Files
Odoo-Modules/fusion_centralize_billing/wizards/import_wizard.py
gsinghpal 2bdf4ef6a0 feat(billing): 2d dual-run reconciliation (Odoo-computed vs NexaCloud-actual)
fusion.billing.reconciliation gains the compute: _compute_reconciliation
(flat + charge overage vs external, status match/delta at a tolerance) and
_reconcile_rows (resolve shadow sub -> flat + charge, upsert one row per
service/partner/period, per-row isolated). The wizard gains a read-only
_read_reconciliation_rows (NexaCloud usage cpu_hours*3600 + invoice-item
subtotals per YYYY-MM) and a "Run Reconciliation" button. 2a amended to
stamp x_fc_nexacloud_plan_id on shadow subs so reconciliation can find the
charge. Read-only on NexaCloud; writes only reconciliation rows (shadow
guarantees intact). 8 new tests, full suite green on odoo-trial.
2026-05-27 14:34:23 -04:00

448 lines
20 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1
"""NexaCloud → Odoo billing importer (sub-project #2a).
One-time, re-runnable, read-only backfill: read the NexaCloud Postgres and create the
equivalent Odoo records (partners + links, a cpu_seconds charge catalog, one DRAFT
shadow ``sale.order`` per deployment). Shadow-safe by construction — see the design spec
``docs/superpowers/specs/2026-05-27-nexacloud-billing-importer-design.md``.
Logic lives in model methods so it is unit-testable headless; the wizard button only
calls ``_read_nexacloud_rows()`` → ``_import_rows()``.
"""
import json
import logging
from odoo import api, fields, models
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
NEXACLOUD_CODE = "nexacloud"
CPU_METRIC_CODE = "cpu_seconds"
CPU_RATE_PER_CORE_HOUR = 0.0075 # NexaCloud CPU rate, CAD per core-hour
CPU_SECONDS_PER_CORE_HOUR = 3600.0 # one core-hour = 3600 cpu-seconds
class FusionBillingImportWizard(models.TransientModel):
_name = "fusion.billing.import.wizard"
_description = "Fusion Billing — NexaCloud Importer"
dry_run = fields.Boolean(
default=True,
help="Read and report what would be imported, without writing anything.")
result_summary = fields.Text(readonly=True)
failed_count = fields.Integer(readonly=True)
skipped_count = fields.Integer(readonly=True)
def action_run_import(self):
self.ensure_one()
data = self._read_nexacloud_rows()
summary = self._import_rows(data, dry_run=self.dry_run)
failed = summary.get("failed") or []
skipped = summary.get("skipped") or []
self.result_summary = json.dumps(summary, indent=2, default=str)
self.failed_count = len(failed)
self.skipped_count = len(skipped)
# A partial billing import must be loud, not buried in the JSON. Log at ERROR
# so it survives nexa's log_level=warn (INFO is suppressed there).
if failed:
_logger.error("NexaCloud import: %s row(s) FAILED%s: %s",
len(failed), " (dry-run)" if self.dry_run else "", failed)
if skipped:
_logger.warning("NexaCloud import: %s row(s) skipped: %s", len(skipped), skipped)
return {
"type": "ir.actions.act_window",
"res_model": self._name,
"res_id": self.id,
"view_mode": "form",
"target": "new",
}
def action_test_connection(self):
"""Read-only connectivity + schema check: connect, read the source tables, and
report row counts WITHOUT importing anything. The safe first step before a
dry-run — surfaces a bad DSN, no network route, or a schema drift up front."""
self.ensure_one()
data = self._read_nexacloud_rows()
msg = "Connected. Read %s user(s), %s plan(s), %s subscription(s)." % (
len(data.get("users", [])), len(data.get("plans", [])),
len(data.get("subscriptions", [])))
return {
"type": "ir.actions.client",
"tag": "display_notification",
"params": {"title": "NexaCloud connection OK", "message": msg,
"type": "success", "sticky": False},
}
def action_run_reconciliation(self):
"""Read NexaCloud usage + invoice actuals and record per-subscription/period
Odoo-vs-NexaCloud deltas in fusion.billing.reconciliation. Read-only on
NexaCloud; writes only reconciliation rows (shadow-safe)."""
self.ensure_one()
rows = self._read_reconciliation_rows()
summary = self.env["fusion.billing.reconciliation"]._reconcile_rows(rows)
self.result_summary = json.dumps(summary, indent=2, default=str)
self.failed_count = len(summary.get("failed") or [])
self.skipped_count = len(summary.get("skipped") or [])
if summary.get("delta") or summary.get("failed"):
_logger.error(
"NexaCloud reconciliation: %s delta, %s failed, %s skipped row(s): %s",
summary.get("delta"), len(summary.get("failed") or []),
len(summary.get("skipped") or []), summary)
return {
"type": "ir.actions.act_window", "res_model": self._name,
"res_id": self.id, "view_mode": "form", "target": "new",
}
# ----- read side (the ONLY code that touches NexaCloud) ------------------
def _read_nexacloud_rows(self):
"""Open a READ-ONLY psycopg2 connection to the nexacloud Postgres (DSN in
ir.config_parameter 'fusion_billing.nexacloud_dsn') and return rows as dicts.
Raises UserError on a missing DSN or a failed connection."""
import psycopg2
import psycopg2.extras
dsn = self.env["ir.config_parameter"].sudo().get_param(
"fusion_billing.nexacloud_dsn")
if not dsn:
raise UserError(
"NexaCloud DSN not configured. Set the 'fusion_billing.nexacloud_dsn' "
"system parameter to a read-only Postgres connection string.")
try:
conn = psycopg2.connect(dsn)
except Exception as e: # noqa: BLE001 - surface as a user error
raise UserError("Could not connect to the NexaCloud database: %s" % e)
try:
conn.set_session(readonly=True)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
data = {}
cur.execute(
"SELECT id, email, full_name, company, billing_email, billing_address, "
"billing_city, billing_state, billing_postal_code, billing_country, "
"tax_id, stripe_customer_id FROM users")
data["users"] = [dict(r) for r in cur.fetchall()]
cur.execute(
"SELECT id, name, price_monthly, price_yearly, cpu_seconds_quota, "
"is_active FROM plans")
data["plans"] = [dict(r) for r in cur.fetchall()]
cur.execute(
"SELECT id, user_id, deployment_id, plan_id, status, billing_cycle, "
"current_period_start, current_period_end FROM subscriptions")
data["subscriptions"] = [dict(r) for r in cur.fetchall()]
return data
except psycopg2.Error as e:
# A query/schema error (e.g. a renamed/missing column) gets the same clean
# operator message as a connection failure — not a raw SQL traceback. We
# never return a partial `data` (the return is the last statement in `try`).
raise UserError(
"Failed reading from the NexaCloud database — the source schema may "
"have changed. Underlying error:\n%s" % e)
finally:
conn.close()
def _read_reconciliation_rows(self):
"""Read-only: per (subscription, YYYY-MM period), NexaCloud's CPU usage
(cpu_hours*3600 = cpu_seconds) and its actual pre-tax invoice amount. Shaped for
fusion.billing.reconciliation._reconcile_rows. Reuses the 2a DSN + guards.
(Integration glue — validate the SQL against the live schema, like the importer
reader; the reconciliation math itself is unit-tested.)"""
import psycopg2
import psycopg2.extras
dsn = self.env["ir.config_parameter"].sudo().get_param(
"fusion_billing.nexacloud_dsn")
if not dsn:
raise UserError("NexaCloud DSN not configured (fusion_billing.nexacloud_dsn).")
try:
conn = psycopg2.connect(dsn)
except Exception as e: # noqa: BLE001
raise UserError("Could not connect to the NexaCloud database: %s" % e)
try:
conn.set_session(readonly=True)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(
"SELECT subscription_id::text AS sub, "
"to_char(period_start, 'YYYY-MM') AS period, "
"COALESCE(SUM(cpu_hours), 0) * 3600.0 AS cpu_seconds "
"FROM usage_records "
"GROUP BY subscription_id, to_char(period_start, 'YYYY-MM')")
usage = {(r["sub"], r["period"]): float(r["cpu_seconds"] or 0.0)
for r in cur.fetchall()}
cur.execute(
"SELECT i.subscription_id::text AS sub, "
"to_char(ii.period_start, 'YYYY-MM') AS period, "
"COALESCE(SUM(ii.amount), 0) AS external_amount "
"FROM invoices i JOIN invoice_items ii ON ii.invoice_id = i.id "
"GROUP BY i.subscription_id, to_char(ii.period_start, 'YYYY-MM')")
rows = []
for r in cur.fetchall():
key = (r["sub"], r["period"])
rows.append({
"subscription_external_id": r["sub"], "period": r["period"],
"cpu_seconds": usage.get(key, 0.0),
"external_amount": float(r["external_amount"] or 0.0)})
return rows
except psycopg2.Error as e:
raise UserError(
"Failed reading NexaCloud actuals — the source schema may have changed. "
"Underlying error:\n%s" % e)
finally:
conn.close()
# ----- import side (pure Odoo; unit-tested) ------------------------------
@api.model
def _import_rows(self, data, dry_run=False):
"""Upsert NexaCloud rows into Odoo. Idempotent. With dry_run=True the writes
happen inside a savepoint that is rolled back, so nothing persists (the summary
is still returned)."""
if not dry_run:
return self._do_import(data)
result = {}
class _Rollback(Exception):
pass
try:
with self.env.cr.savepoint():
result.update(self._do_import(data))
raise _Rollback()
except _Rollback:
pass
result["dry_run"] = True
return result
@api.model
def _do_import(self, data):
service = self._fc_service()
metric = self._fc_cpu_metric()
recurrence_plans = {
"monthly": self._fc_recurrence_plan("month"),
"yearly": self._fc_recurrence_plan("year"),
}
summary = {"created": {}, "updated": {}, "skipped": [], "failed": []}
partner_by_user = {}
plan_ctx_by_id = {}
for u in data.get("users", []):
try:
with self.env.cr.savepoint():
link, created = self._import_user(service, u)
partner_by_user[str(u["id"])] = link.partner_id
self._bump(summary, created, "partners")
except Exception as e: # noqa: BLE001 - per-row isolation
_logger.exception("NexaCloud import: user row %s failed", u.get("id"))
summary["failed"].append(
{"kind": "user", "id": str(u.get("id")),
"error": "%s: %s" % (type(e).__name__, e)})
for p in data.get("plans", []):
try:
with self.env.cr.savepoint():
ctx, created = self._import_plan(metric, p)
plan_ctx_by_id[str(p["id"])] = ctx
self._bump(summary, created, "plans")
except Exception as e: # noqa: BLE001
_logger.exception("NexaCloud import: plan row %s failed", p.get("id"))
summary["failed"].append(
{"kind": "plan", "id": str(p.get("id")),
"error": "%s: %s" % (type(e).__name__, e)})
for s in data.get("subscriptions", []):
partner = partner_by_user.get(str(s.get("user_id") or ""))
ctx = plan_ctx_by_id.get(str(s.get("plan_id") or ""))
if not partner or not ctx:
summary["skipped"].append({
"kind": "subscription", "id": str(s.get("id")),
"reason": "unresolved %s" % ("user" if not partner else "plan")})
continue
try:
with self.env.cr.savepoint():
_order, created = self._import_subscription(
service, partner, ctx, recurrence_plans, s)
self._bump(summary, created, "subscriptions")
except Exception as e: # noqa: BLE001
_logger.exception("NexaCloud import: subscription row %s failed", s.get("id"))
summary["failed"].append(
{"kind": "subscription", "id": str(s.get("id")),
"error": "%s: %s" % (type(e).__name__, e)})
_logger.info("NexaCloud import summary: %s", summary)
return summary
# ----- find-or-create helpers --------------------------------------------
@api.model
def _fc_service(self):
Service = self.env["fusion.billing.service"]
svc = Service.search([("code", "=", NEXACLOUD_CODE)], limit=1)
return svc or Service.create({"name": "NexaCloud", "code": NEXACLOUD_CODE})
@api.model
def _fc_cpu_metric(self):
Metric = self.env["fusion.billing.metric"]
m = Metric.search([("code", "=", CPU_METRIC_CODE)], limit=1)
return m or Metric.create({
"name": "CPU seconds", "code": CPU_METRIC_CODE,
"aggregation": "sum", "unit_label": "CPU-seconds"})
@api.model
def _fc_recurrence_plan(self, unit):
Plan = self.env["sale.subscription.plan"]
plan = Plan.search(
[("billing_period_value", "=", 1), ("billing_period_unit", "=", unit)], limit=1)
if plan:
return plan
label = "Monthly" if unit == "month" else "Yearly"
return Plan.create(
{"name": label, "billing_period_value": 1, "billing_period_unit": unit})
@api.model
def _fc_resolve_country(self, value):
Country = self.env["res.country"]
if not value:
return Country.browse()
v = value.strip()
return Country.search(
["|", ("code", "=ilike", v), ("name", "=ilike", v)], limit=1)
@staticmethod
def _bump(summary, created, key):
bucket = "created" if created else "updated"
summary[bucket][key] = summary[bucket].get(key, 0) + 1
# ----- per-entity import --------------------------------------------------
@api.model
def _import_user(self, service, urow):
Link = self.env["fusion.billing.account.link"]
ext = str(urow["id"])
email = (urow.get("billing_email") or urow.get("email") or "").strip().lower() or None
name = urow.get("full_name") or urow.get("company") or email or ext
existed = bool(Link.search(
[("service_id", "=", service.id), ("external_id", "=", ext)], limit=1))
link = Link._resolve_or_create_partner(service, ext, name=name, email=email)
vals = {}
if urow.get("billing_address"):
vals["street"] = urow["billing_address"]
if urow.get("billing_city"):
vals["city"] = urow["billing_city"]
if urow.get("billing_postal_code"):
vals["zip"] = urow["billing_postal_code"]
if urow.get("tax_id"):
vals["vat"] = urow["tax_id"]
if urow.get("stripe_customer_id"):
vals["x_fc_stripe_customer_id"] = urow["stripe_customer_id"]
country = self._fc_resolve_country(urow.get("billing_country"))
if country:
vals["country_id"] = country.id
if vals:
link.partner_id.write(vals)
return link, not existed
@api.model
def _import_plan(self, metric, prow):
Product = self.env["product.product"]
Charge = self.env["fusion.billing.charge"]
plan_code = str(prow["id"])
name = prow.get("name") or plan_code
# Preserve NULL vs 0.0: a missing price must NOT silently become a $0 line.
# The subscription import raises on a missing price for its cycle (-> failed[]).
raw_monthly = prow.get("price_monthly")
raw_yearly = prow.get("price_yearly")
price_monthly = float(raw_monthly) if raw_monthly is not None else None
price_yearly = float(raw_yearly) if raw_yearly is not None else None
created = False
sub_code = "NC-PLAN-%s" % plan_code
sub_product = Product.search([("default_code", "=", sub_code)], limit=1)
if not sub_product:
sub_product = Product.create({
"name": "NexaCloud %s" % name, "default_code": sub_code,
"type": "service", "recurring_invoice": True,
"list_price": price_monthly or 0.0})
created = True
ov_code = "NC-CPU-OVG-%s" % plan_code
ov_product = Product.search([("default_code", "=", ov_code)], limit=1)
if not ov_product:
ov_product = Product.create({
"name": "NexaCloud CPU overage (%s)" % name, "default_code": ov_code,
"type": "service", "list_price": 0.0})
charge_vals = {
"name": "NexaCloud CPU overage — %s" % name,
"plan_code": plan_code, "metric_id": metric.id, "product_id": ov_product.id,
"included_quota": float(prow.get("cpu_seconds_quota") or 0.0),
"price_per_unit": CPU_RATE_PER_CORE_HOUR,
"unit_batch": CPU_SECONDS_PER_CORE_HOUR,
"charge_model": "standard",
# Shadow safety guarantee #3: plan_id MUST stay NULL so the rating cron
# never auto-mutates order lines. Set it explicitly (not just omitted) so a
# re-run re-asserts NULL even if someone set it on the charge between runs.
"plan_id": False,
}
charge = Charge.search(
[("plan_code", "=", plan_code), ("metric_id", "=", metric.id)], limit=1)
if charge:
charge.write(charge_vals)
else:
charge = Charge.create(charge_vals)
created = True
return {
"sub_product": sub_product, "overage_product": ov_product, "charge": charge,
"price_monthly": price_monthly, "price_yearly": price_yearly,
}, created
@api.model
def _import_subscription(self, service, partner, plan_ctx, recurrence_plans, srow):
SaleOrder = self.env["sale.order"]
SaleOrderLine = self.env["sale.order.line"]
sub_ext = str(srow["id"])
cycle = (srow.get("billing_cycle") or "").strip().lower()
if cycle not in ("monthly", "yearly"):
raise UserError(
"Subscription %s has an unrecognized billing_cycle %r — cannot pick a "
"plan/price." % (sub_ext, srow.get("billing_cycle")))
rec_plan = recurrence_plans["yearly"] if cycle == "yearly" else recurrence_plans["monthly"]
price = plan_ctx["price_yearly"] if cycle == "yearly" else plan_ctx["price_monthly"]
if price is None:
raise UserError(
"Subscription %s is billed %s but its plan has no %s price." % (
sub_ext, cycle, cycle))
product = plan_ctx["sub_product"]
# x_fc_* are always (re-)written; identity fields (partner_id/plan_id/order_line)
# are set ONLY at creation, so a re-run never rewrites immutable fields on an
# order that may since have been confirmed.
shadow_vals = {
"x_fc_nexacloud_deployment_id": str(srow.get("deployment_id") or ""),
"x_fc_nexacloud_plan_id": str(srow.get("plan_id") or ""),
"x_fc_billing_service_id": service.id, "x_fc_shadow": True,
}
existing = SaleOrder.search(
[("x_fc_nexacloud_subscription_id", "=", sub_ext)], limit=1)
if existing:
existing.write(shadow_vals)
line = existing.order_line.filtered(lambda l: l.product_id == product)
line_vals = {"product_uom_qty": 1, "price_unit": price}
if line:
line.write(line_vals)
else:
SaleOrderLine.create(
dict(order_id=existing.id, product_id=product.id, **line_vals))
order = existing
created = False
else:
order = SaleOrder.create({
"partner_id": partner.id, "plan_id": rec_plan.id,
"x_fc_nexacloud_subscription_id": sub_ext,
"order_line": [(0, 0, {
"product_id": product.id, "product_uom_qty": 1, "price_unit": price})],
**shadow_vals,
})
created = True
# guarantee the explicit price stuck (a pricelist compute may have overwritten it)
line = order.order_line.filtered(lambda l: l.product_id == product)
if line and line.price_unit != price:
line.price_unit = price
return order, created