feat(fusion_accounting_reports): anomaly_detection service
Made-with: Cursor
This commit is contained in:
@@ -4,3 +4,4 @@ from . import totaling
|
||||
from . import currency_conversion
|
||||
from . import line_resolver
|
||||
from . import drill_down_resolver
|
||||
from . import anomaly_detection
|
||||
|
||||
81
fusion_accounting_reports/services/anomaly_detection.py
Normal file
81
fusion_accounting_reports/services/anomaly_detection.py
Normal file
@@ -0,0 +1,81 @@
|
||||
"""Anomaly detection for financial reports.
|
||||
|
||||
Compares each row's current-period amount to its comparison-period
|
||||
amount and flags variances exceeding a threshold. Uses both:
|
||||
- Absolute threshold ($X minimum movement)
|
||||
- Percentage threshold (Y% min variance)
|
||||
|
||||
Pure-Python: callers pass the engine's compute_*() result; we return
|
||||
a list of anomaly dicts."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class Anomaly:
|
||||
row_id: str
|
||||
label: str
|
||||
current_amount: float
|
||||
comparison_amount: float
|
||||
variance_amount: float
|
||||
variance_pct: float
|
||||
severity: str # 'low', 'medium', 'high'
|
||||
direction: str # 'increase', 'decrease'
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
'row_id': self.row_id, 'label': self.label,
|
||||
'current_amount': self.current_amount,
|
||||
'comparison_amount': self.comparison_amount,
|
||||
'variance_amount': self.variance_amount,
|
||||
'variance_pct': self.variance_pct,
|
||||
'severity': self.severity, 'direction': self.direction,
|
||||
}
|
||||
|
||||
|
||||
# Defaults -- tunable per company via ir.config_parameter
|
||||
DEFAULT_MIN_ABSOLUTE_THRESHOLD = 100.0
|
||||
DEFAULT_MIN_PCT_THRESHOLD = 10.0 # 10%
|
||||
DEFAULT_HIGH_PCT_THRESHOLD = 50.0 # 50%+ flagged 'high'
|
||||
|
||||
|
||||
def detect(report_result: dict, *, min_absolute: float = None,
|
||||
min_pct: float = None, high_pct: float = None) -> list[dict]:
|
||||
"""Detect anomalies in a report_result dict (engine output).
|
||||
|
||||
Returns list of anomaly dicts ordered by severity desc, variance_amount desc.
|
||||
Returns empty list if no comparison period was computed."""
|
||||
if not report_result.get('comparison_period'):
|
||||
return []
|
||||
min_absolute = min_absolute if min_absolute is not None else DEFAULT_MIN_ABSOLUTE_THRESHOLD
|
||||
min_pct = min_pct if min_pct is not None else DEFAULT_MIN_PCT_THRESHOLD
|
||||
high_pct = high_pct if high_pct is not None else DEFAULT_HIGH_PCT_THRESHOLD
|
||||
|
||||
anomalies = []
|
||||
for row in report_result.get('rows', []):
|
||||
comparison = row.get('amount_comparison')
|
||||
current = row.get('amount', 0.0)
|
||||
if comparison is None:
|
||||
continue
|
||||
variance_amount = current - comparison
|
||||
variance_pct = abs(row.get('variance_pct') or 0.0)
|
||||
if abs(variance_amount) < min_absolute:
|
||||
continue
|
||||
if variance_pct < min_pct:
|
||||
continue
|
||||
severity = 'high' if variance_pct >= high_pct else 'medium' if variance_pct >= min_pct * 2 else 'low'
|
||||
direction = 'increase' if variance_amount > 0 else 'decrease'
|
||||
anomalies.append(Anomaly(
|
||||
row_id=row['id'],
|
||||
label=row.get('label', ''),
|
||||
current_amount=current,
|
||||
comparison_amount=comparison,
|
||||
variance_amount=variance_amount,
|
||||
variance_pct=variance_pct,
|
||||
severity=severity,
|
||||
direction=direction,
|
||||
).to_dict())
|
||||
|
||||
severity_order = {'high': 0, 'medium': 1, 'low': 2}
|
||||
anomalies.sort(key=lambda a: (severity_order[a['severity']], -abs(a['variance_amount'])))
|
||||
return anomalies
|
||||
Reference in New Issue
Block a user