Files
gsinghpal 9ebf89bde2 changes
2026-05-16 13:18:52 -04:00

82 lines
3.0 KiB
Python

"""Anomaly detection for financial reports.
Compares each row's current-period amount to its comparison-period
amount and flags variances exceeding a threshold. Uses both:
- Absolute threshold ($X minimum movement)
- Percentage threshold (Y% min variance)
Pure-Python: callers pass the engine's compute_*() result; we return
a list of anomaly dicts."""
from dataclasses import dataclass
@dataclass
class Anomaly:
row_id: str
label: str
current_amount: float
comparison_amount: float
variance_amount: float
variance_pct: float
severity: str # 'low', 'medium', 'high'
direction: str # 'increase', 'decrease'
def to_dict(self):
return {
'row_id': self.row_id, 'label': self.label,
'current_amount': self.current_amount,
'comparison_amount': self.comparison_amount,
'variance_amount': self.variance_amount,
'variance_pct': self.variance_pct,
'severity': self.severity, 'direction': self.direction,
}
# Defaults -- tunable per company via ir.config_parameter
DEFAULT_MIN_ABSOLUTE_THRESHOLD = 100.0
DEFAULT_MIN_PCT_THRESHOLD = 10.0 # 10%
DEFAULT_HIGH_PCT_THRESHOLD = 50.0 # 50%+ flagged 'high'
def detect(report_result: dict, *, min_absolute: float = None,
min_pct: float = None, high_pct: float = None) -> list[dict]:
"""Detect anomalies in a report_result dict (engine output).
Returns list of anomaly dicts ordered by severity desc, variance_amount desc.
Returns empty list if no comparison period was computed."""
if not report_result.get('comparison_period'):
return []
min_absolute = min_absolute if min_absolute is not None else DEFAULT_MIN_ABSOLUTE_THRESHOLD
min_pct = min_pct if min_pct is not None else DEFAULT_MIN_PCT_THRESHOLD
high_pct = high_pct if high_pct is not None else DEFAULT_HIGH_PCT_THRESHOLD
anomalies = []
for row in report_result.get('rows', []):
comparison = row.get('amount_comparison')
current = row.get('amount', 0.0)
if comparison is None:
continue
variance_amount = current - comparison
variance_pct = abs(row.get('variance_pct') or 0.0)
if abs(variance_amount) < min_absolute:
continue
if variance_pct < min_pct:
continue
severity = 'high' if variance_pct >= high_pct else 'medium' if variance_pct >= min_pct * 2 else 'low'
direction = 'increase' if variance_amount > 0 else 'decrease'
anomalies.append(Anomaly(
row_id=row['id'],
label=row.get('label', ''),
current_amount=current,
comparison_amount=comparison,
variance_amount=variance_amount,
variance_pct=variance_pct,
severity=severity,
direction=direction,
).to_dict())
severity_order = {'high': 0, 'medium': 1, 'low': 2}
anomalies.sort(key=lambda a: (severity_order[a['severity']], -abs(a['variance_amount'])))
return anomalies