192 lines
6.8 KiB
Python
192 lines
6.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2024-2025 Nexa Systems Inc.
|
|
# License OPL-1 (Odoo Proprietary License v1.0)
|
|
# Part of the Fusion Claim Assistant product family.
|
|
|
|
import base64
|
|
import csv
|
|
import json
|
|
import re
|
|
import io
|
|
from odoo import models, fields, api, _
|
|
from odoo.exceptions import UserError
|
|
import logging
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
class DeviceCodeImportWizard(models.TransientModel):
|
|
_name = 'fusion_claims.device.import.wizard'
|
|
_description = 'Import ADP Device Codes from JSON/CSV'
|
|
|
|
file = fields.Binary(
|
|
string='File',
|
|
required=True,
|
|
help='Upload a JSON or CSV file containing device codes',
|
|
)
|
|
filename = fields.Char(string='Filename')
|
|
file_type = fields.Selection([
|
|
('json', 'JSON'),
|
|
('csv', 'CSV (ADP Mobility Manual)'),
|
|
], string='File Type', default='json', required=True)
|
|
|
|
# Results
|
|
state = fields.Selection([
|
|
('draft', 'Draft'),
|
|
('done', 'Done'),
|
|
], default='draft')
|
|
|
|
result_created = fields.Integer(string='Created', readonly=True)
|
|
result_updated = fields.Integer(string='Updated', readonly=True)
|
|
result_errors = fields.Text(string='Errors', readonly=True)
|
|
|
|
@api.onchange('filename')
|
|
def _onchange_filename(self):
|
|
"""Auto-detect file type from filename."""
|
|
if self.filename:
|
|
if self.filename.lower().endswith('.csv'):
|
|
self.file_type = 'csv'
|
|
elif self.filename.lower().endswith('.json'):
|
|
self.file_type = 'json'
|
|
|
|
def _parse_csv_content(self, content):
|
|
"""Parse CSV content to data list."""
|
|
data = []
|
|
|
|
# Try to decode as UTF-8 with BOM, then fallback
|
|
try:
|
|
text = content.decode('utf-8-sig')
|
|
except UnicodeDecodeError:
|
|
try:
|
|
text = content.decode('latin-1')
|
|
except Exception:
|
|
text = content.decode('utf-8', errors='ignore')
|
|
|
|
reader = csv.DictReader(io.StringIO(text))
|
|
|
|
# Log the column names for debugging
|
|
_logger.info("CSV columns detected: %s", reader.fieldnames)
|
|
|
|
for row in reader:
|
|
device_code = (row.get('Device Code', '') or '').strip()
|
|
if not device_code:
|
|
continue
|
|
|
|
# Find the price column - check multiple possible names
|
|
price = 0.0
|
|
price_column_names = [
|
|
'ADP Price', 'adp_price', 'Approved Price', ' Approved Price ',
|
|
'Price', 'price', 'ADP_Price', 'adp price'
|
|
]
|
|
# First try exact matches
|
|
for col_name in price_column_names:
|
|
if col_name in row:
|
|
price_str = row.get(col_name, '')
|
|
price_str = re.sub(r'[\$,"\'\s]', '', str(price_str))
|
|
try:
|
|
price = float(price_str)
|
|
if price > 0:
|
|
break
|
|
except ValueError:
|
|
pass
|
|
|
|
# If still 0, try partial match on column names
|
|
if price == 0:
|
|
for key in row.keys():
|
|
key_lower = key.lower().strip()
|
|
if 'price' in key_lower:
|
|
price_str = row.get(key, '')
|
|
price_str = re.sub(r'[\$,"\'\s]', '', str(price_str))
|
|
try:
|
|
price = float(price_str)
|
|
if price > 0:
|
|
_logger.info("Found price in column '%s': %s", key, price)
|
|
break
|
|
except ValueError:
|
|
pass
|
|
|
|
# Find the serial/SN required column - check multiple possible names
|
|
sn_required = 'No'
|
|
sn_column_names = [
|
|
'SN Required', 'sn_required', 'Serial', 'serial',
|
|
'SN', 'sn', 'Serial Number Required', 'Requires SN'
|
|
]
|
|
# First try exact matches
|
|
for col_name in sn_column_names:
|
|
if col_name in row:
|
|
sn_required = row.get(col_name, 'No')
|
|
break
|
|
|
|
# If not found, try partial match
|
|
if sn_required == 'No':
|
|
for key in row.keys():
|
|
key_lower = key.lower().strip()
|
|
if 'serial' in key_lower or 'sn' in key_lower:
|
|
sn_required = row.get(key, 'No')
|
|
_logger.info("Found SN in column '%s': %s", key, sn_required)
|
|
break
|
|
|
|
data.append({
|
|
'Device Type': row.get('Device Type', ''),
|
|
'Manufacturer': row.get('Manufacturer', ''),
|
|
'Device Description': row.get('Device Description', ''),
|
|
'Device Code': device_code,
|
|
'Quantity': row.get('Qty', 1) or row.get('Quantity', 1),
|
|
'ADP Price': price,
|
|
'SN Required': sn_required,
|
|
})
|
|
|
|
_logger.info("Parsed %d device codes from CSV", len(data))
|
|
return data
|
|
|
|
def action_import(self):
|
|
"""Import device codes from uploaded file."""
|
|
self.ensure_one()
|
|
|
|
if not self.file:
|
|
raise UserError(_("Please upload a file."))
|
|
|
|
try:
|
|
# Decode file
|
|
file_content = base64.b64decode(self.file)
|
|
|
|
if self.file_type == 'csv':
|
|
# Parse CSV
|
|
data = self._parse_csv_content(file_content)
|
|
else:
|
|
# Parse JSON
|
|
try:
|
|
text = file_content.decode('utf-8-sig')
|
|
except UnicodeDecodeError:
|
|
text = file_content.decode('utf-8')
|
|
data = json.loads(text)
|
|
|
|
except json.JSONDecodeError as e:
|
|
raise UserError(_("Invalid JSON file: %s") % str(e))
|
|
except Exception as e:
|
|
raise UserError(_("Error reading file: %s") % str(e))
|
|
|
|
if not data:
|
|
raise UserError(_("No valid data found in file."))
|
|
|
|
# Import using the model method
|
|
DeviceCode = self.env['fusion.adp.device.code']
|
|
result = DeviceCode.import_from_json(data)
|
|
|
|
# Update wizard with results
|
|
self.write({
|
|
'state': 'done',
|
|
'result_created': result['created'],
|
|
'result_updated': result['updated'],
|
|
'result_errors': '\n'.join(result['errors']) if result['errors'] else '',
|
|
})
|
|
|
|
return {
|
|
'type': 'ir.actions.act_window',
|
|
'res_model': self._name,
|
|
'res_id': self.id,
|
|
'view_mode': 'form',
|
|
'target': 'new',
|
|
}
|
|
|