Files
Odoo-Modules/Fusion Accounting/models/account_reconcile_model.py
2026-02-22 01:22:18 -05:00

668 lines
25 KiB
Python

# Fusion Accounting - Reconciliation Model Extensions
# Extends the bank reconciliation rule engine with invoice matching,
# write-off suggestion, and partner mapping capabilities.
import re
from collections import defaultdict
from dateutil.relativedelta import relativedelta
from odoo import fields, models, tools
from odoo.tools import SQL
class AccountReconcileModel(models.Model):
_inherit = 'account.reconcile.model'
# =====================================================================
# Bank Widget Line Application
# =====================================================================
def _apply_lines_for_bank_widget(self, residual_amount_currency, partner, st_line):
"""Generate journal item values by applying this model's lines
to a bank statement line in the reconciliation widget.
:param residual_amount_currency: Outstanding balance in statement currency.
:param partner: Partner associated with the statement line.
:param st_line: The bank statement line being processed.
:return: List of dicts representing proposed journal items.
"""
self.ensure_one()
stmt_currency = (
st_line.foreign_currency_id
or st_line.journal_id.currency_id
or st_line.company_currency_id
)
proposed_items = []
remaining = residual_amount_currency
for model_line in self.line_ids:
item_vals = model_line._apply_in_bank_widget(remaining, partner, st_line)
line_amount = item_vals['amount_currency']
if stmt_currency.is_zero(line_amount):
continue
proposed_items.append(item_vals)
remaining -= line_amount
return proposed_items
# =====================================================================
# Rule Evaluation Engine
# =====================================================================
def _apply_rules(self, st_line, partner):
"""Evaluate all non-button reconciliation models against a
statement line and return the first matching result.
:param st_line: Bank statement line to match.
:param partner: Partner context for matching.
:return: Dict with match result and model, or empty dict.
"""
eligible_models = self.filtered(
lambda m: m.rule_type != 'writeoff_button'
).sorted()
for model in eligible_models:
if not model._is_applicable_for(st_line, partner):
continue
if model.rule_type == 'invoice_matching':
priority_map = model._get_invoice_matching_rules_map()
for priority in sorted(priority_map.keys()):
for matching_fn in priority_map[priority]:
candidates = matching_fn(st_line, partner)
if not candidates:
continue
if candidates.get('amls'):
match_result = model._get_invoice_matching_amls_result(
st_line, partner, candidates,
)
if match_result:
return {**match_result, 'model': model}
else:
return {**candidates, 'model': model}
elif model.rule_type == 'writeoff_suggestion':
return {
'model': model,
'status': 'write_off',
'auto_reconcile': model.auto_reconcile,
}
return {}
# =====================================================================
# Applicability Checks
# =====================================================================
def _is_applicable_for(self, st_line, partner):
"""Determine whether this model's filters allow it to be
used for the given statement line and partner combination.
:return: True if the model's criteria are satisfied.
"""
self.ensure_one()
# --- Amount and journal filters ---
if self.match_journal_ids and st_line.move_id.journal_id not in self.match_journal_ids:
return False
if self.match_nature == 'amount_received' and st_line.amount < 0:
return False
if self.match_nature == 'amount_paid' and st_line.amount > 0:
return False
abs_amount = abs(st_line.amount)
if self.match_amount == 'lower' and abs_amount >= self.match_amount_max:
return False
if self.match_amount == 'greater' and abs_amount <= self.match_amount_min:
return False
if self.match_amount == 'between' and not (self.match_amount_min <= abs_amount <= self.match_amount_max):
return False
# --- Partner filters ---
if self.match_partner:
if not partner:
return False
if self.match_partner_ids and partner not in self.match_partner_ids:
return False
if (
self.match_partner_category_ids
and not (partner.category_id & self.match_partner_category_ids)
):
return False
# --- Text matching on label, note, and transaction type ---
text_checks = [
(st_line, 'label', 'payment_ref'),
(st_line.move_id, 'note', 'narration'),
(st_line, 'transaction_type', 'transaction_type'),
]
for record, rule_suffix, record_field in text_checks:
configured_term = (self[f'match_{rule_suffix}_param'] or '').lower()
actual_value = (record[record_field] or '').lower()
match_mode = self[f'match_{rule_suffix}']
if match_mode == 'contains' and configured_term not in actual_value:
return False
if match_mode == 'not_contains' and configured_term in actual_value:
return False
if match_mode == 'match_regex' and not re.match(configured_term, actual_value):
return False
return True
# =====================================================================
# Invoice Matching Domain & Token Extraction
# =====================================================================
def _get_invoice_matching_amls_domain(self, st_line, partner):
"""Build the search domain for candidate journal items
when performing invoice matching."""
base_domain = st_line._get_default_amls_matching_domain()
# Filter by balance direction matching the statement line
if st_line.amount > 0.0:
base_domain.append(('balance', '>', 0.0))
else:
base_domain.append(('balance', '<', 0.0))
line_currency = st_line.foreign_currency_id or st_line.currency_id
if self.match_same_currency:
base_domain.append(('currency_id', '=', line_currency.id))
if partner:
base_domain.append(('partner_id', '=', partner.id))
if self.past_months_limit:
cutoff = (
fields.Date.context_today(self)
- relativedelta(months=self.past_months_limit)
)
base_domain.append(('date', '>=', fields.Date.to_string(cutoff)))
return base_domain
def _get_st_line_text_values_for_matching(self, st_line):
"""Gather text fields from the statement line that are enabled
for matching in this model's configuration.
:return: List of text values to search against.
"""
self.ensure_one()
enabled_fields = []
if self.match_text_location_label:
enabled_fields.append('payment_ref')
if self.match_text_location_note:
enabled_fields.append('narration')
if self.match_text_location_reference:
enabled_fields.append('ref')
return st_line._get_st_line_strings_for_matching(
allowed_fields=enabled_fields,
)
def _get_invoice_matching_st_line_tokens(self, st_line):
"""Parse statement line text into tokens for matching.
:return: Tuple of (numerical_tokens, exact_tokens, text_tokens).
"""
raw_texts = self._get_st_line_text_values_for_matching(st_line)
min_token_len = 4
numeric_tokens = []
exact_token_set = set()
text_tokens = []
for text_val in raw_texts:
words = (text_val or '').split()
exact_token_set.add(text_val)
exact_token_set.update(
w for w in words if len(w) >= min_token_len
)
cleaned_words = [
''.join(ch for ch in w if re.match(r'[0-9a-zA-Z\s]', ch))
for w in words
]
for cleaned in cleaned_words:
if len(cleaned) < min_token_len:
continue
text_tokens.append(cleaned)
digits_only = ''.join(ch for ch in cleaned if ch.isdecimal())
if len(digits_only) >= min_token_len:
numeric_tokens.append(digits_only)
return numeric_tokens, list(exact_token_set), text_tokens
# =====================================================================
# Candidate Discovery
# =====================================================================
def _get_invoice_matching_amls_candidates(self, st_line, partner):
"""Search for matching journal items using token-based and
amount-based strategies.
:return: Dict with 'amls' recordset and 'allow_auto_reconcile' flag,
or None if no candidates found.
"""
def _build_sort_clause(tbl_prefix=SQL()):
"""Build ORDER BY clause based on matching_order preference."""
sort_dir = SQL(' DESC') if self.matching_order == 'new_first' else SQL(' ASC')
return SQL(", ").join(
SQL("%s%s%s", tbl_prefix, SQL(col), sort_dir)
for col in ('date_maturity', 'date', 'id')
)
assert self.rule_type == 'invoice_matching'
self.env['account.move'].flush_model()
self.env['account.move.line'].flush_model()
search_domain = self._get_invoice_matching_amls_domain(st_line, partner)
query = self.env['account.move.line']._where_calc(search_domain)
from_clause = query.from_clause
where_clause = query.where_clause or SQL("TRUE")
# Prepare CTE and sub-queries for token matching
cte_sql = SQL()
token_queries: list[SQL] = []
num_tokens, exact_tokens, _txt_tokens = (
self._get_invoice_matching_st_line_tokens(st_line)
)
if num_tokens or exact_tokens:
cte_sql = SQL('''
WITH candidate_lines AS (
SELECT
account_move_line.id AS aml_id,
account_move_line.date AS aml_date,
account_move_line.date_maturity AS aml_maturity,
account_move_line.name AS aml_name,
account_move_line__move_id.name AS move_name,
account_move_line__move_id.ref AS move_ref
FROM %s
JOIN account_move account_move_line__move_id
ON account_move_line__move_id.id = account_move_line.move_id
WHERE %s
)
''', from_clause, where_clause)
# Build sub-queries for numerical token matching
if num_tokens:
for tbl_alias, col_name in [
('account_move_line', 'name'),
('account_move_line__move_id', 'name'),
('account_move_line__move_id', 'ref'),
]:
col_ref = SQL("%s_%s", SQL(tbl_alias), SQL(col_name))
token_queries.append(SQL(r'''
SELECT
aml_id AS id,
aml_date AS date,
aml_maturity AS date_maturity,
UNNEST(
REGEXP_SPLIT_TO_ARRAY(
SUBSTRING(
REGEXP_REPLACE(%(col)s, '[^0-9\s]', '', 'g'),
'\S(?:.*\S)*'
),
'\s+'
)
) AS token
FROM candidate_lines
WHERE %(col)s IS NOT NULL
''', col=col_ref))
# Build sub-queries for exact token matching
if exact_tokens:
for tbl_alias, col_name in [
('account_move_line', 'name'),
('account_move_line__move_id', 'name'),
('account_move_line__move_id', 'ref'),
]:
col_ref = SQL("%s_%s", SQL(tbl_alias), SQL(col_name))
token_queries.append(SQL('''
SELECT
aml_id AS id,
aml_date AS date,
aml_maturity AS date_maturity,
%(col)s AS token
FROM candidate_lines
WHERE %(col)s != ''
''', col=col_ref))
# Execute token-based search if queries exist
if token_queries:
sort_clause = _build_sort_clause(prefix=SQL('matched.'))
all_tokens = tuple(num_tokens + exact_tokens)
found_ids = [
row[0] for row in self.env.execute_query(SQL(
'''
%s
SELECT
matched.id,
COUNT(*) AS match_count
FROM (%s) AS matched
WHERE matched.token IN %s
GROUP BY matched.date_maturity, matched.date, matched.id
HAVING COUNT(*) > 0
ORDER BY match_count DESC, %s
''',
cte_sql,
SQL(" UNION ALL ").join(token_queries),
all_tokens,
sort_clause,
))
]
if found_ids:
return {
'allow_auto_reconcile': True,
'amls': self.env['account.move.line'].browse(found_ids),
}
elif (
self.match_text_location_label
or self.match_text_location_note
or self.match_text_location_reference
):
# Text location matching was enabled but found nothing - don't fall through
return
# Fallback: match by exact amount when no partner is set
if not partner:
line_currency = (
st_line.foreign_currency_id
or st_line.journal_id.currency_id
or st_line.company_currency_id
)
if line_currency == self.company_id.currency_id:
amt_col = SQL('amount_residual')
else:
amt_col = SQL('amount_residual_currency')
sort_clause = _build_sort_clause(prefix=SQL('account_move_line.'))
amount_rows = self.env.execute_query(SQL(
'''
SELECT account_move_line.id
FROM %s
WHERE
%s
AND account_move_line.currency_id = %s
AND ROUND(account_move_line.%s, %s) = ROUND(%s, %s)
ORDER BY %s
''',
from_clause,
where_clause,
line_currency.id,
amt_col,
line_currency.decimal_places,
-st_line.amount_residual,
line_currency.decimal_places,
sort_clause,
))
found_lines = self.env['account.move.line'].browse(
[r[0] for r in amount_rows],
)
else:
found_lines = self.env['account.move.line'].search(
search_domain,
order=_build_sort_clause().code,
)
if found_lines:
return {
'allow_auto_reconcile': False,
'amls': found_lines,
}
def _get_invoice_matching_rules_map(self):
"""Return the priority-ordered mapping of matching rule functions.
Override this in other modules to inject additional matching logic.
:return: Dict mapping priority (int) to list of callables.
"""
priority_map = defaultdict(list)
priority_map[10].append(self._get_invoice_matching_amls_candidates)
return priority_map
# =====================================================================
# Partner Mapping
# =====================================================================
def _get_partner_from_mapping(self, st_line):
"""Attempt to identify a partner using the model's regex mappings.
:param st_line: Bank statement line to analyze.
:return: Matched partner recordset (may be empty).
"""
self.ensure_one()
if self.rule_type not in ('invoice_matching', 'writeoff_suggestion'):
return self.env['res.partner']
for mapping in self.partner_mapping_line_ids:
# Check payment reference regex
ref_ok = True
if mapping.payment_ref_regex:
ref_ok = bool(
re.match(mapping.payment_ref_regex, st_line.payment_ref)
if st_line.payment_ref else False
)
# Check narration regex
narration_ok = True
if mapping.narration_regex:
plain_narration = tools.html2plaintext(
st_line.narration or '',
).rstrip()
narration_ok = bool(re.match(
mapping.narration_regex,
plain_narration,
flags=re.DOTALL,
))
if ref_ok and narration_ok:
return mapping.partner_id
return self.env['res.partner']
# =====================================================================
# Match Result Processing
# =====================================================================
def _get_invoice_matching_amls_result(self, st_line, partner, candidate_vals):
"""Process candidate journal items and determine whether they
form a valid match for the statement line.
:return: Dict with matched amls and status flags, or None.
"""
def _build_result(kept_values, match_status):
"""Construct the result dict from kept candidates and status."""
if 'rejected' in match_status:
return None
output = {'amls': self.env['account.move.line']}
for val_entry in kept_values:
output['amls'] |= val_entry['aml']
if 'allow_write_off' in match_status and self.line_ids:
output['status'] = 'write_off'
if (
'allow_auto_reconcile' in match_status
and candidate_vals['allow_auto_reconcile']
and self.auto_reconcile
):
output['auto_reconcile'] = True
return output
line_currency = st_line.foreign_currency_id or st_line.currency_id
line_amount = st_line._prepare_move_line_default_vals()[1]['amount_currency']
direction = 1 if line_amount > 0.0 else -1
candidates = candidate_vals['amls']
standard_values = []
epd_values = []
same_cur = candidates.currency_id == line_currency
for aml in candidates:
base_vals = {
'aml': aml,
'amount_residual': aml.amount_residual,
'amount_residual_currency': aml.amount_residual_currency,
}
standard_values.append(base_vals)
# Handle early payment discount eligibility
payment_term = aml.move_id.invoice_payment_term_id
last_disc_date = (
payment_term._get_last_discount_date(aml.move_id.date)
if payment_term else False
)
if (
same_cur
and aml.move_id.move_type in (
'out_invoice', 'out_receipt', 'in_invoice', 'in_receipt',
)
and not aml.matched_debit_ids
and not aml.matched_credit_ids
and last_disc_date
and st_line.date <= last_disc_date
):
rate_factor = (
abs(aml.amount_currency) / abs(aml.balance)
if aml.balance else 1.0
)
epd_values.append({
**base_vals,
'amount_residual': st_line.company_currency_id.round(
aml.discount_amount_currency / rate_factor,
),
'amount_residual_currency': aml.discount_amount_currency,
})
else:
epd_values.append(base_vals)
def _try_batch_match(values_list):
"""Attempt to match items as a batch in same-currency mode."""
if not same_cur:
return None, []
kept = []
running_total = 0.0
for vals in values_list:
if line_currency.compare_amounts(
line_amount, -vals['amount_residual_currency'],
) == 0:
return 'perfect', [vals]
if line_currency.compare_amounts(
direction * (line_amount + running_total), 0.0,
) > 0:
kept.append(vals)
running_total += vals['amount_residual_currency']
if line_currency.is_zero(direction * (line_amount + running_total)):
return 'perfect', kept
elif kept:
return 'partial', kept
return None, []
# Priority 1: Try early payment discount match (only accept perfect)
batch_type, kept_list = _try_batch_match(epd_values)
if batch_type != 'perfect':
kept_list = []
# Priority 2: Try standard same-currency match
if not kept_list:
_batch_type, kept_list = _try_batch_match(standard_values)
# Priority 3: Use all candidates as fallback
if not kept_list:
kept_list = standard_values
# Validate the final selection against tolerance rules
if kept_list:
rule_status = self._check_rule_propositions(st_line, kept_list)
output = _build_result(kept_list, rule_status)
if output:
return output
def _check_rule_propositions(self, st_line, amls_values_list):
"""Validate the aggregate match against payment tolerance rules.
:return: Set of status strings indicating the verdict.
"""
self.ensure_one()
if not self.allow_payment_tolerance:
return {'allow_write_off', 'allow_auto_reconcile'}
line_currency = st_line.foreign_currency_id or st_line.currency_id
line_amt = st_line._prepare_move_line_default_vals()[1]['amount_currency']
total_candidate_amt = sum(
st_line._prepare_counterpart_amounts_using_st_line_rate(
v['aml'].currency_id,
v['amount_residual'],
v['amount_residual_currency'],
)['amount_currency']
for v in amls_values_list
)
direction = 1 if line_amt > 0.0 else -1
post_reco_balance = line_currency.round(
direction * (total_candidate_amt + line_amt),
)
# Exact zero balance - perfect match
if line_currency.is_zero(post_reco_balance):
return {'allow_auto_reconcile'}
# Payment exceeds invoices - always allow
if post_reco_balance > 0.0:
return {'allow_auto_reconcile'}
# Zero tolerance configured - reject
if self.payment_tolerance_param == 0:
return {'rejected'}
# Fixed amount tolerance check
if (
self.payment_tolerance_type == 'fixed_amount'
and line_currency.compare_amounts(
-post_reco_balance, self.payment_tolerance_param,
) <= 0
):
return {'allow_write_off', 'allow_auto_reconcile'}
# Percentage tolerance check
pct_remaining = abs(post_reco_balance / total_candidate_amt) * 100.0
if (
self.payment_tolerance_type == 'percentage'
and line_currency.compare_amounts(
pct_remaining, self.payment_tolerance_param,
) <= 0
):
return {'allow_write_off', 'allow_auto_reconcile'}
return {'rejected'}
# =====================================================================
# Auto-Reconciliation Cron
# =====================================================================
def run_auto_reconciliation(self):
"""Trigger automatic reconciliation for statement lines,
with a time limit to prevent long-running operations."""
cron_time_limit = tools.config['limit_time_real_cron'] or -1
max_seconds = (
cron_time_limit if 0 < cron_time_limit < 180 else 180
)
self.env['account.bank.statement.line']._cron_try_auto_reconcile_statement_lines(
limit_time=max_seconds,
)