# -*- coding: utf-8 -*- # Copyright 2024-2025 Nexa Systems Inc. # License OPL-1 (Odoo Proprietary License v1.0) # Part of the Fusion Claim Assistant product family. import base64 import io import logging import zipfile from datetime import date from odoo import models, fields, api, _ from odoo.exceptions import UserError _logger = logging.getLogger(__name__) class ADPImportWizard(models.TransientModel): _name = 'fusion_claims.adp.import.wizard' _description = 'Import ADP Export Files' txt_files = fields.Many2many( 'ir.attachment', string='TXT Files', help='Select one or more ADP export .txt files to import', ) zip_file = fields.Binary(string='ZIP File') zip_filename = fields.Char() result_message = fields.Text(string='Import Results', readonly=True) state = fields.Selection([ ('draft', 'Upload'), ('done', 'Done'), ], default='draft') def action_import(self): """Process uploaded files and create export records.""" self.ensure_one() if not self.txt_files and not self.zip_file: raise UserError(_('Please upload .txt files or a ZIP file.')) ExportRecord = self.env['fusion_claims.adp.export.record'] imported = 0 skipped = 0 errors = [] file_list, skipped_non_txt = self._collect_files() for filename, file_data_b64 in file_list: try: existing = ExportRecord.search([('name', '=', filename)], limit=1) if existing: skipped += 1 continue vendor_code, file_date = ExportRecord._parse_export_filename(filename) posting_date = ExportRecord._get_posting_period_for_file( file_date if file_date else date.today() ) ExportRecord.create({ 'name': filename, 'filename': filename, 'file_data': file_data_b64, 'export_date': fields.Datetime.now(), 'posting_period_date': posting_date, 'vendor_code': vendor_code or '', 'user_id': self.env.uid, 'company_id': self.env.company.id, 'notes': 'Imported on %s' % date.today(), }) imported += 1 except Exception as e: errors.append('%s: %s' % (filename, str(e))) _logger.exception('Error importing file %s', filename) lines = [ 'Import Complete!', '- Files imported: %d' % imported, '- Files skipped (already exist): %d' % skipped, '- Non-txt files ignored: %d' % skipped_non_txt, ] if errors: lines.append('\nErrors (%d):' % len(errors)) for err in errors: lines.append(' - %s' % err) self.result_message = '\n'.join(lines) self.state = 'done' return { 'type': 'ir.actions.act_window', 'res_model': self._name, 'view_mode': 'form', 'res_id': self.id, 'target': 'new', } def _collect_files(self): """Collect all .txt files from both individual uploads and ZIP. Returns (file_list, skipped_count) where file_list is a list of (filename, base64_data) tuples. """ files = [] skipped_types = [] for attachment in self.txt_files: name = attachment.name or '' if not name.lower().endswith('.txt'): skipped_types.append(name) continue files.append((name, attachment.datas)) if skipped_types and not files and not self.zip_file: raise UserError(_( 'Only .txt files are supported. Skipped: %s', ', '.join(skipped_types), )) if self.zip_file: files.extend(self._extract_txt_from_zip()) return files, len(skipped_types) def _extract_txt_from_zip(self): """Scan all folders/subfolders in the ZIP and extract .txt files. Returns list of (filename, base64_data) tuples. Uses only the base filename (no folder path) as the record name. """ raw = base64.b64decode(self.zip_file) buf = io.BytesIO(raw) if not zipfile.is_zipfile(buf): raise UserError(_('The uploaded file is not a valid ZIP archive.')) buf.seek(0) results = [] seen = set() with zipfile.ZipFile(buf, 'r') as zf: for entry in zf.infolist(): if entry.is_dir(): continue lower = entry.filename.lower() if not lower.endswith('.txt'): continue if lower.startswith('__macosx'): continue basename = entry.filename.rsplit('/', 1)[-1] if not basename or basename in seen: continue seen.add(basename) data = zf.read(entry.filename) results.append((basename, base64.b64encode(data).decode('ascii'))) if not results: raise UserError(_('No .txt files found in the ZIP archive.')) return results