feat(billing): 2a NexaCloud→Odoo importer (read-only, idempotent, shadow-safe)

fusion.billing.import.wizard backfills NexaCloud into Odoo: read-only
psycopg2 reader (_read_nexacloud_rows, DSN from ir.config_parameter)
split from pure-Odoo writes (_import_rows/_do_import) so the logic is
unit-tested headless. Maps users→partners+links (reusing
_resolve_or_create_partner, stashing stripe_customer_id), plans→a
cpu_seconds charge catalog (included_quota=cpu_seconds_quota,
unit_batch=3600, $0.0075/core-hour, plan_id NULL), and deployments→one
DRAFT shadow sale.order per deployment with the flat price set
explicitly. Shadow-safe by construction: draft + no payment token +
charge plan_id NULL (rating cron is a no-op). Idempotent re-runs;
per-row savepoints isolate bad rows; dry-run rolls back. 11 tests,
50/50 green on odoo-trial.
This commit is contained in:
gsinghpal
2026-05-27 13:34:47 -04:00
parent 3e0b531110
commit 6f060896bf
11 changed files with 601 additions and 0 deletions

View File

@@ -1,2 +1,3 @@
from . import models
from . import controllers
from . import wizards

View File

@@ -48,6 +48,7 @@ reference files from the container before implementing subscription/account inte
"data": [
"security/ir.model.access.csv",
"data/ir_cron.xml",
"views/import_wizard_views.xml",
],
"installable": True,
"application": False,

View File

@@ -6,3 +6,4 @@ from . import usage
from . import webhook
from . import reconciliation
from . import sale_order
from . import res_partner

View File

@@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1
from odoo import fields, models
class ResPartner(models.Model):
_inherit = "res.partner"
x_fc_stripe_customer_id = fields.Char(
index=True, copy=False,
help="Existing Stripe customer id imported from a source app, reused at flip.")

View File

