# Fusion Accounting - OFX Bank Statement Parser # Original implementation for Open Financial Exchange v1 (SGML) and v2 (XML) # Based on the published OFX specification (https://www.ofx.net/spec) import logging import re from datetime import datetime from xml.etree import ElementTree from odoo import _, models from odoo.exceptions import UserError _log = logging.getLogger(__name__) class FusionOFXParser: """Standalone parser for OFX (Open Financial Exchange) files. Supports both OFX v1 (SGML-like markup without closing tags) and OFX v2 (well-formed XML). The parser normalises either dialect into a common intermediate structure before extracting statement data. This is an **original** implementation written from the published OFX 1.6 / 2.2 specification — it is not derived from Odoo Enterprise. """ # OFX date format: YYYYMMDDHHMMSS[.XXX[:TZ]] — timezone and fractional # seconds are optional; many banks only emit YYYYMMDD. _OFX_DATE_RE = re.compile( r'^(\d{4})(\d{2})(\d{2})' # YYYYMMDD (required) r'(?:(\d{2})(\d{2})(\d{2}))?' # HHMMSS (optional) r'(?:\.\d+)?' # .XXX (optional fractional) r'(?:\[.*\])?$' # [:TZ] (optional timezone) ) # SGML self-closing tags used in OFX v1 (no closing tag counterpart). # These contain scalar data directly after the tag. _SGML_LEAF_TAGS = { 'TRNTYPE', 'DTPOSTED', 'DTUSER', 'DTSTART', 'DTEND', 'TRNAMT', 'FITID', 'CHECKNUM', 'REFNUM', 'NAME', 'MEMO', 'PAYEEID', 'ACCTID', 'BANKID', 'BRANCHID', 'ACCTTYPE', 'BALAMT', 'DTASOF', 'CURDEF', 'SEVERITY', 'CODE', 'MESSAGE', 'SIC', 'PAYEEID', 'CORRECTFITID', 'CORRECTACTION', 'SRVRTID', 'CLRTID', } # ------------------------------------------------------------------- # Public API # ------------------------------------------------------------------- def parse_ofx(self, data_file): """Parse an OFX file (bytes or str) and return a list of statement dicts compatible with the Fusion Accounting import pipeline. Each dict has the keys: - ``name`` : statement identifier - ``date`` : closing date (datetime.date) - ``balance_start`` : opening balance (float) - ``balance_end_real``: closing balance (float) - ``currency_code`` : ISO 4217 currency code - ``account_number`` : bank account number - ``transactions`` : list of transaction dicts Transaction dicts contain: - ``date`` : posting date (datetime.date) - ``payment_ref`` : description / memo - ``ref`` : FITID or reference number - ``amount`` : signed float (negative = debit) - ``unique_import_id`` : unique per-transaction identifier - ``transaction_type`` : OFX TRNTYPE value """ raw = self._to_text(data_file) # Determine OFX dialect and obtain an ElementTree root if self._is_ofx_v2(raw): root = self._parse_xml(raw) else: root = self._parse_sgml(raw) return self._extract_statements(root) # ------------------------------------------------------------------- # Input normalisation # ------------------------------------------------------------------- @staticmethod def _to_text(data_file): """Ensure *data_file* is a string, decoding bytes if necessary.""" if isinstance(data_file, bytes): # Try UTF-8 first; fall back to Latin-1 (lossless for any byte) for encoding in ('utf-8-sig', 'utf-8', 'latin-1'): try: return data_file.decode(encoding) except UnicodeDecodeError: continue return data_file @staticmethod def _is_ofx_v2(text): """Return True when *text* looks like OFX v2 (XML) rather than SGML-based v1. OFX v2 begins with an XML processing instruction or a ```` header.""" stripped = text.lstrip() return stripped.startswith('`` which contain child elements and always have a matching ````. * **Leaf** (data) tags like ``-42.50`` which carry a scalar value and are never explicitly closed. The conversion strategy inserts explicit close tags for every leaf element so that the result is valid XML. """ # Strip the SGML headers (everything before the first ````). ofx_idx = text.upper().find('') if ofx_idx == -1: raise UserError(_("The file does not contain a valid OFX document.")) body = text[ofx_idx:] # Normalise whitespace inside tags: collapse runs of whitespace # between ``>`` and ``<`` but preserve data values. lines = body.splitlines() xml_lines = [] for line in lines: stripped = line.strip() if not stripped: continue xml_lines.append(stripped) joined = '\n'.join(xml_lines) # Insert closing tags for leaf elements. # A leaf tag looks like ``value`` (no ```` follows). def _close_leaf_tags(sgml_text): """Insert ```` after each leaf tag's data value.""" result = [] tag_re = re.compile(r'<(/?)(\w+)>(.*)', re.DOTALL) for raw_line in sgml_text.split('\n'): raw_line = raw_line.strip() if not raw_line: continue m = tag_re.match(raw_line) if m: is_close = m.group(1) == '/' tag_name = m.group(2).upper() rest = m.group(3).strip() if is_close: result.append(f'') elif tag_name in self._SGML_LEAF_TAGS: # Leaf element: value sits between open and (missing) close tag data_val = rest.split('<')[0].strip() if '<' in rest else rest result.append(f'<{tag_name}>{self._xml_escape(data_val)}') # If the rest of the line has another tag, process it if '<' in rest: leftover = rest[rest.index('<'):] for extra in _close_leaf_tags(leftover).split('\n'): if extra.strip(): result.append(extra.strip()) else: # Aggregate (container) tag — keep as-is result.append(f'<{tag_name}>') if rest: for extra in _close_leaf_tags(rest).split('\n'): if extra.strip(): result.append(extra.strip()) else: result.append(raw_line) return '\n'.join(result) xml_text = _close_leaf_tags(joined) try: return ElementTree.fromstring(xml_text.encode('utf-8')) except ElementTree.ParseError as exc: _log.debug("SGML→XML conversion result:\n%s", xml_text[:2000]) raise UserError( _("Failed to parse OFX v1 (SGML) file. The file may be " "corrupt or in an unsupported dialect: %s", str(exc)) ) from exc @staticmethod def _xml_escape(text): """Escape XML-special characters in *text*.""" return ( text.replace('&', '&') .replace('<', '<') .replace('>', '>') .replace('"', '"') .replace("'", ''') ) # ------------------------------------------------------------------- # Data extraction # ------------------------------------------------------------------- def _extract_statements(self, root): """Walk the parsed OFX element tree and collect statement data. Supports ``BANKMSGSRSV1`` (bank accounts) and ``CCMSGSRSV1`` (credit-card accounts). """ statements = [] # Locate all statement response containers for tag_suffix, acct_tag in [ ('BANKMSGSRSV1', 'BANKACCTFROM'), ('CCMSGSRSV1', 'CCACCTFROM'), ]: for stmtrs in self._find_all(root, 'STMTRS') + self._find_all(root, 'CCSTMTRS'): stmt = self._extract_single_statement(stmtrs, acct_tag) if stmt: statements.append(stmt) if not statements: raise UserError( _("No bank or credit-card statements found in the OFX file.") ) return statements def _extract_single_statement(self, stmtrs, acct_tag): """Extract one statement from a ```` or ```` element.""" # Currency currency = self._find_text(stmtrs, 'CURDEF') or '' # Account number acct_elem = self._find_first(stmtrs, acct_tag) if acct_elem is None: acct_elem = self._find_first(stmtrs, 'BANKACCTFROM') if acct_elem is None: acct_elem = self._find_first(stmtrs, 'CCACCTFROM') acct_number = '' if acct_elem is not None: acct_number = self._find_text(acct_elem, 'ACCTID') or '' # Transaction list txn_list_el = self._find_first(stmtrs, 'BANKTRANLIST') if txn_list_el is None: txn_list_el = stmtrs # CCSTMTRS may put transactions directly inside start_date = self._parse_ofx_date(self._find_text(txn_list_el, 'DTSTART')) end_date = self._parse_ofx_date(self._find_text(txn_list_el, 'DTEND')) transactions = [] for stmttrn in self._find_all(txn_list_el, 'STMTTRN'): txn = self._extract_transaction(stmttrn) if txn: transactions.append(txn) # Balances — look for LEDGERBAL and AVAILBAL balance_start = 0.0 balance_end = 0.0 ledger_bal = self._find_first(stmtrs, 'LEDGERBAL') if ledger_bal is not None: balance_end = self._safe_float(self._find_text(ledger_bal, 'BALAMT')) avail_bal = self._find_first(stmtrs, 'AVAILBAL') if avail_bal is not None and ledger_bal is None: balance_end = self._safe_float(self._find_text(avail_bal, 'BALAMT')) # Derive opening balance: opening = closing − sum(transactions) txn_total = sum(t['amount'] for t in transactions) balance_start = balance_end - txn_total stmt_date = end_date or (start_date if start_date else None) stmt_name = f"OFX {acct_number}" if acct_number else "OFX Import" if stmt_date: stmt_name += f" {stmt_date.strftime('%Y-%m-%d')}" return { 'name': stmt_name, 'date': stmt_date, 'balance_start': balance_start, 'balance_end_real': balance_end, 'currency_code': currency.upper() if currency else None, 'account_number': acct_number, 'transactions': transactions, } def _extract_transaction(self, stmttrn): """Extract a single transaction from a ```` element.""" trntype = self._find_text(stmttrn, 'TRNTYPE') or '' dt_posted = self._parse_ofx_date(self._find_text(stmttrn, 'DTPOSTED')) dt_user = self._parse_ofx_date(self._find_text(stmttrn, 'DTUSER')) amount = self._safe_float(self._find_text(stmttrn, 'TRNAMT')) fitid = self._find_text(stmttrn, 'FITID') or '' checknum = self._find_text(stmttrn, 'CHECKNUM') or '' refnum = self._find_text(stmttrn, 'REFNUM') or '' name = self._find_text(stmttrn, 'NAME') or '' memo = self._find_text(stmttrn, 'MEMO') or '' # Build description: prefer NAME, append MEMO if different description = name if memo and memo != name: description = f"{name} - {memo}" if name else memo # Build reference: FITID is the primary unique ID; CHECKNUM or REFNUM # serve as human-readable reference ref = checknum or refnum or fitid unique_id = fitid return { 'date': dt_user or dt_posted, 'payment_ref': description or ref or '/', 'ref': ref, 'amount': amount, 'unique_import_id': unique_id, 'transaction_type': trntype, } # ------------------------------------------------------------------- # Element-tree helpers (case-insensitive tag search) # ------------------------------------------------------------------- @staticmethod def _find_all(parent, tag): """Find all descendant elements whose tag matches *tag* (case-insensitive).""" tag_upper = tag.upper() return [el for el in parent.iter() if el.tag.upper() == tag_upper] @staticmethod def _find_first(parent, tag): """Return the first descendant matching *tag* (case-insensitive) or ``None``.""" tag_upper = tag.upper() for el in parent.iter(): if el.tag.upper() == tag_upper: return el return None @classmethod def _find_text(cls, parent, tag): """Return stripped text content of the first descendant matching *tag*, or ``None``.""" el = cls._find_first(parent, tag) if el is not None and el.text: return el.text.strip() return None # ------------------------------------------------------------------- # Date / numeric helpers # ------------------------------------------------------------------- @classmethod def _parse_ofx_date(cls, date_str): """Parse an OFX date string (``YYYYMMDD…``) into a Python date.""" if not date_str: return None m = cls._OFX_DATE_RE.match(date_str.strip()) if not m: # Fallback: try basic YYYYMMDD try: return datetime.strptime(date_str.strip()[:8], '%Y%m%d').date() except (ValueError, IndexError): _log.warning("Unparseable OFX date: %s", date_str) return None year, month, day = int(m.group(1)), int(m.group(2)), int(m.group(3)) try: return datetime(year, month, day).date() except ValueError: _log.warning("Invalid OFX date components: %s", date_str) return None @staticmethod def _safe_float(value): """Convert *value* to float, returning 0.0 for empty / invalid.""" if not value: return 0.0 try: return float(value.replace(',', '.')) except (ValueError, AttributeError): return 0.0 class FusionJournalOFXImport(models.Model): """Register OFX as an available bank-statement import format and implement the parser hook on ``account.journal``.""" _inherit = 'account.journal' # ---- Format Registration ---- def _get_bank_statements_available_import_formats(self): """Append OFX to the list of importable formats.""" formats = super()._get_bank_statements_available_import_formats() formats.append('OFX') return formats # ---- Parser Hook ---- def _parse_bank_statement_file(self, attachment): """Attempt to parse *attachment* as OFX. Falls through to ``super()`` when the file is not recognised as OFX.""" raw_data = attachment.raw if not self._is_ofx_file(raw_data): return super()._parse_bank_statement_file(attachment) parser = FusionOFXParser() try: statements = parser.parse_ofx(raw_data) except UserError: raise except Exception as exc: _log.exception("OFX parsing error") raise UserError( _("Could not parse the OFX file: %s", str(exc)) ) from exc # The import pipeline expects (currency_code, account_number, stmts) currency_code = None account_number = None if statements: currency_code = statements[0].get('currency_code') account_number = statements[0].get('account_number') return currency_code, account_number, statements # ---- Detection ---- @staticmethod def _is_ofx_file(raw_data): """Heuristic check: does *raw_data* look like an OFX file?""" try: text = raw_data.decode('utf-8-sig', errors='ignore')[:4096] except (UnicodeDecodeError, AttributeError): text = str(raw_data)[:4096] text_upper = text.upper() # OFX v2 (XML) if '' in text_upper: return True # OFX v1 (SGML header markers) if 'OFXHEADER:' in text_upper: return True return False