# -*- coding: utf-8 -*- # Copyright 2026 Nexa Systems Inc. # License OPL-1 """NexaCloud → Odoo billing importer (sub-project #2a). One-time, re-runnable, read-only backfill: read the NexaCloud Postgres and create the equivalent Odoo records (partners + links, a cpu_seconds charge catalog, one DRAFT shadow ``sale.order`` per deployment). Shadow-safe by construction — see the design spec ``docs/superpowers/specs/2026-05-27-nexacloud-billing-importer-design.md``. Logic lives in model methods so it is unit-testable headless; the wizard button only calls ``_read_nexacloud_rows()`` → ``_import_rows()``. """ import json import logging from odoo import api, fields, models from odoo.exceptions import UserError _logger = logging.getLogger(__name__) NEXACLOUD_CODE = "nexacloud" CPU_METRIC_CODE = "cpu_seconds" CPU_RATE_PER_CORE_HOUR = 0.0075 # NexaCloud CPU rate, CAD per core-hour CPU_SECONDS_PER_CORE_HOUR = 3600.0 # one core-hour = 3600 cpu-seconds class FusionBillingImportWizard(models.TransientModel): _name = "fusion.billing.import.wizard" _description = "Fusion Billing — NexaCloud Importer" dry_run = fields.Boolean( default=True, help="Read and report what would be imported, without writing anything.") result_summary = fields.Text(readonly=True) failed_count = fields.Integer(readonly=True) skipped_count = fields.Integer(readonly=True) def action_run_import(self): self.ensure_one() data = self._read_nexacloud_rows() summary = self._import_rows(data, dry_run=self.dry_run) failed = summary.get("failed") or [] skipped = summary.get("skipped") or [] self.result_summary = json.dumps(summary, indent=2, default=str) self.failed_count = len(failed) self.skipped_count = len(skipped) # A partial billing import must be loud, not buried in the JSON. Log at ERROR # so it survives nexa's log_level=warn (INFO is suppressed there). if failed: _logger.error("NexaCloud import: %s row(s) FAILED%s: %s", len(failed), " (dry-run)" if self.dry_run else "", failed) if skipped: _logger.warning("NexaCloud import: %s row(s) skipped: %s", len(skipped), skipped) return { "type": "ir.actions.act_window", "res_model": self._name, "res_id": self.id, "view_mode": "form", "target": "new", } # ----- read side (the ONLY code that touches NexaCloud) ------------------ def _read_nexacloud_rows(self): """Open a READ-ONLY psycopg2 connection to the nexacloud Postgres (DSN in ir.config_parameter 'fusion_billing.nexacloud_dsn') and return rows as dicts. Raises UserError on a missing DSN or a failed connection.""" import psycopg2 import psycopg2.extras dsn = self.env["ir.config_parameter"].sudo().get_param( "fusion_billing.nexacloud_dsn") if not dsn: raise UserError( "NexaCloud DSN not configured. Set the 'fusion_billing.nexacloud_dsn' " "system parameter to a read-only Postgres connection string.") try: conn = psycopg2.connect(dsn) except Exception as e: # noqa: BLE001 - surface as a user error raise UserError("Could not connect to the NexaCloud database: %s" % e) try: conn.set_session(readonly=True) cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) data = {} cur.execute( "SELECT id, email, full_name, company, billing_email, billing_address, " "billing_city, billing_state, billing_postal_code, billing_country, " "tax_id, stripe_customer_id FROM users") data["users"] = [dict(r) for r in cur.fetchall()] cur.execute( "SELECT id, name, price_monthly, price_yearly, cpu_seconds_quota, " "is_active FROM plans") data["plans"] = [dict(r) for r in cur.fetchall()] cur.execute( "SELECT id, user_id, deployment_id, plan_id, status, billing_cycle, " "current_period_start, current_period_end FROM subscriptions") data["subscriptions"] = [dict(r) for r in cur.fetchall()] return data except psycopg2.Error as e: # A query/schema error (e.g. a renamed/missing column) gets the same clean # operator message as a connection failure — not a raw SQL traceback. We # never return a partial `data` (the return is the last statement in `try`). raise UserError( "Failed reading from the NexaCloud database — the source schema may " "have changed. Underlying error:\n%s" % e) finally: conn.close() # ----- import side (pure Odoo; unit-tested) ------------------------------ @api.model def _import_rows(self, data, dry_run=False): """Upsert NexaCloud rows into Odoo. Idempotent. With dry_run=True the writes happen inside a savepoint that is rolled back, so nothing persists (the summary is still returned).""" if not dry_run: return self._do_import(data) result = {} class _Rollback(Exception): pass try: with self.env.cr.savepoint(): result.update(self._do_import(data)) raise _Rollback() except _Rollback: pass result["dry_run"] = True return result @api.model def _do_import(self, data): service = self._fc_service() metric = self._fc_cpu_metric() recurrence_plans = { "monthly": self._fc_recurrence_plan("month"), "yearly": self._fc_recurrence_plan("year"), } summary = {"created": {}, "updated": {}, "skipped": [], "failed": []} partner_by_user = {} plan_ctx_by_id = {} for u in data.get("users", []): try: with self.env.cr.savepoint(): link, created = self._import_user(service, u) partner_by_user[str(u["id"])] = link.partner_id self._bump(summary, created, "partners") except Exception as e: # noqa: BLE001 - per-row isolation _logger.exception("NexaCloud import: user row %s failed", u.get("id")) summary["failed"].append( {"kind": "user", "id": str(u.get("id")), "error": "%s: %s" % (type(e).__name__, e)}) for p in data.get("plans", []): try: with self.env.cr.savepoint(): ctx, created = self._import_plan(metric, p) plan_ctx_by_id[str(p["id"])] = ctx self._bump(summary, created, "plans") except Exception as e: # noqa: BLE001 _logger.exception("NexaCloud import: plan row %s failed", p.get("id")) summary["failed"].append( {"kind": "plan", "id": str(p.get("id")), "error": "%s: %s" % (type(e).__name__, e)}) for s in data.get("subscriptions", []): partner = partner_by_user.get(str(s.get("user_id") or "")) ctx = plan_ctx_by_id.get(str(s.get("plan_id") or "")) if not partner or not ctx: summary["skipped"].append({ "kind": "subscription", "id": str(s.get("id")), "reason": "unresolved %s" % ("user" if not partner else "plan")}) continue try: with self.env.cr.savepoint(): _order, created = self._import_subscription( service, partner, ctx, recurrence_plans, s) self._bump(summary, created, "subscriptions") except Exception as e: # noqa: BLE001 _logger.exception("NexaCloud import: subscription row %s failed", s.get("id")) summary["failed"].append( {"kind": "subscription", "id": str(s.get("id")), "error": "%s: %s" % (type(e).__name__, e)}) _logger.info("NexaCloud import summary: %s", summary) return summary # ----- find-or-create helpers -------------------------------------------- @api.model def _fc_service(self): Service = self.env["fusion.billing.service"] svc = Service.search([("code", "=", NEXACLOUD_CODE)], limit=1) return svc or Service.create({"name": "NexaCloud", "code": NEXACLOUD_CODE}) @api.model def _fc_cpu_metric(self): Metric = self.env["fusion.billing.metric"] m = Metric.search([("code", "=", CPU_METRIC_CODE)], limit=1) return m or Metric.create({ "name": "CPU seconds", "code": CPU_METRIC_CODE, "aggregation": "sum", "unit_label": "CPU-seconds"}) @api.model def _fc_recurrence_plan(self, unit): Plan = self.env["sale.subscription.plan"] plan = Plan.search( [("billing_period_value", "=", 1), ("billing_period_unit", "=", unit)], limit=1) if plan: return plan label = "Monthly" if unit == "month" else "Yearly" return Plan.create( {"name": label, "billing_period_value": 1, "billing_period_unit": unit}) @api.model def _fc_resolve_country(self, value): Country = self.env["res.country"] if not value: return Country.browse() v = value.strip() return Country.search( ["|", ("code", "=ilike", v), ("name", "=ilike", v)], limit=1) @staticmethod def _bump(summary, created, key): bucket = "created" if created else "updated" summary[bucket][key] = summary[bucket].get(key, 0) + 1 # ----- per-entity import -------------------------------------------------- @api.model def _import_user(self, service, urow): Link = self.env["fusion.billing.account.link"] ext = str(urow["id"]) email = (urow.get("billing_email") or urow.get("email") or "").strip().lower() or None name = urow.get("full_name") or urow.get("company") or email or ext existed = bool(Link.search( [("service_id", "=", service.id), ("external_id", "=", ext)], limit=1)) link = Link._resolve_or_create_partner(service, ext, name=name, email=email) vals = {} if urow.get("billing_address"): vals["street"] = urow["billing_address"] if urow.get("billing_city"): vals["city"] = urow["billing_city"] if urow.get("billing_postal_code"): vals["zip"] = urow["billing_postal_code"] if urow.get("tax_id"): vals["vat"] = urow["tax_id"] if urow.get("stripe_customer_id"): vals["x_fc_stripe_customer_id"] = urow["stripe_customer_id"] country = self._fc_resolve_country(urow.get("billing_country")) if country: vals["country_id"] = country.id if vals: link.partner_id.write(vals) return link, not existed @api.model def _import_plan(self, metric, prow): Product = self.env["product.product"] Charge = self.env["fusion.billing.charge"] plan_code = str(prow["id"]) name = prow.get("name") or plan_code # Preserve NULL vs 0.0: a missing price must NOT silently become a $0 line. # The subscription import raises on a missing price for its cycle (-> failed[]). raw_monthly = prow.get("price_monthly") raw_yearly = prow.get("price_yearly") price_monthly = float(raw_monthly) if raw_monthly is not None else None price_yearly = float(raw_yearly) if raw_yearly is not None else None created = False sub_code = "NC-PLAN-%s" % plan_code sub_product = Product.search([("default_code", "=", sub_code)], limit=1) if not sub_product: sub_product = Product.create({ "name": "NexaCloud %s" % name, "default_code": sub_code, "type": "service", "recurring_invoice": True, "list_price": price_monthly or 0.0}) created = True ov_code = "NC-CPU-OVG-%s" % plan_code ov_product = Product.search([("default_code", "=", ov_code)], limit=1) if not ov_product: ov_product = Product.create({ "name": "NexaCloud CPU overage (%s)" % name, "default_code": ov_code, "type": "service", "list_price": 0.0}) charge_vals = { "name": "NexaCloud CPU overage — %s" % name, "plan_code": plan_code, "metric_id": metric.id, "product_id": ov_product.id, "included_quota": float(prow.get("cpu_seconds_quota") or 0.0), "price_per_unit": CPU_RATE_PER_CORE_HOUR, "unit_batch": CPU_SECONDS_PER_CORE_HOUR, "charge_model": "standard", # Shadow safety guarantee #3: plan_id MUST stay NULL so the rating cron # never auto-mutates order lines. Set it explicitly (not just omitted) so a # re-run re-asserts NULL even if someone set it on the charge between runs. "plan_id": False, } charge = Charge.search( [("plan_code", "=", plan_code), ("metric_id", "=", metric.id)], limit=1) if charge: charge.write(charge_vals) else: charge = Charge.create(charge_vals) created = True return { "sub_product": sub_product, "overage_product": ov_product, "charge": charge, "price_monthly": price_monthly, "price_yearly": price_yearly, }, created @api.model def _import_subscription(self, service, partner, plan_ctx, recurrence_plans, srow): SaleOrder = self.env["sale.order"] SaleOrderLine = self.env["sale.order.line"] sub_ext = str(srow["id"]) cycle = (srow.get("billing_cycle") or "").strip().lower() if cycle not in ("monthly", "yearly"): raise UserError( "Subscription %s has an unrecognized billing_cycle %r — cannot pick a " "plan/price." % (sub_ext, srow.get("billing_cycle"))) rec_plan = recurrence_plans["yearly"] if cycle == "yearly" else recurrence_plans["monthly"] price = plan_ctx["price_yearly"] if cycle == "yearly" else plan_ctx["price_monthly"] if price is None: raise UserError( "Subscription %s is billed %s but its plan has no %s price." % ( sub_ext, cycle, cycle)) product = plan_ctx["sub_product"] # x_fc_* are always (re-)written; identity fields (partner_id/plan_id/order_line) # are set ONLY at creation, so a re-run never rewrites immutable fields on an # order that may since have been confirmed. shadow_vals = { "x_fc_nexacloud_deployment_id": str(srow.get("deployment_id") or ""), "x_fc_billing_service_id": service.id, "x_fc_shadow": True, } existing = SaleOrder.search( [("x_fc_nexacloud_subscription_id", "=", sub_ext)], limit=1) if existing: existing.write(shadow_vals) line = existing.order_line.filtered(lambda l: l.product_id == product) line_vals = {"product_uom_qty": 1, "price_unit": price} if line: line.write(line_vals) else: SaleOrderLine.create( dict(order_id=existing.id, product_id=product.id, **line_vals)) order = existing created = False else: order = SaleOrder.create({ "partner_id": partner.id, "plan_id": rec_plan.id, "x_fc_nexacloud_subscription_id": sub_ext, "order_line": [(0, 0, { "product_id": product.id, "product_uom_qty": 1, "price_unit": price})], **shadow_vals, }) created = True # guarantee the explicit price stuck (a pricelist compute may have overwritten it) line = order.order_line.filtered(lambda l: l.product_id == product) if line and line.price_unit != price: line.price_unit = price return order, created