fix(billing): resolve code-review findings (authz, cross-billing, validation, webhook integrity)

- C1/H4: rating cron only rates subs on the charge's own plan_id
- C1: _fc_rate_usage skips creating a line when amount is 0 (still updates existing)
- C2/C4: /usage authorizes each event (exists + is_subscription + linked customer)
- C3: API handlers validate input and return 4xx-shaped errors instead of raising;
       controller maps status=='error' to HTTP 400
- H1: cron uses real billing window [last_invoice_date or start_date, next_invoice_date)
- H2: _aggregate uses half-open window anchored on period_start
- H3: idempotency scoped to (subscription_id, metric_id, idempotency_key)
- H5: webhook stores canonical body, signs+POSTs it verbatim, adds X-Fusion-Event-Id,
       caps backoff at 2**min(attempts,10)
- H6: SSRF guard rejects non-https / localhost / private / link-local webhook_url
- M7: charge_model reduced to standard/package (dropped unimplemented graduated/volume)
- L1: currency_id required on charge + reconciliation
- L2: charge price non-negative + unit_batch positive DB constraints

Adds 17 regression tests (suite 22 -> 39, all green via fcb_test_on_trial.sh).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
gsinghpal
2026-05-27 03:27:34 -04:00
parent a5db0fe71e
commit d770c0c3a9
11 changed files with 442 additions and 32 deletions

View File

@@ -56,7 +56,10 @@ class FusionBillingApi(http.Controller):
payload = self._read_json()
if payload is None:
return self._json({"error": "invalid json"}, status=400)
return self._json(service._api_upsert_customer(payload))
result = service._api_upsert_customer(payload)
if result.get("status") == "error":
return self._json(result, status=400)
return self._json(result)
@http.route(f"{API_BASE}/usage", type="http", auth="none", methods=["POST"], csrf=False)
def post_usage(self, **kw):
@@ -66,7 +69,10 @@ class FusionBillingApi(http.Controller):
payload = self._read_json()
if payload is None:
return self._json({"error": "invalid json"}, status=400)
return self._json(service._api_record_usage(payload), status=202)
result = service._api_record_usage(payload)
if result.get("status") == "error":
return self._json(result, status=400)
return self._json(result, status=202)
@http.route(f"{API_BASE}/plans", type="http", auth="none", methods=["GET"], csrf=False)
def get_plans(self, **kw):
@@ -83,4 +89,7 @@ class FusionBillingApi(http.Controller):
payload = self._read_json()
if payload is None:
return self._json({"error": "invalid json"}, status=400)
return self._json(service._api_create_subscription(payload))
result = service._api_create_subscription(payload)
if result.get("status") == "error":
return self._json(result, status=400)
return self._json(result)

View File

@@ -43,23 +43,30 @@ class FusionBillingCharge(models.Model):
charge_model = fields.Selection(
[
("standard", "Standard (per unit)"),
("graduated", "Graduated"),
("package", "Package"),
("volume", "Volume"),
],
default="standard", required=True,
)
currency_id = fields.Many2one(
"res.currency", default=lambda self: self.env.company.currency_id,
"res.currency", required=True,
default=lambda self: self.env.company.currency_id,
)
active = fields.Boolean(default=True)
_price_non_negative = models.Constraint(
"CHECK (price_per_unit >= 0)", "Overage price per unit cannot be negative.",
)
_unit_batch_positive = models.Constraint(
"CHECK (unit_batch > 0)", "Unit batch must be greater than zero.",
)
def _compute_billable(self, total_quantity):
"""Return (overage_units, amount) for total period usage under this charge.
- overage_units = usage above included_quota (never negative)
- 'standard'/'package'/'volume': priced per `unit_batch` block, partial block rounds up.
(graduated tiers are out of scope for the core; treated as 'standard'.)
- 'standard': price the overage in (rounded-up) `unit_batch` blocks.
- 'package': price whole packages over the RAW quantity (quota ignored for
package counting); a partial package rounds up.
"""
self.ensure_one()
overage = max(0.0, (total_quantity or 0.0) - (self.included_quota or 0.0))
@@ -68,6 +75,6 @@ class FusionBillingCharge(models.Model):
# whole packages over the RAW quantity (quota ignored for package counting)
blocks = math.ceil((total_quantity or 0.0) / batch) if total_quantity else 0
return overage, round(blocks * (self.price_per_unit or 0.0), 2)
# standard / volume / graduated-fallback: price the overage in (rounded-up) batches
# standard: price the overage in (rounded-up) batches
blocks = math.ceil(overage / batch) if overage > 0 else 0
return overage, round(blocks * (self.price_per_unit or 0.0), 2)

