update
This commit is contained in:
163
fusion_claims/wizard/adp_import_wizard.py
Normal file
163
fusion_claims/wizard/adp_import_wizard.py
Normal file
@@ -0,0 +1,163 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2024-2025 Nexa Systems Inc.
|
||||
# License OPL-1 (Odoo Proprietary License v1.0)
|
||||
# Part of the Fusion Claim Assistant product family.
|
||||
|
||||
import base64
|
||||
import io
|
||||
import logging
|
||||
import zipfile
|
||||
from datetime import date
|
||||
|
||||
from odoo import models, fields, api, _
|
||||
from odoo.exceptions import UserError
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ADPImportWizard(models.TransientModel):
|
||||
_name = 'fusion_claims.adp.import.wizard'
|
||||
_description = 'Import ADP Export Files'
|
||||
|
||||
txt_files = fields.Many2many(
|
||||
'ir.attachment',
|
||||
string='TXT Files',
|
||||
help='Select one or more ADP export .txt files to import',
|
||||
)
|
||||
zip_file = fields.Binary(string='ZIP File')
|
||||
zip_filename = fields.Char()
|
||||
result_message = fields.Text(string='Import Results', readonly=True)
|
||||
state = fields.Selection([
|
||||
('draft', 'Upload'),
|
||||
('done', 'Done'),
|
||||
], default='draft')
|
||||
|
||||
def action_import(self):
|
||||
"""Process uploaded files and create export records."""
|
||||
self.ensure_one()
|
||||
if not self.txt_files and not self.zip_file:
|
||||
raise UserError(_('Please upload .txt files or a ZIP file.'))
|
||||
|
||||
ExportRecord = self.env['fusion_claims.adp.export.record']
|
||||
imported = 0
|
||||
skipped = 0
|
||||
errors = []
|
||||
|
||||
file_list, skipped_non_txt = self._collect_files()
|
||||
|
||||
for filename, file_data_b64 in file_list:
|
||||
try:
|
||||
existing = ExportRecord.search([('name', '=', filename)], limit=1)
|
||||
if existing:
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
vendor_code, file_date = ExportRecord._parse_export_filename(filename)
|
||||
posting_date = ExportRecord._get_posting_period_for_file(
|
||||
file_date if file_date else date.today()
|
||||
)
|
||||
|
||||
ExportRecord.create({
|
||||
'name': filename,
|
||||
'filename': filename,
|
||||
'file_data': file_data_b64,
|
||||
'export_date': fields.Datetime.now(),
|
||||
'posting_period_date': posting_date,
|
||||
'vendor_code': vendor_code or '',
|
||||
'user_id': self.env.uid,
|
||||
'company_id': self.env.company.id,
|
||||
'notes': 'Imported on %s' % date.today(),
|
||||
})
|
||||
imported += 1
|
||||
|
||||
except Exception as e:
|
||||
errors.append('%s: %s' % (filename, str(e)))
|
||||
_logger.exception('Error importing file %s', filename)
|
||||
|
||||
lines = [
|
||||
'Import Complete!',
|
||||
'- Files imported: %d' % imported,
|
||||
'- Files skipped (already exist): %d' % skipped,
|
||||
'- Non-txt files ignored: %d' % skipped_non_txt,
|
||||
]
|
||||
if errors:
|
||||
lines.append('\nErrors (%d):' % len(errors))
|
||||
for err in errors:
|
||||
lines.append(' - %s' % err)
|
||||
|
||||
self.result_message = '\n'.join(lines)
|
||||
self.state = 'done'
|
||||
|
||||
return {
|
||||
'type': 'ir.actions.act_window',
|
||||
'res_model': self._name,
|
||||
'view_mode': 'form',
|
||||
'res_id': self.id,
|
||||
'target': 'new',
|
||||
}
|
||||
|
||||
def _collect_files(self):
|
||||
"""Collect all .txt files from both individual uploads and ZIP.
|
||||
|
||||
Returns (file_list, skipped_count) where file_list is
|
||||
a list of (filename, base64_data) tuples.
|
||||
"""
|
||||
files = []
|
||||
skipped_types = []
|
||||
|
||||
for attachment in self.txt_files:
|
||||
name = attachment.name or ''
|
||||
if not name.lower().endswith('.txt'):
|
||||
skipped_types.append(name)
|
||||
continue
|
||||
files.append((name, attachment.datas))
|
||||
|
||||
if skipped_types and not files and not self.zip_file:
|
||||
raise UserError(_(
|
||||
'Only .txt files are supported. Skipped: %s',
|
||||
', '.join(skipped_types),
|
||||
))
|
||||
|
||||
if self.zip_file:
|
||||
files.extend(self._extract_txt_from_zip())
|
||||
|
||||
return files, len(skipped_types)
|
||||
|
||||
def _extract_txt_from_zip(self):
|
||||
"""Scan all folders/subfolders in the ZIP and extract .txt files.
|
||||
|
||||
Returns list of (filename, base64_data) tuples.
|
||||
Uses only the base filename (no folder path) as the record name.
|
||||
"""
|
||||
raw = base64.b64decode(self.zip_file)
|
||||
buf = io.BytesIO(raw)
|
||||
|
||||
if not zipfile.is_zipfile(buf):
|
||||
raise UserError(_('The uploaded file is not a valid ZIP archive.'))
|
||||
|
||||
buf.seek(0)
|
||||
results = []
|
||||
seen = set()
|
||||
|
||||
with zipfile.ZipFile(buf, 'r') as zf:
|
||||
for entry in zf.infolist():
|
||||
if entry.is_dir():
|
||||
continue
|
||||
lower = entry.filename.lower()
|
||||
if not lower.endswith('.txt'):
|
||||
continue
|
||||
if lower.startswith('__macosx'):
|
||||
continue
|
||||
|
||||
basename = entry.filename.rsplit('/', 1)[-1]
|
||||
if not basename or basename in seen:
|
||||
continue
|
||||
seen.add(basename)
|
||||
|
||||
data = zf.read(entry.filename)
|
||||
results.append((basename, base64.b64encode(data).decode('ascii')))
|
||||
|
||||
if not results:
|
||||
raise UserError(_('No .txt files found in the ZIP archive.'))
|
||||
|
||||
return results
|
||||
Reference in New Issue
Block a user