The orchestrator AbstractModel for follow-up lifecycle. get_overdue_for_partner, compute_followup_level, send_followup_email, escalate_to_next_level, pause_followup, reset_followup, snapshot_followup_history. All controllers, AI tools, wizards, cron must route through these methods; no direct ORM writes to fusion.followup.run from anywhere else. Made-with: Cursor
75 lines
3.1 KiB
Python
75 lines
3.1 KiB
Python
"""Unit tests for the fusion.followup.engine 7-method API."""
|
|
|
|
from datetime import date, timedelta
|
|
from odoo.tests.common import TransactionCase
|
|
from odoo.tests import tagged
|
|
|
|
|
|
@tagged('post_install', '-at_install')
|
|
class TestFusionFollowupEngine(TransactionCase):
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.engine = self.env['fusion.followup.engine']
|
|
self.partner = self.env['res.partner'].create({
|
|
'name': 'Engine Test Partner', 'email': 'engine@test.local',
|
|
})
|
|
for seq, name, days, tone in [(901, 'Reminder', 7, 'gentle'),
|
|
(902, 'Warning', 30, 'firm'),
|
|
(903, 'Legal', 60, 'legal')]:
|
|
self.env['fusion.followup.level'].create({
|
|
'name': name, 'sequence': seq,
|
|
'delay_days': days, 'tone': tone,
|
|
})
|
|
|
|
def test_engine_model_exists(self):
|
|
self.assertIn('fusion.followup.engine', self.env.registry)
|
|
|
|
def test_get_overdue_returns_dict(self):
|
|
result = self.engine.get_overdue_for_partner(self.partner)
|
|
self.assertIn('aging', result)
|
|
self.assertIn('risk', result)
|
|
self.assertEqual(result['partner_id'], self.partner.id)
|
|
|
|
def test_compute_followup_level_no_overdue_returns_empty(self):
|
|
result = self.engine.compute_followup_level(self.partner)
|
|
self.assertFalse(result)
|
|
|
|
def test_pause_sets_partner_state(self):
|
|
until = date.today() + timedelta(days=14)
|
|
self.engine.pause_followup(self.partner, until_date=until)
|
|
self.partner.invalidate_recordset(['fusion_followup_paused_until', 'fusion_followup_status'])
|
|
self.assertEqual(self.partner.fusion_followup_paused_until, until)
|
|
self.assertEqual(self.partner.fusion_followup_status, 'paused')
|
|
|
|
def test_reset_clears_state(self):
|
|
self.engine.pause_followup(self.partner)
|
|
self.engine.reset_followup(self.partner)
|
|
self.partner.invalidate_recordset([
|
|
'fusion_followup_status', 'fusion_followup_paused_until',
|
|
'fusion_followup_last_level_id',
|
|
])
|
|
self.assertEqual(self.partner.fusion_followup_status, 'no_action')
|
|
self.assertFalse(self.partner.fusion_followup_paused_until)
|
|
|
|
def test_snapshot_history_returns_runs(self):
|
|
Run = self.env['fusion.followup.run']
|
|
run = Run.create({
|
|
'partner_id': self.partner.id,
|
|
'state': 'sent',
|
|
'overdue_amount': 500,
|
|
})
|
|
result = self.engine.snapshot_followup_history(self.partner)
|
|
self.assertEqual(result['count'], 1)
|
|
self.assertEqual(result['runs'][0]['id'], run.id)
|
|
|
|
def test_send_no_overdue_returns_no_action(self):
|
|
Level = self.env['fusion.followup.level']
|
|
level = Level.search([('sequence', '=', 901)], limit=1)
|
|
result = self.engine.send_followup_email(self.partner, level=level, force=True)
|
|
self.assertEqual(result['status'], 'no_overdue')
|
|
|
|
def test_escalate_when_no_current_level(self):
|
|
result = self.engine.escalate_to_next_level(self.partner)
|
|
self.assertIn('partner_id', result)
|