63 lines
2.1 KiB
Python
63 lines
2.1 KiB
Python
"""K-nearest precedent search.
|
|
|
|
Given a new bank line, find the most similar past reconciliations for
|
|
ranking + confidence scoring. Distance metric: amount delta (primary),
|
|
date recency (secondary), memo token overlap (tertiary).
|
|
"""
|
|
|
|
from dataclasses import dataclass
|
|
|
|
|
|
@dataclass
|
|
class PrecedentMatch:
|
|
precedent_id: int
|
|
amount: float
|
|
memo_tokens: str
|
|
matched_move_line_count: int
|
|
similarity_score: float
|
|
|
|
|
|
AMOUNT_TOLERANCE_PCT = 0.01 # 1% tolerance for "near" amount
|
|
|
|
|
|
def find_nearest_precedents(env, *, partner_id, amount, k=5, memo_tokens=None):
|
|
"""Return up to k most-similar precedents for a partner+amount.
|
|
|
|
Indexed query: filters by partner first (cheap), then ranks by
|
|
amount distance + memo overlap. Sub-50ms for typical Westin volume."""
|
|
Precedent = env['fusion.reconcile.precedent'].sudo()
|
|
|
|
tolerance = max(amount * AMOUNT_TOLERANCE_PCT, 1.00)
|
|
candidates = Precedent.search([
|
|
('partner_id', '=', partner_id),
|
|
('amount', '>=', amount - tolerance),
|
|
('amount', '<=', amount + tolerance),
|
|
], limit=k * 4, order='reconciled_at desc')
|
|
|
|
results = []
|
|
for p in candidates:
|
|
amount_score = 1.0 - min(abs(p.amount - amount) / max(amount, 1), 1.0)
|
|
memo_score = _memo_overlap(p.memo_tokens, memo_tokens) if memo_tokens else 0.5
|
|
similarity = (amount_score * 0.7) + (memo_score * 0.3)
|
|
results.append(PrecedentMatch(
|
|
precedent_id=p.id,
|
|
amount=p.amount,
|
|
memo_tokens=p.memo_tokens or '',
|
|
matched_move_line_count=p.matched_move_line_count,
|
|
similarity_score=similarity,
|
|
))
|
|
|
|
results.sort(key=lambda r: -r.similarity_score)
|
|
return results[:k]
|
|
|
|
|
|
def _memo_overlap(precedent_tokens_str, new_tokens) -> float:
|
|
"""Jaccard similarity between two token sets."""
|
|
if not precedent_tokens_str or not new_tokens:
|
|
return 0.0
|
|
precedent_set = set(precedent_tokens_str.split(','))
|
|
new_set = set(new_tokens) if not isinstance(new_tokens, set) else new_tokens
|
|
if not precedent_set and not new_set:
|
|
return 0.0
|
|
return len(precedent_set & new_set) / len(precedent_set | new_set)
|