313 lines
11 KiB
Python
313 lines
11 KiB
Python
# Fusion Accounting - Unified Bank Statement Import Wizard
|
|
# Accepts file uploads, auto-detects format (CSV, OFX, QIF, CAMT.053),
|
|
# routes to the correct parser, and creates bank statement lines.
|
|
|
|
import base64
|
|
import logging
|
|
|
|
from odoo import _, api, fields, models
|
|
from odoo.exceptions import UserError
|
|
|
|
from ..models.bank_statement_import_ofx import FusionOFXParser
|
|
from ..models.bank_statement_import_qif import FusionQIFParser
|
|
from ..models.bank_statement_import_camt import FusionCAMTParser
|
|
|
|
_log = logging.getLogger(__name__)
|
|
|
|
|
|
class FusionBankStatementImportWizard(models.TransientModel):
|
|
"""Transient wizard that provides a unified interface for importing
|
|
bank statements from multiple file formats.
|
|
|
|
The wizard:
|
|
|
|
1. Accepts a binary file upload from the user
|
|
2. Auto-detects the file format (OFX, QIF, CAMT.053, or CSV)
|
|
3. Routes the file to the appropriate parser
|
|
4. Creates ``account.bank.statement`` and ``account.bank.statement.line``
|
|
records from the parsed data
|
|
5. Reports the import results (or errors) to the user
|
|
"""
|
|
|
|
_name = 'fusion.bank.statement.import'
|
|
_description = 'Import Bank Statement'
|
|
|
|
# ---- Fields ----
|
|
journal_id = fields.Many2one(
|
|
'account.journal',
|
|
string='Bank Journal',
|
|
required=True,
|
|
domain="[('type', 'in', ('bank', 'credit', 'cash'))]",
|
|
default=lambda self: self._default_journal_id(),
|
|
help="The journal to import the bank statement into.",
|
|
)
|
|
data_file = fields.Binary(
|
|
string='Bank Statement File',
|
|
required=True,
|
|
help=(
|
|
"Upload a bank statement file. Supported formats: "
|
|
"OFX, QIF, CAMT.053 (XML), and CSV."
|
|
),
|
|
)
|
|
filename = fields.Char(
|
|
string='Filename',
|
|
help="Name of the uploaded file.",
|
|
)
|
|
detected_format = fields.Char(
|
|
string='Detected Format',
|
|
readonly=True,
|
|
help="The file format detected by the auto-detection engine.",
|
|
)
|
|
import_result = fields.Text(
|
|
string='Import Result',
|
|
readonly=True,
|
|
help="Summary of the import operation.",
|
|
)
|
|
|
|
# ---- Defaults ----
|
|
@api.model
|
|
def _default_journal_id(self):
|
|
"""Default to the journal from context, or the first bank journal."""
|
|
journal_id = self.env.context.get('default_journal_id')
|
|
if journal_id:
|
|
return journal_id
|
|
return self.env['account.journal'].search(
|
|
[('type', '=', 'bank')], limit=1,
|
|
).id or False
|
|
|
|
# ---- Onchange: auto-detect on file upload ----
|
|
@api.onchange('data_file', 'filename')
|
|
def _onchange_data_file(self):
|
|
"""Auto-detect the file format when a file is uploaded."""
|
|
if not self.data_file:
|
|
self.detected_format = ''
|
|
return
|
|
raw_data = base64.b64decode(self.data_file)
|
|
fmt = self._detect_format(raw_data, self.filename or '')
|
|
self.detected_format = fmt
|
|
|
|
# ---- Format detection ----
|
|
@staticmethod
|
|
def _detect_format(raw_data, filename=''):
|
|
"""Examine the file content (and optionally the filename) to
|
|
determine the file format.
|
|
|
|
Returns one of: ``'OFX'``, ``'QIF'``, ``'CAMT.053'``, ``'CSV'``,
|
|
or ``'Unknown'``.
|
|
"""
|
|
# Decode a preview of the file for text-based detection
|
|
try:
|
|
preview = raw_data[:8192].decode('utf-8-sig', errors='ignore')
|
|
except Exception:
|
|
preview = ''
|
|
|
|
preview_upper = preview.upper().strip()
|
|
fname_lower = (filename or '').lower().strip()
|
|
|
|
# --- CAMT.053 (XML with ISO 20022 namespace) ---
|
|
if 'camt.053' in preview.lower() or 'BkToCstmrStmt' in preview:
|
|
return 'CAMT.053'
|
|
|
|
# --- OFX (v1 SGML or v2 XML) ---
|
|
if (preview_upper.startswith('<?OFX') or
|
|
'<OFX>' in preview_upper or
|
|
'OFXHEADER:' in preview_upper):
|
|
return 'OFX'
|
|
if fname_lower.endswith('.ofx'):
|
|
return 'OFX'
|
|
|
|
# --- QIF ---
|
|
if preview_upper.startswith('!TYPE:') or preview_upper.startswith('!ACCOUNT:'):
|
|
return 'QIF'
|
|
if fname_lower.endswith('.qif'):
|
|
return 'QIF'
|
|
|
|
# --- CSV / spreadsheet ---
|
|
if fname_lower.endswith(('.csv', '.xls', '.xlsx')):
|
|
return 'CSV'
|
|
|
|
# Content-based CSV heuristic: looks for comma-separated or
|
|
# semicolon-separated tabular data
|
|
lines = preview.split('\n')
|
|
if len(lines) >= 2:
|
|
first_line = lines[0].strip()
|
|
if (',' in first_line or ';' in first_line or '\t' in first_line):
|
|
# Check second line has a similar number of separators
|
|
sep = ',' if ',' in first_line else (';' if ';' in first_line else '\t')
|
|
if abs(first_line.count(sep) - lines[1].strip().count(sep)) <= 1:
|
|
return 'CSV'
|
|
|
|
return 'Unknown'
|
|
|
|
# ---- Main import action ----
|
|
def action_import(self):
|
|
"""Main entry point: decode the file, detect format, parse, and
|
|
create bank statement records."""
|
|
self.ensure_one()
|
|
|
|
if not self.data_file:
|
|
raise UserError(_("Please upload a bank statement file."))
|
|
|
|
raw_data = base64.b64decode(self.data_file)
|
|
if not raw_data:
|
|
raise UserError(_("The uploaded file is empty."))
|
|
|
|
fmt = self._detect_format(raw_data, self.filename or '')
|
|
self.detected_format = fmt
|
|
|
|
if fmt == 'CSV':
|
|
return self._import_csv(raw_data)
|
|
elif fmt == 'OFX':
|
|
return self._import_parsed(raw_data, FusionOFXParser(), 'parse_ofx', 'OFX')
|
|
elif fmt == 'QIF':
|
|
return self._import_qif(raw_data)
|
|
elif fmt == 'CAMT.053':
|
|
return self._import_parsed(raw_data, FusionCAMTParser(), 'parse_camt', 'CAMT.053')
|
|
else:
|
|
raise UserError(
|
|
_("Could not determine the file format. "
|
|
"Supported formats are: OFX, QIF, CAMT.053, and CSV.")
|
|
)
|
|
|
|
# ---- Format-specific import handlers ----
|
|
|
|
def _import_parsed(self, raw_data, parser, method_name, label):
|
|
"""Generic handler for parsers that return a list of statement
|
|
dicts (OFX, CAMT.053)."""
|
|
try:
|
|
parse_fn = getattr(parser, method_name)
|
|
statements = parse_fn(raw_data)
|
|
except UserError:
|
|
raise
|
|
except Exception as exc:
|
|
_log.exception("%s parsing failed", label)
|
|
raise UserError(
|
|
_("Failed to parse the %(format)s file: %(error)s",
|
|
format=label, error=str(exc))
|
|
) from exc
|
|
|
|
if not statements:
|
|
raise UserError(
|
|
_("No statements found in the %(format)s file.", format=label)
|
|
)
|
|
|
|
return self._create_statements_from_parsed(statements, label)
|
|
|
|
def _import_qif(self, raw_data):
|
|
"""Handle QIF import — the parser returns a single dict."""
|
|
parser = FusionQIFParser()
|
|
try:
|
|
stmt = parser.parse_qif(raw_data)
|
|
except UserError:
|
|
raise
|
|
except Exception as exc:
|
|
_log.exception("QIF parsing failed")
|
|
raise UserError(
|
|
_("Failed to parse the QIF file: %s", str(exc))
|
|
) from exc
|
|
|
|
if not stmt:
|
|
raise UserError(_("No transactions found in the QIF file."))
|
|
|
|
return self._create_statements_from_parsed([stmt], 'QIF')
|
|
|
|
def _import_csv(self, raw_data):
|
|
"""Redirect CSV files to the existing base_import column-mapping
|
|
wizard, which provides interactive field mapping for CSV data."""
|
|
attachment = self.env['ir.attachment'].create({
|
|
'name': self.filename or 'bank_statement.csv',
|
|
'type': 'binary',
|
|
'raw': raw_data,
|
|
'mimetype': 'text/csv',
|
|
})
|
|
return self.journal_id._import_bank_statement(attachment)
|
|
|
|
# ---- Statement creation ----
|
|
|
|
def _create_statements_from_parsed(self, statements, format_label):
|
|
"""Create bank statement records from parsed statement dicts
|
|
and open the reconciliation widget for the imported lines.
|
|
|
|
This method delegates to the journal's existing import pipeline
|
|
for proper duplicate detection, partner matching, and statement
|
|
creation.
|
|
"""
|
|
journal = self.journal_id
|
|
if not journal:
|
|
raise UserError(_("Please select a bank journal."))
|
|
|
|
if not journal.default_account_id:
|
|
raise UserError(
|
|
_("You must set a Default Account for the journal: %s",
|
|
journal.name)
|
|
)
|
|
|
|
# Extract currency / account from first statement
|
|
currency_code = None
|
|
account_number = None
|
|
for stmt in statements:
|
|
if stmt.get('currency_code'):
|
|
currency_code = stmt['currency_code']
|
|
if stmt.get('account_number'):
|
|
account_number = stmt['account_number']
|
|
if currency_code and account_number:
|
|
break
|
|
|
|
# Validate through the journal pipeline
|
|
journal._check_parsed_data(statements, account_number)
|
|
|
|
# Find / validate journal match
|
|
target_journal = journal._find_additional_data(currency_code, account_number)
|
|
|
|
# Create an attachment reference for the import
|
|
attachment = self.env['ir.attachment'].create({
|
|
'name': self.filename or f'{format_label}_import',
|
|
'type': 'binary',
|
|
'raw': base64.b64decode(self.data_file),
|
|
'mimetype': 'application/octet-stream',
|
|
})
|
|
|
|
# Complete statement values
|
|
statements = target_journal._complete_bank_statement_vals(
|
|
statements, target_journal, account_number, attachment,
|
|
)
|
|
|
|
# Create statement records
|
|
stmt_ids, line_ids, notifications = target_journal._create_bank_statements(
|
|
statements,
|
|
)
|
|
|
|
if not stmt_ids:
|
|
raise UserError(
|
|
_("No new transactions were imported. "
|
|
"The file may contain only duplicates.")
|
|
)
|
|
|
|
# Build a summary message
|
|
stmt_records = self.env['account.bank.statement'].browse(stmt_ids)
|
|
total_lines = len(line_ids)
|
|
total_stmts = len(stmt_ids)
|
|
|
|
summary = _(
|
|
"Successfully imported %(lines)d transaction(s) into "
|
|
"%(stmts)d statement(s) from %(format)s file.",
|
|
lines=total_lines,
|
|
stmts=total_stmts,
|
|
format=format_label,
|
|
)
|
|
|
|
for notif in notifications:
|
|
summary += f"\n{notif.get('message', '')}"
|
|
|
|
# Open the reconciliation widget for the new lines
|
|
return stmt_records.line_ids._action_open_bank_reconciliation_widget(
|
|
extra_domain=[('statement_id', 'in', stmt_ids)],
|
|
default_context={
|
|
'search_default_not_matched': True,
|
|
'default_journal_id': target_journal.id,
|
|
'notifications': {
|
|
self.filename or format_label: summary,
|
|
},
|
|
},
|
|
)
|