Files
Odoo-Modules/Fusion Accounting/models/bank_statement_import_ofx.py
2026-02-22 01:22:18 -05:00

459 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Fusion Accounting - OFX Bank Statement Parser
# Original implementation for Open Financial Exchange v1 (SGML) and v2 (XML)
# Based on the published OFX specification (https://www.ofx.net/spec)
import logging
import re
from datetime import datetime
from xml.etree import ElementTree
from odoo import _, models
from odoo.exceptions import UserError
_log = logging.getLogger(__name__)
class FusionOFXParser:
"""Standalone parser for OFX (Open Financial Exchange) files.
Supports both OFX v1 (SGML-like markup without closing tags) and
OFX v2 (well-formed XML). The parser normalises either dialect into
a common intermediate structure before extracting statement data.
This is an **original** implementation written from the published
OFX 1.6 / 2.2 specification — it is not derived from Odoo Enterprise.
"""
# OFX date format: YYYYMMDDHHMMSS[.XXX[:TZ]] — timezone and fractional
# seconds are optional; many banks only emit YYYYMMDD.
_OFX_DATE_RE = re.compile(
r'^(\d{4})(\d{2})(\d{2})' # YYYYMMDD (required)
r'(?:(\d{2})(\d{2})(\d{2}))?' # HHMMSS (optional)
r'(?:\.\d+)?' # .XXX (optional fractional)
r'(?:\[.*\])?$' # [:TZ] (optional timezone)
)
# SGML self-closing tags used in OFX v1 (no closing tag counterpart).
# These contain scalar data directly after the tag.
_SGML_LEAF_TAGS = {
'TRNTYPE', 'DTPOSTED', 'DTUSER', 'DTSTART', 'DTEND',
'TRNAMT', 'FITID', 'CHECKNUM', 'REFNUM', 'NAME', 'MEMO',
'PAYEEID', 'ACCTID', 'BANKID', 'BRANCHID', 'ACCTTYPE',
'BALAMT', 'DTASOF', 'CURDEF', 'SEVERITY', 'CODE', 'MESSAGE',
'SIC', 'PAYEEID', 'CORRECTFITID', 'CORRECTACTION',
'SRVRTID', 'CLRTID',
}
# -------------------------------------------------------------------
# Public API
# -------------------------------------------------------------------
def parse_ofx(self, data_file):
"""Parse an OFX file (bytes or str) and return a list of statement
dicts compatible with the Fusion Accounting import pipeline.
Each dict has the keys:
- ``name`` : statement identifier
- ``date`` : closing date (datetime.date)
- ``balance_start`` : opening balance (float)
- ``balance_end_real``: closing balance (float)
- ``currency_code`` : ISO 4217 currency code
- ``account_number`` : bank account number
- ``transactions`` : list of transaction dicts
Transaction dicts contain:
- ``date`` : posting date (datetime.date)
- ``payment_ref`` : description / memo
- ``ref`` : FITID or reference number
- ``amount`` : signed float (negative = debit)
- ``unique_import_id`` : unique per-transaction identifier
- ``transaction_type`` : OFX TRNTYPE value
"""
raw = self._to_text(data_file)
# Determine OFX dialect and obtain an ElementTree root
if self._is_ofx_v2(raw):
root = self._parse_xml(raw)
else:
root = self._parse_sgml(raw)
return self._extract_statements(root)
# -------------------------------------------------------------------
# Input normalisation
# -------------------------------------------------------------------
@staticmethod
def _to_text(data_file):
"""Ensure *data_file* is a string, decoding bytes if necessary."""
if isinstance(data_file, bytes):
# Try UTF-8 first; fall back to Latin-1 (lossless for any byte)
for encoding in ('utf-8-sig', 'utf-8', 'latin-1'):
try:
return data_file.decode(encoding)
except UnicodeDecodeError:
continue
return data_file
@staticmethod
def _is_ofx_v2(text):
"""Return True when *text* looks like OFX v2 (XML) rather than
SGML-based v1. OFX v2 begins with an XML processing instruction
or a ``<?OFX …?>`` header."""
stripped = text.lstrip()
return stripped.startswith('<?xml') or stripped.startswith('<?OFX')
# -------------------------------------------------------------------
# OFX v2 (XML) parser
# -------------------------------------------------------------------
def _parse_xml(self, text):
"""Parse well-formed OFX v2 XML and return the root Element."""
try:
return ElementTree.fromstring(text.encode('utf-8'))
except ElementTree.ParseError as exc:
raise UserError(
_("Failed to parse OFX XML file: %s", str(exc))
) from exc
# -------------------------------------------------------------------
# OFX v1 (SGML) parser — convert to XML then parse
# -------------------------------------------------------------------
def _parse_sgml(self, text):
"""Convert an OFX v1 SGML document into well-formed XML and
return the root Element.
The SGML dialect used by OFX v1 has two kinds of tags:
* **Aggregate** tags like ``<STMTTRNRS>`` which contain child
elements and always have a matching ``</STMTTRNRS>``.
* **Leaf** (data) tags like ``<TRNAMT>-42.50`` which carry a
scalar value and are never explicitly closed.
The conversion strategy inserts explicit close tags for every
leaf element so that the result is valid XML.
"""
# Strip the SGML headers (everything before the first ``<OFX>``).
ofx_idx = text.upper().find('<OFX>')
if ofx_idx == -1:
raise UserError(_("The file does not contain a valid OFX document."))
body = text[ofx_idx:]
# Normalise whitespace inside tags: collapse runs of whitespace
# between ``>`` and ``<`` but preserve data values.
lines = body.splitlines()
xml_lines = []
for line in lines:
stripped = line.strip()
if not stripped:
continue
xml_lines.append(stripped)
joined = '\n'.join(xml_lines)
# Insert closing tags for leaf elements.
# A leaf tag looks like ``<TAGNAME>value`` (no ``</TAGNAME>`` follows).
def _close_leaf_tags(sgml_text):
"""Insert ``</TAG>`` after each leaf tag's data value."""
result = []
tag_re = re.compile(r'<(/?)(\w+)>(.*)', re.DOTALL)
for raw_line in sgml_text.split('\n'):
raw_line = raw_line.strip()
if not raw_line:
continue
m = tag_re.match(raw_line)
if m:
is_close = m.group(1) == '/'
tag_name = m.group(2).upper()
rest = m.group(3).strip()
if is_close:
result.append(f'</{tag_name}>')
elif tag_name in self._SGML_LEAF_TAGS:
# Leaf element: value sits between open and (missing) close tag
data_val = rest.split('<')[0].strip() if '<' in rest else rest
result.append(f'<{tag_name}>{self._xml_escape(data_val)}</{tag_name}>')
# If the rest of the line has another tag, process it
if '<' in rest:
leftover = rest[rest.index('<'):]
for extra in _close_leaf_tags(leftover).split('\n'):
if extra.strip():
result.append(extra.strip())
else:
# Aggregate (container) tag — keep as-is
result.append(f'<{tag_name}>')
if rest:
for extra in _close_leaf_tags(rest).split('\n'):
if extra.strip():
result.append(extra.strip())
else:
result.append(raw_line)
return '\n'.join(result)
xml_text = _close_leaf_tags(joined)
try:
return ElementTree.fromstring(xml_text.encode('utf-8'))
except ElementTree.ParseError as exc:
_log.debug("SGML→XML conversion result:\n%s", xml_text[:2000])
raise UserError(
_("Failed to parse OFX v1 (SGML) file. The file may be "
"corrupt or in an unsupported dialect: %s", str(exc))
) from exc
@staticmethod
def _xml_escape(text):
"""Escape XML-special characters in *text*."""
return (
text.replace('&', '&amp;')
.replace('<', '&lt;')
.replace('>', '&gt;')
.replace('"', '&quot;')
.replace("'", '&apos;')
)
# -------------------------------------------------------------------
# Data extraction
# -------------------------------------------------------------------
def _extract_statements(self, root):
"""Walk the parsed OFX element tree and collect statement data.
Supports ``BANKMSGSRSV1`` (bank accounts) and ``CCMSGSRSV1``
(credit-card accounts).
"""
statements = []
# Locate all statement response containers
for tag_suffix, acct_tag in [
('BANKMSGSRSV1', 'BANKACCTFROM'),
('CCMSGSRSV1', 'CCACCTFROM'),
]:
for stmtrs in self._find_all(root, 'STMTRS') + self._find_all(root, 'CCSTMTRS'):
stmt = self._extract_single_statement(stmtrs, acct_tag)
if stmt:
statements.append(stmt)
if not statements:
raise UserError(
_("No bank or credit-card statements found in the OFX file.")
)
return statements
def _extract_single_statement(self, stmtrs, acct_tag):
"""Extract one statement from a ``<STMTRS>`` or ``<CCSTMTRS>``
element."""
# Currency
currency = self._find_text(stmtrs, 'CURDEF') or ''
# Account number
acct_elem = self._find_first(stmtrs, acct_tag)
if acct_elem is None:
acct_elem = self._find_first(stmtrs, 'BANKACCTFROM')
if acct_elem is None:
acct_elem = self._find_first(stmtrs, 'CCACCTFROM')
acct_number = ''
if acct_elem is not None:
acct_number = self._find_text(acct_elem, 'ACCTID') or ''
# Transaction list
txn_list_el = self._find_first(stmtrs, 'BANKTRANLIST')
if txn_list_el is None:
txn_list_el = stmtrs # CCSTMTRS may put transactions directly inside
start_date = self._parse_ofx_date(self._find_text(txn_list_el, 'DTSTART'))
end_date = self._parse_ofx_date(self._find_text(txn_list_el, 'DTEND'))
transactions = []
for stmttrn in self._find_all(txn_list_el, 'STMTTRN'):
txn = self._extract_transaction(stmttrn)
if txn:
transactions.append(txn)
# Balances — look for LEDGERBAL and AVAILBAL
balance_start = 0.0
balance_end = 0.0
ledger_bal = self._find_first(stmtrs, 'LEDGERBAL')
if ledger_bal is not None:
balance_end = self._safe_float(self._find_text(ledger_bal, 'BALAMT'))
avail_bal = self._find_first(stmtrs, 'AVAILBAL')
if avail_bal is not None and ledger_bal is None:
balance_end = self._safe_float(self._find_text(avail_bal, 'BALAMT'))
# Derive opening balance: opening = closing sum(transactions)
txn_total = sum(t['amount'] for t in transactions)
balance_start = balance_end - txn_total
stmt_date = end_date or (start_date if start_date else None)
stmt_name = f"OFX {acct_number}" if acct_number else "OFX Import"
if stmt_date:
stmt_name += f" {stmt_date.strftime('%Y-%m-%d')}"
return {
'name': stmt_name,
'date': stmt_date,
'balance_start': balance_start,
'balance_end_real': balance_end,
'currency_code': currency.upper() if currency else None,
'account_number': acct_number,
'transactions': transactions,
}
def _extract_transaction(self, stmttrn):
"""Extract a single transaction from a ``<STMTTRN>`` element."""
trntype = self._find_text(stmttrn, 'TRNTYPE') or ''
dt_posted = self._parse_ofx_date(self._find_text(stmttrn, 'DTPOSTED'))
dt_user = self._parse_ofx_date(self._find_text(stmttrn, 'DTUSER'))
amount = self._safe_float(self._find_text(stmttrn, 'TRNAMT'))
fitid = self._find_text(stmttrn, 'FITID') or ''
checknum = self._find_text(stmttrn, 'CHECKNUM') or ''
refnum = self._find_text(stmttrn, 'REFNUM') or ''
name = self._find_text(stmttrn, 'NAME') or ''
memo = self._find_text(stmttrn, 'MEMO') or ''
# Build description: prefer NAME, append MEMO if different
description = name
if memo and memo != name:
description = f"{name} - {memo}" if name else memo
# Build reference: FITID is the primary unique ID; CHECKNUM or REFNUM
# serve as human-readable reference
ref = checknum or refnum or fitid
unique_id = fitid
return {
'date': dt_user or dt_posted,
'payment_ref': description or ref or '/',
'ref': ref,
'amount': amount,
'unique_import_id': unique_id,
'transaction_type': trntype,
}
# -------------------------------------------------------------------
# Element-tree helpers (case-insensitive tag search)
# -------------------------------------------------------------------
@staticmethod
def _find_all(parent, tag):
"""Find all descendant elements whose tag matches *tag*
(case-insensitive)."""
tag_upper = tag.upper()
return [el for el in parent.iter() if el.tag.upper() == tag_upper]
@staticmethod
def _find_first(parent, tag):
"""Return the first descendant matching *tag* (case-insensitive)
or ``None``."""
tag_upper = tag.upper()
for el in parent.iter():
if el.tag.upper() == tag_upper:
return el
return None
@classmethod
def _find_text(cls, parent, tag):
"""Return stripped text content of the first descendant matching
*tag*, or ``None``."""
el = cls._find_first(parent, tag)
if el is not None and el.text:
return el.text.strip()
return None
# -------------------------------------------------------------------
# Date / numeric helpers
# -------------------------------------------------------------------
@classmethod
def _parse_ofx_date(cls, date_str):
"""Parse an OFX date string (``YYYYMMDD…``) into a Python date."""
if not date_str:
return None
m = cls._OFX_DATE_RE.match(date_str.strip())
if not m:
# Fallback: try basic YYYYMMDD
try:
return datetime.strptime(date_str.strip()[:8], '%Y%m%d').date()
except (ValueError, IndexError):
_log.warning("Unparseable OFX date: %s", date_str)
return None
year, month, day = int(m.group(1)), int(m.group(2)), int(m.group(3))
try:
return datetime(year, month, day).date()
except ValueError:
_log.warning("Invalid OFX date components: %s", date_str)
return None
@staticmethod
def _safe_float(value):
"""Convert *value* to float, returning 0.0 for empty / invalid."""
if not value:
return 0.0
try:
return float(value.replace(',', '.'))
except (ValueError, AttributeError):
return 0.0
class FusionJournalOFXImport(models.Model):
"""Register OFX as an available bank-statement import format and
implement the parser hook on ``account.journal``."""
_inherit = 'account.journal'
# ---- Format Registration ----
def _get_bank_statements_available_import_formats(self):
"""Append OFX to the list of importable formats."""
formats = super()._get_bank_statements_available_import_formats()
formats.append('OFX')
return formats
# ---- Parser Hook ----
def _parse_bank_statement_file(self, attachment):
"""Attempt to parse *attachment* as OFX. Falls through to
``super()`` when the file is not recognised as OFX."""
raw_data = attachment.raw
if not self._is_ofx_file(raw_data):
return super()._parse_bank_statement_file(attachment)
parser = FusionOFXParser()
try:
statements = parser.parse_ofx(raw_data)
except UserError:
raise
except Exception as exc:
_log.exception("OFX parsing error")
raise UserError(
_("Could not parse the OFX file: %s", str(exc))
) from exc
# The import pipeline expects (currency_code, account_number, stmts)
currency_code = None
account_number = None
if statements:
currency_code = statements[0].get('currency_code')
account_number = statements[0].get('account_number')
return currency_code, account_number, statements
# ---- Detection ----
@staticmethod
def _is_ofx_file(raw_data):
"""Heuristic check: does *raw_data* look like an OFX file?"""
try:
text = raw_data.decode('utf-8-sig', errors='ignore')[:4096]
except (UnicodeDecodeError, AttributeError):
text = str(raw_data)[:4096]
text_upper = text.upper()
# OFX v2 (XML)
if '<?OFX' in text_upper or '<OFX>' in text_upper:
return True
# OFX v1 (SGML header markers)
if 'OFXHEADER:' in text_upper:
return True
return False