164 lines
5.3 KiB
Python
164 lines
5.3 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2024-2025 Nexa Systems Inc.
|
|
# License OPL-1 (Odoo Proprietary License v1.0)
|
|
# Part of the Fusion Claim Assistant product family.
|
|
|
|
import base64
|
|
import io
|
|
import logging
|
|
import zipfile
|
|
from datetime import date
|
|
|
|
from odoo import models, fields, api, _
|
|
from odoo.exceptions import UserError
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ADPImportWizard(models.TransientModel):
|
|
_name = 'fusion_claims.adp.import.wizard'
|
|
_description = 'Import ADP Export Files'
|
|
|
|
txt_files = fields.Many2many(
|
|
'ir.attachment',
|
|
string='TXT Files',
|
|
help='Select one or more ADP export .txt files to import',
|
|
)
|
|
zip_file = fields.Binary(string='ZIP File')
|
|
zip_filename = fields.Char()
|
|
result_message = fields.Text(string='Import Results', readonly=True)
|
|
state = fields.Selection([
|
|
('draft', 'Upload'),
|
|
('done', 'Done'),
|
|
], default='draft')
|
|
|
|
def action_import(self):
|
|
"""Process uploaded files and create export records."""
|
|
self.ensure_one()
|
|
if not self.txt_files and not self.zip_file:
|
|
raise UserError(_('Please upload .txt files or a ZIP file.'))
|
|
|
|
ExportRecord = self.env['fusion_claims.adp.export.record']
|
|
imported = 0
|
|
skipped = 0
|
|
errors = []
|
|
|
|
file_list, skipped_non_txt = self._collect_files()
|
|
|
|
for filename, file_data_b64 in file_list:
|
|
try:
|
|
existing = ExportRecord.search([('name', '=', filename)], limit=1)
|
|
if existing:
|
|
skipped += 1
|
|
continue
|
|
|
|
vendor_code, file_date = ExportRecord._parse_export_filename(filename)
|
|
posting_date = ExportRecord._get_posting_period_for_file(
|
|
file_date if file_date else date.today()
|
|
)
|
|
|
|
ExportRecord.create({
|
|
'name': filename,
|
|
'filename': filename,
|
|
'file_data': file_data_b64,
|
|
'export_date': fields.Datetime.now(),
|
|
'posting_period_date': posting_date,
|
|
'vendor_code': vendor_code or '',
|
|
'user_id': self.env.uid,
|
|
'company_id': self.env.company.id,
|
|
'notes': 'Imported on %s' % date.today(),
|
|
})
|
|
imported += 1
|
|
|
|
except Exception as e:
|
|
errors.append('%s: %s' % (filename, str(e)))
|
|
_logger.exception('Error importing file %s', filename)
|
|
|
|
lines = [
|
|
'Import Complete!',
|
|
'- Files imported: %d' % imported,
|
|
'- Files skipped (already exist): %d' % skipped,
|
|
'- Non-txt files ignored: %d' % skipped_non_txt,
|
|
]
|
|
if errors:
|
|
lines.append('\nErrors (%d):' % len(errors))
|
|
for err in errors:
|
|
lines.append(' - %s' % err)
|
|
|
|
self.result_message = '\n'.join(lines)
|
|
self.state = 'done'
|
|
|
|
return {
|
|
'type': 'ir.actions.act_window',
|
|
'res_model': self._name,
|
|
'view_mode': 'form',
|
|
'res_id': self.id,
|
|
'target': 'new',
|
|
}
|
|
|
|
def _collect_files(self):
|
|
"""Collect all .txt files from both individual uploads and ZIP.
|
|
|
|
Returns (file_list, skipped_count) where file_list is
|
|
a list of (filename, base64_data) tuples.
|
|
"""
|
|
files = []
|
|
skipped_types = []
|
|
|
|
for attachment in self.txt_files:
|
|
name = attachment.name or ''
|
|
if not name.lower().endswith('.txt'):
|
|
skipped_types.append(name)
|
|
continue
|
|
files.append((name, attachment.datas))
|
|
|
|
if skipped_types and not files and not self.zip_file:
|
|
raise UserError(_(
|
|
'Only .txt files are supported. Skipped: %s',
|
|
', '.join(skipped_types),
|
|
))
|
|
|
|
if self.zip_file:
|
|
files.extend(self._extract_txt_from_zip())
|
|
|
|
return files, len(skipped_types)
|
|
|
|
def _extract_txt_from_zip(self):
|
|
"""Scan all folders/subfolders in the ZIP and extract .txt files.
|
|
|
|
Returns list of (filename, base64_data) tuples.
|
|
Uses only the base filename (no folder path) as the record name.
|
|
"""
|
|
raw = base64.b64decode(self.zip_file)
|
|
buf = io.BytesIO(raw)
|
|
|
|
if not zipfile.is_zipfile(buf):
|
|
raise UserError(_('The uploaded file is not a valid ZIP archive.'))
|
|
|
|
buf.seek(0)
|
|
results = []
|
|
seen = set()
|
|
|
|
with zipfile.ZipFile(buf, 'r') as zf:
|
|
for entry in zf.infolist():
|
|
if entry.is_dir():
|
|
continue
|
|
lower = entry.filename.lower()
|
|
if not lower.endswith('.txt'):
|
|
continue
|
|
if lower.startswith('__macosx'):
|
|
continue
|
|
|
|
basename = entry.filename.rsplit('/', 1)[-1]
|
|
if not basename or basename in seen:
|
|
continue
|
|
seen.add(basename)
|
|
|
|
data = zf.read(entry.filename)
|
|
results.append((basename, base64.b64encode(data).decode('ascii')))
|
|
|
|
if not results:
|
|
raise UserError(_('No .txt files found in the ZIP archive.'))
|
|
|
|
return results
|