# -*- coding: utf-8 -*- # Copyright 2024-2025 Nexa Systems Inc. # License OPL-1 (Odoo Proprietary License v1.0) # Part of the Fusion Claim Assistant product family. from odoo import models, fields, api, _ from odoo.exceptions import UserError from odoo.tools.misc import file_path as get_resource_path import json import re import os import logging _logger = logging.getLogger(__name__) class FusionADPDeviceCode(models.Model): _name = 'fusion.adp.device.code' _description = 'ADP Device Code Reference (Mobility Manual)' _order = 'device_type, device_code' def _register_hook(self): """ Called when the model is loaded. Re-loads device codes from packaged JSON on module upgrade. """ super()._register_hook() # Use with_context to check if this is a module upgrade # The data will be loaded via post_init_hook on install, # and via this hook on upgrade (when module is reloaded) try: self.sudo()._load_packaged_device_codes() except Exception as e: _logger.warning("Could not auto-load device codes: %s", str(e)) # ========================================================================== # MAIN FIELDS # ========================================================================== device_code = fields.Char( string='Device Code', required=True, index=True, help='ADP device code from the mobility manual', ) name = fields.Char( string='Description', compute='_compute_name', store=True, ) device_type = fields.Char( string='Device Type', index=True, help='Device type/category (e.g., Adult Wheeled Walker Type 1)', ) manufacturer = fields.Char( string='Manufacturer', index=True, help='Device manufacturer', ) build_type = fields.Selection( [('modular', 'Modular'), ('custom_fabricated', 'Custom Fabricated')], string='Build Type', index=True, help='Build type for positioning/seating devices: Modular or Custom Fabricated', ) device_description = fields.Char( string='Device Description', help='Detailed device description from mobility manual', ) max_quantity = fields.Integer( string='Max Quantity', default=1, help='Maximum quantity that can be billed per claim', ) adp_price = fields.Float( string='ADP Price', digits='Product Price', help='Maximum price ADP will cover for this device', ) sn_required = fields.Boolean( string='Serial Number Required', default=False, help='Is serial number required for this device?', ) # ========================================================================== # TRACKING # ========================================================================== active = fields.Boolean( string='Active', default=True, ) last_updated = fields.Datetime( string='Last Updated', default=fields.Datetime.now, ) # ========================================================================== # SQL CONSTRAINTS # ========================================================================== _sql_constraints = [ ('device_code_uniq', 'unique(device_code)', 'Device code must be unique!'), ] # ========================================================================== # COMPUTED FIELDS # ========================================================================== @api.depends('device_code', 'adp_price', 'device_type', 'device_description') def _compute_name(self): for record in self: if record.device_code: if record.device_description: record.name = f"{record.device_code} - {record.device_description} (${record.adp_price:.2f})" else: record.name = f"{record.device_code} (${record.adp_price:.2f})" else: record.name = '' # ========================================================================== # DEVICE TYPE LOOKUP (for wizard display) # ========================================================================== @api.model def get_device_type_for_code(self, device_code): """Get the device type for a given device code.""" if not device_code: return '' device = self.search([('device_code', '=', device_code), ('active', '=', True)], limit=1) return device.device_type or '' @api.model def get_unique_device_types(self): """Get list of unique device types from the database.""" self.flush_model() self.env.cr.execute(""" SELECT DISTINCT device_type FROM fusion_adp_device_code WHERE device_type IS NOT NULL AND device_type != '' AND active = TRUE ORDER BY device_type """) return [row[0] for row in self.env.cr.fetchall()] # ========================================================================== # LOOKUP METHODS # ========================================================================== @api.model def get_device_info(self, device_code): """Get device info by code.""" if not device_code: return None device = self.search([('device_code', '=', device_code), ('active', '=', True)], limit=1) if device: return { 'device_code': device.device_code, 'max_quantity': device.max_quantity, 'adp_price': device.adp_price, 'sn_required': device.sn_required, } return None @api.model def validate_device_code(self, device_code): """Check if a device code exists in the mobility manual.""" if not device_code: return False return bool(self.search([('device_code', '=', device_code), ('active', '=', True)], limit=1)) # ========================================================================== # TEXT CLEANING UTILITIES # ========================================================================== @staticmethod def _clean_text(text): """Clean text from weird characters, normalize encoding.""" if not text: return '' # Convert to string if not already text = str(text) # Remove or replace problematic characters # Replace curly quotes with straight quotes text = text.replace('"', '"').replace('"', '"') text = text.replace(''', "'").replace(''', "'") # Remove non-printable characters except newlines text = ''.join(char if char.isprintable() or char in '\n\r\t' else ' ' for char in text) # Normalize multiple spaces text = re.sub(r'\s+', ' ', text) # Strip leading/trailing whitespace return text.strip() @staticmethod def _parse_price(price_str): """Parse price string like '$64.00' or '$2,578.00' to float.""" if not price_str: return 0.0 # Remove currency symbols, commas, spaces, quotes price_str = str(price_str).strip() price_str = re.sub(r'[\$,"\'\s]', '', price_str) try: return float(price_str) except ValueError: return 0.0 # ========================================================================== # IMPORT FROM JSON # ========================================================================== @api.model def import_from_json(self, json_data): """ Import device codes from JSON data. Expected format (enhanced with device type, manufacturer, description): [ { "Device Type": "Adult Wheeled Walker Type 1", "Manufacturer": "Drive Medical", "Device Description": "One Button Or Dual Trigger Release", "Device Code": "MW1D50005", "Quantity": 1, "ADP Price": 64.00, "SN Required": "Yes" }, ... ] """ if isinstance(json_data, str): try: data = json.loads(json_data) except json.JSONDecodeError as e: raise UserError(_("Invalid JSON data: %s") % str(e)) else: data = json_data if not isinstance(data, list): raise UserError(_("Expected a list of device codes")) created = 0 updated = 0 errors = [] for idx, item in enumerate(data): try: device_code = self._clean_text(item.get('Device Code', '') or item.get('device_code', '')) if not device_code: errors.append(f"Row {idx + 1}: Missing device code") continue # Parse fields with cleaning device_type = self._clean_text(item.get('Device Type', '') or item.get('device_type', '')) manufacturer = self._clean_text(item.get('Manufacturer', '') or item.get('manufacturer', '')) device_description = self._clean_text(item.get('Device Description', '') or item.get('device_description', '')) # Parse build type (Modular / Custom Fabricated) build_type_raw = self._clean_text(item.get('Build Type', '') or item.get('build_type', '')) build_type = False if build_type_raw: bt_lower = build_type_raw.lower().strip() if bt_lower in ('modular', 'mod'): build_type = 'modular' elif bt_lower in ('custom fabricated', 'custom_fabricated', 'custom'): build_type = 'custom_fabricated' # Parse quantity qty_val = item.get('Quantity', 1) or item.get('Qty', 1) or item.get('quantity', 1) max_qty = int(qty_val) if qty_val else 1 # Parse price (handles both raw number and string format) price_val = item.get('ADP Price', 0) or item.get('Approved Price', 0) or item.get('adp_price', 0) if isinstance(price_val, (int, float)): adp_price = float(price_val) else: adp_price = self._parse_price(price_val) # Parse serial requirement - handle boolean, string, and various formats sn_raw = item.get('SN Required') or item.get('Serial') or item.get('SN') or item.get('sn_required') or 'No' # Handle boolean values directly if isinstance(sn_raw, bool): sn_required = sn_raw else: sn_val = str(sn_raw).upper().strip() sn_required = sn_val in ('YES', 'Y', 'TRUE', '1', 'T') # Check if exists existing = self.search([('device_code', '=', device_code)], limit=1) vals = { 'device_type': device_type, 'manufacturer': manufacturer, 'device_description': device_description, 'max_quantity': max_qty, 'adp_price': adp_price, 'sn_required': sn_required, 'last_updated': fields.Datetime.now(), 'active': True, } if build_type: vals['build_type'] = build_type if existing: existing.write(vals) updated += 1 else: vals['device_code'] = device_code self.create(vals) created += 1 except Exception as e: errors.append(f"Row {idx + 1}: {str(e)}") return { 'created': created, 'updated': updated, 'errors': errors, } @api.model def import_from_csv_file(self, file_path): """Import device codes from a CSV file (ADP Mobility Manual format). Expected CSV columns: Device Type, Manufacturer, Device Description, Device Code, Qty, Approved Price, Serial """ import csv try: data = [] with open(file_path, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) for row in reader: # Skip empty rows device_code = (row.get('Device Code', '') or '').strip() if not device_code: continue data.append({ 'Device Type': row.get('Device Type', ''), 'Manufacturer': row.get('Manufacturer', ''), 'Device Description': row.get('Device Description', ''), 'Device Code': device_code, 'Quantity': row.get('Qty', 1), 'ADP Price': row.get(' Approved Price ', '') or row.get('Approved Price', ''), 'SN Required': row.get('Serial', 'No'), }) return self.import_from_json(data) except FileNotFoundError: raise UserError(_("File not found: %s") % file_path) except Exception as e: raise UserError(_("Error reading CSV file: %s") % str(e)) @api.model def import_from_file(self, file_path): """Import device codes from a JSON file.""" try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) return self.import_from_json(data) except FileNotFoundError: raise UserError(_("File not found: %s") % file_path) except json.JSONDecodeError as e: raise UserError(_("Invalid JSON file: %s") % str(e)) except Exception as e: raise UserError(_("Error reading file: %s") % str(e)) # ========================================================================== # AUTO-LOAD FROM PACKAGED DATA FILE # ========================================================================== @api.model def _load_packaged_device_codes(self): """ Load device codes from the packaged JSON file. Called automatically on module install/upgrade via post_init_hook. The JSON file is located at: fusion_claims/data/device_codes/adp_mobility_manual.json """ _logger.info("Loading ADP Mobility Manual device codes from packaged data file...") # Get the path to the packaged JSON file try: json_path = get_resource_path('fusion_claims/data/device_codes/adp_mobility_manual.json') except FileNotFoundError: json_path = None if not json_path or not os.path.exists(json_path): _logger.warning("ADP Mobility Manual JSON file not found at expected location.") return {'created': 0, 'updated': 0, 'errors': ['JSON file not found']} try: with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) result = self.import_from_json(data) _logger.info( "ADP Mobility Manual import complete: %d created, %d updated, %d errors", result.get('created', 0), result.get('updated', 0), len(result.get('errors', [])) ) if result.get('errors'): for error in result['errors'][:10]: # Log first 10 errors _logger.warning("Import error: %s", error) return result except Exception as e: _logger.error("Error loading ADP Mobility Manual: %s", str(e)) return {'created': 0, 'updated': 0, 'errors': [str(e)]}