# Fusion Accounting - CAMT.053 Bank Statement Parser # Original implementation for ISO 20022 camt.053 bank-to-customer statement # Based on the published ISO 20022 message definitions import logging from datetime import datetime from xml.etree import ElementTree from odoo import _, models from odoo.exceptions import UserError _log = logging.getLogger(__name__) class FusionCAMTParser: """Standalone parser for ISO 20022 CAMT.053 XML bank statements. CAMT.053 (Bank-to-Customer Statement) is the international standard for electronic bank statements. This parser supports the following schema versions: * ``camt.053.001.02`` — original version * ``camt.053.001.03`` through ``camt.053.001.08`` — subsequent revisions (structurally compatible for the fields we consume) The parser auto-detects the XML namespace from the document root. This is an **original** implementation written from the published ISO 20022 message definitions — it is not derived from Odoo Enterprise. """ # Namespace prefixes we recognise (base URI without version suffix) _CAMT_NS_BASE = 'urn:iso:std:iso:20022:tech:xsd:camt.053.001.' # ------------------------------------------------------------------- # Public API # ------------------------------------------------------------------- def parse_camt(self, data_file): """Parse a CAMT.053 XML file and return a list of statement dicts compatible with the Fusion Accounting import pipeline. Each dict has the keys: - ``name`` : statement identification (from ````) - ``date`` : creation date - ``balance_start`` : opening booked balance - ``balance_end_real``: closing booked balance - ``currency_code`` : ISO 4217 currency - ``account_number`` : IBAN or other account identifier - ``transactions`` : list of transaction dicts Transaction dicts contain: - ``date`` : booking date - ``payment_ref`` : combined reference / remittance info - ``ref`` : end-to-end reference or instruction id - ``amount`` : signed float (negative for debits) - ``unique_import_id`` : generated unique key - ``partner_name`` : debtor or creditor name - ``account_number`` : debtor/creditor IBAN """ raw_xml = self._to_bytes(data_file) root = self._parse_xml(raw_xml) ns = self._detect_namespace(root) return self._extract_statements(root, ns) # ------------------------------------------------------------------- # Input handling # ------------------------------------------------------------------- @staticmethod def _to_bytes(data_file): """Ensure *data_file* is bytes for XML parsing.""" if isinstance(data_file, str): return data_file.encode('utf-8') return data_file @staticmethod def _parse_xml(raw_xml): """Parse raw XML bytes and return the root Element.""" try: return ElementTree.fromstring(raw_xml) except ElementTree.ParseError as exc: raise UserError( _("Failed to parse CAMT.053 XML: %s", str(exc)) ) from exc def _detect_namespace(self, root): """Auto-detect the CAMT.053 namespace from the document root. Returns a dict ``{'ns': 'urn:...'}`` suitable for passing to ``Element.find()`` / ``Element.findall()``.""" tag = root.tag if '}' in tag: ns_uri = tag.split('}')[0].lstrip('{') else: ns_uri = '' if ns_uri and not ns_uri.startswith(self._CAMT_NS_BASE): _log.warning( "Unexpected CAMT namespace: %s (expected %s*)", ns_uri, self._CAMT_NS_BASE, ) return {'ns': ns_uri} if ns_uri else {} # ------------------------------------------------------------------- # Convenience helpers for namespaced tag access # ------------------------------------------------------------------- @staticmethod def _tag(ns_map, local_name): """Build a namespaced tag string for ElementTree lookups.""" ns = ns_map.get('ns', '') if ns: return f'{{{ns}}}{local_name}' return local_name def _find(self, parent, ns, path): """Find the first child element matching a ``/``-separated *path* of local tag names.""" current = parent for part in path.split('/'): if current is None: return None current = current.find(self._tag(ns, part)) return current def _find_text(self, parent, ns, path): """Return the stripped text of the element at *path*, or ``None``.""" el = self._find(parent, ns, path) if el is not None and el.text: return el.text.strip() return None def _findall(self, parent, ns, local_name): """Return all direct children matching *local_name*.""" return parent.findall(self._tag(ns, local_name)) def _iter(self, parent, ns, local_name): """Iterate over all descendant elements matching *local_name*.""" return parent.iter(self._tag(ns, local_name)) # ------------------------------------------------------------------- # Statement-level extraction # ------------------------------------------------------------------- def _extract_statements(self, root, ns): """Extract all ```` elements from the document.""" statements = [] # CAMT.053 structure: Document > BkToCstmrStmt > Stmt (repeating) for stmt_el in self._iter(root, ns, 'Stmt'): stmt = self._extract_single_statement(stmt_el, ns) if stmt: statements.append(stmt) if not statements: raise UserError( _("No statements found in the CAMT.053 file.") ) return statements def _extract_single_statement(self, stmt_el, ns): """Extract one ```` element into a statement dict.""" # Statement ID stmt_id = self._find_text(stmt_el, ns, 'Id') or '' # Creation date/time creation_dt = self._find_text(stmt_el, ns, 'CreDtTm') stmt_date = self._parse_camt_datetime(creation_dt) # Account identification acct_el = self._find(stmt_el, ns, 'Acct') account_number = '' currency_code = None if acct_el is not None: # Try IBAN first, then generic Id/Othr/Id iban = self._find_text(acct_el, ns, 'Id/IBAN') if iban: account_number = iban else: account_number = self._find_text(acct_el, ns, 'Id/Othr/Id') or '' # Currency from Ccy element or attribute ccy_text = self._find_text(acct_el, ns, 'Ccy') if ccy_text: currency_code = ccy_text.upper() # Balances — look for OPBD (opening booked) and CLBD (closing booked) balance_start = 0.0 balance_end = 0.0 for bal_el in self._findall(stmt_el, ns, 'Bal'): bal_type_el = self._find(bal_el, ns, 'Tp/CdOrPrtry/Cd') bal_code = bal_type_el.text.strip().upper() if (bal_type_el is not None and bal_type_el.text) else '' amt_el = self._find(bal_el, ns, 'Amt') amt_val = 0.0 if amt_el is not None and amt_el.text: amt_val = self._safe_float(amt_el.text) # Also capture currency from balance if not yet known if not currency_code: currency_code = (amt_el.get('Ccy') or '').upper() or None # Credit/Debit indicator cdi = self._find_text(bal_el, ns, 'CdtDbtInd') if cdi and cdi.upper() == 'DBIT': amt_val = -amt_val if bal_code in ('OPBD', 'PRCD'): # Opening booked / previous closing (used as opening) balance_start = amt_val elif bal_code in ('CLBD', 'CLAV'): # Closing booked / closing available balance_end = amt_val # Also capture statement date from closing balance if missing if bal_code in ('CLBD',) and not stmt_date: dt_text = self._find_text(bal_el, ns, 'Dt/Dt') if dt_text: stmt_date = self._parse_camt_date(dt_text) # Transactions — Ntry elements transactions = [] for ntry_el in self._findall(stmt_el, ns, 'Ntry'): txn_list = self._extract_entry(ntry_el, ns, stmt_id, account_number) transactions.extend(txn_list) stmt_name = stmt_id or f"CAMT {account_number}" if stmt_date: stmt_name += f" {stmt_date.strftime('%Y-%m-%d')}" return { 'name': stmt_name, 'date': stmt_date, 'balance_start': balance_start, 'balance_end_real': balance_end, 'currency_code': currency_code, 'account_number': account_number, 'transactions': transactions, } # ------------------------------------------------------------------- # Entry / transaction extraction # ------------------------------------------------------------------- def _extract_entry(self, ntry_el, ns, stmt_id, acct_number): """Extract transactions from a single ```` element. An entry may contain one or more ``/`` detail blocks. If no detail blocks exist, we create a single transaction from the entry-level data. """ # Entry-level fields entry_amt = self._safe_float( self._find_text(ntry_el, ns, 'Amt') or '0' ) entry_cdi = self._find_text(ntry_el, ns, 'CdtDbtInd') or '' if entry_cdi.upper() == 'DBIT': entry_amt = -abs(entry_amt) else: entry_amt = abs(entry_amt) # Reversal indicator rvsl = self._find_text(ntry_el, ns, 'RvslInd') if rvsl and rvsl.upper() in ('TRUE', 'Y', '1'): entry_amt = -entry_amt booking_date = self._parse_camt_date( self._find_text(ntry_el, ns, 'BookgDt/Dt') ) if not booking_date: booking_date = self._parse_camt_datetime( self._find_text(ntry_el, ns, 'BookgDt/DtTm') ) value_date = self._parse_camt_date( self._find_text(ntry_el, ns, 'ValDt/Dt') ) entry_ref = self._find_text(ntry_el, ns, 'NtryRef') or '' entry_addl_info = self._find_text(ntry_el, ns, 'AddtlNtryInf') or '' # Check for detail-level transactions tx_details = [] for ntry_dtls in self._findall(ntry_el, ns, 'NtryDtls'): for tx_dtls in self._findall(ntry_dtls, ns, 'TxDtls'): tx_details.append(tx_dtls) if not tx_details: # No detail blocks — create transaction from entry-level data description = entry_addl_info or entry_ref or '/' unique_id = self._make_unique_id( stmt_id, acct_number, entry_ref, booking_date, entry_amt, description, ) return [{ 'date': booking_date or value_date, 'payment_ref': description, 'ref': entry_ref, 'amount': entry_amt, 'unique_import_id': unique_id, }] # Process each detail block transactions = [] for idx, tx_dtls in enumerate(tx_details): txn = self._extract_tx_details( tx_dtls, ns, stmt_id, acct_number, entry_amt, entry_cdi, booking_date, value_date, entry_ref, entry_addl_info, idx, ) if txn: transactions.append(txn) return transactions def _extract_tx_details( self, tx_dtls, ns, stmt_id, acct_number, entry_amt, entry_cdi, booking_date, value_date, entry_ref, entry_addl_info, detail_idx, ): """Extract a single transaction from a ```` element.""" # Amount — detail may override entry amount detail_amt_text = self._find_text(tx_dtls, ns, 'Amt') if detail_amt_text: amount = self._safe_float(detail_amt_text) cdi = self._find_text(tx_dtls, ns, 'CdtDbtInd') or entry_cdi if cdi.upper() == 'DBIT': amount = -abs(amount) else: amount = abs(amount) else: amount = entry_amt # References refs = self._find(tx_dtls, ns, 'Refs') end_to_end_id = '' instruction_id = '' msg_id = '' if refs is not None: end_to_end_id = self._find_text(refs, ns, 'EndToEndId') or '' instruction_id = self._find_text(refs, ns, 'InstrId') or '' msg_id = self._find_text(refs, ns, 'MsgId') or '' # Filter out NOTPROVIDED sentinel values if end_to_end_id.upper() in ('NOTPROVIDED', 'NOTAVAILABLE', 'NONE'): end_to_end_id = '' if instruction_id.upper() in ('NOTPROVIDED', 'NOTAVAILABLE', 'NONE'): instruction_id = '' ref = end_to_end_id or instruction_id or msg_id or entry_ref # Remittance information (unstructured) remittance_info = '' rmt_inf = self._find(tx_dtls, ns, 'RmtInf') if rmt_inf is not None: ustrd_parts = [] for ustrd in self._findall(rmt_inf, ns, 'Ustrd'): if ustrd.text and ustrd.text.strip(): ustrd_parts.append(ustrd.text.strip()) remittance_info = ' '.join(ustrd_parts) # Structured remittance: creditor reference if not remittance_info and rmt_inf is not None: cred_ref = self._find_text(rmt_inf, ns, 'Strd/CdtrRefInf/Ref') if cred_ref: remittance_info = cred_ref # Additional transaction info addl_tx_info = self._find_text(tx_dtls, ns, 'AddtlTxInf') or '' # Build description from all available text fields desc_parts = [p for p in [remittance_info, addl_tx_info, entry_addl_info] if p] description = ' | '.join(desc_parts) if desc_parts else ref or '/' # Debtor / Creditor information partner_name = '' partner_account = '' # For credits (incoming), the relevant party is the debtor # For debits (outgoing), the relevant party is the creditor for party_tag in ('DbtrAcct', 'CdtrAcct'): iban = self._find_text(tx_dtls, ns, f'RltdPties/{party_tag}/Id/IBAN') if iban: partner_account = iban break other_id = self._find_text(tx_dtls, ns, f'RltdPties/{party_tag}/Id/Othr/Id') if other_id: partner_account = other_id break for name_tag in ('Dbtr/Nm', 'Cdtr/Nm'): nm = self._find_text(tx_dtls, ns, f'RltdPties/{name_tag}') if nm: partner_name = nm break # Unique ID unique_id = self._make_unique_id( stmt_id, acct_number, ref, booking_date, amount, f"{description}-{detail_idx}", ) txn = { 'date': booking_date or value_date, 'payment_ref': description, 'ref': ref, 'amount': amount, 'unique_import_id': unique_id, } if partner_name: txn['partner_name'] = partner_name if partner_account: txn['account_number'] = partner_account return txn # ------------------------------------------------------------------- # Unique-ID generation # ------------------------------------------------------------------- @staticmethod def _make_unique_id(stmt_id, acct_number, ref, date, amount, extra=''): """Generate a deterministic unique import ID from available data.""" parts = [ 'CAMT', stmt_id or '', acct_number or '', ref or '', date.isoformat() if date else '', str(amount), ] if extra: parts.append(extra) return '-'.join(p for p in parts if p) # ------------------------------------------------------------------- # Date helpers # ------------------------------------------------------------------- @staticmethod def _parse_camt_date(date_str): """Parse an ISO 8601 date (``YYYY-MM-DD``) to ``datetime.date``.""" if not date_str: return None try: return datetime.strptime(date_str.strip()[:10], '%Y-%m-%d').date() except ValueError: _log.warning("Unparseable CAMT date: %s", date_str) return None @staticmethod def _parse_camt_datetime(dt_str): """Parse an ISO 8601 datetime to ``datetime.date``.""" if not dt_str: return None # Strip timezone suffix for simple parsing cleaned = dt_str.strip() for fmt in ('%Y-%m-%dT%H:%M:%S', '%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%d', '%Y-%m-%dT%H:%M:%S%z'): try: return datetime.strptime(cleaned[:19], fmt[:len(fmt)]).date() except ValueError: continue _log.warning("Unparseable CAMT datetime: %s", dt_str) return None # ------------------------------------------------------------------- # Numeric helper # ------------------------------------------------------------------- @staticmethod def _safe_float(value): """Convert *value* to float, returning 0.0 on failure.""" if not value: return 0.0 try: return float(value.strip().replace(',', '.')) except (ValueError, AttributeError): return 0.0 class FusionJournalCAMTImport(models.Model): """Register CAMT.053 as an available bank-statement import format and implement the parser hook on ``account.journal``.""" _inherit = 'account.journal' # ---- Format Registration ---- def _get_bank_statements_available_import_formats(self): """Append CAMT.053 to the list of importable formats.""" formats = super()._get_bank_statements_available_import_formats() formats.append('CAMT.053') return formats # ---- Parser Hook ---- def _parse_bank_statement_file(self, attachment): """Attempt to parse *attachment* as CAMT.053. Falls through to ``super()`` when the file is not recognised as CAMT.""" raw_data = attachment.raw if not self._is_camt_file(raw_data): return super()._parse_bank_statement_file(attachment) parser = FusionCAMTParser() try: statements = parser.parse_camt(raw_data) except UserError: raise except Exception as exc: _log.exception("CAMT.053 parsing error") raise UserError( _("Could not parse the CAMT.053 file: %s", str(exc)) ) from exc # Extract currency and account from the first statement currency_code = None account_number = None if statements: currency_code = statements[0].get('currency_code') account_number = statements[0].get('account_number') return currency_code, account_number, statements # ---- Detection ---- @staticmethod def _is_camt_file(raw_data): """Heuristic check: does *raw_data* look like a CAMT.053 file?""" try: text = raw_data.decode('utf-8-sig', errors='ignore')[:4096] except (UnicodeDecodeError, AttributeError): text = str(raw_data)[:4096] # Look for the CAMT namespace URI if 'camt.053' in text.lower(): return True # Also accept documents with BkToCstmrStmt element (in case the # namespace URI uses a different casing or custom prefix) if 'BkToCstmrStmt' in text: return True return False