# -*- coding: utf-8 -*- # Copyright 2026 Nexa Systems Inc. # License OPL-1 """NexaCloud → Odoo invoice ledger ingester. Reads NexaCloud's real (Stripe-billed) invoices and creates native Odoo ``account.move`` customer invoices — posted, with the Stripe payments reconciled and HST modelled — so Odoo is the accounting system of record. Revenue is split by service family into distinct income accounts. NexaCloud/Stripe keep doing the billing; Odoo ingests its output. See docs/superpowers/specs/2026-05-27-nexacloud-invoice-ledger-design.md """ import json import logging import re from datetime import datetime, timezone from odoo import api, fields, models from odoo.exceptions import UserError _logger = logging.getLogger(__name__) class FusionBillingInvoiceLedgerWizard(models.TransientModel): _name = "fusion.billing.invoice.ledger.wizard" _description = "Fusion Billing — NexaCloud Invoice Ledger Ingester" dry_run = fields.Boolean(default=True) auto_post = fields.Boolean( default=False, help="Post invoices immediately (else leave draft for review).") result_summary = fields.Text(readonly=True) # description keyword -> service family (checked in order; hosting before managed) _FAMILY_KEYWORDS = [ ("hosting", ["odoo erp hosting", "wordpress website hosting"]), ("managed", ["managed"]), ("addons", ["daily backup", "whatsapp", "forms builder", "white label"]), ] def action_run(self): self.ensure_one() data = self._read_nexacloud_invoices() if self.dry_run: class _Rollback(Exception): pass res = {} try: with self.env.cr.savepoint(): res.update(self._ingest_invoices(data, post=False)) raise _Rollback() except _Rollback: pass res["dry_run"] = True else: res = self._ingest_invoices(data, post=self.auto_post) self.result_summary = json.dumps(res, indent=2, default=str) if res.get("failed"): _logger.error("Ledger ingest: %s failed: %s", len(res["failed"]), res["failed"]) return {"type": "ir.actions.act_window", "res_model": self._name, "res_id": self.id, "view_mode": "form", "target": "new"} # ----- read side (the ONLY code that touches NexaCloud) ------------------ def _read_nexacloud_invoices(self, since=None): import psycopg2 import psycopg2.extras dsn = self.env["ir.config_parameter"].sudo().get_param("fusion_billing.nexacloud_dsn") if not dsn: raise UserError("NexaCloud DSN not configured (fusion_billing.nexacloud_dsn).") try: conn = psycopg2.connect(dsn) except Exception as e: # noqa: BLE001 raise UserError("Could not connect to the NexaCloud database: %s" % e) try: conn.set_session(readonly=True) conn.set_client_encoding('UTF8') # invoice descriptions contain non-ASCII (e.g. "×") cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) where = "WHERE i.created_at >= %(since)s" if since else "" cur.execute( "SELECT i.id, i.stripe_invoice_id, i.invoice_number, " "i.user_id AS user_external_id, u.full_name AS partner_name, " "u.company AS partner_company, " "COALESCE(u.billing_email, u.email) AS partner_email, " "i.created_at AS invoice_date, i.currency, i.status, i.subtotal, i.tax, " "i.amount_paid, i.paid_at " "FROM invoices i JOIN users u ON u.id = i.user_id " + where + " ORDER BY i.created_at", {"since": since}) invoices = {str(r["id"]): dict(r, items=[]) for r in cur.fetchall()} if invoices: cur.execute( "SELECT ii.invoice_id, ii.description, ii.quantity, ii.unit_price, ii.amount " "FROM invoice_items ii WHERE ii.invoice_id::text = ANY(%(ids)s)", {"ids": list(invoices.keys())}) for r in cur.fetchall(): inv = invoices.get(str(r["invoice_id"])) if inv: inv["items"].append({ "description": r["description"], "quantity": r["quantity"], "unit_price": r["unit_price"], "amount": r["amount"]}) out = [] for inv in invoices.values(): inv["id"] = str(inv["id"]) inv["user_external_id"] = str(inv["user_external_id"]) out.append(inv) return out except psycopg2.Error as e: raise UserError( "Failed reading NexaCloud invoices — the source schema may have changed. " "Underlying error:\n%s" % e) finally: conn.close() # ----- ingest side (pure Odoo; unit-tested) ------------------------------ @api.model def _ingest_invoices(self, data, post=False, verified=None): """Upsert one account.move per NexaCloud invoice. ``verified`` (optional) maps nc_id -> the dict returned by ``_fc_verify`` (date + paid status taken from the SOURCE billing system). When present for an invoice, the source invoice_date and paid status win over NexaCloud's own (unreliable) fields. Without it, the raw NexaCloud fields are used (manual backfill / dry-run path).""" verified = verified or {} Move = self.env["account.move"] cad = self.env.ref("base.CAD", raise_if_not_found=False) or self.env.company.currency_id summary = {"created": 0, "updated": 0, "posted": 0, "reconciled": 0, "skipped": [], "failed": [], "by_family": {}} for inv in data: nc_id = str(inv.get("id") or "") v = verified.get(nc_id) inv_date = (v or {}).get("invoice_date") or inv.get("invoice_date") try: with self.env.cr.savepoint(): existing = Move.search( [("x_fc_nexacloud_invoice_id", "=", nc_id)], limit=1) if existing and existing.state != "draft": summary["skipped"].append({"id": nc_id, "reason": "already posted"}) continue partner = self._fc_partner_for(inv) if existing: existing.invoice_line_ids.unlink() # draft: replace lines if existing.partner_id != partner: existing.partner_id = partner.id if inv_date and str(existing.invoice_date) != str(inv_date): existing.invoice_date = inv_date move = existing else: move = Move.create({ "move_type": "out_invoice", "partner_id": partner.id, "invoice_date": inv_date, "ref": inv.get("invoice_number"), "currency_id": cad.id, "x_fc_nexacloud_invoice_id": nc_id, "x_fc_stripe_invoice_id": inv.get("stripe_invoice_id"), }) tax = self._fc_tax_for(inv.get("subtotal"), inv.get("tax")) line_vals = [] for it in inv.get("items", []): fam = self._fc_family_for(it.get("description")) summary["by_family"][fam] = round( summary["by_family"].get(fam, 0.0) + float(it.get("amount") or 0.0), 2) line_vals.append((0, 0, { "name": it.get("description") or "NexaCloud", "quantity": float(it.get("quantity") or 1.0), "price_unit": float(it.get("unit_price") or it.get("amount") or 0.0), "account_id": self._fc_income_account(fam).id, "tax_ids": [(6, 0, tax.ids)] if tax else [(5, 0, 0)], })) # Many NexaCloud base-plan invoices store the charge in `subtotal` with # NO invoice_items. Add a balancing line for any gap so the Odoo invoice # total matches what Stripe actually billed (captures un-itemized revenue # and absorbs proration credits where items exceed subtotal). items_total = round(sum(float(it.get("amount") or 0.0) for it in inv.get("items", [])), 2) gap = round(float(inv.get("subtotal") or 0.0) - items_total, 2) if abs(gap) > 0.01: summary["by_family"]["base"] = round( summary["by_family"].get("base", 0.0) + gap, 2) line_vals.append((0, 0, { "name": "NexaCloud base/unitemized charge", "quantity": 1.0, "price_unit": gap, "account_id": self._fc_income_account("base").id, "tax_ids": [(6, 0, tax.ids)] if tax else [(5, 0, 0)], })) if not line_vals: # zero-amount invoice (no items, $0 subtotal) — nothing to record; # drop the empty move (whether just-created or a pre-existing draft). move.unlink() summary["skipped"].append({"id": nc_id, "reason": "zero-amount invoice"}) continue move.write({"invoice_line_ids": line_vals}) summary["updated" if existing else "created"] += 1 if post: if v and inv_date: # accounting date = source invoice date (else Odoo stamps today) move.write({"date": inv_date}) move.action_post() summary["posted"] += 1 if self._fc_reconcile_payment(move, inv, verified=v): summary["reconciled"] += 1 except Exception as e: # noqa: BLE001 - per-invoice isolation _logger.exception("Ledger ingest: invoice %s failed", nc_id) summary["failed"].append({"id": nc_id, "error": "%s: %s" % (type(e).__name__, e)}) return summary @api.model def _post_ingested(self): moves = self.env["account.move"].search([ ("x_fc_nexacloud_invoice_id", "!=", False), ("state", "=", "draft"), ("move_type", "=", "out_invoice")]) posted = 0 for mv in moves: try: with self.env.cr.savepoint(): mv.action_post() posted += 1 except Exception: # noqa: BLE001 _logger.exception("Ledger post: move %s failed", mv.id) return posted @api.model def _post_and_reconcile_paid(self, data): """Post + reconcile ONLY the invoices NexaCloud marks paid, dating the ledger entry to the ORIGINAL invoice date and the payment to the actual paid_at. Leaves unpaid invoices as draft. Per-invoice isolated.""" Move = self.env["account.move"] summary = {"posted": 0, "reconciled": 0, "skipped_unpaid": 0, "skipped_missing": 0, "failed": []} for inv in data: nc_id = str(inv.get("id") or "") paid = float(inv.get("amount_paid") or 0.0) if inv.get("status") != "paid" and paid <= 0: summary["skipped_unpaid"] += 1 continue mv = Move.search([("x_fc_nexacloud_invoice_id", "=", nc_id), ("move_type", "=", "out_invoice")], limit=1) if not mv or not mv.invoice_line_ids: summary["skipped_missing"] += 1 continue try: with self.env.cr.savepoint(): if mv.state == "draft": inv_date = inv.get("invoice_date") # keep the original invoice + accounting date (not today) mv.write({"invoice_date": inv_date, "date": inv_date}) mv.action_post() summary["posted"] += 1 if mv.payment_state not in ("paid", "in_payment", "reversed"): if self._fc_reconcile_payment(mv, inv): summary["reconciled"] += 1 except Exception as e: # noqa: BLE001 - per-invoice isolation _logger.exception("Post+pay: invoice %s failed", nc_id) summary["failed"].append({"id": nc_id, "error": "%s: %s" % (type(e).__name__, e)}) return summary def _cron_sync_verified(self): """Daily go-forward sync (the only safe automatic path). Reads NexaCloud invoices, then for each one not already in the ledger verifies it against its SOURCE billing system (Stripe / Lago) and ingests + posts only verified data: the real invoice date, and a reconciled payment ONLY when the source confirms it is paid. Voids are skipped; anything that cannot be verified is logged and left for the next run (never posted on NexaCloud's own unreliable created_at / status / paid_at). Idempotent — already-posted invoices are left untouched.""" Move = self.env["account.move"] data = self._read_nexacloud_invoices() to_ingest, verified = [], {} summary = {"verified": 0, "skipped_void": 0, "skipped_draft": 0, "already_posted": 0, "unverified": []} for inv in data: nc_id = str(inv.get("id") or "") existing = Move.search( [("x_fc_nexacloud_invoice_id", "=", nc_id), ("move_type", "=", "out_invoice")], limit=1) if existing and existing.state == "posted": summary["already_posted"] += 1 continue v = self._fc_verify(inv) if v is None: summary["unverified"].append(nc_id) continue if v.get("void"): summary["skipped_void"] += 1 continue if v.get("draft"): # not finalized at the source yet — will be picked up once it finalizes summary["skipped_draft"] += 1 continue verified[nc_id] = v to_ingest.append(inv) summary["verified"] += 1 res = self._ingest_invoices(to_ingest, post=True, verified=verified) for k in ("created", "updated", "posted", "reconciled", "failed"): summary[k] = res.get(k) if summary["unverified"]: _logger.warning("Ledger sync: %s invoice(s) unverified, will retry next run: %s", len(summary["unverified"]), summary["unverified"]) _logger.info("Ledger sync summary: %s", summary) return summary # ----- source-of-truth verification (Stripe / Lago) ---------------------- @api.model def _fc_ts_to_date(self, ts): """Unix timestamp (Stripe) -> 'YYYY-MM-DD' (UTC). None/blank-safe (0 = epoch).""" if ts is None or ts == "": return None return datetime.fromtimestamp(int(ts), tz=timezone.utc).date().isoformat() @api.model def _fc_verify(self, inv): """Route an invoice to its source billing system for verification. Returns a dict {invoice_date, void, paid, paid_at, amount_paid} or None if the source can't be determined / reached (caller then leaves it for the next run).""" sid = (inv.get("stripe_invoice_id") or "").strip() if sid.startswith("in_"): return self._fc_verify_stripe(sid) if sid.startswith("lago:"): return self._fc_verify_lago(sid[len("lago:"):]) return None @api.model def _fc_verify_stripe(self, stripe_invoice_id): key = self.env["ir.config_parameter"].sudo().get_param("fusion_billing.stripe_api_key") if not key: return None import requests try: resp = requests.get( "https://api.stripe.com/v1/invoices/%s" % stripe_invoice_id, auth=(key, ""), timeout=20) except Exception: # noqa: BLE001 - network failure: treat as unverifiable _logger.exception("Stripe verify failed for %s", stripe_invoice_id) return None if resp.status_code != 200: _logger.warning("Stripe verify %s -> HTTP %s", stripe_invoice_id, resp.status_code) return None d = resp.json() status = d.get("status") paid_ts = (d.get("status_transitions") or {}).get("paid_at") return { "invoice_date": self._fc_ts_to_date(d.get("created")), "void": status == "void", "draft": status == "draft", # not finalized in Stripe -> not a real invoice yet "paid": status == "paid" or float(d.get("amount_paid") or 0) > 0, "paid_at": self._fc_ts_to_date(paid_ts), "amount_paid": float(d.get("amount_paid") or 0) / 100.0, } @api.model def _fc_verify_lago(self, lago_invoice_id): cp = self.env["ir.config_parameter"].sudo() url = cp.get_param("fusion_billing.lago_api_url") key = cp.get_param("fusion_billing.lago_api_key") if not url or not key: return None import requests try: resp = requests.get( "%s/v1/invoices/%s" % (url.rstrip("/"), lago_invoice_id), headers={"Authorization": "Bearer %s" % key}, timeout=20) except Exception: # noqa: BLE001 - network failure: treat as unverifiable _logger.exception("Lago verify failed for %s", lago_invoice_id) return None if resp.status_code != 200: _logger.warning("Lago verify %s -> HTTP %s", lago_invoice_id, resp.status_code) return None d = (resp.json() or {}).get("invoice") or {} issuing = d.get("issuing_date") # already 'YYYY-MM-DD' return { "invoice_date": issuing, "void": d.get("status") == "voided", "draft": d.get("status") == "draft", # not finalized in Lago yet "paid": d.get("payment_status") == "succeeded", "paid_at": issuing, # Lago exposes no clean paid-at; issuing date is the proxy "amount_paid": float(d.get("total_paid_amount_cents") or 0) / 100.0, } # ----- helpers ------------------------------------------------------------ @api.model def _fc_family_for(self, description): d = (description or "").lower() m = re.match(r"remaining time on (.+?)(?: after| from |\s*\()", d) if m: d = m.group(1) # classify proration by the prorated item for fam, kws in self._FAMILY_KEYWORDS: if any(k in d for k in kws): return fam return "other" @api.model def _fc_income_account(self, family): Account = self.env["account.account"] # Odoo 19 account codes allow only alphanumerics + dots (no hyphen). code = "NCR." + family.upper() acc = Account.search([("code", "=", code)], limit=1) if not acc: acc = Account.create({ "code": code, "name": "NexaCloud %s Revenue" % family.title(), "account_type": "income"}) return acc @api.model def _fc_tax_for(self, subtotal, tax_amount): """Map a NexaCloud invoice's (subtotal, tax) to the Odoo sale tax whose computed tax equals it. Picks by effective percent; falls back to a 0% sale tax.""" Tax = self.env["account.tax"] sub = float(subtotal or 0.0) amt = float(tax_amount or 0.0) if sub <= 0 or amt <= 0: return Tax.search([("type_tax_use", "=", "sale"), ("amount", "=", 0.0)], limit=1) rate = round(100.0 * amt / sub) tax = Tax.search([("type_tax_use", "=", "sale"), ("amount_type", "=", "percent"), ("amount", "=", float(rate))], limit=1) if not tax: tax = Tax.search([("type_tax_use", "=", "sale"), ("name", "ilike", "%s" % rate)], limit=1) return tax @api.model def _fc_partner_for(self, inv): """Resolve the unified partner via the nexacloud account.link (by user id); create partner+link if missing (covers NULL-subscription invoices).""" service = self.env["fusion.billing.service"].search([("code", "=", "nexacloud")], limit=1) if not service: service = self.env["fusion.billing.service"].create( {"name": "NexaCloud", "code": "nexacloud"}) company = (inv.get("partner_company") or "").strip() name = company or inv.get("partner_name") or str(inv.get("user_external_id")) link = self.env["fusion.billing.account.link"]._resolve_or_create_partner( service, str(inv.get("user_external_id")), name=name, email=inv.get("partner_email")) partner = link.partner_id # Name the partner for the BUSINESS (company), not the NexaCloud user's full_name — # one person (e.g. "Gurpreet Singh") can manage several distinct customer businesses. # Rewrite an existing partner so earlier full_name-based names get corrected. if company and (partner.name != company or not partner.is_company): partner.write({"name": company, "is_company": True}) return partner @api.model def _fc_stripe_journal(self): Journal = self.env["account.journal"] j = Journal.search([("code", "=", "NCSTR")], limit=1) if not j: j = Journal.create({"name": "NexaCloud Stripe", "code": "NCSTR", "type": "bank"}) return j @api.model def _fc_reconcile_payment(self, move, inv, verified=None): """Register + reconcile a Stripe payment against a posted invoice. When ``verified`` is given, paid status / amount / date come from the SOURCE system (Stripe/Lago); a payment is created ONLY if the source confirms paid. Without it, NexaCloud's own (unreliable) fields are used (manual/backfill path).""" if move.state != "posted": return False if verified is not None: if not verified.get("paid"): return False amount = verified.get("amount_paid") or move.amount_total payment_date = verified.get("paid_at") or move.invoice_date or fields.Date.today() else: paid = float(inv.get("amount_paid") or 0.0) if inv.get("status") != "paid" and paid <= 0: return False amount = paid or move.amount_total payment_date = inv.get("paid_at") or move.invoice_date or fields.Date.today() reg = self.env["account.payment.register"].with_context( active_model="account.move", active_ids=move.ids).create({ "journal_id": self._fc_stripe_journal().id, "payment_date": payment_date, "amount": amount, }) reg._create_payments() return True @api.model def _fc_prune_metered_shadow(self): """Delete the superseded metered shadow data (shadow sale.orders, NC-* products, NexaCloud charges, reconciliation rows).""" counts = {} subs = self.env["sale.order"].search([("x_fc_shadow", "=", True)]) counts["subscriptions"] = len(subs) subs.unlink() ch = self.env["fusion.billing.charge"].search([]) # before products (charge -> product) counts["charges"] = len(ch) ch.unlink() rec = self.env["fusion.billing.reconciliation"].search([]) counts["reconciliations"] = len(rec) rec.unlink() prods = self.env["product.product"].search([("default_code", "=like", "NC-%")]) counts["products"] = len(prods) try: prods.unlink() except Exception: # noqa: BLE001 - undeletable (referenced) products: archive instead prods.write({"active": False}) counts["products_archived"] = len(prods) return counts