Initial commit

This commit is contained in:
gsinghpal
2026-02-22 01:22:18 -05:00
commit 5200d5baf0
2394 changed files with 386834 additions and 0 deletions

View File

@@ -0,0 +1,410 @@
"""
Fusion Accounting - SAF-T Import
Provides a transient wizard that reads a Standard Audit File for Tax
(SAF-T) XML document and creates the corresponding Odoo records:
* Chart-of-accounts entries (``account.account``)
* Partners — customers and suppliers (``res.partner``)
* Journal entries with their lines (``account.move`` / ``account.move.line``)
The parser is namespace-agnostic: it strips any XML namespace prefix so
that files from different country variants can be imported without
modification.
Original implementation by Nexa Systems Inc.
"""
import base64
import logging
import xml.etree.ElementTree as ET
from odoo import api, fields, models, _
from odoo.exceptions import UserError, ValidationError
_log = logging.getLogger(__name__)
class FusionSAFTImport(models.TransientModel):
"""
Wizard to import accounting data from an SAF-T XML file.
The user uploads a file, optionally previews the contents, then
clicks **Import** to create the matching Odoo records. Records that
already exist (matched by code or VAT) are skipped to allow safe
re-imports of the same file.
"""
_name = "fusion.saft.import"
_description = "SAF-T Import Wizard"
# ------------------------------------------------------------------
# Fields
# ------------------------------------------------------------------
saft_file = fields.Binary(
string="SAF-T File",
required=True,
help="Upload an SAF-T XML file to import.",
)
saft_filename = fields.Char(
string="Filename",
)
company_id = fields.Many2one(
comodel_name="res.company",
string="Company",
required=True,
default=lambda self: self.env.company,
)
state = fields.Selection(
selection=[
("upload", "Upload"),
("preview", "Preview"),
("done", "Done"),
],
string="Status",
default="upload",
readonly=True,
)
import_log = fields.Text(
string="Import Log",
readonly=True,
help="Summary of records created, updated, and skipped.",
)
# Counters (populated after import)
accounts_created = fields.Integer(string="Accounts Created", readonly=True)
partners_created = fields.Integer(string="Partners Created", readonly=True)
moves_created = fields.Integer(string="Journal Entries Created", readonly=True)
# ------------------------------------------------------------------
# Actions
# ------------------------------------------------------------------
def action_preview(self):
"""Parse the uploaded file and display a summary before import."""
self.ensure_one()
root = self._parse_saft_file()
account_count = len(self._find_all(root, "Account"))
customer_count = len(self._find_all(root, "Customer"))
supplier_count = len(self._find_all(root, "Supplier"))
txn_count = len(self._find_all(root, "Transaction"))
self.write({
"state": "preview",
"import_log": _(
"File parsed successfully.\n\n"
"Records found:\n"
" Accounts: %(accounts)s\n"
" Customers: %(customers)s\n"
" Suppliers: %(suppliers)s\n"
" Journal Entries: %(entries)s\n\n"
"Click 'Import' to create these records.",
accounts=account_count,
customers=customer_count,
suppliers=supplier_count,
entries=txn_count,
),
})
return self._reopen_wizard()
def action_import(self):
"""Parse the SAF-T XML and create Odoo records."""
self.ensure_one()
root = self._parse_saft_file()
log_lines = []
n_accounts = self._import_accounts(root, log_lines)
n_customers = self._import_partners(root, "Customer", log_lines)
n_suppliers = self._import_partners(root, "Supplier", log_lines)
n_moves = self._import_journal_entries(root, log_lines)
self.write({
"state": "done",
"accounts_created": n_accounts,
"partners_created": n_customers + n_suppliers,
"moves_created": n_moves,
"import_log": "\n".join(log_lines) or _("Import completed with no issues."),
})
return self._reopen_wizard()
# ------------------------------------------------------------------
# Parsing helpers
# ------------------------------------------------------------------
def _parse_saft_file(self):
"""Decode the uploaded binary and return the XML root element.
All namespace prefixes are stripped so that element lookups work
regardless of the SAF-T variant used in the file.
"""
if not self.saft_file:
raise UserError(_("Please upload an SAF-T XML file."))
try:
raw = base64.b64decode(self.saft_file)
root = ET.fromstring(raw)
except ET.ParseError as exc:
raise UserError(
_("The uploaded file is not valid XML:\n%(error)s", error=exc)
) from exc
# Strip namespaces for easy tag matching
self._strip_namespaces(root)
return root
@staticmethod
def _strip_namespaces(element):
"""Remove namespace URIs from all tags and attributes in-place."""
for el in element.iter():
if "}" in el.tag:
el.tag = el.tag.split("}", 1)[1]
for attr_key in list(el.attrib):
if "}" in attr_key:
new_key = attr_key.split("}", 1)[1]
el.attrib[new_key] = el.attrib.pop(attr_key)
@staticmethod
def _find_all(root, tag):
"""Recursively find all elements with the given *tag*."""
return root.iter(tag)
@staticmethod
def _get_text(element, tag, default=""):
"""Return the text of the first child with *tag*, or *default*."""
child = element.find(tag)
if child is not None and child.text:
return child.text.strip()
return default
# ------------------------------------------------------------------
# Import: Chart of Accounts
# ------------------------------------------------------------------
def _import_accounts(self, root, log_lines):
"""Create ``account.account`` records for each ``<Account>``
element that does not already exist (matched by code).
"""
Account = self.env["account.account"]
created = 0
for acct_el in self._find_all(root, "Account"):
code = self._get_text(acct_el, "AccountID")
name = self._get_text(acct_el, "AccountDescription")
if not code:
continue
existing = Account.search([
("code", "=", code),
("company_id", "=", self.company_id.id),
], limit=1)
if existing:
log_lines.append(_("Account [%(code)s] already exists — skipped.", code=code))
continue
grouping = self._get_text(acct_el, "GroupingCategory", "GM")
account_type = self._map_grouping_to_account_type(grouping)
Account.create({
"code": code,
"name": name or code,
"account_type": account_type,
"company_id": self.company_id.id,
})
created += 1
log_lines.append(_("Account [%(code)s] %(name)s created.", code=code, name=name))
return created
@staticmethod
def _map_grouping_to_account_type(grouping_code):
"""Convert an SAF-T GroupingCategory to an Odoo account_type.
Mapping:
GR → income
GP → expense
GA → asset_current
GL → liability_current
GM → off_balance (fallback)
"""
mapping = {
"GR": "income",
"GP": "expense",
"GA": "asset_current",
"GL": "liability_current",
}
return mapping.get(grouping_code, "off_balance")
# ------------------------------------------------------------------
# Import: Partners (Customers / Suppliers)
# ------------------------------------------------------------------
def _import_partners(self, root, partner_tag, log_lines):
"""Create ``res.partner`` records for each ``<Customer>`` or
``<Supplier>`` element. Duplicates are detected by VAT number.
"""
Partner = self.env["res.partner"]
created = 0
is_customer = partner_tag == "Customer"
id_tag = "CustomerID" if is_customer else "SupplierID"
tax_id_tag = "CustomerTaxID" if is_customer else "SupplierTaxID"
for partner_el in self._find_all(root, partner_tag):
ext_id = self._get_text(partner_el, id_tag)
vat = self._get_text(partner_el, tax_id_tag)
name = self._get_text(partner_el, "CompanyName")
if not name:
continue
# Try matching by VAT first, then by name
domain = [("company_id", "in", [self.company_id.id, False])]
if vat and vat != "999999990":
domain.append(("vat", "=", vat))
else:
domain.append(("name", "=ilike", name))
existing = Partner.search(domain, limit=1)
if existing:
log_lines.append(
_("%(type)s '%(name)s' already exists (ID %(id)s) — skipped.",
type=partner_tag, name=name, id=existing.id)
)
continue
# Address extraction
addr_tag = "BillingAddress" if is_customer else "SupplierAddress"
addr_el = partner_el.find(addr_tag)
vals = {
"name": name,
"company_id": self.company_id.id,
"customer_rank": 1 if is_customer else 0,
"supplier_rank": 0 if is_customer else 1,
}
if vat and vat != "999999990":
vals["vat"] = vat
if addr_el is not None:
vals["street"] = self._get_text(addr_el, "StreetName")
vals["city"] = self._get_text(addr_el, "City")
vals["zip"] = self._get_text(addr_el, "PostalCode")
country_code = self._get_text(addr_el, "Country")
if country_code:
country = self.env["res.country"].search(
[("code", "=ilike", country_code)], limit=1
)
if country:
vals["country_id"] = country.id
Partner.create(vals)
created += 1
log_lines.append(
_("%(type)s '%(name)s' created.", type=partner_tag, name=name)
)
return created
# ------------------------------------------------------------------
# Import: Journal Entries
# ------------------------------------------------------------------
def _import_journal_entries(self, root, log_lines):
"""Create ``account.move`` records from ``<Transaction>``
elements nested inside ``<Journal>`` sections.
"""
Move = self.env["account.move"]
Account = self.env["account.account"]
created = 0
for journal_el in self._find_all(root, "Journal"):
journal_code = self._get_text(journal_el, "JournalID")
journal = self.env["account.journal"].search([
("code", "=", journal_code),
("company_id", "=", self.company_id.id),
], limit=1)
if not journal:
# Fall back to the company's miscellaneous journal
journal = self.env["account.journal"].search([
("type", "=", "general"),
("company_id", "=", self.company_id.id),
], limit=1)
if not journal:
log_lines.append(
_("No journal found for code '%(code)s' — transactions skipped.",
code=journal_code)
)
continue
for txn_el in journal_el.iter("Transaction"):
txn_id = self._get_text(txn_el, "TransactionID")
txn_date = self._get_text(txn_el, "TransactionDate")
description = self._get_text(txn_el, "Description")
# Check for duplicates by reference
if txn_id and Move.search([
("ref", "=", txn_id),
("company_id", "=", self.company_id.id),
], limit=1):
log_lines.append(
_("Transaction '%(id)s' already imported — skipped.", id=txn_id)
)
continue
move_lines = []
for line_el in txn_el.iter("Line"):
account_code = self._get_text(line_el, "AccountID")
account = Account.search([
("code", "=", account_code),
("company_id", "=", self.company_id.id),
], limit=1)
if not account:
log_lines.append(
_("Account '%(code)s' not found — line skipped.", code=account_code)
)
continue
debit_el = line_el.find("DebitAmount")
credit_el = line_el.find("CreditAmount")
debit = 0.0
credit = 0.0
if debit_el is not None:
debit = float(self._get_text(debit_el, "Amount", "0"))
if credit_el is not None:
credit = float(self._get_text(credit_el, "Amount", "0"))
line_name = self._get_text(line_el, "Description", description)
move_lines.append((0, 0, {
"account_id": account.id,
"name": line_name,
"debit": debit,
"credit": credit,
}))
if not move_lines:
continue
Move.create({
"journal_id": journal.id,
"date": txn_date or fields.Date.context_today(self),
"ref": txn_id,
"narration": description,
"company_id": self.company_id.id,
"line_ids": move_lines,
})
created += 1
log_lines.append(
_("Journal entry '%(id)s' created with %(lines)s lines.",
id=txn_id, lines=len(move_lines))
)
return created
# ------------------------------------------------------------------
# Utility
# ------------------------------------------------------------------
def _reopen_wizard(self):
"""Return an action that re-opens this wizard at its current
state so the user sees updated information.
"""
return {
"type": "ir.actions.act_window",
"res_model": self._name,
"res_id": self.id,
"view_mode": "form",
"target": "new",
}