"""Anomaly detection for financial reports. Compares each row's current-period amount to its comparison-period amount and flags variances exceeding a threshold. Uses both: - Absolute threshold ($X minimum movement) - Percentage threshold (Y% min variance) Pure-Python: callers pass the engine's compute_*() result; we return a list of anomaly dicts.""" from dataclasses import dataclass @dataclass class Anomaly: row_id: str label: str current_amount: float comparison_amount: float variance_amount: float variance_pct: float severity: str # 'low', 'medium', 'high' direction: str # 'increase', 'decrease' def to_dict(self): return { 'row_id': self.row_id, 'label': self.label, 'current_amount': self.current_amount, 'comparison_amount': self.comparison_amount, 'variance_amount': self.variance_amount, 'variance_pct': self.variance_pct, 'severity': self.severity, 'direction': self.direction, } # Defaults -- tunable per company via ir.config_parameter DEFAULT_MIN_ABSOLUTE_THRESHOLD = 100.0 DEFAULT_MIN_PCT_THRESHOLD = 10.0 # 10% DEFAULT_HIGH_PCT_THRESHOLD = 50.0 # 50%+ flagged 'high' def detect(report_result: dict, *, min_absolute: float = None, min_pct: float = None, high_pct: float = None) -> list[dict]: """Detect anomalies in a report_result dict (engine output). Returns list of anomaly dicts ordered by severity desc, variance_amount desc. Returns empty list if no comparison period was computed.""" if not report_result.get('comparison_period'): return [] min_absolute = min_absolute if min_absolute is not None else DEFAULT_MIN_ABSOLUTE_THRESHOLD min_pct = min_pct if min_pct is not None else DEFAULT_MIN_PCT_THRESHOLD high_pct = high_pct if high_pct is not None else DEFAULT_HIGH_PCT_THRESHOLD anomalies = [] for row in report_result.get('rows', []): comparison = row.get('amount_comparison') current = row.get('amount', 0.0) if comparison is None: continue variance_amount = current - comparison variance_pct = abs(row.get('variance_pct') or 0.0) if abs(variance_amount) < min_absolute: continue if variance_pct < min_pct: continue severity = 'high' if variance_pct >= high_pct else 'medium' if variance_pct >= min_pct * 2 else 'low' direction = 'increase' if variance_amount > 0 else 'decrease' anomalies.append(Anomaly( row_id=row['id'], label=row.get('label', ''), current_amount=current, comparison_amount=comparison, variance_amount=variance_amount, variance_pct=variance_pct, severity=severity, direction=direction, ).to_dict()) severity_order = {'high': 0, 'medium': 1, 'low': 2} anomalies.sort(key=lambda a: (severity_order[a['severity']], -abs(a['variance_amount']))) return anomalies