View File

@@ -25,7 +25,8 @@ class FusionBillingReconciliation(models.Model):
external_amount = fields.Monetary(string="App-actual Amount")
delta = fields.Monetary(help="odoo_amount - external_amount.")
currency_id = fields.Many2one(
"res.currency", default=lambda self: self.env.company.currency_id,
"res.currency", required=True,
default=lambda self: self.env.company.currency_id,
)
status = fields.Selection(
[

View File

@@ -10,13 +10,19 @@ class SaleOrder(models.Model):
def _fc_rate_usage(self, charge, period_start, period_end):
"""Aggregate this subscription's usage for `charge`'s metric in the period,
compute the overage amount, and upsert a matching overage order line.
Returns the amount."""
Returns the amount.
A zero amount never *creates* a new line (no $0.00 overage clutter); if a
line already exists it is still updated so a dropped-to-zero overage clears.
"""
self.ensure_one()
Usage = self.env['fusion.billing.usage']
total = Usage._aggregate(self, charge.metric_id, period_start, period_end)
_overage, amount = charge._compute_billable(total)
if charge.product_id:
line = self.order_line.filtered(lambda l: l.product_id == charge.product_id)
if not line and amount == 0:
return amount
vals = {'product_uom_qty': 1, 'price_unit': amount}
if line:
line.write(vals)

View File

@@ -2,9 +2,12 @@
# Copyright 2026 Nexa Systems Inc.
# License OPL-1
import hashlib
import ipaddress
import secrets
from urllib.parse import urlparse
from odoo import api, fields, models
from odoo.exceptions import ValidationError
class FusionBillingService(models.Model):
@@ -45,6 +48,33 @@ class FusionBillingService(models.Model):
for rec in self:
rec.account_link_count = len(rec.account_link_ids)
@api.constrains("webhook_url")
def _check_webhook_url(self):
"""Reject SSRF-prone webhook targets: a non-empty URL must be https and must
not point at localhost or a private / link-local / loopback IP literal. Empty
is allowed (no webhook configured)."""
for rec in self:
url = (rec.webhook_url or "").strip()
if not url:
continue
parsed = urlparse(url)
if parsed.scheme != "https":
raise ValidationError("Webhook URL must use https.")
host = parsed.hostname or ""
if not host or host.lower() in ("localhost", "ip6-localhost", "ip6-loopback"):
raise ValidationError(
"Webhook URL must not target localhost or a private address.")
try:
ip = ipaddress.ip_address(host)
except ValueError:
ip = None
if ip is not None and (
ip.is_private or ip.is_loopback or ip.is_link_local
or ip.is_reserved or ip.is_unspecified or ip.is_multicast
):
raise ValidationError(
"Webhook URL must not target a private or loopback address.")
def action_generate_api_key(self):
"""Generate a fresh bearer key, store only its hash, return the raw key.
@@ -64,7 +94,14 @@ class FusionBillingService(models.Model):
return self.search([('api_key_hash', '=', key_hash), ('active', '=', True)], limit=1)
def _api_upsert_customer(self, payload):
"""Resolve/create the partner link for an external account.
Defensive: a non-dict payload or a missing/empty ``external_id`` returns a
4xx-shaped error instead of raising (C3).
"""
self.ensure_one()
if not isinstance(payload, dict):
return {'status': 'error', 'error': 'invalid payload'}
ext = payload.get('external_id')
if not ext:
return {'status': 'error', 'error': 'external_id required'}
@@ -73,15 +110,53 @@ class FusionBillingService(models.Model):
return {'status': 'ok', 'partner_id': link.partner_id.id, 'external_id': ext}
def _api_record_usage(self, payload):
"""Ingest a batch of usage events.
Authorization (C2/C4): each event must target a subscription sale.order that
(a) exists, (b) is actually a subscription, and (c) belongs to a customer THIS
service is linked to. Any failing event is rejected and stops processing for
that event without writing a usage row.
Validation (C3): a non-dict payload, a non-list ``events``, missing required
keys, or non-numeric ``quantity``/ids return a 4xx-shaped error instead of
raising (no 500s).
"""
self.ensure_one()
events = payload.get('events') or []
if not isinstance(payload, dict):
return {'status': 'error', 'error': 'invalid payload'}
events = payload.get('events')
if events is None:
events = []
if not isinstance(events, list):
return {'status': 'error', 'error': 'events must be a list'}
Usage = self.env['fusion.billing.usage']
linked_partners = self.account_link_ids.mapped('partner_id')
accepted = 0
for ev in events:
sub = self.env['sale.order'].browse(int(ev['subscription_external_id']))
Usage._record_usage(
sub, ev['metric_code'], float(ev['quantity']),
ev['period_start'], ev['period_end'], idem=ev.get('idempotency_key'))
if not isinstance(ev, dict):
return {'status': 'error', 'error': 'invalid event'}
for key in ('subscription_external_id', 'metric_code', 'quantity',
'period_start', 'period_end'):
if ev.get(key) in (None, ''):
return {'status': 'error', 'error': 'missing %s' % key}
try:
sub_id = int(ev['subscription_external_id'])
except (TypeError, ValueError):
return {'status': 'error', 'error': 'invalid subscription_external_id'}
try:
quantity = float(ev['quantity'])
except (TypeError, ValueError):
return {'status': 'error', 'error': 'invalid quantity'}
sub = self.env['sale.order'].browse(sub_id)
if not sub.exists() or not sub.is_subscription \
or sub.partner_id not in linked_partners:
return {'status': 'error', 'error': 'unknown subscription'}
try:
Usage._record_usage(
sub, ev['metric_code'], quantity,
ev['period_start'], ev['period_end'], idem=ev.get('idempotency_key'))
except ValueError as e:
return {'status': 'error', 'error': str(e)}
accepted += 1
return {'status': 'ok', 'accepted': accepted}
@@ -100,21 +175,49 @@ class FusionBillingService(models.Model):
The product on each line must have recurring_invoice=True so that
Odoo recognises the order as a subscription with has_recurring_line and
action_confirm() reaches subscription_state='3_progress'.
Validation (C3): a non-dict payload, a missing/unknown customer, a missing
``plan_id``, a non-list ``lines``, or a non-numeric product id/quantity
return a 4xx-shaped error instead of raising (no 500s).
"""
self.ensure_one()
if not isinstance(payload, dict):
return {'status': 'error', 'error': 'invalid payload'}
if not payload.get('external_customer_id'):
return {'status': 'error', 'error': 'external_customer_id required'}
if not payload.get('plan_id'):
return {'status': 'error', 'error': 'plan_id required'}
try:
plan_id = int(payload['plan_id'])
except (TypeError, ValueError):
return {'status': 'error', 'error': 'invalid plan_id'}
link = self.env['fusion.billing.account.link'].search([
('service_id', '=', self.id),
('external_id', '=', payload.get('external_customer_id')),
], limit=1)
if not link:
return {'status': 'error', 'error': 'unknown customer'}
order_lines = [(0, 0, {
'product_id': line['product_id'],
'product_uom_qty': line.get('quantity', 1),
}) for line in payload.get('lines', [])]
lines = payload.get('lines')
if lines is None:
lines = []
if not isinstance(lines, list):
return {'status': 'error', 'error': 'lines must be a list'}
order_lines = []
for line in lines:
if not isinstance(line, dict) or line.get('product_id') in (None, ''):
return {'status': 'error', 'error': 'invalid line'}
try:
product_id = int(line['product_id'])
quantity = float(line.get('quantity', 1))
except (TypeError, ValueError):
return {'status': 'error', 'error': 'invalid line'}
order_lines.append((0, 0, {
'product_id': product_id,
'product_uom_qty': quantity,
}))
sub = self.env['sale.order'].sudo().create({
'partner_id': link.partner_id.id,
'plan_id': payload['plan_id'],
'plan_id': plan_id,
'order_line': order_lines,
})
sub.action_confirm()

View File

@@ -35,12 +35,14 @@ class FusionBillingUsage(models.Model):
)
_idempotency_uniq = models.Constraint(
"unique(idempotency_key)", "Usage idempotency key must be unique.",
"unique(subscription_id, metric_id, idempotency_key)",
"Usage idempotency key must be unique per subscription and metric.",
)
@api.model
def _record_usage(self, subscription, metric_code, quantity, period_start, period_end, idem=None):
"""Upsert one aggregated usage row. Same idempotency key updates in place (no double-count)."""
"""Upsert one aggregated usage row. Same idempotency key (scoped to the same
subscription + metric) updates in place (no double-count)."""
metric = self.env['fusion.billing.metric'].search([('code', '=', metric_code)], limit=1)
if not metric:
raise ValueError("Unknown metric code: %s" % metric_code)
@@ -53,7 +55,11 @@ class FusionBillingUsage(models.Model):
'idempotency_key': idem,
}
if idem:
existing = self.search([('idempotency_key', '=', idem)], limit=1)
existing = self.search([
('subscription_id', '=', subscription.id),
('metric_id', '=', metric.id),
('idempotency_key', '=', idem),
], limit=1)
if existing:
existing.write({'quantity': quantity})
return existing
@@ -62,31 +68,42 @@ class FusionBillingUsage(models.Model):
@api.model
def _cron_rate_open_periods(self):
"""Hourly cron: for every active charge, aggregate usage and upsert overage lines
on all in-progress subscriptions whose next invoice date is set."""
on the in-progress subscriptions that are on the charge's own plan.
A charge only rates subscriptions whose ``plan_id`` matches the charge's
``plan_id`` — never every subscription against every charge (C1/H4). The
billing-period window is the subscription's real open period
``[last_invoice_date or start_date, next_invoice_date)`` (H1)."""
Charge = self.env['fusion.billing.charge'].search([('active', '=', True)])
SaleOrder = self.env['sale.order']
for charge in Charge:
if not charge.plan_id:
continue
subs = SaleOrder.search([
('is_subscription', '=', True),
('subscription_state', '=', '3_progress'),
('plan_id.name', '!=', False),
('plan_id', '=', charge.plan_id.id),
])
for sub in subs:
if not sub.next_invoice_date:
continue
period_end = fields.Datetime.to_datetime(sub.next_invoice_date)
period_start = period_end.replace(day=1)
period_start = fields.Datetime.to_datetime(
sub.last_invoice_date or sub.start_date)
if not period_start:
continue
sub._fc_rate_usage(charge, period_start, period_end)
@api.model
def _aggregate(self, subscription, metric, period_start, period_end):
"""Aggregate stored usage for a subscription+metric within [period_start, period_end)
"""Aggregate stored usage for a subscription+metric over the half-open window
``[period_start, period_end)``, anchored on each rollup's ``period_start``,
using the metric's aggregation function."""
rows = self.search([
('subscription_id', '=', subscription.id),
('metric_id', '=', metric.id),
('period_start', '>=', period_start),
('period_end', '<=', period_end),
('period_start', '<', period_end),
])
qtys = rows.mapped('quantity')
if not qtys:

View File

@@ -39,6 +39,10 @@ class FusionBillingWebhook(models.Model):
"subscription.terminated / subscription.reactivated / usage.threshold_reached",
)
payload = fields.Json()
body = fields.Text(
help="Canonical JSON body that was signed and is POSTed verbatim "
"(so the signature always matches the bytes on the wire).",
)
state = fields.Selection(
[
("pending", "Pending"),
@@ -59,11 +63,14 @@ class FusionBillingWebhook(models.Model):
@api.model
def _enqueue(self, service, event_type, payload):
# Serialize the canonical body ONCE, store it, and sign that exact string so
# the dispatched bytes always match the signature (no re-serialization drift).
body = json.dumps(payload, sort_keys=True, separators=(',', ':'))
return self.create({
'service_id': service.id,
'event_type': event_type,
'payload': payload,
'body': body,
'signature': self._sign(service.webhook_secret, body),
'state': 'pending',
'next_retry_at': fields.Datetime.now(),
@@ -77,14 +84,18 @@ class FusionBillingWebhook(models.Model):
('next_retry_at', '<=', now),
], limit=100)
for wh in due:
body = json.dumps(wh.payload, sort_keys=True, separators=(',', ':'))
# POST the exact bytes that were signed at enqueue time. Fall back to
# re-serializing the payload only for legacy rows enqueued before `body`
# existed (the signature was computed over the same canonical form).
body = wh.body or json.dumps(wh.payload, sort_keys=True, separators=(',', ':'))
try:
resp = requests.post(
wh.service_id.webhook_url,
data=body,
headers={'Content-Type': 'application/json',
'X-Fusion-Signature': wh.signature,
'X-Fusion-Event': wh.event_type},
'X-Fusion-Event': wh.event_type,
'X-Fusion-Event-Id': str(wh.id)},
timeout=10,
)
ok = 200 <= resp.status_code < 300
@@ -98,4 +109,5 @@ class FusionBillingWebhook(models.Model):
wh.state = 'dead'
else:
wh.state = 'failed'
wh.next_retry_at = now + timedelta(minutes=2 ** wh.attempts)
# Cap the exponential backoff so the interval can't overflow.
wh.next_retry_at = now + timedelta(minutes=2 ** min(wh.attempts, 10))

View File

@@ -61,3 +61,79 @@ class TestApiHandlers(TransactionCase):
self.assertTrue(sub.is_subscription)
self.assertEqual(sub.plan_id, self.plan)
self.assertEqual(sub.subscription_state, '3_progress')
# ── item 4 (C3): malformed input returns a 4xx-shaped error, never raises ──
def test_record_usage_missing_metric_code_returns_error(self):
self.service._api_upsert_customer({'external_id': 'client-9', 'name': 'Globex'})
partner = self.env['fusion.billing.account.link'].search(
[('external_id', '=', 'client-9')]).partner_id
sub = self.env['sale.order'].sudo().create(
{'partner_id': partner.id, 'is_subscription': True, 'plan_id': self.plan.id})
# metric_code intentionally omitted — must return an error dict, not raise
res = self.service._api_record_usage({'events': [{
'subscription_external_id': str(sub.id),
'quantity': 10.0, 'period_start': '2026-05-01', 'period_end': '2026-06-01',
}]})
self.assertEqual(res['status'], 'error')
# no usage row written
usage = self.env['fusion.billing.usage'].search([('subscription_id', '=', sub.id)])
self.assertFalse(usage)
@tagged('post_install', '-at_install')
class TestUsageAuthorization(TransactionCase):
"""/usage must only accept events for subscriptions the calling service is linked
to, and reject unknown / non-subscription targets (items 3/C2/C4)."""
def setUp(self):
super().setUp()
self.metric = self.env['fusion.billing.metric'].sudo().create(
{'name': 'API Calls', 'code': 'api_calls', 'aggregation': 'sum'})
self.plan = self.env['sale.subscription.plan'].sudo().create(
{'name': 'Monthly', 'billing_period_value': 1, 'billing_period_unit': 'month'})
self.service_a = self.env['fusion.billing.service'].sudo().create(
{'name': 'Service A', 'code': 'svc_a'})
self.service_b = self.env['fusion.billing.service'].sudo().create(
{'name': 'Service B', 'code': 'svc_b'})
# Service A owns customer + subscription
self.service_a._api_upsert_customer({'external_id': 'cust-a', 'name': 'Cust A'})
self.partner_a = self.env['fusion.billing.account.link'].search(
[('service_id', '=', self.service_a.id), ('external_id', '=', 'cust-a')]).partner_id
self.sub_a = self.env['sale.order'].sudo().create(
{'partner_id': self.partner_a.id, 'is_subscription': True, 'plan_id': self.plan.id})
self.Usage = self.env['fusion.billing.usage'].sudo()
def _event(self, sub_id, idem):
return {'events': [{
'subscription_external_id': str(sub_id), 'metric_code': 'api_calls',
'quantity': 42.0, 'period_start': '2026-05-01', 'period_end': '2026-06-01',
'idempotency_key': idem,
}]}
def test_cross_service_usage_rejected(self):
"""Service B pushing usage onto Service A's subscription is rejected, no row."""
res = self.service_b._api_record_usage(self._event(self.sub_a.id, 'cross-1'))
self.assertEqual(res['status'], 'error')
self.assertEqual(res['error'], 'unknown subscription')
self.assertFalse(self.Usage.search([('subscription_id', '=', self.sub_a.id)]))
def test_same_service_usage_accepted(self):
"""Positive control: Service A pushing onto its own subscription is accepted."""
res = self.service_a._api_record_usage(self._event(self.sub_a.id, 'ok-1'))
self.assertEqual(res['status'], 'ok')
self.assertEqual(res['accepted'], 1)
self.assertTrue(self.Usage.search([('subscription_id', '=', self.sub_a.id)]))
def test_nonexistent_subscription_rejected(self):
res = self.service_a._api_record_usage(self._event(999_999_999, 'ghost-1'))
self.assertEqual(res['status'], 'error')
self.assertEqual(res['error'], 'unknown subscription')
def test_non_subscription_order_rejected(self):
"""A plain (non-subscription) sale.order owned by the linked customer is rejected."""
plain = self.env['sale.order'].sudo().create({'partner_id': self.partner_a.id})
self.assertFalse(plain.is_subscription)
res = self.service_a._api_record_usage(self._event(plain.id, 'plain-1'))
self.assertEqual(res['status'], 'error')
self.assertEqual(res['error'], 'unknown subscription')
self.assertFalse(self.Usage.search([('subscription_id', '=', plain.id)]))

View File

@@ -1,5 +1,8 @@
# -*- coding: utf-8 -*-
from psycopg2 import IntegrityError
from odoo.tests.common import TransactionCase, tagged
from odoo.tools.misc import mute_logger
@tagged('post_install', '-at_install')
@@ -43,3 +46,29 @@ class TestChargeMath(TransactionCase):
# 2,001 units -> 3 packages -> $6.00
_, amount = charge._compute_billable(2_001.0)
self.assertAlmostEqual(amount, 6.0, places=2)
# ── item 10 (M7): only the two implemented charge models remain ──
def test_charge_model_selection_limited(self):
field = self.env['fusion.billing.charge']._fields['charge_model']
keys = [k for k, _label in field.selection]
self.assertEqual(sorted(keys), ['package', 'standard'])
self.assertNotIn('graduated', keys)
self.assertNotIn('volume', keys)
# ── item 11 (L1): currency is required and defaults to company currency ──
def test_currency_required_and_defaulted(self):
field = self.env['fusion.billing.charge']._fields['currency_id']
self.assertTrue(field.required)
charge = self._charge()
self.assertEqual(charge.currency_id, self.env.company.currency_id)
# ── item 12 (L2): non-negative price + positive batch DB constraints ──
def test_negative_price_rejected(self):
with self.assertRaises(IntegrityError), mute_logger('odoo.sql_db'):
with self.env.cr.savepoint():
self._charge(price_per_unit=-1.0)
def test_zero_batch_rejected(self):
with self.assertRaises(IntegrityError), mute_logger('odoo.sql_db'):
with self.env.cr.savepoint():
self._charge(unit_batch=0.0)

View File

@@ -2,6 +2,66 @@
from odoo.tests.common import TransactionCase, tagged
@tagged('post_install', '-at_install')
class TestRatingCron(TransactionCase):
"""The rating cron must only rate a subscription against charges on its OWN plan
(items 1/C1/H4) and over the subscription's real open billing period (item 5/H1)."""
def setUp(self):
super().setUp()
self.metric = self.env['fusion.billing.metric'].sudo().create(
{'name': 'CPU seconds', 'code': 'cpu_seconds', 'aggregation': 'sum'})
self.plan_a = self.env['sale.subscription.plan'].sudo().create(
{'name': 'Plan A', 'billing_period_value': 1, 'billing_period_unit': 'month'})
self.plan_b = self.env['sale.subscription.plan'].sudo().create(
{'name': 'Plan B', 'billing_period_value': 1, 'billing_period_unit': 'month'})
self.partner = self.env['res.partner'].sudo().create({'name': 'Acme'})
self.recurring_product = self.env['product.product'].sudo().create(
{'name': 'Plan seat', 'type': 'service', 'recurring_invoice': True,
'list_price': 10.0})
self.overage_product = self.env['product.product'].sudo().create(
{'name': 'CPU overage', 'type': 'service', 'list_price': 0.0})
self.Usage = self.env['fusion.billing.usage'].sudo()
def _confirmed_sub(self, plan):
sub = self.env['sale.order'].sudo().create({
'partner_id': self.partner.id, 'plan_id': plan.id,
'order_line': [(0, 0, {'product_id': self.recurring_product.id,
'product_uom_qty': 1})],
})
sub.action_confirm()
# widen the computed billing window so usage in May is in-period
sub.write({'start_date': '2026-05-01', 'next_invoice_date': '2026-06-01'})
return sub
def test_cron_rates_only_matching_plan(self):
sub_a = self._confirmed_sub(self.plan_a)
sub_b = self._confirmed_sub(self.plan_b)
self.assertEqual(sub_a.subscription_state, '3_progress')
self.assertEqual(sub_b.subscription_state, '3_progress')
# one charge, linked to Plan A only
charge = self.env['fusion.billing.charge'].sudo().create({
'name': 'CPU overage', 'plan_code': 'plan-a', 'plan_id': self.plan_a.id,
'metric_id': self.metric.id, 'product_id': self.overage_product.id,
'included_quota': 100.0, 'price_per_unit': 0.10, 'unit_batch': 1000.0,
'charge_model': 'standard'})
# usage recorded on BOTH subs, in the open period
self.Usage._record_usage(sub_a, 'cpu_seconds', 1100.0,
'2026-05-10 00:00:00', '2026-05-11 00:00:00', idem='a1')
self.Usage._record_usage(sub_b, 'cpu_seconds', 1100.0,
'2026-05-10 00:00:00', '2026-05-11 00:00:00', idem='b1')
self.Usage._cron_rate_open_periods()
# Plan A sub IS rated (window captured the usage → overage line present)
line_a = sub_a.order_line.filtered(lambda l: l.product_id == self.overage_product)
self.assertTrue(line_a, "Plan A subscription should be rated by the Plan A charge")
self.assertAlmostEqual(line_a.price_unit, 0.10, places=2)
# Plan B sub is NOT rated by the Plan A charge
line_b = sub_b.order_line.filtered(lambda l: l.product_id == self.overage_product)
self.assertFalse(line_b, "Plan B subscription must NOT be rated by the Plan A charge")
@tagged('post_install', '-at_install')
class TestUsageIngestion(TransactionCase):
@@ -65,3 +125,47 @@ class TestUsageIngestion(TransactionCase):
self.assertAlmostEqual(amount, 0.10, places=2)
line = self.sub.order_line.filtered(lambda l: l.product_id == product)
self.assertTrue(line)
# ── item 6 (H2): half-open aggregation window anchored on period_start ──
def test_aggregate_daily_rollups_in_window(self):
"""Three DAILY rollups (period_start 05-01/-08/-15, each period_end +1 day)
sum correctly for the half-open window ['2026-05-01', '2026-06-01')."""
rollups = [
('2026-05-01 00:00:00', '2026-05-02 00:00:00', 3.0),
('2026-05-08 00:00:00', '2026-05-09 00:00:00', 5.0),
('2026-05-15 00:00:00', '2026-05-16 00:00:00', 7.0),
]
for i, (ps, pe, q) in enumerate(rollups):
self.Usage._record_usage(self.sub, 'cpu_seconds', q, ps, pe, idem='daily-%d' % i)
total = self.Usage._aggregate(
self.sub, self.metric, '2026-05-01 00:00:00', '2026-06-01 00:00:00')
self.assertEqual(total, 15.0) # 3 + 5 + 7
# ── item 7 (H3): idempotency key is scoped per (subscription, metric) ──
def test_same_idempotency_key_distinct_subscriptions(self):
"""The SAME idempotency key on two DIFFERENT subscriptions creates TWO rows."""
sub2 = self.env['sale.order'].sudo().create({
'partner_id': self.partner.id, 'is_subscription': True, 'plan_id': self.plan.id,
})
key = 'shared-idem-key'
a = self.Usage._record_usage(self.sub, 'cpu_seconds', 10.0, '2026-05-01', '2026-06-01', idem=key)
b = self.Usage._record_usage(sub2, 'cpu_seconds', 20.0, '2026-05-01', '2026-06-01', idem=key)
self.assertNotEqual(a, b) # distinct rows, no collision
rows = self.Usage.search([('idempotency_key', '=', key)])
self.assertEqual(len(rows), 2)
self.assertEqual(a.quantity, 10.0)
self.assertEqual(b.quantity, 20.0)
# ── item 2 (C1): zero aggregated usage creates no overage line ──
def test_zero_usage_creates_no_line(self):
product = self.env['product.product'].sudo().create(
{'name': 'API overage', 'type': 'service', 'list_price': 0.0})
charge = self.env['fusion.billing.charge'].sudo().create({
'name': 'overage', 'plan_code': 'p', 'metric_id': self.metric.id,
'product_id': product.id, 'included_quota': 100.0,
'price_per_unit': 0.10, 'unit_batch': 1000.0, 'charge_model': 'standard'})
# no usage recorded → aggregate is 0 → amount 0 → no line created
amount = self.sub._fc_rate_usage(charge, '2026-05-01', '2026-06-01')
self.assertEqual(amount, 0.0)
line = self.sub.order_line.filtered(lambda l: l.product_id == product)
self.assertFalse(line)

View File

@@ -4,6 +4,7 @@ import hmac
import json
from unittest.mock import patch
from odoo.exceptions import ValidationError
from odoo.tests.common import TransactionCase, tagged
@@ -51,3 +52,48 @@ class TestWebhookEngine(TransactionCase):
return_value=_Resp()):
self.Webhook._cron_dispatch()
self.assertEqual(wh.state, 'dead')
# ── item 8 (H5): dispatch POSTs the stored body verbatim + event-id header ──
def test_dispatch_posts_stored_body_and_event_id(self):
wh = self.Webhook._enqueue(self.service, 'invoice.payment_failed', {'invoice': 'INV-9'})
class _Resp:
status_code = 200
text = 'ok'
with patch('odoo.addons.fusion_centralize_billing.models.webhook.requests.post',
return_value=_Resp()) as mock_post:
self.Webhook._cron_dispatch()
self.assertTrue(mock_post.called)
_args, kwargs = mock_post.call_args
# the exact stored body is POSTed (not a re-serialized payload)
self.assertEqual(kwargs['data'], wh.body)
self.assertEqual(wh.body, json.dumps(
{'invoice': 'INV-9'}, sort_keys=True, separators=(',', ':')))
# signature matches the bytes on the wire
expected = hmac.new(b'whsec_test', wh.body.encode(), hashlib.sha256).hexdigest()
self.assertEqual(kwargs['headers']['X-Fusion-Signature'], expected)
# event id header present and correct
self.assertEqual(kwargs['headers']['X-Fusion-Event-Id'], str(wh.id))
# ── item 9 (H6): SSRF guard on webhook_url ──
def test_webhook_url_rejects_loopback(self):
with self.assertRaises(ValidationError):
self.env['fusion.billing.service'].sudo().create({
'name': 'Evil', 'code': 'evil', 'webhook_url': 'http://127.0.0.1/x'})
def test_webhook_url_rejects_private_and_http(self):
for bad in ('http://10.0.0.5/hook', # private + non-https
'https://192.168.1.10/hook', # private
'https://localhost/hook', # localhost host
'https://169.254.169.254/latest', # link-local metadata
'http://api.example.com/hook'): # non-https
with self.assertRaises(ValidationError):
self.env['fusion.billing.service'].sudo().create({
'name': 'Bad', 'code': 'bad-%s' % bad, 'webhook_url': bad})
def test_webhook_url_allows_public_https(self):
svc = self.env['fusion.billing.service'].sudo().create({
'name': 'Good', 'code': 'good',
'webhook_url': 'https://api.vps.nexasystems.ca/billing/webhook'})
self.assertTrue(svc.id)