Files
Odoo-Modules/fusion_centralize_billing/wizards/invoice_ledger.py
gsinghpal e36318f7a5 feat(billing): Stripe/Lago-verified go-forward sync + activate daily cron
The NexaCloud->Odoo ledger now verifies every new invoice against its
SOURCE billing system before posting, instead of trusting NexaCloud's
unreliable created_at/status/paid_at:

- _fc_verify routes by stripe_invoice_id prefix (in_ -> Stripe REST,
  lago: -> Lago REST) and returns source-truth
  {invoice_date, void, draft, paid, paid_at, amount_paid}, or None when it
  can't be determined/reached (left for the next run).
- _ingest_invoices(post=True, verified=...) uses the source invoice date
  (and accounting date), and reconciles a payment ONLY when the source
  confirms paid.
- _cron_sync_verified posts only finalized invoices; skips void + draft,
  logs unverified for retry. Replaces the old _cron_ingest_recent.

Cron cron_fc_invoice_ledger is enabled daily on nexamain. First live run:
23 already-posted, 1 void + 2 Stripe drafts + 5 zero-amount all skipped,
0 new posted, ledger intact at $3,403.46.

Tests: routing/guards (no network), verified date+reconcile, and the cron's
void/draft/unverified filtering (sources patched). FCB_EXIT=0 on odoo-trial.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-27 18:37:36 -04:00

