Files
Odoo-Modules/fusion_centralize_billing/wizards/invoice_ledger.py

349 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1
"""NexaCloud → Odoo invoice ledger ingester.
Reads NexaCloud's real (Stripe-billed) invoices and creates native Odoo
``account.move`` customer invoices — posted, with the Stripe payments reconciled and
HST modelled — so Odoo is the accounting system of record. Revenue is split by service
family into distinct income accounts. NexaCloud/Stripe keep doing the billing; Odoo
ingests its output. See docs/superpowers/specs/2026-05-27-nexacloud-invoice-ledger-design.md
"""
import json
import logging
import re
from datetime import timedelta
from odoo import api, fields, models
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
class FusionBillingInvoiceLedgerWizard(models.TransientModel):
_name = "fusion.billing.invoice.ledger.wizard"
_description = "Fusion Billing — NexaCloud Invoice Ledger Ingester"
dry_run = fields.Boolean(default=True)
auto_post = fields.Boolean(
default=False, help="Post invoices immediately (else leave draft for review).")
result_summary = fields.Text(readonly=True)
# description keyword -> service family (checked in order; hosting before managed)
_FAMILY_KEYWORDS = [
("hosting", ["odoo erp hosting", "wordpress website hosting"]),
("managed", ["managed"]),
("addons", ["daily backup", "whatsapp", "forms builder", "white label"]),
]
def action_run(self):
self.ensure_one()
data = self._read_nexacloud_invoices()
if self.dry_run:
class _Rollback(Exception):
pass
res = {}
try:
with self.env.cr.savepoint():
res.update(self._ingest_invoices(data, post=False))
raise _Rollback()
except _Rollback:
pass
res["dry_run"] = True
else:
res = self._ingest_invoices(data, post=self.auto_post)
self.result_summary = json.dumps(res, indent=2, default=str)
if res.get("failed"):
_logger.error("Ledger ingest: %s failed: %s", len(res["failed"]), res["failed"])
return {"type": "ir.actions.act_window", "res_model": self._name,
"res_id": self.id, "view_mode": "form", "target": "new"}
# ----- read side (the ONLY code that touches NexaCloud) ------------------
def _read_nexacloud_invoices(self, since=None):
import psycopg2
import psycopg2.extras
dsn = self.env["ir.config_parameter"].sudo().get_param("fusion_billing.nexacloud_dsn")
if not dsn:
raise UserError("NexaCloud DSN not configured (fusion_billing.nexacloud_dsn).")
try:
conn = psycopg2.connect(dsn)
except Exception as e: # noqa: BLE001
raise UserError("Could not connect to the NexaCloud database: %s" % e)
try:
conn.set_session(readonly=True)
conn.set_client_encoding('UTF8') # invoice descriptions contain non-ASCII (e.g. "×")
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
where = "WHERE i.created_at >= %(since)s" if since else ""
cur.execute(
"SELECT i.id, i.stripe_invoice_id, i.invoice_number, "
"i.user_id AS user_external_id, u.full_name AS partner_name, "
"u.company AS partner_company, "
"COALESCE(u.billing_email, u.email) AS partner_email, "
"i.created_at AS invoice_date, i.currency, i.status, i.subtotal, i.tax, "
"i.amount_paid, i.paid_at "
"FROM invoices i JOIN users u ON u.id = i.user_id " + where +
" ORDER BY i.created_at", {"since": since})
invoices = {str(r["id"]): dict(r, items=[]) for r in cur.fetchall()}
if invoices:
cur.execute(
"SELECT ii.invoice_id, ii.description, ii.quantity, ii.unit_price, ii.amount "
"FROM invoice_items ii WHERE ii.invoice_id::text = ANY(%(ids)s)",
{"ids": list(invoices.keys())})
for r in cur.fetchall():
inv = invoices.get(str(r["invoice_id"]))
if inv:
inv["items"].append({
"description": r["description"], "quantity": r["quantity"],
"unit_price": r["unit_price"], "amount": r["amount"]})
out = []
for inv in invoices.values():
inv["id"] = str(inv["id"])
inv["user_external_id"] = str(inv["user_external_id"])
out.append(inv)
return out
except psycopg2.Error as e:
raise UserError(
"Failed reading NexaCloud invoices — the source schema may have changed. "
"Underlying error:\n%s" % e)
finally:
conn.close()
# ----- ingest side (pure Odoo; unit-tested) ------------------------------
@api.model
def _ingest_invoices(self, data, post=False):
Move = self.env["account.move"]
cad = self.env.ref("base.CAD", raise_if_not_found=False) or self.env.company.currency_id
summary = {"created": 0, "updated": 0, "posted": 0,
"skipped": [], "failed": [], "by_family": {}}
for inv in data:
nc_id = str(inv.get("id") or "")
try:
with self.env.cr.savepoint():
existing = Move.search(
[("x_fc_nexacloud_invoice_id", "=", nc_id)], limit=1)
if existing and existing.state != "draft":
summary["skipped"].append({"id": nc_id, "reason": "already posted"})
continue
partner = self._fc_partner_for(inv)
if existing:
existing.invoice_line_ids.unlink() # draft: replace lines
if existing.partner_id != partner:
existing.partner_id = partner.id
move = existing
else:
move = Move.create({
"move_type": "out_invoice",
"partner_id": partner.id,
"invoice_date": inv.get("invoice_date"),
"ref": inv.get("invoice_number"),
"currency_id": cad.id,
"x_fc_nexacloud_invoice_id": nc_id,
"x_fc_stripe_invoice_id": inv.get("stripe_invoice_id"),
})
tax = self._fc_tax_for(inv.get("subtotal"), inv.get("tax"))
line_vals = []
for it in inv.get("items", []):
fam = self._fc_family_for(it.get("description"))
summary["by_family"][fam] = round(
summary["by_family"].get(fam, 0.0) + float(it.get("amount") or 0.0), 2)
line_vals.append((0, 0, {
"name": it.get("description") or "NexaCloud",
"quantity": float(it.get("quantity") or 1.0),
"price_unit": float(it.get("unit_price") or it.get("amount") or 0.0),
"account_id": self._fc_income_account(fam).id,
"tax_ids": [(6, 0, tax.ids)] if tax else [(5, 0, 0)],
}))
# Many NexaCloud base-plan invoices store the charge in `subtotal` with
# NO invoice_items. Add a balancing line for any gap so the Odoo invoice
# total matches what Stripe actually billed (captures un-itemized revenue
# and absorbs proration credits where items exceed subtotal).
items_total = round(sum(float(it.get("amount") or 0.0)
for it in inv.get("items", [])), 2)
gap = round(float(inv.get("subtotal") or 0.0) - items_total, 2)
if abs(gap) > 0.01:
summary["by_family"]["base"] = round(
summary["by_family"].get("base", 0.0) + gap, 2)
line_vals.append((0, 0, {
"name": "NexaCloud base/unitemized charge",
"quantity": 1.0, "price_unit": gap,
"account_id": self._fc_income_account("base").id,
"tax_ids": [(6, 0, tax.ids)] if tax else [(5, 0, 0)],
}))
if not line_vals:
# zero-amount invoice (no items, $0 subtotal) — nothing to record;
# drop the empty move (whether just-created or a pre-existing draft).
move.unlink()
summary["skipped"].append({"id": nc_id, "reason": "zero-amount invoice"})
continue
move.write({"invoice_line_ids": line_vals})
summary["updated" if existing else "created"] += 1
if post:
move.action_post()
summary["posted"] += 1
self._fc_reconcile_payment(move, inv)
except Exception as e: # noqa: BLE001 - per-invoice isolation
_logger.exception("Ledger ingest: invoice %s failed", nc_id)
summary["failed"].append({"id": nc_id, "error": "%s: %s" % (type(e).__name__, e)})
return summary
@api.model
def _post_ingested(self):
moves = self.env["account.move"].search([
("x_fc_nexacloud_invoice_id", "!=", False),
("state", "=", "draft"), ("move_type", "=", "out_invoice")])
posted = 0
for mv in moves:
try:
with self.env.cr.savepoint():
mv.action_post()
posted += 1
except Exception: # noqa: BLE001
_logger.exception("Ledger post: move %s failed", mv.id)
return posted
@api.model
def _post_and_reconcile_paid(self, data):
"""Post + reconcile ONLY the invoices NexaCloud marks paid, dating the ledger entry
to the ORIGINAL invoice date and the payment to the actual paid_at. Leaves unpaid
invoices as draft. Per-invoice isolated."""
Move = self.env["account.move"]
summary = {"posted": 0, "reconciled": 0, "skipped_unpaid": 0,
"skipped_missing": 0, "failed": []}
for inv in data:
nc_id = str(inv.get("id") or "")
paid = float(inv.get("amount_paid") or 0.0)
if inv.get("status") != "paid" and paid <= 0:
summary["skipped_unpaid"] += 1
continue
mv = Move.search([("x_fc_nexacloud_invoice_id", "=", nc_id),
("move_type", "=", "out_invoice")], limit=1)
if not mv or not mv.invoice_line_ids:
summary["skipped_missing"] += 1
continue
try:
with self.env.cr.savepoint():
if mv.state == "draft":
inv_date = inv.get("invoice_date")
# keep the original invoice + accounting date (not today)
mv.write({"invoice_date": inv_date, "date": inv_date})
mv.action_post()
summary["posted"] += 1
if mv.payment_state not in ("paid", "in_payment", "reversed"):
if self._fc_reconcile_payment(mv, inv):
summary["reconciled"] += 1
except Exception as e: # noqa: BLE001 - per-invoice isolation
_logger.exception("Post+pay: invoice %s failed", nc_id)
summary["failed"].append({"id": nc_id, "error": "%s: %s" % (type(e).__name__, e)})
return summary
def _cron_ingest_recent(self):
since = fields.Datetime.to_string(fields.Datetime.now() - timedelta(days=2))
return self._ingest_invoices(self._read_nexacloud_invoices(since=since), post=True)
# ----- helpers ------------------------------------------------------------
@api.model
def _fc_family_for(self, description):
d = (description or "").lower()
m = re.match(r"remaining time on (.+?)(?: after| from |\s*\()", d)
if m:
d = m.group(1) # classify proration by the prorated item
for fam, kws in self._FAMILY_KEYWORDS:
if any(k in d for k in kws):
return fam
return "other"
@api.model
def _fc_income_account(self, family):
Account = self.env["account.account"]
# Odoo 19 account codes allow only alphanumerics + dots (no hyphen).
code = "NCR." + family.upper()
acc = Account.search([("code", "=", code)], limit=1)
if not acc:
acc = Account.create({
"code": code, "name": "NexaCloud %s Revenue" % family.title(),
"account_type": "income"})
return acc
@api.model
def _fc_tax_for(self, subtotal, tax_amount):
"""Map a NexaCloud invoice's (subtotal, tax) to the Odoo sale tax whose computed
tax equals it. Picks by effective percent; falls back to a 0% sale tax."""
Tax = self.env["account.tax"]
sub = float(subtotal or 0.0)
amt = float(tax_amount or 0.0)
if sub <= 0 or amt <= 0:
return Tax.search([("type_tax_use", "=", "sale"), ("amount", "=", 0.0)], limit=1)
rate = round(100.0 * amt / sub)
tax = Tax.search([("type_tax_use", "=", "sale"), ("amount_type", "=", "percent"),
("amount", "=", float(rate))], limit=1)
if not tax:
tax = Tax.search([("type_tax_use", "=", "sale"), ("name", "ilike", "%s" % rate)], limit=1)
return tax
@api.model
def _fc_partner_for(self, inv):
"""Resolve the unified partner via the nexacloud account.link (by user id);
create partner+link if missing (covers NULL-subscription invoices)."""
service = self.env["fusion.billing.service"].search([("code", "=", "nexacloud")], limit=1)
if not service:
service = self.env["fusion.billing.service"].create(
{"name": "NexaCloud", "code": "nexacloud"})
company = (inv.get("partner_company") or "").strip()
name = company or inv.get("partner_name") or str(inv.get("user_external_id"))
link = self.env["fusion.billing.account.link"]._resolve_or_create_partner(
service, str(inv.get("user_external_id")), name=name, email=inv.get("partner_email"))
partner = link.partner_id
# Name the partner for the BUSINESS (company), not the NexaCloud user's full_name —
# one person (e.g. "Gurpreet Singh") can manage several distinct customer businesses.
# Rewrite an existing partner so earlier full_name-based names get corrected.
if company and (partner.name != company or not partner.is_company):
partner.write({"name": company, "is_company": True})
return partner
@api.model
def _fc_stripe_journal(self):
Journal = self.env["account.journal"]
j = Journal.search([("code", "=", "NCSTR")], limit=1)
if not j:
j = Journal.create({"name": "NexaCloud Stripe", "code": "NCSTR", "type": "bank"})
return j
@api.model
def _fc_reconcile_payment(self, move, inv):
paid = float(inv.get("amount_paid") or 0.0)
if (inv.get("status") != "paid" and paid <= 0) or move.state != "posted":
return False
reg = self.env["account.payment.register"].with_context(
active_model="account.move", active_ids=move.ids).create({
"journal_id": self._fc_stripe_journal().id,
"payment_date": inv.get("paid_at") or move.invoice_date or fields.Date.today(),
"amount": paid or move.amount_total,
})
reg._create_payments()
return True
@api.model
def _fc_prune_metered_shadow(self):
"""Delete the superseded metered shadow data (shadow sale.orders, NC-* products,
NexaCloud charges, reconciliation rows)."""
counts = {}
subs = self.env["sale.order"].search([("x_fc_shadow", "=", True)])
counts["subscriptions"] = len(subs)
subs.unlink()
ch = self.env["fusion.billing.charge"].search([]) # before products (charge -> product)
counts["charges"] = len(ch)
ch.unlink()
rec = self.env["fusion.billing.reconciliation"].search([])
counts["reconciliations"] = len(rec)
rec.unlink()
prods = self.env["product.product"].search([("default_code", "=like", "NC-%")])
counts["products"] = len(prods)
try:
prods.unlink()
except Exception: # noqa: BLE001 - undeletable (referenced) products: archive instead
prods.write({"active": False})
counts["products_archived"] = len(prods)
return counts