Files
Odoo-Modules/fusion_accounting_followup/models/fusion_followup_engine.py
gsinghpal 9b6d6b3895 test(fusion_accounting_followup): engine integration tests for full lifecycle
End-to-end flows over a real posted receivable line: aging discovery,
level resolution, send-with-cache reuse, pause+force override, and
audit history growth. Adds ignore_pause kwarg to compute_followup_level
so force=True in send_followup_email reaches level resolution.

Made-with: Cursor
2026-04-19 20:54:13 -04:00

380 lines
15 KiB
Python

"""The follow-up engine — orchestrator for customer follow-ups.
7-method public API. All controllers, AI tools, wizards, cron must
go through this engine; no direct ORM writes to fusion.followup.run
from elsewhere."""
import logging
from datetime import date, timedelta
from odoo import _, api, fields, models
from odoo.exceptions import ValidationError, UserError
from ..services.overdue_aging import compute_aging
from ..services.level_resolver import resolve_level, FollowupLevelSpec
from ..services.risk_scorer import score_partner
from ..services.tone_selector import select_tone
from ..services.followup_text_generator import generate_followup_text
_logger = logging.getLogger(__name__)
class FusionFollowupEngine(models.AbstractModel):
_name = "fusion.followup.engine"
_description = "Fusion Follow-up Engine"
# ============================================================
# PUBLIC API (7 methods)
# ============================================================
@api.model
def get_overdue_for_partner(self, partner) -> dict:
"""Return aging report + risk score for a partner."""
partner.ensure_one()
as_of = fields.Date.today()
move_lines = self._fetch_overdue_lines(partner)
aging = compute_aging(
move_lines=[{
'date_maturity': l.date_maturity,
'amount_residual': l.amount_residual,
} for l in move_lines],
as_of=as_of,
)
risk = self._compute_risk(partner, move_lines)
return {
'partner_id': partner.id,
'as_of': str(as_of),
'aging': aging.to_dict(),
'risk': {
'score': risk.score,
'band': risk.band,
'drivers': risk.drivers,
},
'overdue_line_count': len(move_lines),
}
@api.model
def compute_followup_level(self, partner, *, ignore_pause=False):
"""Return the fusion.followup.level recordset that should fire now,
or empty recordset if no action needed."""
partner.ensure_one()
Level = self.env['fusion.followup.level']
if not ignore_pause and partner.fusion_followup_paused_until and \
partner.fusion_followup_paused_until > fields.Date.today():
return Level
as_of = fields.Date.today()
move_lines = self._fetch_overdue_lines(partner)
if not move_lines:
return Level
aging = compute_aging(
move_lines=[{
'date_maturity': l.date_maturity,
'amount_residual': l.amount_residual,
} for l in move_lines],
as_of=as_of,
)
company_id = partner.company_id.id if partner.company_id else self.env.company.id
levels = Level.search([
('active', '=', True),
'|', ('company_id', '=', company_id), ('company_id', '=', False),
], order='sequence')
if not levels:
return Level
specs = [FollowupLevelSpec(
sequence=l.sequence, name=l.name,
delay_days=l.delay_days, tone=l.tone,
) for l in levels]
chosen_spec = resolve_level(aging_report=aging, levels=specs)
if chosen_spec is None:
return Level
return levels.filtered(lambda l: l.sequence == chosen_spec.sequence)[:1]
@api.model
def send_followup_email(self, partner, *, level=None, force=False) -> dict:
"""Send a follow-up email at the given level (or auto-resolve if None).
Creates a fusion.followup.run record. Uses cached text if available."""
partner.ensure_one()
if not force and partner.fusion_followup_paused_until and \
partner.fusion_followup_paused_until > fields.Date.today():
return {
'status': 'paused_until_' + str(partner.fusion_followup_paused_until),
'partner_id': partner.id,
}
if not level:
level = self.compute_followup_level(partner, ignore_pause=force)
if not level:
return {'status': 'no_action', 'partner_id': partner.id}
if level.requires_manual_review and not force:
run = self._create_run(partner, level, state='manual_review')
return {
'status': 'manual_review',
'partner_id': partner.id,
'run_id': run.id,
}
overdue_data = self.get_overdue_for_partner(partner)
if overdue_data['overdue_line_count'] == 0:
return {'status': 'no_overdue', 'partner_id': partner.id}
tone = select_tone(
level_sequence=level.sequence,
risk_score=overdue_data['risk']['score'],
)
text_data = self._get_or_generate_text(
partner=partner, level=level,
overdue_amount=overdue_data['aging']['total_overdue_amount'],
longest_overdue_days=self._max_overdue_days_from_aging(overdue_data['aging']),
invoice_count=overdue_data['overdue_line_count'],
tone=tone, risk_drivers=overdue_data['risk']['drivers'],
)
run = self._create_run(
partner, level, state='draft',
overdue_amount=overdue_data['aging']['total_overdue_amount'],
longest_overdue_days=self._max_overdue_days_from_aging(overdue_data['aging']),
risk_score=overdue_data['risk']['score'],
risk_band=overdue_data['risk']['band'],
subject=text_data['subject'],
body=text_data['body'],
tone_used=text_data['tone_used'],
text_was_ai_generated=text_data.get('_was_ai', False),
)
try:
self._send_email(partner, run)
run.write({'state': 'sent'})
partner.write({
'fusion_followup_status': 'no_action',
'fusion_followup_last_level_id': level.id,
'fusion_followup_last_run_date': fields.Datetime.now(),
})
except Exception as e:
_logger.warning("Email send failed for partner %s: %s", partner.id, e)
run.write({'state': 'failed', 'error_message': str(e)})
return {
'status': 'sent', 'partner_id': partner.id,
'run_id': run.id, 'level_id': level.id, 'tone': tone,
}
@api.model
def escalate_to_next_level(self, partner) -> dict:
"""Force the next-higher level than the partner's current last_level."""
partner.ensure_one()
Level = self.env['fusion.followup.level']
current = partner.fusion_followup_last_level_id
next_seq = (current.sequence + 1) if current else 1
company_id = partner.company_id.id if partner.company_id else self.env.company.id
next_level = Level.search([
('active', '=', True),
('sequence', '>=', next_seq),
'|', ('company_id', '=', company_id), ('company_id', '=', False),
], order='sequence', limit=1)
if not next_level:
return {'status': 'at_max_level', 'partner_id': partner.id}
return self.send_followup_email(partner, level=next_level, force=True)
@api.model
def pause_followup(self, partner, until_date: date = None) -> dict:
"""Pause follow-ups for a partner until a date (default 30 days)."""
partner.ensure_one()
until = until_date or (fields.Date.today() + timedelta(days=30))
partner.write({
'fusion_followup_paused_until': until,
'fusion_followup_status': 'paused',
})
return {'partner_id': partner.id, 'paused_until': str(until)}
@api.model
def reset_followup(self, partner) -> dict:
"""Reset partner's follow-up state to no_action."""
partner.ensure_one()
partner.write({
'fusion_followup_status': 'no_action',
'fusion_followup_paused_until': False,
'fusion_followup_last_level_id': False,
})
return {'partner_id': partner.id, 'status': 'reset'}
@api.model
def snapshot_followup_history(self, partner, *, limit: int = 50) -> dict:
"""Return audit history for a partner."""
partner.ensure_one()
Run = self.env['fusion.followup.run']
runs = Run.search([
('partner_id', '=', partner.id),
], order='execution_date desc', limit=int(limit))
return {
'partner_id': partner.id,
'count': len(runs),
'runs': [{
'id': r.id, 'date': str(r.execution_date),
'level_id': r.level_id.id if r.level_id else None,
'level_name': r.level_id.name if r.level_id else '',
'state': r.state,
'overdue_amount': r.overdue_amount,
'longest_overdue_days': r.longest_overdue_days,
'tone_used': r.tone_used,
'risk_score': r.risk_score,
'subject': r.subject or '',
'text_was_ai_generated': r.text_was_ai_generated,
} for r in runs],
}
# ============================================================
# PRIVATE HELPERS
# ============================================================
def _fetch_overdue_lines(self, partner):
"""Fetch posted, unreconciled receivable lines for a partner."""
Line = self.env['account.move.line'].sudo()
return Line.search([
('partner_id', '=', partner.id),
('parent_state', '=', 'posted'),
('account_id.account_type', '=', 'asset_receivable'),
('reconciled', '=', False),
('amount_residual', '>', 0),
])
def _compute_risk(self, partner, overdue_lines):
"""Compute risk score from partner's payment history."""
Line = self.env['account.move.line'].sudo()
all_lines = Line.search([
('partner_id', '=', partner.id),
('parent_state', '=', 'posted'),
('account_id.account_type', '=', 'asset_receivable'),
])
total_invoices = len(all_lines)
# Heavy paid-late computation deferred to Phase 4.5
paid_late_count = 0
avg_days_late = 0.0
as_of = fields.Date.today()
longest_overdue_days = 0
for line in overdue_lines:
if line.date_maturity:
days = (as_of - line.date_maturity).days
if days > longest_overdue_days:
longest_overdue_days = days
open_overdue = sum(line.amount_residual for line in overdue_lines)
avg_invoice_amount = 1000.0
if total_invoices > 0:
total_amount = sum(all_lines.mapped('balance'))
if total_amount:
avg_invoice_amount = abs(total_amount) / total_invoices
return score_partner(
total_invoices=total_invoices,
paid_late_count=paid_late_count,
avg_days_late=avg_days_late,
longest_overdue_days=longest_overdue_days,
open_overdue_amount=open_overdue,
average_invoice_amount=avg_invoice_amount,
)
def _max_overdue_days_from_aging(self, aging_dict):
"""Extract longest overdue days from aging dict."""
tracked = aging_dict.get('max_days_overdue', 0) or 0
if tracked:
return tracked
max_days = 0
for b in aging_dict.get('buckets', []):
if b['name'] == 'current' or b['amount'] <= 0:
continue
if b['days_max'] is None:
max_days = max(max_days, b['days_min'])
else:
max_days = max(max_days, b['days_max'])
return max_days
def _get_or_generate_text(self, *, partner, level, overdue_amount,
longest_overdue_days, invoice_count, tone,
risk_drivers=None) -> dict:
"""Cache lookup + LLM fallback."""
Cache = self.env['fusion.followup.text.cache']
cached = Cache.lookup(
partner_id=partner.id, level_id=level.id,
overdue_amount=overdue_amount,
longest_overdue_days=longest_overdue_days,
invoice_count=invoice_count, tone=tone,
)
if cached:
cached.action_increment_use()
return {
'subject': cached.subject, 'body': cached.body,
'tone_used': cached.tone_used,
'key_points': cached.key_points or [],
'_was_ai': bool(cached.provider),
}
company = partner.company_id or self.env.company
currency = company.currency_id
text = generate_followup_text(
self.env,
partner_name=partner.name,
total_overdue=overdue_amount,
currency_code=currency.name or 'USD',
longest_overdue_days=longest_overdue_days,
tone=tone, invoice_count=invoice_count,
risk_drivers=risk_drivers,
)
try:
Cache.sudo().create({
'partner_id': partner.id, 'level_id': level.id,
'company_id': company.id,
'fingerprint': Cache.compute_fingerprint(
partner_id=partner.id, level_id=level.id,
overdue_amount=overdue_amount,
longest_overdue_days=longest_overdue_days,
invoice_count=invoice_count, tone=tone,
),
'subject': text['subject'], 'body': text['body'],
'tone_used': text.get('tone_used', tone),
'key_points': text.get('key_points', []),
})
except Exception as e:
_logger.debug("Cache create failed (non-fatal): %s", e)
text['_was_ai'] = False
return text
def _create_run(self, partner, level, *, state='draft', **vals):
Run = self.env['fusion.followup.run'].sudo()
company = partner.company_id or self.env.company
defaults = {
'partner_id': partner.id,
'company_id': company.id,
'level_id': level.id if level else False,
'state': state,
}
defaults.update(vals)
return Run.create(defaults)
def _send_email(self, partner, run):
"""Best-effort email send. Uses level's mail_template if set, else
creates a simple message."""
if not partner.email:
raise UserError(_("Partner %s has no email address.") % partner.name)
if run.level_id and run.level_id.mail_template_id:
run.level_id.mail_template_id.send_mail(partner.id, force_send=True)
else:
body_text = (run.body or '').replace('<', '&lt;').replace('>', '&gt;')
self.env['mail.mail'].sudo().create({
'subject': run.subject or 'Follow-up',
'body_html': '<pre>{}</pre>'.format(body_text),
'email_to': partner.email,
'recipient_ids': [(4, partner.id)],
}).send()
run.write({'sent_to_email': partner.email})