test(fusion_accounting_followup): engine integration tests for full lifecycle

End-to-end flows over a real posted receivable line: aging discovery,
level resolution, send-with-cache reuse, pause+force override, and
audit history growth. Adds ignore_pause kwarg to compute_followup_level
so force=True in send_followup_email reaches level resolution.

Made-with: Cursor
This commit is contained in:
gsinghpal
2026-04-19 20:54:13 -04:00
parent 6802d60e44
commit 9b6d6b3895
4 changed files with 85 additions and 8 deletions

View File

@@ -1,6 +1,6 @@
{ {
'name': 'Fusion Accounting Follow-up', 'name': 'Fusion Accounting Follow-up',
'version': '19.0.1.0.13', 'version': '19.0.1.0.14',
'category': 'Accounting/Accounting', 'category': 'Accounting/Accounting',
'summary': 'AI-augmented customer follow-ups (dunning) for unpaid invoices.', 'summary': 'AI-augmented customer follow-ups (dunning) for unpaid invoices.',
'description': """ 'description': """

View File

@@ -54,12 +54,12 @@ class FusionFollowupEngine(models.AbstractModel):
} }
@api.model @api.model
def compute_followup_level(self, partner): def compute_followup_level(self, partner, *, ignore_pause=False):
"""Return the fusion.followup.level recordset that should fire now, """Return the fusion.followup.level recordset that should fire now,
or empty recordset if no action needed.""" or empty recordset if no action needed."""
partner.ensure_one() partner.ensure_one()
Level = self.env['fusion.followup.level'] Level = self.env['fusion.followup.level']
if partner.fusion_followup_paused_until and \ if not ignore_pause and partner.fusion_followup_paused_until and \
partner.fusion_followup_paused_until > fields.Date.today(): partner.fusion_followup_paused_until > fields.Date.today():
return Level return Level
@@ -101,11 +101,6 @@ class FusionFollowupEngine(models.AbstractModel):
Creates a fusion.followup.run record. Uses cached text if available.""" Creates a fusion.followup.run record. Uses cached text if available."""
partner.ensure_one() partner.ensure_one()
if not level:
level = self.compute_followup_level(partner)
if not level:
return {'status': 'no_action', 'partner_id': partner.id}
if not force and partner.fusion_followup_paused_until and \ if not force and partner.fusion_followup_paused_until and \
partner.fusion_followup_paused_until > fields.Date.today(): partner.fusion_followup_paused_until > fields.Date.today():
return { return {
@@ -113,6 +108,11 @@ class FusionFollowupEngine(models.AbstractModel):
'partner_id': partner.id, 'partner_id': partner.id,
} }
if not level:
level = self.compute_followup_level(partner, ignore_pause=force)
if not level:
return {'status': 'no_action', 'partner_id': partner.id}
if level.requires_manual_review and not force: if level.requires_manual_review and not force:
run = self._create_run(partner, level, state='manual_review') run = self._create_run(partner, level, state='manual_review')
return { return {

View File

@@ -9,3 +9,4 @@ from . import test_fusion_followup_text_cache
from . import test_res_partner_inherit from . import test_res_partner_inherit
from . import test_account_move_line_inherit from . import test_account_move_line_inherit
from . import test_fusion_followup_engine from . import test_fusion_followup_engine
from . import test_engine_integration

View File

@@ -0,0 +1,76 @@
"""Integration tests: full follow-up flow with real overdue invoices."""
from datetime import date, timedelta
from odoo.tests.common import TransactionCase
from odoo.tests import tagged
@tagged('post_install', '-at_install', 'integration')
class TestFollowupEngineIntegration(TransactionCase):
def setUp(self):
super().setUp()
self.engine = self.env['fusion.followup.engine']
self.partner = self.env['res.partner'].create({
'name': 'Integration Partner', 'email': 'integ@test.local',
})
for seq, name, days, tone in [(801, 'Test Reminder', 7, 'gentle'),
(802, 'Test Warning', 30, 'firm'),
(803, 'Test Legal', 60, 'legal')]:
self.env['fusion.followup.level'].create({
'name': name, 'sequence': seq, 'delay_days': days, 'tone': tone,
})
line = self.env['account.move.line'].search([
('parent_state', '=', 'posted'),
('account_id.account_type', '=', 'asset_receivable'),
('reconciled', '=', False),
('amount_residual', '>', 0),
], limit=1)
if not line:
self.skipTest("No posted unreconciled receivable lines in test DB")
line.write({
'partner_id': self.partner.id,
'date_maturity': date.today() - timedelta(days=20),
})
def test_get_overdue_finds_lines(self):
result = self.engine.get_overdue_for_partner(self.partner)
self.assertGreater(result['overdue_line_count'], 0)
self.assertGreater(result['aging']['total_overdue_amount'], 0)
def test_compute_level_picks_reminder_at_20_days(self):
level = self.engine.compute_followup_level(self.partner)
self.assertTrue(level)
self.assertGreater(level.delay_days, 0)
def test_send_followup_creates_run(self):
result = self.engine.send_followup_email(self.partner, force=True)
self.assertIn(result['status'], ('sent', 'manual_review'))
if 'run_id' in result:
run = self.env['fusion.followup.run'].browse(result['run_id'])
self.assertEqual(run.partner_id, self.partner)
def test_pause_blocks_send_unless_force(self):
self.engine.pause_followup(self.partner,
until_date=date.today() + timedelta(days=30))
result = self.engine.send_followup_email(self.partner)
self.assertTrue(result['status'].startswith('paused'))
result_force = self.engine.send_followup_email(self.partner, force=True)
self.assertIn(result_force['status'], ('sent', 'manual_review'))
def test_history_grows_with_each_send(self):
Run = self.env['fusion.followup.run']
before = Run.search_count([('partner_id', '=', self.partner.id)])
self.engine.send_followup_email(self.partner, force=True)
after = Run.search_count([('partner_id', '=', self.partner.id)])
self.assertGreater(after, before)
def test_text_cache_used_on_repeat_call(self):
Cache = self.env['fusion.followup.text.cache']
self.engine.send_followup_email(self.partner, force=True)
cache_count_after_first = Cache.search_count([('partner_id', '=', self.partner.id)])
self.engine.send_followup_email(self.partner, force=True)
cache_count_after_second = Cache.search_count([('partner_id', '=', self.partner.id)])
self.assertEqual(cache_count_after_first, cache_count_after_second,
"Repeat send with same params should not create new cache row")