# Fusion Accounting - Reconciliation Model Extensions # Extends the bank reconciliation rule engine with invoice matching, # write-off suggestion, and partner mapping capabilities. import re from collections import defaultdict from dateutil.relativedelta import relativedelta from odoo import fields, models, tools from odoo.tools import SQL class AccountReconcileModel(models.Model): _inherit = 'account.reconcile.model' # ===================================================================== # Bank Widget Line Application # ===================================================================== def _apply_lines_for_bank_widget(self, residual_amount_currency, partner, st_line): """Generate journal item values by applying this model's lines to a bank statement line in the reconciliation widget. :param residual_amount_currency: Outstanding balance in statement currency. :param partner: Partner associated with the statement line. :param st_line: The bank statement line being processed. :return: List of dicts representing proposed journal items. """ self.ensure_one() stmt_currency = ( st_line.foreign_currency_id or st_line.journal_id.currency_id or st_line.company_currency_id ) proposed_items = [] remaining = residual_amount_currency for model_line in self.line_ids: item_vals = model_line._apply_in_bank_widget(remaining, partner, st_line) line_amount = item_vals['amount_currency'] if stmt_currency.is_zero(line_amount): continue proposed_items.append(item_vals) remaining -= line_amount return proposed_items # ===================================================================== # Rule Evaluation Engine # ===================================================================== def _apply_rules(self, st_line, partner): """Evaluate all non-button reconciliation models against a statement line and return the first matching result. :param st_line: Bank statement line to match. :param partner: Partner context for matching. :return: Dict with match result and model, or empty dict. """ eligible_models = self.filtered( lambda m: m.rule_type != 'writeoff_button' ).sorted() for model in eligible_models: if not model._is_applicable_for(st_line, partner): continue if model.rule_type == 'invoice_matching': priority_map = model._get_invoice_matching_rules_map() for priority in sorted(priority_map.keys()): for matching_fn in priority_map[priority]: candidates = matching_fn(st_line, partner) if not candidates: continue if candidates.get('amls'): match_result = model._get_invoice_matching_amls_result( st_line, partner, candidates, ) if match_result: return {**match_result, 'model': model} else: return {**candidates, 'model': model} elif model.rule_type == 'writeoff_suggestion': return { 'model': model, 'status': 'write_off', 'auto_reconcile': model.auto_reconcile, } return {} # ===================================================================== # Applicability Checks # ===================================================================== def _is_applicable_for(self, st_line, partner): """Determine whether this model's filters allow it to be used for the given statement line and partner combination. :return: True if the model's criteria are satisfied. """ self.ensure_one() # --- Amount and journal filters --- if self.match_journal_ids and st_line.move_id.journal_id not in self.match_journal_ids: return False if self.match_nature == 'amount_received' and st_line.amount < 0: return False if self.match_nature == 'amount_paid' and st_line.amount > 0: return False abs_amount = abs(st_line.amount) if self.match_amount == 'lower' and abs_amount >= self.match_amount_max: return False if self.match_amount == 'greater' and abs_amount <= self.match_amount_min: return False if self.match_amount == 'between' and not (self.match_amount_min <= abs_amount <= self.match_amount_max): return False # --- Partner filters --- if self.match_partner: if not partner: return False if self.match_partner_ids and partner not in self.match_partner_ids: return False if ( self.match_partner_category_ids and not (partner.category_id & self.match_partner_category_ids) ): return False # --- Text matching on label, note, and transaction type --- text_checks = [ (st_line, 'label', 'payment_ref'), (st_line.move_id, 'note', 'narration'), (st_line, 'transaction_type', 'transaction_type'), ] for record, rule_suffix, record_field in text_checks: configured_term = (self[f'match_{rule_suffix}_param'] or '').lower() actual_value = (record[record_field] or '').lower() match_mode = self[f'match_{rule_suffix}'] if match_mode == 'contains' and configured_term not in actual_value: return False if match_mode == 'not_contains' and configured_term in actual_value: return False if match_mode == 'match_regex' and not re.match(configured_term, actual_value): return False return True # ===================================================================== # Invoice Matching Domain & Token Extraction # ===================================================================== def _get_invoice_matching_amls_domain(self, st_line, partner): """Build the search domain for candidate journal items when performing invoice matching.""" base_domain = st_line._get_default_amls_matching_domain() # Filter by balance direction matching the statement line if st_line.amount > 0.0: base_domain.append(('balance', '>', 0.0)) else: base_domain.append(('balance', '<', 0.0)) line_currency = st_line.foreign_currency_id or st_line.currency_id if self.match_same_currency: base_domain.append(('currency_id', '=', line_currency.id)) if partner: base_domain.append(('partner_id', '=', partner.id)) if self.past_months_limit: cutoff = ( fields.Date.context_today(self) - relativedelta(months=self.past_months_limit) ) base_domain.append(('date', '>=', fields.Date.to_string(cutoff))) return base_domain def _get_st_line_text_values_for_matching(self, st_line): """Gather text fields from the statement line that are enabled for matching in this model's configuration. :return: List of text values to search against. """ self.ensure_one() enabled_fields = [] if self.match_text_location_label: enabled_fields.append('payment_ref') if self.match_text_location_note: enabled_fields.append('narration') if self.match_text_location_reference: enabled_fields.append('ref') return st_line._get_st_line_strings_for_matching( allowed_fields=enabled_fields, ) def _get_invoice_matching_st_line_tokens(self, st_line): """Parse statement line text into tokens for matching. :return: Tuple of (numerical_tokens, exact_tokens, text_tokens). """ raw_texts = self._get_st_line_text_values_for_matching(st_line) min_token_len = 4 numeric_tokens = [] exact_token_set = set() text_tokens = [] for text_val in raw_texts: words = (text_val or '').split() exact_token_set.add(text_val) exact_token_set.update( w for w in words if len(w) >= min_token_len ) cleaned_words = [ ''.join(ch for ch in w if re.match(r'[0-9a-zA-Z\s]', ch)) for w in words ] for cleaned in cleaned_words: if len(cleaned) < min_token_len: continue text_tokens.append(cleaned) digits_only = ''.join(ch for ch in cleaned if ch.isdecimal()) if len(digits_only) >= min_token_len: numeric_tokens.append(digits_only) return numeric_tokens, list(exact_token_set), text_tokens # ===================================================================== # Candidate Discovery # ===================================================================== def _get_invoice_matching_amls_candidates(self, st_line, partner): """Search for matching journal items using token-based and amount-based strategies. :return: Dict with 'amls' recordset and 'allow_auto_reconcile' flag, or None if no candidates found. """ def _build_sort_clause(tbl_prefix=SQL()): """Build ORDER BY clause based on matching_order preference.""" sort_dir = SQL(' DESC') if self.matching_order == 'new_first' else SQL(' ASC') return SQL(", ").join( SQL("%s%s%s", tbl_prefix, SQL(col), sort_dir) for col in ('date_maturity', 'date', 'id') ) assert self.rule_type == 'invoice_matching' self.env['account.move'].flush_model() self.env['account.move.line'].flush_model() search_domain = self._get_invoice_matching_amls_domain(st_line, partner) query = self.env['account.move.line']._where_calc(search_domain) from_clause = query.from_clause where_clause = query.where_clause or SQL("TRUE") # Prepare CTE and sub-queries for token matching cte_sql = SQL() token_queries: list[SQL] = [] num_tokens, exact_tokens, _txt_tokens = ( self._get_invoice_matching_st_line_tokens(st_line) ) if num_tokens or exact_tokens: cte_sql = SQL(''' WITH candidate_lines AS ( SELECT account_move_line.id AS aml_id, account_move_line.date AS aml_date, account_move_line.date_maturity AS aml_maturity, account_move_line.name AS aml_name, account_move_line__move_id.name AS move_name, account_move_line__move_id.ref AS move_ref FROM %s JOIN account_move account_move_line__move_id ON account_move_line__move_id.id = account_move_line.move_id WHERE %s ) ''', from_clause, where_clause) # Build sub-queries for numerical token matching if num_tokens: for tbl_alias, col_name in [ ('account_move_line', 'name'), ('account_move_line__move_id', 'name'), ('account_move_line__move_id', 'ref'), ]: col_ref = SQL("%s_%s", SQL(tbl_alias), SQL(col_name)) token_queries.append(SQL(r''' SELECT aml_id AS id, aml_date AS date, aml_maturity AS date_maturity, UNNEST( REGEXP_SPLIT_TO_ARRAY( SUBSTRING( REGEXP_REPLACE(%(col)s, '[^0-9\s]', '', 'g'), '\S(?:.*\S)*' ), '\s+' ) ) AS token FROM candidate_lines WHERE %(col)s IS NOT NULL ''', col=col_ref)) # Build sub-queries for exact token matching if exact_tokens: for tbl_alias, col_name in [ ('account_move_line', 'name'), ('account_move_line__move_id', 'name'), ('account_move_line__move_id', 'ref'), ]: col_ref = SQL("%s_%s", SQL(tbl_alias), SQL(col_name)) token_queries.append(SQL(''' SELECT aml_id AS id, aml_date AS date, aml_maturity AS date_maturity, %(col)s AS token FROM candidate_lines WHERE %(col)s != '' ''', col=col_ref)) # Execute token-based search if queries exist if token_queries: sort_clause = _build_sort_clause(prefix=SQL('matched.')) all_tokens = tuple(num_tokens + exact_tokens) found_ids = [ row[0] for row in self.env.execute_query(SQL( ''' %s SELECT matched.id, COUNT(*) AS match_count FROM (%s) AS matched WHERE matched.token IN %s GROUP BY matched.date_maturity, matched.date, matched.id HAVING COUNT(*) > 0 ORDER BY match_count DESC, %s ''', cte_sql, SQL(" UNION ALL ").join(token_queries), all_tokens, sort_clause, )) ] if found_ids: return { 'allow_auto_reconcile': True, 'amls': self.env['account.move.line'].browse(found_ids), } elif ( self.match_text_location_label or self.match_text_location_note or self.match_text_location_reference ): # Text location matching was enabled but found nothing - don't fall through return # Fallback: match by exact amount when no partner is set if not partner: line_currency = ( st_line.foreign_currency_id or st_line.journal_id.currency_id or st_line.company_currency_id ) if line_currency == self.company_id.currency_id: amt_col = SQL('amount_residual') else: amt_col = SQL('amount_residual_currency') sort_clause = _build_sort_clause(prefix=SQL('account_move_line.')) amount_rows = self.env.execute_query(SQL( ''' SELECT account_move_line.id FROM %s WHERE %s AND account_move_line.currency_id = %s AND ROUND(account_move_line.%s, %s) = ROUND(%s, %s) ORDER BY %s ''', from_clause, where_clause, line_currency.id, amt_col, line_currency.decimal_places, -st_line.amount_residual, line_currency.decimal_places, sort_clause, )) found_lines = self.env['account.move.line'].browse( [r[0] for r in amount_rows], ) else: found_lines = self.env['account.move.line'].search( search_domain, order=_build_sort_clause().code, ) if found_lines: return { 'allow_auto_reconcile': False, 'amls': found_lines, } def _get_invoice_matching_rules_map(self): """Return the priority-ordered mapping of matching rule functions. Override this in other modules to inject additional matching logic. :return: Dict mapping priority (int) to list of callables. """ priority_map = defaultdict(list) priority_map[10].append(self._get_invoice_matching_amls_candidates) return priority_map # ===================================================================== # Partner Mapping # ===================================================================== def _get_partner_from_mapping(self, st_line): """Attempt to identify a partner using the model's regex mappings. :param st_line: Bank statement line to analyze. :return: Matched partner recordset (may be empty). """ self.ensure_one() if self.rule_type not in ('invoice_matching', 'writeoff_suggestion'): return self.env['res.partner'] for mapping in self.partner_mapping_line_ids: # Check payment reference regex ref_ok = True if mapping.payment_ref_regex: ref_ok = bool( re.match(mapping.payment_ref_regex, st_line.payment_ref) if st_line.payment_ref else False ) # Check narration regex narration_ok = True if mapping.narration_regex: plain_narration = tools.html2plaintext( st_line.narration or '', ).rstrip() narration_ok = bool(re.match( mapping.narration_regex, plain_narration, flags=re.DOTALL, )) if ref_ok and narration_ok: return mapping.partner_id return self.env['res.partner'] # ===================================================================== # Match Result Processing # ===================================================================== def _get_invoice_matching_amls_result(self, st_line, partner, candidate_vals): """Process candidate journal items and determine whether they form a valid match for the statement line. :return: Dict with matched amls and status flags, or None. """ def _build_result(kept_values, match_status): """Construct the result dict from kept candidates and status.""" if 'rejected' in match_status: return None output = {'amls': self.env['account.move.line']} for val_entry in kept_values: output['amls'] |= val_entry['aml'] if 'allow_write_off' in match_status and self.line_ids: output['status'] = 'write_off' if ( 'allow_auto_reconcile' in match_status and candidate_vals['allow_auto_reconcile'] and self.auto_reconcile ): output['auto_reconcile'] = True return output line_currency = st_line.foreign_currency_id or st_line.currency_id line_amount = st_line._prepare_move_line_default_vals()[1]['amount_currency'] direction = 1 if line_amount > 0.0 else -1 candidates = candidate_vals['amls'] standard_values = [] epd_values = [] same_cur = candidates.currency_id == line_currency for aml in candidates: base_vals = { 'aml': aml, 'amount_residual': aml.amount_residual, 'amount_residual_currency': aml.amount_residual_currency, } standard_values.append(base_vals) # Handle early payment discount eligibility payment_term = aml.move_id.invoice_payment_term_id last_disc_date = ( payment_term._get_last_discount_date(aml.move_id.date) if payment_term else False ) if ( same_cur and aml.move_id.move_type in ( 'out_invoice', 'out_receipt', 'in_invoice', 'in_receipt', ) and not aml.matched_debit_ids and not aml.matched_credit_ids and last_disc_date and st_line.date <= last_disc_date ): rate_factor = ( abs(aml.amount_currency) / abs(aml.balance) if aml.balance else 1.0 ) epd_values.append({ **base_vals, 'amount_residual': st_line.company_currency_id.round( aml.discount_amount_currency / rate_factor, ), 'amount_residual_currency': aml.discount_amount_currency, }) else: epd_values.append(base_vals) def _try_batch_match(values_list): """Attempt to match items as a batch in same-currency mode.""" if not same_cur: return None, [] kept = [] running_total = 0.0 for vals in values_list: if line_currency.compare_amounts( line_amount, -vals['amount_residual_currency'], ) == 0: return 'perfect', [vals] if line_currency.compare_amounts( direction * (line_amount + running_total), 0.0, ) > 0: kept.append(vals) running_total += vals['amount_residual_currency'] if line_currency.is_zero(direction * (line_amount + running_total)): return 'perfect', kept elif kept: return 'partial', kept return None, [] # Priority 1: Try early payment discount match (only accept perfect) batch_type, kept_list = _try_batch_match(epd_values) if batch_type != 'perfect': kept_list = [] # Priority 2: Try standard same-currency match if not kept_list: _batch_type, kept_list = _try_batch_match(standard_values) # Priority 3: Use all candidates as fallback if not kept_list: kept_list = standard_values # Validate the final selection against tolerance rules if kept_list: rule_status = self._check_rule_propositions(st_line, kept_list) output = _build_result(kept_list, rule_status) if output: return output def _check_rule_propositions(self, st_line, amls_values_list): """Validate the aggregate match against payment tolerance rules. :return: Set of status strings indicating the verdict. """ self.ensure_one() if not self.allow_payment_tolerance: return {'allow_write_off', 'allow_auto_reconcile'} line_currency = st_line.foreign_currency_id or st_line.currency_id line_amt = st_line._prepare_move_line_default_vals()[1]['amount_currency'] total_candidate_amt = sum( st_line._prepare_counterpart_amounts_using_st_line_rate( v['aml'].currency_id, v['amount_residual'], v['amount_residual_currency'], )['amount_currency'] for v in amls_values_list ) direction = 1 if line_amt > 0.0 else -1 post_reco_balance = line_currency.round( direction * (total_candidate_amt + line_amt), ) # Exact zero balance - perfect match if line_currency.is_zero(post_reco_balance): return {'allow_auto_reconcile'} # Payment exceeds invoices - always allow if post_reco_balance > 0.0: return {'allow_auto_reconcile'} # Zero tolerance configured - reject if self.payment_tolerance_param == 0: return {'rejected'} # Fixed amount tolerance check if ( self.payment_tolerance_type == 'fixed_amount' and line_currency.compare_amounts( -post_reco_balance, self.payment_tolerance_param, ) <= 0 ): return {'allow_write_off', 'allow_auto_reconcile'} # Percentage tolerance check pct_remaining = abs(post_reco_balance / total_candidate_amt) * 100.0 if ( self.payment_tolerance_type == 'percentage' and line_currency.compare_amounts( pct_remaining, self.payment_tolerance_param, ) <= 0 ): return {'allow_write_off', 'allow_auto_reconcile'} return {'rejected'} # ===================================================================== # Auto-Reconciliation Cron # ===================================================================== def run_auto_reconciliation(self): """Trigger automatic reconciliation for statement lines, with a time limit to prevent long-running operations.""" cron_time_limit = tools.config['limit_time_real_cron'] or -1 max_seconds = ( cron_time_limit if 0 < cron_time_limit < 180 else 180 ) self.env['account.bank.statement.line']._cron_try_auto_reconcile_statement_lines( limit_time=max_seconds, )