""" Fusion Accounting - SAF-T Import Provides a transient wizard that reads a Standard Audit File for Tax (SAF-T) XML document and creates the corresponding Odoo records: * Chart-of-accounts entries (``account.account``) * Partners — customers and suppliers (``res.partner``) * Journal entries with their lines (``account.move`` / ``account.move.line``) The parser is namespace-agnostic: it strips any XML namespace prefix so that files from different country variants can be imported without modification. Original implementation by Nexa Systems Inc. """ import base64 import logging import xml.etree.ElementTree as ET from odoo import api, fields, models, _ from odoo.exceptions import UserError, ValidationError _log = logging.getLogger(__name__) class FusionSAFTImport(models.TransientModel): """ Wizard to import accounting data from an SAF-T XML file. The user uploads a file, optionally previews the contents, then clicks **Import** to create the matching Odoo records. Records that already exist (matched by code or VAT) are skipped to allow safe re-imports of the same file. """ _name = "fusion.saft.import" _description = "SAF-T Import Wizard" # ------------------------------------------------------------------ # Fields # ------------------------------------------------------------------ saft_file = fields.Binary( string="SAF-T File", required=True, help="Upload an SAF-T XML file to import.", ) saft_filename = fields.Char( string="Filename", ) company_id = fields.Many2one( comodel_name="res.company", string="Company", required=True, default=lambda self: self.env.company, ) state = fields.Selection( selection=[ ("upload", "Upload"), ("preview", "Preview"), ("done", "Done"), ], string="Status", default="upload", readonly=True, ) import_log = fields.Text( string="Import Log", readonly=True, help="Summary of records created, updated, and skipped.", ) # Counters (populated after import) accounts_created = fields.Integer(string="Accounts Created", readonly=True) partners_created = fields.Integer(string="Partners Created", readonly=True) moves_created = fields.Integer(string="Journal Entries Created", readonly=True) # ------------------------------------------------------------------ # Actions # ------------------------------------------------------------------ def action_preview(self): """Parse the uploaded file and display a summary before import.""" self.ensure_one() root = self._parse_saft_file() account_count = len(self._find_all(root, "Account")) customer_count = len(self._find_all(root, "Customer")) supplier_count = len(self._find_all(root, "Supplier")) txn_count = len(self._find_all(root, "Transaction")) self.write({ "state": "preview", "import_log": _( "File parsed successfully.\n\n" "Records found:\n" " Accounts: %(accounts)s\n" " Customers: %(customers)s\n" " Suppliers: %(suppliers)s\n" " Journal Entries: %(entries)s\n\n" "Click 'Import' to create these records.", accounts=account_count, customers=customer_count, suppliers=supplier_count, entries=txn_count, ), }) return self._reopen_wizard() def action_import(self): """Parse the SAF-T XML and create Odoo records.""" self.ensure_one() root = self._parse_saft_file() log_lines = [] n_accounts = self._import_accounts(root, log_lines) n_customers = self._import_partners(root, "Customer", log_lines) n_suppliers = self._import_partners(root, "Supplier", log_lines) n_moves = self._import_journal_entries(root, log_lines) self.write({ "state": "done", "accounts_created": n_accounts, "partners_created": n_customers + n_suppliers, "moves_created": n_moves, "import_log": "\n".join(log_lines) or _("Import completed with no issues."), }) return self._reopen_wizard() # ------------------------------------------------------------------ # Parsing helpers # ------------------------------------------------------------------ def _parse_saft_file(self): """Decode the uploaded binary and return the XML root element. All namespace prefixes are stripped so that element lookups work regardless of the SAF-T variant used in the file. """ if not self.saft_file: raise UserError(_("Please upload an SAF-T XML file.")) try: raw = base64.b64decode(self.saft_file) root = ET.fromstring(raw) except ET.ParseError as exc: raise UserError( _("The uploaded file is not valid XML:\n%(error)s", error=exc) ) from exc # Strip namespaces for easy tag matching self._strip_namespaces(root) return root @staticmethod def _strip_namespaces(element): """Remove namespace URIs from all tags and attributes in-place.""" for el in element.iter(): if "}" in el.tag: el.tag = el.tag.split("}", 1)[1] for attr_key in list(el.attrib): if "}" in attr_key: new_key = attr_key.split("}", 1)[1] el.attrib[new_key] = el.attrib.pop(attr_key) @staticmethod def _find_all(root, tag): """Recursively find all elements with the given *tag*.""" return root.iter(tag) @staticmethod def _get_text(element, tag, default=""): """Return the text of the first child with *tag*, or *default*.""" child = element.find(tag) if child is not None and child.text: return child.text.strip() return default # ------------------------------------------------------------------ # Import: Chart of Accounts # ------------------------------------------------------------------ def _import_accounts(self, root, log_lines): """Create ``account.account`` records for each ```` element that does not already exist (matched by code). """ Account = self.env["account.account"] created = 0 for acct_el in self._find_all(root, "Account"): code = self._get_text(acct_el, "AccountID") name = self._get_text(acct_el, "AccountDescription") if not code: continue existing = Account.search([ ("code", "=", code), ("company_id", "=", self.company_id.id), ], limit=1) if existing: log_lines.append(_("Account [%(code)s] already exists — skipped.", code=code)) continue grouping = self._get_text(acct_el, "GroupingCategory", "GM") account_type = self._map_grouping_to_account_type(grouping) Account.create({ "code": code, "name": name or code, "account_type": account_type, "company_id": self.company_id.id, }) created += 1 log_lines.append(_("Account [%(code)s] %(name)s created.", code=code, name=name)) return created @staticmethod def _map_grouping_to_account_type(grouping_code): """Convert an SAF-T GroupingCategory to an Odoo account_type. Mapping: GR → income GP → expense GA → asset_current GL → liability_current GM → off_balance (fallback) """ mapping = { "GR": "income", "GP": "expense", "GA": "asset_current", "GL": "liability_current", } return mapping.get(grouping_code, "off_balance") # ------------------------------------------------------------------ # Import: Partners (Customers / Suppliers) # ------------------------------------------------------------------ def _import_partners(self, root, partner_tag, log_lines): """Create ``res.partner`` records for each ```` or ```` element. Duplicates are detected by VAT number. """ Partner = self.env["res.partner"] created = 0 is_customer = partner_tag == "Customer" id_tag = "CustomerID" if is_customer else "SupplierID" tax_id_tag = "CustomerTaxID" if is_customer else "SupplierTaxID" for partner_el in self._find_all(root, partner_tag): ext_id = self._get_text(partner_el, id_tag) vat = self._get_text(partner_el, tax_id_tag) name = self._get_text(partner_el, "CompanyName") if not name: continue # Try matching by VAT first, then by name domain = [("company_id", "in", [self.company_id.id, False])] if vat and vat != "999999990": domain.append(("vat", "=", vat)) else: domain.append(("name", "=ilike", name)) existing = Partner.search(domain, limit=1) if existing: log_lines.append( _("%(type)s '%(name)s' already exists (ID %(id)s) — skipped.", type=partner_tag, name=name, id=existing.id) ) continue # Address extraction addr_tag = "BillingAddress" if is_customer else "SupplierAddress" addr_el = partner_el.find(addr_tag) vals = { "name": name, "company_id": self.company_id.id, "customer_rank": 1 if is_customer else 0, "supplier_rank": 0 if is_customer else 1, } if vat and vat != "999999990": vals["vat"] = vat if addr_el is not None: vals["street"] = self._get_text(addr_el, "StreetName") vals["city"] = self._get_text(addr_el, "City") vals["zip"] = self._get_text(addr_el, "PostalCode") country_code = self._get_text(addr_el, "Country") if country_code: country = self.env["res.country"].search( [("code", "=ilike", country_code)], limit=1 ) if country: vals["country_id"] = country.id Partner.create(vals) created += 1 log_lines.append( _("%(type)s '%(name)s' created.", type=partner_tag, name=name) ) return created # ------------------------------------------------------------------ # Import: Journal Entries # ------------------------------------------------------------------ def _import_journal_entries(self, root, log_lines): """Create ``account.move`` records from ```` elements nested inside ```` sections. """ Move = self.env["account.move"] Account = self.env["account.account"] created = 0 for journal_el in self._find_all(root, "Journal"): journal_code = self._get_text(journal_el, "JournalID") journal = self.env["account.journal"].search([ ("code", "=", journal_code), ("company_id", "=", self.company_id.id), ], limit=1) if not journal: # Fall back to the company's miscellaneous journal journal = self.env["account.journal"].search([ ("type", "=", "general"), ("company_id", "=", self.company_id.id), ], limit=1) if not journal: log_lines.append( _("No journal found for code '%(code)s' — transactions skipped.", code=journal_code) ) continue for txn_el in journal_el.iter("Transaction"): txn_id = self._get_text(txn_el, "TransactionID") txn_date = self._get_text(txn_el, "TransactionDate") description = self._get_text(txn_el, "Description") # Check for duplicates by reference if txn_id and Move.search([ ("ref", "=", txn_id), ("company_id", "=", self.company_id.id), ], limit=1): log_lines.append( _("Transaction '%(id)s' already imported — skipped.", id=txn_id) ) continue move_lines = [] for line_el in txn_el.iter("Line"): account_code = self._get_text(line_el, "AccountID") account = Account.search([ ("code", "=", account_code), ("company_id", "=", self.company_id.id), ], limit=1) if not account: log_lines.append( _("Account '%(code)s' not found — line skipped.", code=account_code) ) continue debit_el = line_el.find("DebitAmount") credit_el = line_el.find("CreditAmount") debit = 0.0 credit = 0.0 if debit_el is not None: debit = float(self._get_text(debit_el, "Amount", "0")) if credit_el is not None: credit = float(self._get_text(credit_el, "Amount", "0")) line_name = self._get_text(line_el, "Description", description) move_lines.append((0, 0, { "account_id": account.id, "name": line_name, "debit": debit, "credit": credit, })) if not move_lines: continue Move.create({ "journal_id": journal.id, "date": txn_date or fields.Date.context_today(self), "ref": txn_id, "narration": description, "company_id": self.company_id.id, "line_ids": move_lines, }) created += 1 log_lines.append( _("Journal entry '%(id)s' created with %(lines)s lines.", id=txn_id, lines=len(move_lines)) ) return created # ------------------------------------------------------------------ # Utility # ------------------------------------------------------------------ def _reopen_wizard(self): """Return an action that re-opens this wizard at its current state so the user sees updated information. """ return { "type": "ir.actions.act_window", "res_model": self._name, "res_id": self.id, "view_mode": "form", "target": "new", }