499 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1
"""NexaCloud → Odoo invoice ledger ingester.
Reads NexaCloud's real (Stripe-billed) invoices and creates native Odoo
``account.move`` customer invoices — posted, with the Stripe payments reconciled and
HST modelled — so Odoo is the accounting system of record. Revenue is split by service
family into distinct income accounts. NexaCloud/Stripe keep doing the billing; Odoo
ingests its output. See docs/superpowers/specs/2026-05-27-nexacloud-invoice-ledger-design.md
"""
import json
import logging
import re
from datetime import datetime, timezone
from odoo import api, fields, models
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
class FusionBillingInvoiceLedgerWizard(models.TransientModel):
_name = "fusion.billing.invoice.ledger.wizard"
_description = "Fusion Billing — NexaCloud Invoice Ledger Ingester"
dry_run = fields.Boolean(default=True)
auto_post = fields.Boolean(
default=False, help="Post invoices immediately (else leave draft for review).")
result_summary = fields.Text(readonly=True)
# description keyword -> service family (checked in order; hosting before managed)
_FAMILY_KEYWORDS = [
("hosting", ["odoo erp hosting", "wordpress website hosting"]),
("managed", ["managed"]),
("addons", ["daily backup", "whatsapp", "forms builder", "white label"]),
]
def action_run(self):
self.ensure_one()
data = self._read_nexacloud_invoices()
if self.dry_run:
class _Rollback(Exception):
pass
res = {}
try:
with self.env.cr.savepoint():
res.update(self._ingest_invoices(data, post=False))
raise _Rollback()
except _Rollback:
pass
res["dry_run"] = True
else:
res = self._ingest_invoices(data, post=self.auto_post)
self.result_summary = json.dumps(res, indent=2, default=str)
if res.get("failed"):
_logger.error("Ledger ingest: %s failed: %s", len(res["failed"]), res["failed"])
return {"type": "ir.actions.act_window", "res_model": self._name,
"res_id": self.id, "view_mode": "form", "target": "new"}
# ----- read side (the ONLY code that touches NexaCloud) ------------------
def _read_nexacloud_invoices(self, since=None):
import psycopg2
import psycopg2.extras
dsn = self.env["ir.config_parameter"].sudo().get_param("fusion_billing.nexacloud_dsn")
if not dsn:
raise UserError("NexaCloud DSN not configured (fusion_billing.nexacloud_dsn).")
try:
conn = psycopg2.connect(dsn)
except Exception as e: # noqa: BLE001
raise UserError("Could not connect to the NexaCloud database: %s" % e)
try:
conn.set_session(readonly=True)
conn.set_client_encoding('UTF8') # invoice descriptions contain non-ASCII (e.g. "×")
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
where = "WHERE i.created_at >= %(since)s" if since else ""
cur.execute(
"SELECT i.id, i.stripe_invoice_id, i.invoice_number, "
"i.user_id AS user_external_id, u.full_name AS partner_name, "
"u.company AS partner_company, "
"COALESCE(u.billing_email, u.email) AS partner_email, "
"i.created_at AS invoice_date, i.currency, i.status, i.subtotal, i.tax, "
"i.amount_paid, i.paid_at "
"FROM invoices i JOIN users u ON u.id = i.user_id " + where +
" ORDER BY i.created_at", {"since": since})
invoices = {str(r["id"]): dict(r, items=[]) for r in cur.fetchall()}
if invoices:
cur.execute(
"SELECT ii.invoice_id, ii.description, ii.quantity, ii.unit_price, ii.amount "
"FROM invoice_items ii WHERE ii.invoice_id::text = ANY(%(ids)s)",
{"ids": list(invoices.keys())})
for r in cur.fetchall():
inv = invoices.get(str(r["invoice_id"]))
if inv:
inv["items"].append({
"description": r["description"], "quantity": r["quantity"],
"unit_price": r["unit_price"], "amount": r["amount"]})
out = []
for inv in invoices.values():
inv["id"] = str(inv["id"])
inv["user_external_id"] = str(inv["user_external_id"])
out.append(inv)
return out
except psycopg2.Error as e:
raise UserError(
"Failed reading NexaCloud invoices — the source schema may have changed. "
"Underlying error:\n%s" % e)
finally:
conn.close()
# ----- ingest side (pure Odoo; unit-tested) ------------------------------
@api.model
def _ingest_invoices(self, data, post=False, verified=None):
"""Upsert one account.move per NexaCloud invoice.
``verified`` (optional) maps nc_id -> the dict returned by ``_fc_verify``
(date + paid status taken from the SOURCE billing system). When present for
an invoice, the source invoice_date and paid status win over NexaCloud's own
(unreliable) fields. Without it, the raw NexaCloud fields are used (manual
backfill / dry-run path)."""
verified = verified or {}
Move = self.env["account.move"]
cad = self.env.ref("base.CAD", raise_if_not_found=False) or self.env.company.currency_id
summary = {"created": 0, "updated": 0, "posted": 0, "reconciled": 0,
"skipped": [], "failed": [], "by_family": {}}
for inv in data:
nc_id = str(inv.get("id") or "")
v = verified.get(nc_id)
inv_date = (v or {}).get("invoice_date") or inv.get("invoice_date")
try:
with self.env.cr.savepoint():
existing = Move.search(
[("x_fc_nexacloud_invoice_id", "=", nc_id)], limit=1)
if existing and existing.state != "draft":
summary["skipped"].append({"id": nc_id, "reason": "already posted"})
continue
partner = self._fc_partner_for(inv)
if existing:
existing.invoice_line_ids.unlink() # draft: replace lines
if existing.partner_id != partner:
existing.partner_id = partner.id
if inv_date and str(existing.invoice_date) != str(inv_date):
existing.invoice_date = inv_date
move = existing
else:
move = Move.create({
"move_type": "out_invoice",
"partner_id": partner.id,
"invoice_date": inv_date,
"ref": inv.get("invoice_number"),
"currency_id": cad.id,
"x_fc_nexacloud_invoice_id": nc_id,
"x_fc_stripe_invoice_id": inv.get("stripe_invoice_id"),
})
tax = self._fc_tax_for(inv.get("subtotal"), inv.get("tax"))
line_vals = []
for it in inv.get("items", []):
fam = self._fc_family_for(it.get("description"))
summary["by_family"][fam] = round(
summary["by_family"].get(fam, 0.0) + float(it.get("amount") or 0.0), 2)
line_vals.append((0, 0, {
"name": it.get("description") or "NexaCloud",
"quantity": float(it.get("quantity") or 1.0),
"price_unit": float(it.get("unit_price") or it.get("amount") or 0.0),
"account_id": self._fc_income_account(fam).id,
"tax_ids": [(6, 0, tax.ids)] if tax else [(5, 0, 0)],
}))
# Many NexaCloud base-plan invoices store the charge in `subtotal` with
# NO invoice_items. Add a balancing line for any gap so the Odoo invoice
# total matches what Stripe actually billed (captures un-itemized revenue
# and absorbs proration credits where items exceed subtotal).
items_total = round(sum(float(it.get("amount") or 0.0)
for it in inv.get("items", [])), 2)
gap = round(float(inv.get("subtotal") or 0.0) - items_total, 2)
if abs(gap) > 0.01:
summary["by_family"]["base"] = round(
summary["by_family"].get("base", 0.0) + gap, 2)
line_vals.append((0, 0, {
"name": "NexaCloud base/unitemized charge",
"quantity": 1.0, "price_unit": gap,
"account_id": self._fc_income_account("base").id,
"tax_ids": [(6, 0, tax.ids)] if tax else [(5, 0, 0)],
}))
if not line_vals:
# zero-amount invoice (no items, $0 subtotal) — nothing to record;
# drop the empty move (whether just-created or a pre-existing draft).
move.unlink()
summary["skipped"].append({"id": nc_id, "reason": "zero-amount invoice"})
continue
move.write({"invoice_line_ids": line_vals})
summary["updated" if existing else "created"] += 1
if post:
if v and inv_date:
# accounting date = source invoice date (else Odoo stamps today)
move.write({"date": inv_date})
move.action_post()
summary["posted"] += 1
if self._fc_reconcile_payment(move, inv, verified=v):
summary["reconciled"] += 1
except Exception as e: # noqa: BLE001 - per-invoice isolation
_logger.exception("Ledger ingest: invoice %s failed", nc_id)
summary["failed"].append({"id": nc_id, "error": "%s: %s" % (type(e).__name__, e)})
return summary
@api.model
def _post_ingested(self):
moves = self.env["account.move"].search([
("x_fc_nexacloud_invoice_id", "!=", False),
("state", "=", "draft"), ("move_type", "=", "out_invoice")])
posted = 0
for mv in moves:
try:
with self.env.cr.savepoint():
mv.action_post()
posted += 1
except Exception: # noqa: BLE001
_logger.exception("Ledger post: move %s failed", mv.id)
return posted
@api.model
def _post_and_reconcile_paid(self, data):
"""Post + reconcile ONLY the invoices NexaCloud marks paid, dating the ledger entry
to the ORIGINAL invoice date and the payment to the actual paid_at. Leaves unpaid
invoices as draft. Per-invoice isolated."""
Move = self.env["account.move"]
summary = {"posted": 0, "reconciled": 0, "skipped_unpaid": 0,
"skipped_missing": 0, "failed": []}
for inv in data:
nc_id = str(inv.get("id") or "")
paid = float(inv.get("amount_paid") or 0.0)
if inv.get("status") != "paid" and paid <= 0:
summary["skipped_unpaid"] += 1
continue
mv = Move.search([("x_fc_nexacloud_invoice_id", "=", nc_id),
("move_type", "=", "out_invoice")], limit=1)
if not mv or not mv.invoice_line_ids:
summary["skipped_missing"] += 1
continue
try:
with self.env.cr.savepoint():
if mv.state == "draft":
inv_date = inv.get("invoice_date")
# keep the original invoice + accounting date (not today)
mv.write({"invoice_date": inv_date, "date": inv_date})
mv.action_post()
summary["posted"] += 1
if mv.payment_state not in ("paid", "in_payment", "reversed"):
if self._fc_reconcile_payment(mv, inv):
summary["reconciled"] += 1
except Exception as e: # noqa: BLE001 - per-invoice isolation
_logger.exception("Post+pay: invoice %s failed", nc_id)
summary["failed"].append({"id": nc_id, "error": "%s: %s" % (type(e).__name__, e)})
return summary
def _cron_sync_verified(self):
"""Daily go-forward sync (the only safe automatic path).
Reads NexaCloud invoices, then for each one not already in the ledger verifies
it against its SOURCE billing system (Stripe / Lago) and ingests + posts only
verified data: the real invoice date, and a reconciled payment ONLY when the
source confirms it is paid. Voids are skipped; anything that cannot be verified
is logged and left for the next run (never posted on NexaCloud's own unreliable
created_at / status / paid_at). Idempotent — already-posted invoices are left
untouched."""
Move = self.env["account.move"]
data = self._read_nexacloud_invoices()
to_ingest, verified = [], {}
summary = {"verified": 0, "skipped_void": 0, "skipped_draft": 0,
"already_posted": 0, "unverified": []}
for inv in data:
nc_id = str(inv.get("id") or "")
existing = Move.search(
[("x_fc_nexacloud_invoice_id", "=", nc_id),
("move_type", "=", "out_invoice")], limit=1)
if existing and existing.state == "posted":
summary["already_posted"] += 1
continue
v = self._fc_verify(inv)
if v is None:
summary["unverified"].append(nc_id)
continue
if v.get("void"):
summary["skipped_void"] += 1
continue
if v.get("draft"):
# not finalized at the source yet — will be picked up once it finalizes
summary["skipped_draft"] += 1
continue
verified[nc_id] = v
to_ingest.append(inv)
summary["verified"] += 1
res = self._ingest_invoices(to_ingest, post=True, verified=verified)
for k in ("created", "updated", "posted", "reconciled", "failed"):
summary[k] = res.get(k)
if summary["unverified"]:
_logger.warning("Ledger sync: %s invoice(s) unverified, will retry next run: %s",
len(summary["unverified"]), summary["unverified"])
_logger.info("Ledger sync summary: %s", summary)
return summary
# ----- source-of-truth verification (Stripe / Lago) ----------------------
@api.model
def _fc_ts_to_date(self, ts):
"""Unix timestamp (Stripe) -> 'YYYY-MM-DD' (UTC). None/blank-safe (0 = epoch)."""
if ts is None or ts == "":
return None
return datetime.fromtimestamp(int(ts), tz=timezone.utc).date().isoformat()
@api.model
def _fc_verify(self, inv):
"""Route an invoice to its source billing system for verification.
Returns a dict {invoice_date, void, paid, paid_at, amount_paid} or None if the
source can't be determined / reached (caller then leaves it for the next run)."""
sid = (inv.get("stripe_invoice_id") or "").strip()
if sid.startswith("in_"):
return self._fc_verify_stripe(sid)
if sid.startswith("lago:"):
return self._fc_verify_lago(sid[len("lago:"):])
return None
@api.model
def _fc_verify_stripe(self, stripe_invoice_id):
key = self.env["ir.config_parameter"].sudo().get_param("fusion_billing.stripe_api_key")
if not key:
return None
import requests
try:
resp = requests.get(
"https://api.stripe.com/v1/invoices/%s" % stripe_invoice_id,
auth=(key, ""), timeout=20)
except Exception: # noqa: BLE001 - network failure: treat as unverifiable
_logger.exception("Stripe verify failed for %s", stripe_invoice_id)
return None
if resp.status_code != 200:
_logger.warning("Stripe verify %s -> HTTP %s", stripe_invoice_id, resp.status_code)
return None
d = resp.json()
status = d.get("status")
paid_ts = (d.get("status_transitions") or {}).get("paid_at")
return {
"invoice_date": self._fc_ts_to_date(d.get("created")),
"void": status == "void",
"draft": status == "draft", # not finalized in Stripe -> not a real invoice yet
"paid": status == "paid" or float(d.get("amount_paid") or 0) > 0,
"paid_at": self._fc_ts_to_date(paid_ts),
"amount_paid": float(d.get("amount_paid") or 0) / 100.0,
}
@api.model
def _fc_verify_lago(self, lago_invoice_id):
cp = self.env["ir.config_parameter"].sudo()
url = cp.get_param("fusion_billing.lago_api_url")
key = cp.get_param("fusion_billing.lago_api_key")
if not url or not key:
return None
import requests
try:
resp = requests.get(
"%s/v1/invoices/%s" % (url.rstrip("/"), lago_invoice_id),
headers={"Authorization": "Bearer %s" % key}, timeout=20)
except Exception: # noqa: BLE001 - network failure: treat as unverifiable
_logger.exception("Lago verify failed for %s", lago_invoice_id)
return None
if resp.status_code != 200:
_logger.warning("Lago verify %s -> HTTP %s", lago_invoice_id, resp.status_code)
return None
d = (resp.json() or {}).get("invoice") or {}
issuing = d.get("issuing_date") # already 'YYYY-MM-DD'
return {
"invoice_date": issuing,
"void": d.get("status") == "voided",
"draft": d.get("status") == "draft", # not finalized in Lago yet
"paid": d.get("payment_status") == "succeeded",
"paid_at": issuing, # Lago exposes no clean paid-at; issuing date is the proxy
"amount_paid": float(d.get("total_paid_amount_cents") or 0) / 100.0,
}
# ----- helpers ------------------------------------------------------------
@api.model
def _fc_family_for(self, description):
d = (description or "").lower()
m = re.match(r"remaining time on (.+?)(?: after| from |\s*\()", d)
if m:
d = m.group(1) # classify proration by the prorated item
for fam, kws in self._FAMILY_KEYWORDS:
if any(k in d for k in kws):
return fam
return "other"
@api.model
def _fc_income_account(self, family):
Account = self.env["account.account"]
# Odoo 19 account codes allow only alphanumerics + dots (no hyphen).
code = "NCR." + family.upper()
acc = Account.search([("code", "=", code)], limit=1)
if not acc:
acc = Account.create({
"code": code, "name": "NexaCloud %s Revenue" % family.title(),
"account_type": "income"})
return acc
@api.model
def _fc_tax_for(self, subtotal, tax_amount):
"""Map a NexaCloud invoice's (subtotal, tax) to the Odoo sale tax whose computed
tax equals it. Picks by effective percent; falls back to a 0% sale tax."""
Tax = self.env["account.tax"]
sub = float(subtotal or 0.0)
amt = float(tax_amount or 0.0)
if sub <= 0 or amt <= 0:
return Tax.search([("type_tax_use", "=", "sale"), ("amount", "=", 0.0)], limit=1)
rate = round(100.0 * amt / sub)
tax = Tax.search([("type_tax_use", "=", "sale"), ("amount_type", "=", "percent"),
("amount", "=", float(rate))], limit=1)
if not tax:
tax = Tax.search([("type_tax_use", "=", "sale"), ("name", "ilike", "%s" % rate)], limit=1)
return tax
@api.model
def _fc_partner_for(self, inv):
"""Resolve the unified partner via the nexacloud account.link (by user id);
create partner+link if missing (covers NULL-subscription invoices)."""
service = self.env["fusion.billing.service"].search([("code", "=", "nexacloud")], limit=1)
if not service:
service = self.env["fusion.billing.service"].create(
{"name": "NexaCloud", "code": "nexacloud"})
company = (inv.get("partner_company") or "").strip()
name = company or inv.get("partner_name") or str(inv.get("user_external_id"))
link = self.env["fusion.billing.account.link"]._resolve_or_create_partner(
service, str(inv.get("user_external_id")), name=name, email=inv.get("partner_email"))
partner = link.partner_id
# Name the partner for the BUSINESS (company), not the NexaCloud user's full_name —
# one person (e.g. "Gurpreet Singh") can manage several distinct customer businesses.
# Rewrite an existing partner so earlier full_name-based names get corrected.
if company and (partner.name != company or not partner.is_company):
partner.write({"name": company, "is_company": True})
return partner
@api.model
def _fc_stripe_journal(self):
Journal = self.env["account.journal"]
j = Journal.search([("code", "=", "NCSTR")], limit=1)
if not j:
j = Journal.create({"name": "NexaCloud Stripe", "code": "NCSTR", "type": "bank"})
return j
@api.model
def _fc_reconcile_payment(self, move, inv, verified=None):
"""Register + reconcile a Stripe payment against a posted invoice.
When ``verified`` is given, paid status / amount / date come from the SOURCE
system (Stripe/Lago); a payment is created ONLY if the source confirms paid.
Without it, NexaCloud's own (unreliable) fields are used (manual/backfill path)."""
if move.state != "posted":
return False
if verified is not None:
if not verified.get("paid"):
return False
amount = verified.get("amount_paid") or move.amount_total
payment_date = verified.get("paid_at") or move.invoice_date or fields.Date.today()
else:
paid = float(inv.get("amount_paid") or 0.0)
if inv.get("status") != "paid" and paid <= 0:
return False
amount = paid or move.amount_total
payment_date = inv.get("paid_at") or move.invoice_date or fields.Date.today()
reg = self.env["account.payment.register"].with_context(
active_model="account.move", active_ids=move.ids).create({
"journal_id": self._fc_stripe_journal().id,
"payment_date": payment_date,
"amount": amount,
})
reg._create_payments()
return True
@api.model
def _fc_prune_metered_shadow(self):
"""Delete the superseded metered shadow data (shadow sale.orders, NC-* products,
NexaCloud charges, reconciliation rows)."""
counts = {}
subs = self.env["sale.order"].search([("x_fc_shadow", "=", True)])
counts["subscriptions"] = len(subs)
subs.unlink()
ch = self.env["fusion.billing.charge"].search([]) # before products (charge -> product)
counts["charges"] = len(ch)
ch.unlink()
rec = self.env["fusion.billing.reconciliation"].search([])
counts["reconciliations"] = len(rec)
rec.unlink()
prods = self.env["product.product"].search([("default_code", "=like", "NC-%")])
counts["products"] = len(prods)
try:
prods.unlink()
except Exception: # noqa: BLE001 - undeletable (referenced) products: archive instead
prods.write({"active": False})
counts["products_archived"] = len(prods)
return counts