@@ -7,6 +7,16 @@ from odoo import api, fields, models
class SaleOrder(models.Model):
_inherit = "sale.order"
x_fc_nexacloud_subscription_id = fields.Char(
index=True, copy=False,
help="Source NexaCloud subscription id — the importer's idempotency key.")
x_fc_nexacloud_deployment_id = fields.Char(index=True, copy=False)
x_fc_billing_service_id = fields.Many2one(
"fusion.billing.service", index=True, copy=False, ondelete="set null")
x_fc_shadow = fields.Boolean(
default=False, copy=False,
help="Imported in shadow mode: Odoo computes but must not charge/post/email.")
def _fc_rate_usage(self, charge, period_start, period_end):
"""Aggregate this subscription's usage for `charge`'s metric in the period,
compute the overage amount, and upsert a matching overage order line.

View File

@@ -9,3 +9,4 @@ access_fusion_billing_reconciliation_admin,fusion.billing.reconciliation admin,m
access_fusion_billing_metric_acct,fusion.billing.metric accountant,model_fusion_billing_metric,account.group_account_manager,1,1,1,0
access_fusion_billing_charge_acct,fusion.billing.charge accountant,model_fusion_billing_charge,account.group_account_manager,1,1,1,0
access_fusion_billing_reconciliation_acct,fusion.billing.reconciliation accountant,model_fusion_billing_reconciliation,account.group_account_manager,1,1,1,0
access_fusion_billing_import_wizard,fusion.billing.import.wizard,model_fusion_billing_import_wizard,base.group_system,1,1,1,1
1 id name model_id:id group_id:id perm_read perm_write perm_create perm_unlink
9 access_fusion_billing_metric_acct fusion.billing.metric accountant model_fusion_billing_metric account.group_account_manager 1 1 1 0
10 access_fusion_billing_charge_acct fusion.billing.charge accountant model_fusion_billing_charge account.group_account_manager 1 1 1 0
11 access_fusion_billing_reconciliation_acct fusion.billing.reconciliation accountant model_fusion_billing_reconciliation account.group_account_manager 1 1 1 0
12 access_fusion_billing_import_wizard fusion.billing.import.wizard model_fusion_billing_import_wizard base.group_system 1 1 1 1

View File

@@ -3,3 +3,4 @@ from . import test_charge
from . import test_usage
from . import test_api
from . import test_webhook
from . import test_importer

View File

@@ -0,0 +1,222 @@
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1
from odoo.exceptions import UserError
from odoo.tests.common import TransactionCase, tagged
def _fixture():
"""Two users, one plan, two subscriptions (monthly + yearly) — the canonical
NexaCloud row dicts the importer consumes."""
return {
"users": [
{"id": "u-1", "email": "ar@acme.test", "full_name": "Acme Inc",
"company": "Acme", "billing_email": "billing@acme.test",
"billing_address": "1 Main St", "billing_city": "Toronto",
"billing_state": "ON", "billing_postal_code": "M1M1M1",
"billing_country": "CA", "tax_id": "123456789RT0001",
"stripe_customer_id": "cus_ACME"},
{"id": "u-2", "email": "ops@globex.test", "full_name": "Globex",
"company": "Globex", "billing_email": None, "billing_address": None,
"billing_city": None, "billing_state": None, "billing_postal_code": None,
"billing_country": None, "tax_id": None, "stripe_customer_id": "cus_GLBX"},
],
"plans": [
{"id": "p-1", "name": "Starter", "price_monthly": 20.0,
"price_yearly": 200.0, "cpu_seconds_quota": 18000.0, "is_active": True},
],
"subscriptions": [
{"id": "s-1", "user_id": "u-1", "deployment_id": "d-1", "plan_id": "p-1",
"status": "active", "billing_cycle": "monthly",
"current_period_start": "2026-05-01", "current_period_end": "2026-06-01"},
{"id": "s-2", "user_id": "u-2", "deployment_id": "d-2", "plan_id": "p-1",
"status": "active", "billing_cycle": "yearly",
"current_period_start": "2026-05-01", "current_period_end": "2027-05-01"},
],
}
@tagged('post_install', '-at_install')
class TestImporterIdentity(TransactionCase):
def setUp(self):
super().setUp()
self.Wizard = self.env['fusion.billing.import.wizard'].sudo()
self.Link = self.env['fusion.billing.account.link'].sudo()
def test_imports_users_as_partners_and_links(self):
self.Wizard._import_rows({'users': _fixture()['users']})
svc = self.env['fusion.billing.service'].search([('code', '=', 'nexacloud')])
self.assertTrue(svc, "importer must find-or-create the nexacloud service")
link1 = self.Link.search([('service_id', '=', svc.id), ('external_id', '=', 'u-1')])
self.assertEqual(len(link1), 1)
self.assertEqual(link1.partner_id.email, 'billing@acme.test') # billing_email wins
self.assertEqual(link1.partner_id.city, 'Toronto')
self.assertEqual(link1.partner_id.vat, '123456789RT0001')
self.assertEqual(link1.partner_id.x_fc_stripe_customer_id, 'cus_ACME')
self.assertEqual(link1.partner_id.country_id.code, 'CA')
link2 = self.Link.search([('service_id', '=', svc.id), ('external_id', '=', 'u-2')])
self.assertEqual(link2.partner_id.email, 'ops@globex.test') # falls back to email
@tagged('post_install', '-at_install')
class TestImporterCatalog(TransactionCase):
def setUp(self):
super().setUp()
self.Wizard = self.env['fusion.billing.import.wizard'].sudo()
def test_imports_plan_as_charge_with_null_plan_id(self):
self.Wizard._import_rows({'plans': _fixture()['plans']})
metric = self.env['fusion.billing.metric'].search([('code', '=', 'cpu_seconds')])
self.assertTrue(metric)
charge = self.env['fusion.billing.charge'].search([('plan_code', '=', 'p-1')])
self.assertEqual(len(charge), 1)
self.assertEqual(charge.metric_id, metric)
self.assertEqual(charge.included_quota, 18000.0) # = plan.cpu_seconds_quota
self.assertEqual(charge.unit_batch, 3600.0) # one core-hour
self.assertAlmostEqual(charge.price_per_unit, 0.0075) # CAD per core-hour
self.assertEqual(charge.charge_model, 'standard')
self.assertFalse(charge.plan_id, "shadow: charge.plan_id must be NULL so the "
"rating cron never auto-mutates order lines")
self.assertTrue(charge.product_id, "charge needs an overage product")
# the subscription product is a recurring product (so orders using it are subs)
sub_product = self.env['product.product'].search(
[('default_code', '=', 'NC-PLAN-p-1')])
self.assertTrue(sub_product.recurring_invoice)
def test_charge_math_matches_nexacloud(self):
# 18000 quota + 2 core-hours overage (7200s) -> 2 batches * $0.0075 = $0.015
self.Wizard._import_rows({'plans': _fixture()['plans']})
charge = self.env['fusion.billing.charge'].search([('plan_code', '=', 'p-1')])
_overage, amount = charge._compute_billable(18000.0 + 7200.0)
self.assertAlmostEqual(amount, 0.015, places=4)
@tagged('post_install', '-at_install')
class TestImporterSubscriptions(TransactionCase):
def setUp(self):
super().setUp()
self.Wizard = self.env['fusion.billing.import.wizard'].sudo()
def test_imports_one_draft_shadow_subscription_per_deployment(self):
self.Wizard._import_rows(_fixture())
SaleOrder = self.env['sale.order']
sub1 = SaleOrder.search([('x_fc_nexacloud_subscription_id', '=', 's-1')])
self.assertEqual(len(sub1), 1)
self.assertTrue(sub1.is_subscription)
self.assertTrue(sub1.x_fc_shadow)
self.assertEqual(sub1.x_fc_nexacloud_deployment_id, 'd-1')
self.assertNotEqual(sub1.subscription_state, '3_progress') # left in draft
plan_line = sub1.order_line.filtered(
lambda l: l.product_id.default_code == 'NC-PLAN-p-1')
self.assertEqual(len(plan_line), 1)
self.assertAlmostEqual(plan_line.price_unit, 20.0) # price_monthly
sub2 = SaleOrder.search([('x_fc_nexacloud_subscription_id', '=', 's-2')])
line2 = sub2.order_line.filtered(lambda l: l.product_id.default_code == 'NC-PLAN-p-1')
self.assertAlmostEqual(line2.price_unit, 200.0) # price_yearly
self.assertEqual(sub2.plan_id.billing_period_unit, 'year')
def test_subscription_skipped_when_user_or_plan_unresolved(self):
data = _fixture()
data['subscriptions'].append(
{"id": "s-3", "user_id": "u-missing", "deployment_id": "d-3", "plan_id": "p-1",
"status": "active", "billing_cycle": "monthly",
"current_period_start": "2026-05-01", "current_period_end": "2026-06-01"})
summary = self.Wizard._import_rows(data)
self.assertFalse(self.env['sale.order'].search(
[('x_fc_nexacloud_subscription_id', '=', 's-3')]))
self.assertTrue(any(s.get('id') == 's-3' for s in summary['skipped']))
@tagged('post_install', '-at_install')
class TestImporterIdempotencyDryRun(TransactionCase):
def setUp(self):
super().setUp()
self.Wizard = self.env['fusion.billing.import.wizard'].sudo()
def _counts(self):
return (
self.env['fusion.billing.account.link'].search_count([]),
self.env['fusion.billing.charge'].search_count([]),
self.env['sale.order'].search_count([('x_fc_shadow', '=', True)]),
)
def test_rerun_updates_not_duplicates(self):
self.Wizard._import_rows(_fixture())
before = self._counts()
data = _fixture()
data['plans'][0]['cpu_seconds_quota'] = 99999.0
self.Wizard._import_rows(data)
self.assertEqual(self._counts(), before, "re-run must upsert, not duplicate")
charge = self.env['fusion.billing.charge'].search([('plan_code', '=', 'p-1')])
self.assertEqual(charge.included_quota, 99999.0)
def test_dry_run_writes_nothing(self):
summary = self.Wizard._import_rows(_fixture(), dry_run=True)
self.assertTrue(summary.get('dry_run'))
self.assertEqual(self._counts(), (0, 0, 0), "dry-run must not persist anything")
self.assertFalse(
self.env['fusion.billing.service'].search([('code', '=', 'nexacloud')]))
@tagged('post_install', '-at_install')
class TestImporterShadowSafety(TransactionCase):
def setUp(self):
super().setUp()
self.Wizard = self.env['fusion.billing.import.wizard'].sudo()
def test_import_creates_no_invoice_and_no_payment_token(self):
self.Wizard._import_rows(_fixture())
subs = self.env['sale.order'].search([('x_fc_shadow', '=', True)])
self.assertTrue(subs)
partners = subs.mapped('partner_id')
invoices = self.env['account.move'].search([
('partner_id', 'in', partners.ids), ('move_type', '=', 'out_invoice')])
self.assertFalse(invoices, "shadow import must not create any invoice")
tokens = self.env['payment.token'].search([('partner_id', 'in', partners.ids)])
self.assertFalse(tokens, "shadow import must not attach a payment token")
charges = self.env['fusion.billing.charge'].search([('plan_code', '=', 'p-1')])
self.assertTrue(charges)
self.assertFalse(any(charges.mapped('plan_id')))
def test_rating_cron_leaves_shadow_subscriptions_untouched(self):
self.Wizard._import_rows(_fixture())
subs = self.env['sale.order'].search([('x_fc_shadow', '=', True)])
lines_before = sum(len(s.order_line) for s in subs)
self.env['fusion.billing.usage']._cron_rate_open_periods()
subs.invalidate_recordset()
lines_after = sum(len(s.order_line) for s in subs)
self.assertEqual(lines_before, lines_after,
"charges with NULL plan_id must keep the rating cron a no-op")
@tagged('post_install', '-at_install')
class TestImporterErrorIsolation(TransactionCase):
def setUp(self):
super().setUp()
self.Wizard = self.env['fusion.billing.import.wizard'].sudo()
def test_one_bad_user_does_not_abort_the_batch(self):
data = _fixture()
# a row with no id -> str(urow['id']) raises KeyError, must be caught per-row
data['users'].insert(0, {"email": "broken@x.test"})
summary = self.Wizard._import_rows(data)
self.assertEqual(
self.env['fusion.billing.account.link'].search_count([]), 2)
self.assertTrue(summary['failed'], "the bad row must be recorded in failed[]")
self.assertTrue(any(f['kind'] == 'user' for f in summary['failed']))
@tagged('post_install', '-at_install')
class TestImporterReadGuard(TransactionCase):
def test_missing_dsn_raises_usererror(self):
self.env['ir.config_parameter'].sudo().set_param('fusion_billing.nexacloud_dsn', '')
wiz = self.env['fusion.billing.import.wizard'].sudo().create({'dry_run': True})
with self.assertRaises(UserError):
wiz._read_nexacloud_rows()

View File

@@ -0,0 +1,36 @@
<?xml version="1.0" encoding="utf-8"?>
<odoo>
<record id="view_fusion_billing_import_wizard_form" model="ir.ui.view">
<field name="name">fusion.billing.import.wizard.form</field>
<field name="model">fusion.billing.import.wizard</field>
<field name="arch" type="xml">
<form string="Import from NexaCloud">
<group>
<field name="dry_run"/>
</group>
<group string="Result" invisible="not result_summary">
<field name="result_summary" nolabel="1" widget="text"/>
</group>
<footer>
<button name="action_run_import" type="object" string="Run Import"
class="btn-primary"/>
<button string="Close" class="btn-secondary" special="cancel"/>
</footer>
</form>
</field>
</record>
<record id="action_fusion_billing_import_wizard" model="ir.actions.act_window">
<field name="name">Import from NexaCloud</field>
<field name="res_model">fusion.billing.import.wizard</field>
<field name="view_mode">form</field>
<field name="target">new</field>
</record>
<menuitem id="menu_fusion_billing_root" name="Fusion Billing"
parent="account.menu_finance" sequence="90"/>
<menuitem id="menu_fusion_billing_import" name="Import from NexaCloud"
parent="menu_fusion_billing_root"
action="action_fusion_billing_import_wizard" sequence="10"
groups="base.group_system"/>
</odoo>

View File

@@ -0,0 +1 @@
from . import import_wizard

View File

@@ -0,0 +1,315 @@
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1
"""NexaCloud → Odoo billing importer (sub-project #2a).
One-time, re-runnable, read-only backfill: read the NexaCloud Postgres and create the
equivalent Odoo records (partners + links, a cpu_seconds charge catalog, one DRAFT
shadow ``sale.order`` per deployment). Shadow-safe by construction — see the design spec
``docs/superpowers/specs/2026-05-27-nexacloud-billing-importer-design.md``.
Logic lives in model methods so it is unit-testable headless; the wizard button only
calls ``_read_nexacloud_rows()`` → ``_import_rows()``.
"""
import json
import logging
from odoo import api, fields, models
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
NEXACLOUD_CODE = "nexacloud"
CPU_METRIC_CODE = "cpu_seconds"
CPU_RATE_PER_CORE_HOUR = 0.0075 # NexaCloud CPU rate, CAD per core-hour
CPU_SECONDS_PER_CORE_HOUR = 3600.0 # one core-hour = 3600 cpu-seconds
class FusionBillingImportWizard(models.TransientModel):
_name = "fusion.billing.import.wizard"
_description = "Fusion Billing — NexaCloud Importer"
dry_run = fields.Boolean(
default=True,
help="Read and report what would be imported, without writing anything.")
result_summary = fields.Text(readonly=True)
def action_run_import(self):
self.ensure_one()
data = self._read_nexacloud_rows()
summary = self._import_rows(data, dry_run=self.dry_run)
self.result_summary = json.dumps(summary, indent=2, default=str)
return {
"type": "ir.actions.act_window",
"res_model": self._name,
"res_id": self.id,
"view_mode": "form",
"target": "new",
}
# ----- read side (the ONLY code that touches NexaCloud) ------------------
def _read_nexacloud_rows(self):
"""Open a READ-ONLY psycopg2 connection to the nexacloud Postgres (DSN in
ir.config_parameter 'fusion_billing.nexacloud_dsn') and return rows as dicts.
Raises UserError on a missing DSN or a failed connection."""
import psycopg2
import psycopg2.extras
dsn = self.env["ir.config_parameter"].sudo().get_param(
"fusion_billing.nexacloud_dsn")
if not dsn:
raise UserError(
"NexaCloud DSN not configured. Set the 'fusion_billing.nexacloud_dsn' "
"system parameter to a read-only Postgres connection string.")
try:
conn = psycopg2.connect(dsn)
except Exception as e: # noqa: BLE001 - surface as a user error
raise UserError("Could not connect to the NexaCloud database: %s" % e)
try:
conn.set_session(readonly=True)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
data = {}
cur.execute(
"SELECT id, email, full_name, company, billing_email, billing_address, "
"billing_city, billing_state, billing_postal_code, billing_country, "
"tax_id, stripe_customer_id FROM users")
data["users"] = [dict(r) for r in cur.fetchall()]
cur.execute(
"SELECT id, name, price_monthly, price_yearly, cpu_seconds_quota, "
"is_active FROM plans")
data["plans"] = [dict(r) for r in cur.fetchall()]
cur.execute(
"SELECT id, user_id, deployment_id, plan_id, status, billing_cycle, "
"current_period_start, current_period_end FROM subscriptions")
data["subscriptions"] = [dict(r) for r in cur.fetchall()]
return data
finally:
conn.close()
# ----- import side (pure Odoo; unit-tested) ------------------------------
@api.model
def _import_rows(self, data, dry_run=False):
"""Upsert NexaCloud rows into Odoo. Idempotent. With dry_run=True the writes
happen inside a savepoint that is rolled back, so nothing persists (the summary
is still returned)."""
if not dry_run:
return self._do_import(data)
result = {}
class _Rollback(Exception):
pass
try:
with self.env.cr.savepoint():
result.update(self._do_import(data))
raise _Rollback()
except _Rollback:
pass
result["dry_run"] = True
return result
@api.model
def _do_import(self, data):
service = self._fc_service()
metric = self._fc_cpu_metric()
recurrence_plans = {
"monthly": self._fc_recurrence_plan("month"),
"yearly": self._fc_recurrence_plan("year"),
}
summary = {"created": {}, "updated": {}, "skipped": [], "failed": []}
partner_by_user = {}
plan_ctx_by_id = {}
for u in data.get("users", []):
try:
with self.env.cr.savepoint():
link, created = self._import_user(service, u)
partner_by_user[str(u["id"])] = link.partner_id
self._bump(summary, created, "partners")
except Exception as e: # noqa: BLE001 - per-row isolation
summary["failed"].append(
{"kind": "user", "id": str(u.get("id")), "error": str(e)})
for p in data.get("plans", []):
try:
with self.env.cr.savepoint():
ctx, created = self._import_plan(metric, p)
plan_ctx_by_id[str(p["id"])] = ctx
self._bump(summary, created, "plans")
except Exception as e: # noqa: BLE001
summary["failed"].append(
{"kind": "plan", "id": str(p.get("id")), "error": str(e)})
for s in data.get("subscriptions", []):
partner = partner_by_user.get(str(s.get("user_id") or ""))
ctx = plan_ctx_by_id.get(str(s.get("plan_id") or ""))
if not partner or not ctx:
summary["skipped"].append({
"kind": "subscription", "id": str(s.get("id")),
"reason": "unresolved %s" % ("user" if not partner else "plan")})
continue
try:
with self.env.cr.savepoint():
_order, created = self._import_subscription(
service, partner, ctx, recurrence_plans, s)
self._bump(summary, created, "subscriptions")
except Exception as e: # noqa: BLE001
summary["failed"].append(
{"kind": "subscription", "id": str(s.get("id")), "error": str(e)})
_logger.info("NexaCloud import summary: %s", summary)
return summary
# ----- find-or-create helpers --------------------------------------------
@api.model
def _fc_service(self):
Service = self.env["fusion.billing.service"]
svc = Service.search([("code", "=", NEXACLOUD_CODE)], limit=1)
return svc or Service.create({"name": "NexaCloud", "code": NEXACLOUD_CODE})
@api.model
def _fc_cpu_metric(self):
Metric = self.env["fusion.billing.metric"]
m = Metric.search([("code", "=", CPU_METRIC_CODE)], limit=1)
return m or Metric.create({
"name": "CPU seconds", "code": CPU_METRIC_CODE,
"aggregation": "sum", "unit_label": "CPU-seconds"})
@api.model
def _fc_recurrence_plan(self, unit):
Plan = self.env["sale.subscription.plan"]
plan = Plan.search(
[("billing_period_value", "=", 1), ("billing_period_unit", "=", unit)], limit=1)
if plan:
return plan
label = "Monthly" if unit == "month" else "Yearly"
return Plan.create(
{"name": label, "billing_period_value": 1, "billing_period_unit": unit})
@api.model
def _fc_resolve_country(self, value):
Country = self.env["res.country"]
if not value:
return Country.browse()
v = value.strip()
return Country.search(
["|", ("code", "=ilike", v), ("name", "=ilike", v)], limit=1)
@staticmethod
def _bump(summary, created, key):
bucket = "created" if created else "updated"
summary[bucket][key] = summary[bucket].get(key, 0) + 1
# ----- per-entity import --------------------------------------------------
@api.model
def _import_user(self, service, urow):
Link = self.env["fusion.billing.account.link"]
ext = str(urow["id"])
email = (urow.get("billing_email") or urow.get("email") or "").strip().lower() or None
name = urow.get("full_name") or urow.get("company") or email or ext
existed = bool(Link.search(
[("service_id", "=", service.id), ("external_id", "=", ext)], limit=1))
link = Link._resolve_or_create_partner(service, ext, name=name, email=email)
vals = {}
if urow.get("billing_address"):
vals["street"] = urow["billing_address"]
if urow.get("billing_city"):
vals["city"] = urow["billing_city"]
if urow.get("billing_postal_code"):
vals["zip"] = urow["billing_postal_code"]
if urow.get("tax_id"):
vals["vat"] = urow["tax_id"]
if urow.get("stripe_customer_id"):
vals["x_fc_stripe_customer_id"] = urow["stripe_customer_id"]
country = self._fc_resolve_country(urow.get("billing_country"))
if country:
vals["country_id"] = country.id
if vals:
link.partner_id.write(vals)
return link, not existed
@api.model
def _import_plan(self, metric, prow):
Product = self.env["product.product"]
Charge = self.env["fusion.billing.charge"]
plan_code = str(prow["id"])
name = prow.get("name") or plan_code
price_monthly = float(prow.get("price_monthly") or 0.0)
price_yearly = float(prow.get("price_yearly") or 0.0)
created = False
sub_code = "NC-PLAN-%s" % plan_code
sub_product = Product.search([("default_code", "=", sub_code)], limit=1)
if not sub_product:
sub_product = Product.create({
"name": "NexaCloud %s" % name, "default_code": sub_code,
"type": "service", "recurring_invoice": True,
"list_price": price_monthly})
created = True
ov_code = "NC-CPU-OVG-%s" % plan_code
ov_product = Product.search([("default_code", "=", ov_code)], limit=1)
if not ov_product:
ov_product = Product.create({
"name": "NexaCloud CPU overage (%s)" % name, "default_code": ov_code,
"type": "service", "list_price": 0.0})
charge_vals = {
"name": "NexaCloud CPU overage — %s" % name,
"plan_code": plan_code, "metric_id": metric.id, "product_id": ov_product.id,
"included_quota": float(prow.get("cpu_seconds_quota") or 0.0),
"price_per_unit": CPU_RATE_PER_CORE_HOUR,
"unit_batch": CPU_SECONDS_PER_CORE_HOUR,
"charge_model": "standard",
# plan_id intentionally omitted (NULL) — shadow safety guarantee #3
}
charge = Charge.search(
[("plan_code", "=", plan_code), ("metric_id", "=", metric.id)], limit=1)
if charge:
charge.write(charge_vals)
else:
charge = Charge.create(charge_vals)
created = True
return {
"sub_product": sub_product, "overage_product": ov_product, "charge": charge,
"price_monthly": price_monthly, "price_yearly": price_yearly,
}, created
@api.model
def _import_subscription(self, service, partner, plan_ctx, recurrence_plans, srow):
SaleOrder = self.env["sale.order"]
SaleOrderLine = self.env["sale.order.line"]
sub_ext = str(srow["id"])
cycle = (srow.get("billing_cycle") or "monthly").lower()
rec_plan = recurrence_plans["yearly"] if cycle == "yearly" else recurrence_plans["monthly"]
price = plan_ctx["price_yearly"] if cycle == "yearly" else plan_ctx["price_monthly"]
product = plan_ctx["sub_product"]
order_vals = {
"partner_id": partner.id, "plan_id": rec_plan.id,
"x_fc_nexacloud_subscription_id": sub_ext,
"x_fc_nexacloud_deployment_id": str(srow.get("deployment_id") or ""),
"x_fc_billing_service_id": service.id, "x_fc_shadow": True,
}
existing = SaleOrder.search(
[("x_fc_nexacloud_subscription_id", "=", sub_ext)], limit=1)
if existing:
existing.write(order_vals)
line = existing.order_line.filtered(lambda l: l.product_id == product)
line_vals = {"product_uom_qty": 1, "price_unit": price}
if line:
line.write(line_vals)
else:
SaleOrderLine.create(
dict(order_id=existing.id, product_id=product.id, **line_vals))
order = existing
created = False
else:
order_vals["order_line"] = [(0, 0, {
"product_id": product.id, "product_uom_qty": 1, "price_unit": price})]
order = SaleOrder.create(order_vals)
created = True
# guarantee the explicit price stuck (a pricelist compute may have overwritten it)
line = order.order_line.filtered(lambda l: l.product_id == product)
if line and line.price_unit != price:
line.price_unit = price
return order, created