# -*- coding: utf-8 -*- import hashlib import hmac import json from unittest.mock import patch from odoo.tests.common import TransactionCase, tagged @tagged('post_install', '-at_install') class TestWebhookEngine(TransactionCase): def setUp(self): super().setUp() self.service = self.env['fusion.billing.service'].sudo().create({ 'name': 'NexaCloud', 'code': 'nexacloud', 'webhook_url': 'https://api.vps.nexasystems.ca/billing/webhook', 'webhook_secret': 'whsec_test', }) self.Webhook = self.env['fusion.billing.webhook'].sudo() def test_enqueue_signs_payload(self): wh = self.Webhook._enqueue(self.service, 'invoice.payment_failed', {'invoice': 'INV-1'}) self.assertEqual(wh.state, 'pending') body = json.dumps({'invoice': 'INV-1'}, sort_keys=True, separators=(',', ':')) expected = hmac.new(b'whsec_test', body.encode(), hashlib.sha256).hexdigest() self.assertEqual(wh.signature, expected) def test_dispatch_marks_sent_on_2xx(self): wh = self.Webhook._enqueue(self.service, 'invoice.paid', {'invoice': 'INV-2'}) class _Resp: status_code = 200 text = 'ok' with patch('odoo.addons.fusion_centralize_billing.models.webhook.requests.post', return_value=_Resp()) as mock_post: self.Webhook._cron_dispatch() self.assertTrue(mock_post.called) self.assertEqual(wh.state, 'sent') def test_dispatch_retries_then_deadletters(self): wh = self.Webhook._enqueue(self.service, 'invoice.paid', {'invoice': 'INV-3'}) wh.write({'attempts': 7}) # already past max class _Resp: status_code = 500 text = 'err' with patch('odoo.addons.fusion_centralize_billing.models.webhook.requests.post', return_value=_Resp()): self.Webhook._cron_dispatch() self.assertEqual(wh.state, 'dead')