541 lines
20 KiB
Python
541 lines
20 KiB
Python
# Fusion Accounting - CAMT.053 Bank Statement Parser
|
|
# Original implementation for ISO 20022 camt.053 bank-to-customer statement
|
|
# Based on the published ISO 20022 message definitions
|
|
|
|
import logging
|
|
from datetime import datetime
|
|
from xml.etree import ElementTree
|
|
|
|
from odoo import _, models
|
|
from odoo.exceptions import UserError
|
|
|
|
_log = logging.getLogger(__name__)
|
|
|
|
|
|
class FusionCAMTParser:
|
|
"""Standalone parser for ISO 20022 CAMT.053 XML bank statements.
|
|
|
|
CAMT.053 (Bank-to-Customer Statement) is the international standard
|
|
for electronic bank statements. This parser supports the following
|
|
schema versions:
|
|
|
|
* ``camt.053.001.02`` — original version
|
|
* ``camt.053.001.03`` through ``camt.053.001.08`` — subsequent
|
|
revisions (structurally compatible for the fields we consume)
|
|
|
|
The parser auto-detects the XML namespace from the document root.
|
|
|
|
This is an **original** implementation written from the published
|
|
ISO 20022 message definitions — it is not derived from Odoo Enterprise.
|
|
"""
|
|
|
|
# Namespace prefixes we recognise (base URI without version suffix)
|
|
_CAMT_NS_BASE = 'urn:iso:std:iso:20022:tech:xsd:camt.053.001.'
|
|
|
|
# -------------------------------------------------------------------
|
|
# Public API
|
|
# -------------------------------------------------------------------
|
|
|
|
def parse_camt(self, data_file):
|
|
"""Parse a CAMT.053 XML file and return a list of statement dicts
|
|
compatible with the Fusion Accounting import pipeline.
|
|
|
|
Each dict has the keys:
|
|
- ``name`` : statement identification (from ``<Id>``)
|
|
- ``date`` : creation date
|
|
- ``balance_start`` : opening booked balance
|
|
- ``balance_end_real``: closing booked balance
|
|
- ``currency_code`` : ISO 4217 currency
|
|
- ``account_number`` : IBAN or other account identifier
|
|
- ``transactions`` : list of transaction dicts
|
|
|
|
Transaction dicts contain:
|
|
- ``date`` : booking date
|
|
- ``payment_ref`` : combined reference / remittance info
|
|
- ``ref`` : end-to-end reference or instruction id
|
|
- ``amount`` : signed float (negative for debits)
|
|
- ``unique_import_id`` : generated unique key
|
|
- ``partner_name`` : debtor or creditor name
|
|
- ``account_number`` : debtor/creditor IBAN
|
|
"""
|
|
raw_xml = self._to_bytes(data_file)
|
|
root = self._parse_xml(raw_xml)
|
|
ns = self._detect_namespace(root)
|
|
return self._extract_statements(root, ns)
|
|
|
|
# -------------------------------------------------------------------
|
|
# Input handling
|
|
# -------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _to_bytes(data_file):
|
|
"""Ensure *data_file* is bytes for XML parsing."""
|
|
if isinstance(data_file, str):
|
|
return data_file.encode('utf-8')
|
|
return data_file
|
|
|
|
@staticmethod
|
|
def _parse_xml(raw_xml):
|
|
"""Parse raw XML bytes and return the root Element."""
|
|
try:
|
|
return ElementTree.fromstring(raw_xml)
|
|
except ElementTree.ParseError as exc:
|
|
raise UserError(
|
|
_("Failed to parse CAMT.053 XML: %s", str(exc))
|
|
) from exc
|
|
|
|
def _detect_namespace(self, root):
|
|
"""Auto-detect the CAMT.053 namespace from the document root.
|
|
|
|
Returns a dict ``{'ns': 'urn:...'}`` suitable for passing to
|
|
``Element.find()`` / ``Element.findall()``."""
|
|
tag = root.tag
|
|
if '}' in tag:
|
|
ns_uri = tag.split('}')[0].lstrip('{')
|
|
else:
|
|
ns_uri = ''
|
|
|
|
if ns_uri and not ns_uri.startswith(self._CAMT_NS_BASE):
|
|
_log.warning(
|
|
"Unexpected CAMT namespace: %s (expected %s*)",
|
|
ns_uri, self._CAMT_NS_BASE,
|
|
)
|
|
|
|
return {'ns': ns_uri} if ns_uri else {}
|
|
|
|
# -------------------------------------------------------------------
|
|
# Convenience helpers for namespaced tag access
|
|
# -------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _tag(ns_map, local_name):
|
|
"""Build a namespaced tag string for ElementTree lookups."""
|
|
ns = ns_map.get('ns', '')
|
|
if ns:
|
|
return f'{{{ns}}}{local_name}'
|
|
return local_name
|
|
|
|
def _find(self, parent, ns, path):
|
|
"""Find the first child element matching a ``/``-separated
|
|
*path* of local tag names."""
|
|
current = parent
|
|
for part in path.split('/'):
|
|
if current is None:
|
|
return None
|
|
current = current.find(self._tag(ns, part))
|
|
return current
|
|
|
|
def _find_text(self, parent, ns, path):
|
|
"""Return the stripped text of the element at *path*, or ``None``."""
|
|
el = self._find(parent, ns, path)
|
|
if el is not None and el.text:
|
|
return el.text.strip()
|
|
return None
|
|
|
|
def _findall(self, parent, ns, local_name):
|
|
"""Return all direct children matching *local_name*."""
|
|
return parent.findall(self._tag(ns, local_name))
|
|
|
|
def _iter(self, parent, ns, local_name):
|
|
"""Iterate over all descendant elements matching *local_name*."""
|
|
return parent.iter(self._tag(ns, local_name))
|
|
|
|
# -------------------------------------------------------------------
|
|
# Statement-level extraction
|
|
# -------------------------------------------------------------------
|
|
|
|
def _extract_statements(self, root, ns):
|
|
"""Extract all ``<Stmt>`` elements from the document."""
|
|
statements = []
|
|
|
|
# CAMT.053 structure: Document > BkToCstmrStmt > Stmt (repeating)
|
|
for stmt_el in self._iter(root, ns, 'Stmt'):
|
|
stmt = self._extract_single_statement(stmt_el, ns)
|
|
if stmt:
|
|
statements.append(stmt)
|
|
|
|
if not statements:
|
|
raise UserError(
|
|
_("No statements found in the CAMT.053 file.")
|
|
)
|
|
return statements
|
|
|
|
def _extract_single_statement(self, stmt_el, ns):
|
|
"""Extract one ``<Stmt>`` element into a statement dict."""
|
|
# Statement ID
|
|
stmt_id = self._find_text(stmt_el, ns, 'Id') or ''
|
|
|
|
# Creation date/time
|
|
creation_dt = self._find_text(stmt_el, ns, 'CreDtTm')
|
|
stmt_date = self._parse_camt_datetime(creation_dt)
|
|
|
|
# Account identification
|
|
acct_el = self._find(stmt_el, ns, 'Acct')
|
|
account_number = ''
|
|
currency_code = None
|
|
|
|
if acct_el is not None:
|
|
# Try IBAN first, then generic Id/Othr/Id
|
|
iban = self._find_text(acct_el, ns, 'Id/IBAN')
|
|
if iban:
|
|
account_number = iban
|
|
else:
|
|
account_number = self._find_text(acct_el, ns, 'Id/Othr/Id') or ''
|
|
|
|
# Currency from Ccy element or attribute
|
|
ccy_text = self._find_text(acct_el, ns, 'Ccy')
|
|
if ccy_text:
|
|
currency_code = ccy_text.upper()
|
|
|
|
# Balances — look for OPBD (opening booked) and CLBD (closing booked)
|
|
balance_start = 0.0
|
|
balance_end = 0.0
|
|
|
|
for bal_el in self._findall(stmt_el, ns, 'Bal'):
|
|
bal_type_el = self._find(bal_el, ns, 'Tp/CdOrPrtry/Cd')
|
|
bal_code = bal_type_el.text.strip().upper() if (bal_type_el is not None and bal_type_el.text) else ''
|
|
|
|
amt_el = self._find(bal_el, ns, 'Amt')
|
|
amt_val = 0.0
|
|
if amt_el is not None and amt_el.text:
|
|
amt_val = self._safe_float(amt_el.text)
|
|
# Also capture currency from balance if not yet known
|
|
if not currency_code:
|
|
currency_code = (amt_el.get('Ccy') or '').upper() or None
|
|
|
|
# Credit/Debit indicator
|
|
cdi = self._find_text(bal_el, ns, 'CdtDbtInd')
|
|
if cdi and cdi.upper() == 'DBIT':
|
|
amt_val = -amt_val
|
|
|
|
if bal_code in ('OPBD', 'PRCD'):
|
|
# Opening booked / previous closing (used as opening)
|
|
balance_start = amt_val
|
|
elif bal_code in ('CLBD', 'CLAV'):
|
|
# Closing booked / closing available
|
|
balance_end = amt_val
|
|
|
|
# Also capture statement date from closing balance if missing
|
|
if bal_code in ('CLBD',) and not stmt_date:
|
|
dt_text = self._find_text(bal_el, ns, 'Dt/Dt')
|
|
if dt_text:
|
|
stmt_date = self._parse_camt_date(dt_text)
|
|
|
|
# Transactions — Ntry elements
|
|
transactions = []
|
|
for ntry_el in self._findall(stmt_el, ns, 'Ntry'):
|
|
txn_list = self._extract_entry(ntry_el, ns, stmt_id, account_number)
|
|
transactions.extend(txn_list)
|
|
|
|
stmt_name = stmt_id or f"CAMT {account_number}"
|
|
if stmt_date:
|
|
stmt_name += f" {stmt_date.strftime('%Y-%m-%d')}"
|
|
|
|
return {
|
|
'name': stmt_name,
|
|
'date': stmt_date,
|
|
'balance_start': balance_start,
|
|
'balance_end_real': balance_end,
|
|
'currency_code': currency_code,
|
|
'account_number': account_number,
|
|
'transactions': transactions,
|
|
}
|
|
|
|
# -------------------------------------------------------------------
|
|
# Entry / transaction extraction
|
|
# -------------------------------------------------------------------
|
|
|
|
def _extract_entry(self, ntry_el, ns, stmt_id, acct_number):
|
|
"""Extract transactions from a single ``<Ntry>`` element.
|
|
|
|
An entry may contain one or more ``<NtryDtls>/<TxDtls>`` detail
|
|
blocks. If no detail blocks exist, we create a single transaction
|
|
from the entry-level data.
|
|
"""
|
|
# Entry-level fields
|
|
entry_amt = self._safe_float(
|
|
self._find_text(ntry_el, ns, 'Amt') or '0'
|
|
)
|
|
entry_cdi = self._find_text(ntry_el, ns, 'CdtDbtInd') or ''
|
|
if entry_cdi.upper() == 'DBIT':
|
|
entry_amt = -abs(entry_amt)
|
|
else:
|
|
entry_amt = abs(entry_amt)
|
|
|
|
# Reversal indicator
|
|
rvsl = self._find_text(ntry_el, ns, 'RvslInd')
|
|
if rvsl and rvsl.upper() in ('TRUE', 'Y', '1'):
|
|
entry_amt = -entry_amt
|
|
|
|
booking_date = self._parse_camt_date(
|
|
self._find_text(ntry_el, ns, 'BookgDt/Dt')
|
|
)
|
|
if not booking_date:
|
|
booking_date = self._parse_camt_datetime(
|
|
self._find_text(ntry_el, ns, 'BookgDt/DtTm')
|
|
)
|
|
value_date = self._parse_camt_date(
|
|
self._find_text(ntry_el, ns, 'ValDt/Dt')
|
|
)
|
|
|
|
entry_ref = self._find_text(ntry_el, ns, 'NtryRef') or ''
|
|
entry_addl_info = self._find_text(ntry_el, ns, 'AddtlNtryInf') or ''
|
|
|
|
# Check for detail-level transactions
|
|
tx_details = []
|
|
for ntry_dtls in self._findall(ntry_el, ns, 'NtryDtls'):
|
|
for tx_dtls in self._findall(ntry_dtls, ns, 'TxDtls'):
|
|
tx_details.append(tx_dtls)
|
|
|
|
if not tx_details:
|
|
# No detail blocks — create transaction from entry-level data
|
|
description = entry_addl_info or entry_ref or '/'
|
|
unique_id = self._make_unique_id(
|
|
stmt_id, acct_number, entry_ref,
|
|
booking_date, entry_amt, description,
|
|
)
|
|
return [{
|
|
'date': booking_date or value_date,
|
|
'payment_ref': description,
|
|
'ref': entry_ref,
|
|
'amount': entry_amt,
|
|
'unique_import_id': unique_id,
|
|
}]
|
|
|
|
# Process each detail block
|
|
transactions = []
|
|
for idx, tx_dtls in enumerate(tx_details):
|
|
txn = self._extract_tx_details(
|
|
tx_dtls, ns, stmt_id, acct_number,
|
|
entry_amt, entry_cdi, booking_date, value_date,
|
|
entry_ref, entry_addl_info, idx,
|
|
)
|
|
if txn:
|
|
transactions.append(txn)
|
|
|
|
return transactions
|
|
|
|
def _extract_tx_details(
|
|
self, tx_dtls, ns, stmt_id, acct_number,
|
|
entry_amt, entry_cdi, booking_date, value_date,
|
|
entry_ref, entry_addl_info, detail_idx,
|
|
):
|
|
"""Extract a single transaction from a ``<TxDtls>`` element."""
|
|
# Amount — detail may override entry amount
|
|
detail_amt_text = self._find_text(tx_dtls, ns, 'Amt')
|
|
if detail_amt_text:
|
|
amount = self._safe_float(detail_amt_text)
|
|
cdi = self._find_text(tx_dtls, ns, 'CdtDbtInd') or entry_cdi
|
|
if cdi.upper() == 'DBIT':
|
|
amount = -abs(amount)
|
|
else:
|
|
amount = abs(amount)
|
|
else:
|
|
amount = entry_amt
|
|
|
|
# References
|
|
refs = self._find(tx_dtls, ns, 'Refs')
|
|
end_to_end_id = ''
|
|
instruction_id = ''
|
|
msg_id = ''
|
|
if refs is not None:
|
|
end_to_end_id = self._find_text(refs, ns, 'EndToEndId') or ''
|
|
instruction_id = self._find_text(refs, ns, 'InstrId') or ''
|
|
msg_id = self._find_text(refs, ns, 'MsgId') or ''
|
|
|
|
# Filter out NOTPROVIDED sentinel values
|
|
if end_to_end_id.upper() in ('NOTPROVIDED', 'NOTAVAILABLE', 'NONE'):
|
|
end_to_end_id = ''
|
|
if instruction_id.upper() in ('NOTPROVIDED', 'NOTAVAILABLE', 'NONE'):
|
|
instruction_id = ''
|
|
|
|
ref = end_to_end_id or instruction_id or msg_id or entry_ref
|
|
|
|
# Remittance information (unstructured)
|
|
remittance_info = ''
|
|
rmt_inf = self._find(tx_dtls, ns, 'RmtInf')
|
|
if rmt_inf is not None:
|
|
ustrd_parts = []
|
|
for ustrd in self._findall(rmt_inf, ns, 'Ustrd'):
|
|
if ustrd.text and ustrd.text.strip():
|
|
ustrd_parts.append(ustrd.text.strip())
|
|
remittance_info = ' '.join(ustrd_parts)
|
|
|
|
# Structured remittance: creditor reference
|
|
if not remittance_info and rmt_inf is not None:
|
|
cred_ref = self._find_text(rmt_inf, ns, 'Strd/CdtrRefInf/Ref')
|
|
if cred_ref:
|
|
remittance_info = cred_ref
|
|
|
|
# Additional transaction info
|
|
addl_tx_info = self._find_text(tx_dtls, ns, 'AddtlTxInf') or ''
|
|
|
|
# Build description from all available text fields
|
|
desc_parts = [p for p in [remittance_info, addl_tx_info, entry_addl_info] if p]
|
|
description = ' | '.join(desc_parts) if desc_parts else ref or '/'
|
|
|
|
# Debtor / Creditor information
|
|
partner_name = ''
|
|
partner_account = ''
|
|
|
|
# For credits (incoming), the relevant party is the debtor
|
|
# For debits (outgoing), the relevant party is the creditor
|
|
for party_tag in ('DbtrAcct', 'CdtrAcct'):
|
|
iban = self._find_text(tx_dtls, ns, f'RltdPties/{party_tag}/Id/IBAN')
|
|
if iban:
|
|
partner_account = iban
|
|
break
|
|
other_id = self._find_text(tx_dtls, ns, f'RltdPties/{party_tag}/Id/Othr/Id')
|
|
if other_id:
|
|
partner_account = other_id
|
|
break
|
|
|
|
for name_tag in ('Dbtr/Nm', 'Cdtr/Nm'):
|
|
nm = self._find_text(tx_dtls, ns, f'RltdPties/{name_tag}')
|
|
if nm:
|
|
partner_name = nm
|
|
break
|
|
|
|
# Unique ID
|
|
unique_id = self._make_unique_id(
|
|
stmt_id, acct_number, ref,
|
|
booking_date, amount, f"{description}-{detail_idx}",
|
|
)
|
|
|
|
txn = {
|
|
'date': booking_date or value_date,
|
|
'payment_ref': description,
|
|
'ref': ref,
|
|
'amount': amount,
|
|
'unique_import_id': unique_id,
|
|
}
|
|
if partner_name:
|
|
txn['partner_name'] = partner_name
|
|
if partner_account:
|
|
txn['account_number'] = partner_account
|
|
return txn
|
|
|
|
# -------------------------------------------------------------------
|
|
# Unique-ID generation
|
|
# -------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _make_unique_id(stmt_id, acct_number, ref, date, amount, extra=''):
|
|
"""Generate a deterministic unique import ID from available data."""
|
|
parts = [
|
|
'CAMT',
|
|
stmt_id or '',
|
|
acct_number or '',
|
|
ref or '',
|
|
date.isoformat() if date else '',
|
|
str(amount),
|
|
]
|
|
if extra:
|
|
parts.append(extra)
|
|
return '-'.join(p for p in parts if p)
|
|
|
|
# -------------------------------------------------------------------
|
|
# Date helpers
|
|
# -------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _parse_camt_date(date_str):
|
|
"""Parse an ISO 8601 date (``YYYY-MM-DD``) to ``datetime.date``."""
|
|
if not date_str:
|
|
return None
|
|
try:
|
|
return datetime.strptime(date_str.strip()[:10], '%Y-%m-%d').date()
|
|
except ValueError:
|
|
_log.warning("Unparseable CAMT date: %s", date_str)
|
|
return None
|
|
|
|
@staticmethod
|
|
def _parse_camt_datetime(dt_str):
|
|
"""Parse an ISO 8601 datetime to ``datetime.date``."""
|
|
if not dt_str:
|
|
return None
|
|
# Strip timezone suffix for simple parsing
|
|
cleaned = dt_str.strip()
|
|
for fmt in ('%Y-%m-%dT%H:%M:%S', '%Y-%m-%dT%H:%M:%S.%f',
|
|
'%Y-%m-%d', '%Y-%m-%dT%H:%M:%S%z'):
|
|
try:
|
|
return datetime.strptime(cleaned[:19], fmt[:len(fmt)]).date()
|
|
except ValueError:
|
|
continue
|
|
_log.warning("Unparseable CAMT datetime: %s", dt_str)
|
|
return None
|
|
|
|
# -------------------------------------------------------------------
|
|
# Numeric helper
|
|
# -------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _safe_float(value):
|
|
"""Convert *value* to float, returning 0.0 on failure."""
|
|
if not value:
|
|
return 0.0
|
|
try:
|
|
return float(value.strip().replace(',', '.'))
|
|
except (ValueError, AttributeError):
|
|
return 0.0
|
|
|
|
|
|
class FusionJournalCAMTImport(models.Model):
|
|
"""Register CAMT.053 as an available bank-statement import format
|
|
and implement the parser hook on ``account.journal``."""
|
|
|
|
_inherit = 'account.journal'
|
|
|
|
# ---- Format Registration ----
|
|
def _get_bank_statements_available_import_formats(self):
|
|
"""Append CAMT.053 to the list of importable formats."""
|
|
formats = super()._get_bank_statements_available_import_formats()
|
|
formats.append('CAMT.053')
|
|
return formats
|
|
|
|
# ---- Parser Hook ----
|
|
def _parse_bank_statement_file(self, attachment):
|
|
"""Attempt to parse *attachment* as CAMT.053. Falls through to
|
|
``super()`` when the file is not recognised as CAMT."""
|
|
raw_data = attachment.raw
|
|
if not self._is_camt_file(raw_data):
|
|
return super()._parse_bank_statement_file(attachment)
|
|
|
|
parser = FusionCAMTParser()
|
|
try:
|
|
statements = parser.parse_camt(raw_data)
|
|
except UserError:
|
|
raise
|
|
except Exception as exc:
|
|
_log.exception("CAMT.053 parsing error")
|
|
raise UserError(
|
|
_("Could not parse the CAMT.053 file: %s", str(exc))
|
|
) from exc
|
|
|
|
# Extract currency and account from the first statement
|
|
currency_code = None
|
|
account_number = None
|
|
if statements:
|
|
currency_code = statements[0].get('currency_code')
|
|
account_number = statements[0].get('account_number')
|
|
|
|
return currency_code, account_number, statements
|
|
|
|
# ---- Detection ----
|
|
@staticmethod
|
|
def _is_camt_file(raw_data):
|
|
"""Heuristic check: does *raw_data* look like a CAMT.053 file?"""
|
|
try:
|
|
text = raw_data.decode('utf-8-sig', errors='ignore')[:4096]
|
|
except (UnicodeDecodeError, AttributeError):
|
|
text = str(raw_data)[:4096]
|
|
|
|
# Look for the CAMT namespace URI
|
|
if 'camt.053' in text.lower():
|
|
return True
|
|
# Also accept documents with BkToCstmrStmt element (in case the
|
|
# namespace URI uses a different casing or custom prefix)
|
|
if 'BkToCstmrStmt' in text:
|
|
return True
|
|
return False
|