Files
Odoo-Modules/fusion_centralize_billing/wizards/invoice_ledger.py
gsinghpal 72d3130c88 feat(billing): NexaCloud invoice ledger — ingest invoices to account.move
Odoo becomes the accounting SoR by ingesting NexaCloud's real Stripe
invoices (read-only via the existing DSN) into native account.move
customer invoices: per-service-family income accounts, tax derived to
match the source invoice.tax, Stripe payments reconciled via
account.payment.register (invoice shows paid), idempotent on
x_fc_nexacloud_invoice_id, draft-first with bulk-post + a daily cron
(inactive). Plus a prune helper for the now-obsolete metered shadow data.
73 tests green on odoo-trial. Account codes use dots (Odoo 19 rejects '-').
2026-05-27 16:50:31 -04:00

276 lines
12 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1
"""NexaCloud → Odoo invoice ledger ingester.
Reads NexaCloud's real (Stripe-billed) invoices and creates native Odoo
``account.move`` customer invoices — posted, with the Stripe payments reconciled and
HST modelled — so Odoo is the accounting system of record. Revenue is split by service
family into distinct income accounts. NexaCloud/Stripe keep doing the billing; Odoo
ingests its output. See docs/superpowers/specs/2026-05-27-nexacloud-invoice-ledger-design.md
"""
import json
import logging
import re
from datetime import timedelta
from odoo import api, fields, models
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
class FusionBillingInvoiceLedgerWizard(models.TransientModel):
_name = "fusion.billing.invoice.ledger.wizard"
_description = "Fusion Billing — NexaCloud Invoice Ledger Ingester"
dry_run = fields.Boolean(default=True)
auto_post = fields.Boolean(
default=False, help="Post invoices immediately (else leave draft for review).")
result_summary = fields.Text(readonly=True)
# description keyword -> service family (checked in order; hosting before managed)
_FAMILY_KEYWORDS = [
("hosting", ["odoo erp hosting", "wordpress website hosting"]),
("managed", ["managed"]),
("addons", ["daily backup", "whatsapp", "forms builder", "white label"]),
]
def action_run(self):
self.ensure_one()
data = self._read_nexacloud_invoices()
if self.dry_run:
class _Rollback(Exception):
pass
res = {}
try:
with self.env.cr.savepoint():
res.update(self._ingest_invoices(data, post=False))
raise _Rollback()
except _Rollback:
pass
res["dry_run"] = True
else:
res = self._ingest_invoices(data, post=self.auto_post)
self.result_summary = json.dumps(res, indent=2, default=str)
if res.get("failed"):
_logger.error("Ledger ingest: %s failed: %s", len(res["failed"]), res["failed"])
return {"type": "ir.actions.act_window", "res_model": self._name,
"res_id": self.id, "view_mode": "form", "target": "new"}
# ----- read side (the ONLY code that touches NexaCloud) ------------------
def _read_nexacloud_invoices(self, since=None):
import psycopg2
import psycopg2.extras
dsn = self.env["ir.config_parameter"].sudo().get_param("fusion_billing.nexacloud_dsn")
if not dsn:
raise UserError("NexaCloud DSN not configured (fusion_billing.nexacloud_dsn).")
try:
conn = psycopg2.connect(dsn)
except Exception as e: # noqa: BLE001
raise UserError("Could not connect to the NexaCloud database: %s" % e)
try:
conn.set_session(readonly=True)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
where = "WHERE i.created_at >= %(since)s" if since else ""
cur.execute(
"SELECT i.id, i.stripe_invoice_id, i.invoice_number, "
"i.user_id AS user_external_id, u.full_name AS partner_name, "
"COALESCE(u.billing_email, u.email) AS partner_email, "
"i.created_at AS invoice_date, i.currency, i.status, i.subtotal, i.tax, "
"i.amount_paid, i.paid_at "
"FROM invoices i JOIN users u ON u.id = i.user_id " + where +
" ORDER BY i.created_at", {"since": since})
invoices = {str(r["id"]): dict(r, items=[]) for r in cur.fetchall()}
if invoices:
cur.execute(
"SELECT ii.invoice_id, ii.description, ii.quantity, ii.unit_price, ii.amount "
"FROM invoice_items ii WHERE ii.invoice_id = ANY(%(ids)s)",
{"ids": list(invoices.keys())})
for r in cur.fetchall():
inv = invoices.get(str(r["invoice_id"]))
if inv:
inv["items"].append({
"description": r["description"], "quantity": r["quantity"],
"unit_price": r["unit_price"], "amount": r["amount"]})
out = []
for inv in invoices.values():
inv["id"] = str(inv["id"])
inv["user_external_id"] = str(inv["user_external_id"])
out.append(inv)
return out
except psycopg2.Error as e:
raise UserError(
"Failed reading NexaCloud invoices — the source schema may have changed. "
"Underlying error:\n%s" % e)
finally:
conn.close()
# ----- ingest side (pure Odoo; unit-tested) ------------------------------
@api.model
def _ingest_invoices(self, data, post=False):
Move = self.env["account.move"]
cad = self.env.ref("base.CAD", raise_if_not_found=False) or self.env.company.currency_id
summary = {"created": 0, "updated": 0, "posted": 0,
"skipped": [], "failed": [], "by_family": {}}
for inv in data:
nc_id = str(inv.get("id") or "")
try:
with self.env.cr.savepoint():
existing = Move.search(
[("x_fc_nexacloud_invoice_id", "=", nc_id)], limit=1)
if existing and existing.state != "draft":
summary["skipped"].append({"id": nc_id, "reason": "already posted"})
continue
if existing:
existing.invoice_line_ids.unlink() # draft: replace lines
move = existing
else:
move = Move.create({
"move_type": "out_invoice",
"partner_id": self._fc_partner_for(inv).id,
"invoice_date": inv.get("invoice_date"),
"ref": inv.get("invoice_number"),
"currency_id": cad.id,
"x_fc_nexacloud_invoice_id": nc_id,
"x_fc_stripe_invoice_id": inv.get("stripe_invoice_id"),
})
tax = self._fc_tax_for(inv.get("subtotal"), inv.get("tax"))
line_vals = []
for it in inv.get("items", []):
fam = self._fc_family_for(it.get("description"))
summary["by_family"][fam] = round(
summary["by_family"].get(fam, 0.0) + float(it.get("amount") or 0.0), 2)
line_vals.append((0, 0, {
"name": it.get("description") or "NexaCloud",
"quantity": float(it.get("quantity") or 1.0),
"price_unit": float(it.get("unit_price") or it.get("amount") or 0.0),
"account_id": self._fc_income_account(fam).id,
"tax_ids": [(6, 0, tax.ids)] if tax else [(5, 0, 0)],
}))
move.write({"invoice_line_ids": line_vals})
summary["updated" if existing else "created"] += 1
if post:
move.action_post()
summary["posted"] += 1
self._fc_reconcile_payment(move, inv)
except Exception as e: # noqa: BLE001 - per-invoice isolation
_logger.exception("Ledger ingest: invoice %s failed", nc_id)
summary["failed"].append({"id": nc_id, "error": "%s: %s" % (type(e).__name__, e)})
return summary
@api.model
def _post_ingested(self):
moves = self.env["account.move"].search([
("x_fc_nexacloud_invoice_id", "!=", False),
("state", "=", "draft"), ("move_type", "=", "out_invoice")])
posted = 0
for mv in moves:
try:
with self.env.cr.savepoint():
mv.action_post()
posted += 1
except Exception: # noqa: BLE001
_logger.exception("Ledger post: move %s failed", mv.id)
return posted
def _cron_ingest_recent(self):
since = fields.Datetime.to_string(fields.Datetime.now() - timedelta(days=2))
return self._ingest_invoices(self._read_nexacloud_invoices(since=since), post=True)
# ----- helpers ------------------------------------------------------------
@api.model
def _fc_family_for(self, description):
d = (description or "").lower()
m = re.match(r"remaining time on (.+?)(?: after| from |\s*\()", d)
if m:
d = m.group(1) # classify proration by the prorated item
for fam, kws in self._FAMILY_KEYWORDS:
if any(k in d for k in kws):
return fam
return "other"
@api.model
def _fc_income_account(self, family):
Account = self.env["account.account"]
# Odoo 19 account codes allow only alphanumerics + dots (no hyphen).
code = "NCR." + family.upper()
acc = Account.search([("code", "=", code)], limit=1)
if not acc:
acc = Account.create({
"code": code, "name": "NexaCloud %s Revenue" % family.title(),
"account_type": "income"})
return acc
@api.model
def _fc_tax_for(self, subtotal, tax_amount):
"""Map a NexaCloud invoice's (subtotal, tax) to the Odoo sale tax whose computed
tax equals it. Picks by effective percent; falls back to a 0% sale tax."""
Tax = self.env["account.tax"]
sub = float(subtotal or 0.0)
amt = float(tax_amount or 0.0)
if sub <= 0 or amt <= 0:
return Tax.search([("type_tax_use", "=", "sale"), ("amount", "=", 0.0)], limit=1)
rate = round(100.0 * amt / sub)
tax = Tax.search([("type_tax_use", "=", "sale"), ("amount_type", "=", "percent"),
("amount", "=", float(rate))], limit=1)
if not tax:
tax = Tax.search([("type_tax_use", "=", "sale"), ("name", "ilike", "%s" % rate)], limit=1)
return tax
@api.model
def _fc_partner_for(self, inv):
"""Resolve the unified partner via the nexacloud account.link (by user id);
create partner+link if missing (covers NULL-subscription invoices)."""
service = self.env["fusion.billing.service"].search([("code", "=", "nexacloud")], limit=1)
if not service:
service = self.env["fusion.billing.service"].create(
{"name": "NexaCloud", "code": "nexacloud"})
link = self.env["fusion.billing.account.link"]._resolve_or_create_partner(
service, str(inv.get("user_external_id")),
name=inv.get("partner_name"), email=inv.get("partner_email"))
return link.partner_id
@api.model
def _fc_stripe_journal(self):
Journal = self.env["account.journal"]
j = Journal.search([("code", "=", "NCSTR")], limit=1)
if not j:
j = Journal.create({"name": "NexaCloud Stripe", "code": "NCSTR", "type": "bank"})
return j
@api.model
def _fc_reconcile_payment(self, move, inv):
paid = float(inv.get("amount_paid") or 0.0)
if (inv.get("status") != "paid" and paid <= 0) or move.state != "posted":
return False
reg = self.env["account.payment.register"].with_context(
active_model="account.move", active_ids=move.ids).create({
"journal_id": self._fc_stripe_journal().id,
"payment_date": inv.get("paid_at") or move.invoice_date or fields.Date.today(),
"amount": paid or move.amount_total,
})
reg._create_payments()
return True
@api.model
def _fc_prune_metered_shadow(self):
"""Delete the superseded metered shadow data (shadow sale.orders, NC-* products,
NexaCloud charges, reconciliation rows)."""
counts = {}
subs = self.env["sale.order"].search([("x_fc_shadow", "=", True)])
counts["subscriptions"] = len(subs)
subs.unlink()
prods = self.env["product.product"].search([("default_code", "=like", "NC-%")])
counts["products"] = len(prods)
prods.unlink()
ch = self.env["fusion.billing.charge"].search([])
counts["charges"] = len(ch)
ch.unlink()
rec = self.env["fusion.billing.reconciliation"].search([])
counts["reconciliations"] = len(rec)
rec.unlink()
return counts