Files
Odoo-Modules/fusion_claims/wizard/adp_import_wizard.py
gsinghpal e56974d46f update
2026-03-16 08:14:56 -04:00

164 lines
5.3 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2024-2025 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
# Part of the Fusion Claim Assistant product family.
import base64
import io
import logging
import zipfile
from datetime import date
from odoo import models, fields, api, _
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
class ADPImportWizard(models.TransientModel):
_name = 'fusion_claims.adp.import.wizard'
_description = 'Import ADP Export Files'
txt_files = fields.Many2many(
'ir.attachment',
string='TXT Files',
help='Select one or more ADP export .txt files to import',
)
zip_file = fields.Binary(string='ZIP File')
zip_filename = fields.Char()
result_message = fields.Text(string='Import Results', readonly=True)
state = fields.Selection([
('draft', 'Upload'),
('done', 'Done'),
], default='draft')
def action_import(self):
"""Process uploaded files and create export records."""
self.ensure_one()
if not self.txt_files and not self.zip_file:
raise UserError(_('Please upload .txt files or a ZIP file.'))
ExportRecord = self.env['fusion_claims.adp.export.record']
imported = 0
skipped = 0
errors = []
file_list, skipped_non_txt = self._collect_files()
for filename, file_data_b64 in file_list:
try:
existing = ExportRecord.search([('name', '=', filename)], limit=1)
if existing:
skipped += 1
continue
vendor_code, file_date = ExportRecord._parse_export_filename(filename)
posting_date = ExportRecord._get_posting_period_for_file(
file_date if file_date else date.today()
)
ExportRecord.create({
'name': filename,
'filename': filename,
'file_data': file_data_b64,
'export_date': fields.Datetime.now(),
'posting_period_date': posting_date,
'vendor_code': vendor_code or '',
'user_id': self.env.uid,
'company_id': self.env.company.id,
'notes': 'Imported on %s' % date.today(),
})
imported += 1
except Exception as e:
errors.append('%s: %s' % (filename, str(e)))
_logger.exception('Error importing file %s', filename)
lines = [
'Import Complete!',
'- Files imported: %d' % imported,
'- Files skipped (already exist): %d' % skipped,
'- Non-txt files ignored: %d' % skipped_non_txt,
]
if errors:
lines.append('\nErrors (%d):' % len(errors))
for err in errors:
lines.append(' - %s' % err)
self.result_message = '\n'.join(lines)
self.state = 'done'
return {
'type': 'ir.actions.act_window',
'res_model': self._name,
'view_mode': 'form',
'res_id': self.id,
'target': 'new',
}
def _collect_files(self):
"""Collect all .txt files from both individual uploads and ZIP.
Returns (file_list, skipped_count) where file_list is
a list of (filename, base64_data) tuples.
"""
files = []
skipped_types = []
for attachment in self.txt_files:
name = attachment.name or ''
if not name.lower().endswith('.txt'):
skipped_types.append(name)
continue
files.append((name, attachment.datas))
if skipped_types and not files and not self.zip_file:
raise UserError(_(
'Only .txt files are supported. Skipped: %s',
', '.join(skipped_types),
))
if self.zip_file:
files.extend(self._extract_txt_from_zip())
return files, len(skipped_types)
def _extract_txt_from_zip(self):
"""Scan all folders/subfolders in the ZIP and extract .txt files.
Returns list of (filename, base64_data) tuples.
Uses only the base filename (no folder path) as the record name.
"""
raw = base64.b64decode(self.zip_file)
buf = io.BytesIO(raw)
if not zipfile.is_zipfile(buf):
raise UserError(_('The uploaded file is not a valid ZIP archive.'))
buf.seek(0)
results = []
seen = set()
with zipfile.ZipFile(buf, 'r') as zf:
for entry in zf.infolist():
if entry.is_dir():
continue
lower = entry.filename.lower()
if not lower.endswith('.txt'):
continue
if lower.startswith('__macosx'):
continue
basename = entry.filename.rsplit('/', 1)[-1]
if not basename or basename in seen:
continue
seen.add(basename)
data = zf.read(entry.filename)
results.append((basename, base64.b64encode(data).decode('ascii')))
if not results:
raise UserError(_('No .txt files found in the ZIP archive.'))
return results