- C1/H4: rating cron only rates subs on the charge's own plan_id
- C1: _fc_rate_usage skips creating a line when amount is 0 (still updates existing)
- C2/C4: /usage authorizes each event (exists + is_subscription + linked customer)
- C3: API handlers validate input and return 4xx-shaped errors instead of raising;
controller maps status=='error' to HTTP 400
- H1: cron uses real billing window [last_invoice_date or start_date, next_invoice_date)
- H2: _aggregate uses half-open window anchored on period_start
- H3: idempotency scoped to (subscription_id, metric_id, idempotency_key)
- H5: webhook stores canonical body, signs+POSTs it verbatim, adds X-Fusion-Event-Id,
caps backoff at 2**min(attempts,10)
- H6: SSRF guard rejects non-https / localhost / private / link-local webhook_url
- M7: charge_model reduced to standard/package (dropped unimplemented graduated/volume)
- L1: currency_id required on charge + reconciliation
- L2: charge price non-negative + unit_batch positive DB constraints
Adds 17 regression tests (suite 22 -> 39, all green via fcb_test_on_trial.sh).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
114 lines
4.1 KiB
Python
114 lines
4.1 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2026 Nexa Systems Inc.
|
|
# License OPL-1
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import logging
|
|
from datetime import timedelta
|
|
|
|
import requests
|
|
|
|
from odoo import api, fields, models
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
MAX_ATTEMPTS = 8
|
|
|
|
|
|
class FusionBillingWebhook(models.Model):
|
|
"""Outbound webhook queue: lifecycle events delivered to source apps.
|
|
|
|
Processed by a cron with exponential backoff + HMAC-SHA256 signing, dead-lettered
|
|
after N attempts (mirror the proven retry pattern in NexaDesk's
|
|
lago-payment-retry-job). Apps react: suspend / restore / deprovision. See spec §8.
|
|
|
|
TODO(spec §8): cron processor, HMAC signing, backoff schedule.
|
|
"""
|
|
|
|
_name = "fusion.billing.webhook"
|
|
_description = "Fusion Billing — Outbound Webhook Event"
|
|
_order = "create_date desc"
|
|
|
|
service_id = fields.Many2one(
|
|
"fusion.billing.service", required=True, ondelete="cascade", index=True,
|
|
)
|
|
event_type = fields.Char(
|
|
required=True, index=True,
|
|
help="invoice.payment_failed / invoice.payment_succeeded / "
|
|
"subscription.terminated / subscription.reactivated / usage.threshold_reached",
|
|
)
|
|
payload = fields.Json()
|
|
body = fields.Text(
|
|
help="Canonical JSON body that was signed and is POSTed verbatim "
|
|
"(so the signature always matches the bytes on the wire).",
|
|
)
|
|
state = fields.Selection(
|
|
[
|
|
("pending", "Pending"),
|
|
("sent", "Sent"),
|
|
("failed", "Failed"),
|
|
("dead", "Dead-lettered"),
|
|
],
|
|
default="pending", required=True, index=True,
|
|
)
|
|
attempts = fields.Integer(default=0)
|
|
next_retry_at = fields.Datetime()
|
|
signature = fields.Char(help="HMAC-SHA256 of the payload using the service webhook_secret.")
|
|
last_error = fields.Text()
|
|
|
|
@api.model
|
|
def _sign(self, secret, body):
|
|
return hmac.new((secret or '').encode(), body.encode(), hashlib.sha256).hexdigest()
|
|
|
|
@api.model
|
|
def _enqueue(self, service, event_type, payload):
|
|
# Serialize the canonical body ONCE, store it, and sign that exact string so
|
|
# the dispatched bytes always match the signature (no re-serialization drift).
|
|
body = json.dumps(payload, sort_keys=True, separators=(',', ':'))
|
|
return self.create({
|
|
'service_id': service.id,
|
|
'event_type': event_type,
|
|
'payload': payload,
|
|
'body': body,
|
|
'signature': self._sign(service.webhook_secret, body),
|
|
'state': 'pending',
|
|
'next_retry_at': fields.Datetime.now(),
|
|
})
|
|
|
|
@api.model
|
|
def _cron_dispatch(self):
|
|
now = fields.Datetime.now()
|
|
due = self.search([
|
|
('state', 'in', ('pending', 'failed')),
|
|
('next_retry_at', '<=', now),
|
|
], limit=100)
|
|
for wh in due:
|
|
# POST the exact bytes that were signed at enqueue time. Fall back to
|
|
# re-serializing the payload only for legacy rows enqueued before `body`
|
|
# existed (the signature was computed over the same canonical form).
|
|
body = wh.body or json.dumps(wh.payload, sort_keys=True, separators=(',', ':'))
|
|
try:
|
|
resp = requests.post(
|
|
wh.service_id.webhook_url,
|
|
data=body,
|
|
headers={'Content-Type': 'application/json',
|
|
'X-Fusion-Signature': wh.signature,
|
|
'X-Fusion-Event': wh.event_type,
|
|
'X-Fusion-Event-Id': str(wh.id)},
|
|
timeout=10,
|
|
)
|
|
ok = 200 <= resp.status_code < 300
|
|
except Exception as e: # noqa: BLE001 - record and retry
|
|
ok = False
|
|
wh.last_error = str(e)[:500]
|
|
wh.attempts += 1
|
|
if ok:
|
|
wh.state = 'sent'
|
|
elif wh.attempts >= MAX_ATTEMPTS:
|
|
wh.state = 'dead'
|
|
else:
|
|
wh.state = 'failed'
|
|
# Cap the exponential backoff so the interval can't overflow.
|
|
wh.next_retry_at = now + timedelta(minutes=2 ** min(wh.attempts, 10))
|