refactor(fusion_accounting): move AI module code into fusion_accounting_ai sub-module

git mv preserves history. fusion_accounting/ retains only __manifest__.py,
__init__.py, CLAUDE.md, and docs/ — the meta-module shell. All Python,
data, views, security, services, static, tests, wizards, report move to
fusion_accounting_ai/. Manifest data list updated; security.xml move to
_core deferred to Task 12.

Made-with: Cursor
This commit is contained in:
gsinghpal
2026-04-18 21:45:06 -04:00
parent b7483d5177
commit 6c72f2ab49
74 changed files with 76 additions and 60 deletions

View File

@@ -0,0 +1,19 @@
from .bank_reconciliation import TOOLS as BANK_RECON_TOOLS
from .hst_management import TOOLS as HST_TOOLS
from .accounts_receivable import TOOLS as AR_TOOLS
from .accounts_payable import TOOLS as AP_TOOLS
from .journal_review import TOOLS as JOURNAL_TOOLS
from .month_end import TOOLS as MONTH_END_TOOLS
from .payroll import TOOLS as PAYROLL_TOOLS
from .inventory import TOOLS as INVENTORY_TOOLS
from .adp import TOOLS as ADP_TOOLS
from .reporting import TOOLS as REPORTING_TOOLS
from .audit import TOOLS as AUDIT_TOOLS
TOOL_DISPATCH = {}
for tools_dict in [
BANK_RECON_TOOLS, HST_TOOLS, AR_TOOLS, AP_TOOLS, JOURNAL_TOOLS,
MONTH_END_TOOLS, PAYROLL_TOOLS, INVENTORY_TOOLS, ADP_TOOLS,
REPORTING_TOOLS, AUDIT_TOOLS,
]:
TOOL_DISPATCH.update(tools_dict)

View File

@@ -0,0 +1,406 @@
import logging
from odoo import fields
from datetime import timedelta
_logger = logging.getLogger(__name__)
def get_ap_aging(env, params):
today = fields.Date.today()
domain = [
('account_id.account_type', '=', 'liability_payable'),
('parent_state', '=', 'posted'),
('reconciled', '=', False),
('company_id', '=', env.company.id),
]
amls = env['account.move.line'].search(domain)
buckets = {'current': 0, '1_30': 0, '31_60': 0, '61_90': 0, '90_plus': 0}
for aml in amls:
amt = abs(aml.amount_residual)
if not aml.date_maturity or aml.date_maturity >= today:
buckets['current'] += amt
else:
days = (today - aml.date_maturity).days
if days <= 30:
buckets['1_30'] += amt
elif days <= 60:
buckets['31_60'] += amt
elif days <= 90:
buckets['61_90'] += amt
else:
buckets['90_plus'] += amt
return {'total': sum(buckets.values()), 'buckets': buckets, 'line_count': len(amls)}
def find_duplicate_bills(env, params):
window_days = int(params.get('window_days', 7))
bills = env['account.move'].search([
('move_type', 'in', ('in_invoice', 'in_refund')),
('state', '=', 'posted'),
('company_id', '=', env.company.id),
], order='partner_id, amount_total, date')
duplicates = []
prev = None
for bill in bills:
if prev and (
prev.partner_id == bill.partner_id
and abs(prev.amount_total - bill.amount_total) < 0.01
and abs((prev.date - bill.date).days) <= window_days
):
duplicates.append({
'bill_1': {'id': prev.id, 'name': prev.name, 'date': str(prev.date)},
'bill_2': {'id': bill.id, 'name': bill.name, 'date': str(bill.date)},
'partner': bill.partner_id.name,
'amount': bill.amount_total,
})
prev = bill
return {'count': len(duplicates), 'duplicates': duplicates[:20]}
def match_bill_to_po(env, params):
bill_id = int(params['bill_id'])
bill = env['account.move'].browse(bill_id)
if not bill.exists():
return {'error': 'Bill not found'}
matches = []
for line in bill.invoice_line_ids:
if line.purchase_line_id:
matches.append({
'bill_line': line.name or '',
'po': line.purchase_line_id.order_id.name,
'po_line': line.purchase_line_id.name,
'po_qty': line.purchase_line_id.product_qty,
'bill_qty': line.quantity,
'match': abs(line.quantity - line.purchase_line_id.product_qty) < 0.01,
})
return {'bill': bill.name, 'matches': matches, 'unmatched_lines': len(bill.invoice_line_ids) - len(matches)}
def get_unpaid_bills(env, params):
domain = [
('move_type', 'in', ('in_invoice', 'in_refund')),
('state', '=', 'posted'),
('payment_state', 'in', ('not_paid', 'partial')),
('company_id', '=', env.company.id),
]
if params.get('partner_id'):
domain.append(('partner_id', '=', int(params['partner_id'])))
bills = env['account.move'].search(domain, order='invoice_date_due asc', limit=int(params.get('limit', 50)))
return {
'count': len(bills),
'total': sum(b.amount_residual for b in bills),
'bills': [{
'id': b.id, 'name': b.name,
'partner': b.partner_id.name if b.partner_id else '',
'amount_total': b.amount_total,
'amount_residual': b.amount_residual,
'date_due': str(b.invoice_date_due) if b.invoice_date_due else '',
} for b in bills],
}
def verify_bill_taxes(env, params):
bill_id = int(params['bill_id'])
bill = env['account.move'].browse(bill_id)
if not bill.exists():
return {'error': 'Bill not found'}
issues = []
for line in bill.invoice_line_ids:
if line.product_id and not line.tax_ids:
issues.append({
'line': line.name or line.product_id.name,
'issue': 'No tax applied to product line',
})
return {'bill': bill.name, 'issues': issues, 'clean': len(issues) == 0}
def get_payment_schedule(env, params):
days_ahead = int(params.get('days_ahead', 30))
cutoff = fields.Date.today() + timedelta(days=days_ahead)
bills = env['account.move'].search([
('move_type', '=', 'in_invoice'),
('state', '=', 'posted'),
('payment_state', 'in', ('not_paid', 'partial')),
('invoice_date_due', '<=', cutoff),
('company_id', '=', env.company.id),
], order='invoice_date_due asc')
return {
'period': f'Next {days_ahead} days',
'total': sum(b.amount_residual for b in bills),
'bills': [{
'id': b.id, 'name': b.name,
'partner': b.partner_id.name if b.partner_id else '',
'amount_residual': b.amount_residual,
'date_due': str(b.invoice_date_due) if b.invoice_date_due else '',
} for b in bills[:50]],
}
def search_partners(env, params):
"""Search for partners/vendors by name keyword."""
keyword = params.get('keyword', '')
if not keyword or len(keyword) < 2:
return {'error': 'Keyword must be at least 2 characters'}
domain = [('name', 'ilike', keyword), ('company_id', 'in', [env.company.id, False])]
if params.get('supplier_only'):
domain.append(('supplier_rank', '>', 0))
partners = env['res.partner'].search(domain, limit=int(params.get('limit', 20)))
return {
'count': len(partners),
'partners': [{
'id': p.id,
'name': p.name,
'supplier_rank': p.supplier_rank,
'customer_rank': p.customer_rank,
'vat': p.vat or '',
'email': p.email or '',
'phone': p.phone or '',
} for p in partners],
}
def find_similar_bank_lines(env, params):
"""Find past reconciled bank lines with similar description to suggest coding patterns.
Also checks vendor bill tax patterns if a partner is identified."""
keyword = params.get('keyword', '')
if not keyword or len(keyword) < 3:
return {'error': 'Keyword must be at least 3 characters'}
# Find reconciled bank lines with matching payment_ref
lines = env['account.bank.statement.line'].search([
('is_reconciled', '=', True),
('payment_ref', 'ilike', keyword),
('company_id', '=', env.company.id),
], order='date desc', limit=int(params.get('limit', 10)))
matches = []
found_partner_id = None
for line in lines:
move = line.move_id
if not move:
continue
expense_info = {'account_code': '', 'account_name': '', 'tax_applied': False, 'tax_amount': 0.0}
for ml in move.line_ids:
if ml.account_id.account_type in ('expense', 'expense_direct_cost', 'expense_depreciation'):
expense_info['account_code'] = ml.account_id.code
expense_info['account_name'] = ml.account_id.name
expense_info['tax_applied'] = bool(ml.tax_ids)
expense_info['tax_amount'] = sum(t.amount for t in ml.tax_ids) if ml.tax_ids else 0.0
break
if line.partner_id and not found_partner_id:
found_partner_id = line.partner_id.id
matches.append({
'id': line.id,
'date': str(line.date),
'payment_ref': line.payment_ref or '',
'amount': line.amount,
'partner': line.partner_id.name if line.partner_id else '',
'partner_id': line.partner_id.id if line.partner_id else None,
'expense_account': expense_info['account_code'],
'expense_account_name': expense_info['account_name'],
'tax_applied': expense_info['tax_applied'],
'tax_rate': expense_info['tax_amount'],
})
result = {
'keyword': keyword,
'count': len(matches),
'matches': matches,
'suggestion': matches[0] if matches else None,
}
# Check vendor tax profile cache first (fast), fall back to live query
partner_id = found_partner_id or (int(params['partner_id']) if params.get('partner_id') else None)
if partner_id:
profile = env['fusion.vendor.tax.profile'].search([
('partner_id', '=', partner_id),
('company_id', '=', env.company.id),
], limit=1)
if profile:
result['vendor_tax_pattern'] = {
'source': 'cached_profile',
'total_bills': profile.total_bills,
'bills_with_tax': profile.bills_with_hst,
'bills_no_tax': profile.bills_zero_rated,
'avg_tax_pct': profile.avg_tax_pct,
'tax_classification': profile.tax_classification,
'tax_note': profile.tax_note,
'primary_account_id': profile.primary_account_id.id if profile.primary_account_id else None,
'primary_account_code': profile.primary_account_code or '',
'is_foreign': profile.is_foreign,
'is_po_vendor': profile.is_po_vendor,
'po_count': profile.po_count,
}
else:
# No cached profile — live query for new/small vendors
bills = env['account.move'].search([
('move_type', '=', 'in_invoice'), ('state', '=', 'posted'),
('partner_id', '=', partner_id),
], order='date desc', limit=10)
tax_stats = {'source': 'live_query', 'total_bills': len(bills),
'bills_with_tax': 0, 'bills_no_tax': 0,
'avg_tax_pct': 0.0, 'tax_note': ''}
tax_pcts = []
for bill in bills:
if bill.amount_tax > 0.01:
tax_stats['bills_with_tax'] += 1
if bill.amount_untaxed > 0:
tax_pcts.append(round(bill.amount_tax / bill.amount_untaxed * 100, 2))
else:
tax_stats['bills_no_tax'] += 1
if tax_pcts:
tax_stats['avg_tax_pct'] = round(sum(tax_pcts) / len(tax_pcts), 2)
if tax_stats['total_bills'] > 0:
if tax_stats['bills_no_tax'] == tax_stats['total_bills']:
tax_stats['tax_note'] = 'This vendor NEVER charges HST. All bills are zero-rated.'
elif tax_stats['avg_tax_pct'] < 2.0 and tax_stats['bills_with_tax'] > 0:
tax_stats['tax_note'] = (
f'HST only on shipping (avg {tax_stats["avg_tax_pct"]}%). '
f'Do NOT apply HST to full amount.'
)
elif tax_stats['avg_tax_pct'] >= 12.0:
tax_stats['tax_note'] = f'Consistently charges HST at ~{tax_stats["avg_tax_pct"]}%.'
result['vendor_tax_pattern'] = tax_stats
return result
def create_vendor_bill(env, params):
"""[Tier 3] Create a vendor bill (account.move with move_type='in_invoice').
Requires user approval before execution."""
partner_id = int(params['partner_id'])
invoice_date = params.get('invoice_date', str(fields.Date.today()))
bill_lines = params.get('lines', [])
if not bill_lines:
return {'error': 'At least one invoice line is required'}
partner = env['res.partner'].browse(partner_id)
if not partner.exists():
return {'error': f'Partner not found: {partner_id}'}
invoice_line_vals = []
for line in bill_lines:
line_vals = {
'name': line.get('description', 'Expense'),
'price_unit': float(line.get('price_unit', 0)),
'quantity': float(line.get('quantity', 1)),
}
if line.get('account_id'):
line_vals['account_id'] = int(line['account_id'])
if line.get('tax_ids'):
line_vals['tax_ids'] = [(6, 0, [int(t) for t in line['tax_ids']])]
invoice_line_vals.append((0, 0, line_vals))
try:
bill = env['account.move'].create({
'move_type': 'in_invoice',
'partner_id': partner_id,
'invoice_date': invoice_date,
'date': invoice_date,
'invoice_line_ids': invoice_line_vals,
'company_id': env.company.id,
})
if params.get('post', False):
bill.action_post()
return {
'status': 'created',
'bill_id': bill.id,
'bill_name': bill.name,
'partner': partner.name,
'amount_total': bill.amount_total,
'state': bill.state,
}
except Exception as e:
_logger.error("Failed to create vendor bill: %s", e)
return {'error': str(e)}
def register_bill_payment(env, params):
"""[Tier 3] Register payment on a posted vendor bill and optionally reconcile to bank line.
Requires user approval before execution."""
bill_id = int(params['bill_id'])
journal_id = int(params['journal_id'])
bill = env['account.move'].browse(bill_id)
if not bill.exists() or bill.state != 'posted':
return {'error': 'Bill not found or not posted'}
payment_date = params.get('payment_date', str(fields.Date.today()))
try:
# Use the payment register wizard
ctx = {
'active_model': 'account.move',
'active_ids': [bill_id],
}
wizard = env['account.payment.register'].with_context(**ctx).create({
'journal_id': journal_id,
'payment_date': payment_date,
})
# Optionally set amount if provided (otherwise defaults to bill amount)
if params.get('amount'):
wizard.amount = float(params['amount'])
payments = wizard.action_create_payments()
# Find the created payment
payment = None
if isinstance(payments, dict) and payments.get('res_id'):
payment = env['account.payment'].browse(payments['res_id'])
elif isinstance(payments, dict) and payments.get('domain'):
payment = env['account.payment'].search(payments['domain'], limit=1)
else:
# Fallback: find the latest payment for this bill
payment = env['account.payment'].search([
('partner_id', '=', bill.partner_id.id),
], order='create_date desc', limit=1)
result = {
'status': 'paid',
'bill_id': bill_id,
'bill_name': bill.name,
'payment_state': bill.payment_state,
}
if payment:
result['payment_id'] = payment.id
result['payment_name'] = payment.name
# Optionally reconcile to a bank statement line
if params.get('statement_line_id') and payment:
try:
st_line = env['account.bank.statement.line'].browse(int(params['statement_line_id']))
if st_line.exists() and not st_line.is_reconciled:
# Find the payment's move lines on the bank's outstanding account
pay_move_lines = payment.move_id.line_ids.filtered(
lambda l: l.account_id.reconcile and not l.reconciled
)
if pay_move_lines:
st_line.set_line_bank_statement_line(pay_move_lines.ids)
result['reconciled'] = True
result['statement_line_id'] = st_line.id
except Exception as e:
_logger.warning("Payment created but bank reconciliation failed: %s", e)
result['reconcile_error'] = str(e)
return result
except Exception as e:
_logger.error("Failed to register payment: %s", e)
return {'error': str(e)}
TOOLS = {
'get_ap_aging': get_ap_aging,
'find_duplicate_bills': find_duplicate_bills,
'match_bill_to_po': match_bill_to_po,
'get_unpaid_bills': get_unpaid_bills,
'verify_bill_taxes': verify_bill_taxes,
'get_payment_schedule': get_payment_schedule,
'search_partners': search_partners,
'find_similar_bank_lines': find_similar_bank_lines,
'create_vendor_bill': create_vendor_bill,
'register_bill_payment': register_bill_payment,
}

View File

@@ -0,0 +1,194 @@
import logging
from odoo import fields
_logger = logging.getLogger(__name__)
def get_ar_aging(env, params):
today = fields.Date.today()
domain = [
('account_id.account_type', '=', 'asset_receivable'),
('parent_state', '=', 'posted'),
('reconciled', '=', False),
('company_id', '=', env.company.id),
]
amls = env['account.move.line'].search(domain)
buckets = {'current': 0, '1_30': 0, '31_60': 0, '61_90': 0, '90_plus': 0}
for aml in amls:
if not aml.date_maturity or aml.date_maturity >= today:
buckets['current'] += aml.amount_residual
else:
days = (today - aml.date_maturity).days
if days <= 30:
buckets['1_30'] += aml.amount_residual
elif days <= 60:
buckets['31_60'] += aml.amount_residual
elif days <= 90:
buckets['61_90'] += aml.amount_residual
else:
buckets['90_plus'] += aml.amount_residual
return {
'total': sum(buckets.values()),
'buckets': buckets,
'line_count': len(amls),
}
def get_overdue_invoices(env, params):
today = fields.Date.today()
days_overdue = int(params.get('min_days_overdue', 1))
from datetime import timedelta
cutoff = today - timedelta(days=days_overdue)
invoices = env['account.move'].search([
('move_type', '=', 'out_invoice'),
('state', '=', 'posted'),
('payment_state', 'in', ('not_paid', 'partial')),
('invoice_date_due', '<', cutoff),
('company_id', '=', env.company.id),
], order='invoice_date_due asc', limit=int(params.get('limit', 50)))
return {
'count': len(invoices),
'invoices': [{
'id': inv.id,
'name': inv.name,
'partner': inv.partner_id.name if inv.partner_id else '',
'email': inv.partner_id.email or '' if inv.partner_id else '',
'phone': inv.partner_id.phone or '' if inv.partner_id else '',
'amount_total': inv.amount_total,
'amount_residual': inv.amount_residual,
'date_due': str(inv.invoice_date_due),
'days_overdue': (today - inv.invoice_date_due).days,
} for inv in invoices],
}
def get_partner_balance(env, params):
"""Get AR and AP balance for a partner. Accepts partner_id or partner_name."""
partner = None
if params.get('partner_id'):
partner = env['res.partner'].browse(int(params['partner_id']))
elif params.get('partner_name'):
partner = env['res.partner'].search([
('name', 'ilike', params['partner_name']),
], limit=1)
if not partner or not partner.exists():
return {'error': f"Partner not found: {params.get('partner_name', params.get('partner_id', '?'))}"}
# AR balance (receivable)
ar_amls = env['account.move.line'].search([
('partner_id', '=', partner.id),
('account_id.account_type', '=', 'asset_receivable'),
('parent_state', '=', 'posted'),
('reconciled', '=', False),
('company_id', '=', env.company.id),
])
ar_balance = sum(aml.amount_residual for aml in ar_amls)
# AP balance (payable)
ap_amls = env['account.move.line'].search([
('partner_id', '=', partner.id),
('account_id.account_type', '=', 'liability_payable'),
('parent_state', '=', 'posted'),
('reconciled', '=', False),
('company_id', '=', env.company.id),
])
ap_balance = sum(aml.amount_residual for aml in ap_amls)
open_items = [{
'id': aml.id,
'move_name': aml.move_id.name,
'ref': aml.ref or '',
'date': str(aml.date),
'amount_residual': aml.amount_residual,
'type': 'receivable' if aml.account_id.account_type == 'asset_receivable' else 'payable',
'date_maturity': str(aml.date_maturity) if aml.date_maturity else '',
} for aml in (ar_amls | ap_amls)[:30]]
return {
'partner': partner.name,
'partner_id': partner.id,
'ar_balance': ar_balance,
'ap_balance': ap_balance,
'net_balance': ar_balance + ap_balance,
'they_owe_us': ar_balance if ar_balance > 0 else 0,
'we_owe_them': abs(ap_balance) if ap_balance < 0 else 0,
'open_items': open_items,
}
def send_followup(env, params):
partner_id = int(params['partner_id'])
partner = env['res.partner'].browse(partner_id)
if not partner.exists():
return {'error': 'Partner not found'}
options = {
'partner_id': partner_id,
'email': params.get('send_email', False),
'print': params.get('print_letter', False),
'sms': False,
}
if params.get('email_subject'):
options['email_subject'] = params['email_subject']
if params.get('body'):
options['body'] = params['body']
result = partner.execute_followup(options)
return {'status': 'sent', 'partner': partner.name, 'result': str(result) if result else 'done'}
def get_followup_report(env, params):
partner_id = int(params['partner_id'])
partner = env['res.partner'].browse(partner_id)
if not partner.exists():
return {'error': 'Partner not found'}
try:
report = env['account.followup.report']
html = report._get_followup_report_html(partner)
return {'partner': partner.name, 'html': html}
except Exception as e:
return {'error': str(e)}
def reconcile_payment_to_invoice(env, params):
move_line_ids = [int(x) for x in params['move_line_ids']]
amls = env['account.move.line'].browse(move_line_ids)
if len(amls) < 2:
return {'error': 'Need at least 2 journal items to reconcile'}
amls.reconcile()
return {
'status': 'reconciled',
'move_line_ids': move_line_ids,
}
def get_unmatched_payments(env, params):
domain = [
('account_id.account_type', '=', 'asset_receivable'),
('parent_state', '=', 'posted'),
('reconciled', '=', False),
('move_id.payment_id', '!=', False),
('company_id', '=', env.company.id),
]
amls = env['account.move.line'].search(domain, order='date desc')
return {
'count': len(amls),
'payments': [{
'id': aml.id,
'date': str(aml.date),
'ref': aml.ref or aml.move_id.name,
'partner': aml.partner_id.name if aml.partner_id else '',
'amount': abs(aml.amount_residual),
} for aml in amls[:50]],
}
TOOLS = {
'get_ar_aging': get_ar_aging,
'get_overdue_invoices': get_overdue_invoices,
'get_partner_balance': get_partner_balance,
'send_followup': send_followup,
'get_followup_report': get_followup_report,
'reconcile_payment_to_invoice': reconcile_payment_to_invoice,
'get_unmatched_payments': get_unmatched_payments,
}

View File

@@ -0,0 +1,237 @@
import logging
from odoo import fields
_logger = logging.getLogger(__name__)
def get_adp_receivable_aging(env, params):
accounts = env['account.account'].search([
('code', '=like', '1101%'),
('company_ids', 'in', env.company.id),
])
today = fields.Date.today()
amls = env['account.move.line'].search([
('account_id', 'in', accounts.ids),
('reconciled', '=', False),
('parent_state', '=', 'posted'),
])
buckets = {'current': 0, '1_30': 0, '31_60': 0, '61_90': 0, '90_plus': 0}
for aml in amls:
amt = abs(aml.amount_residual)
if not aml.date_maturity or aml.date_maturity >= today:
buckets['current'] += amt
else:
days = (today - aml.date_maturity).days
if days <= 30:
buckets['1_30'] += amt
elif days <= 60:
buckets['31_60'] += amt
elif days <= 90:
buckets['61_90'] += amt
else:
buckets['90_plus'] += amt
return {'total': sum(buckets.values()), 'buckets': buckets}
def match_adp_payment_to_invoice(env, params):
move_line_ids = [int(x) for x in params['move_line_ids']]
amls = env['account.move.line'].browse(move_line_ids).exists()
if len(amls) < 2:
return {'error': 'Need at least 2 existing journal items to reconcile'}
amls.reconcile()
return {'status': 'matched', 'move_line_ids': amls.ids}
def verify_adp_split(env, params):
invoice_id = int(params['invoice_id'])
invoice = env['account.move'].browse(invoice_id)
if not invoice.exists():
return {'error': 'Invoice not found'}
lines = invoice.invoice_line_ids
total = invoice.amount_untaxed
return {
'invoice': invoice.name,
'total_untaxed': total,
'total_with_tax': invoice.amount_total,
'lines': [{'name': l.name, 'subtotal': l.price_subtotal, 'total': l.price_total} for l in lines],
'balanced': abs(sum(l.price_subtotal for l in lines) - total) < 0.01,
}
def find_adp_without_payment(env, params):
adp_partner = env['res.partner'].search([('name', 'ilike', 'ADP')], limit=1)
if not adp_partner:
return {'status': 'info', 'message': 'No ADP partner found in the system.'}
invoices = env['account.move'].search([
('partner_id', '=', adp_partner.id),
('move_type', '=', 'out_invoice'),
('state', '=', 'posted'),
('payment_state', 'in', ('not_paid', 'partial')),
])
return {
'count': len(invoices),
'invoices': [{
'id': inv.id, 'name': inv.name,
'amount': inv.amount_residual, 'date': str(inv.date),
} for inv in invoices[:20]],
}
def get_adp_summary(env, params):
date_from = params.get('date_from')
date_to = params.get('date_to')
accounts = env['account.account'].search([
('code', '=like', '1101%'), ('company_ids', 'in', env.company.id),
])
domain = [
('account_id', 'in', accounts.ids),
('parent_state', '=', 'posted'),
]
if date_from:
domain.append(('date', '>=', date_from))
if date_to:
domain.append(('date', '<=', date_to))
lines = env['account.move.line'].search(domain)
total_debit = sum(l.debit for l in lines)
total_credit = sum(l.credit for l in lines)
return {
'period': f'{date_from or "all"} to {date_to or "now"}',
'billed': total_debit,
'collected': total_credit,
'outstanding': total_debit - total_credit,
}
def register_adp_batch_payment(env, params):
"""Register payments for a batch of ADP invoices from a remittance advice.
Takes a list of invoice numbers with payment amounts and a payment date.
Registers a payment for each invoice via Odoo's payment wizard, which
creates outstanding receipt entries (PBNK2) on account 1050.
After calling this, use suggest_bank_line_matches on the bank deposit line
to match the outstanding receipts against the bank line.
"""
invoices_data = params.get('invoices', [])
payment_date = params.get('payment_date')
journal_id = int(params.get('journal_id', 50)) # Default Scotia Current
if not invoices_data:
return {'error': 'No invoices provided'}
if not payment_date:
return {'error': 'payment_date is required (YYYY-MM-DD)'}
ADP_PARTNER_ID = 3421 # ADP (Assistive Device Program)
results = []
total_paid = 0.0
errors = []
for inv_data in invoices_data:
inv_number = str(inv_data.get('invoice_number', '')).strip()
amount = float(inv_data.get('amount', 0))
if not inv_number or not amount:
errors.append(f"Skipped: missing invoice_number or amount in {inv_data}")
continue
# Find the invoice by name/number
invoice = env['account.move'].search([
('name', 'ilike', inv_number),
('move_type', '=', 'out_invoice'),
('state', '=', 'posted'),
('company_id', '=', env.company.id),
], limit=1)
if not invoice:
# Try without leading zeros or with different format
invoice = env['account.move'].search([
('name', '=like', f'%{inv_number}'),
('move_type', '=', 'out_invoice'),
('state', '=', 'posted'),
], limit=1)
if not invoice:
errors.append(f"Invoice {inv_number} not found")
continue
if invoice.payment_state == 'paid':
results.append({
'invoice': inv_number,
'status': 'already_paid',
'move_id': invoice.id,
})
continue
# Check if amount matches residual (allow partial)
if amount > invoice.amount_residual + 0.01:
errors.append(
f"Invoice {inv_number}: payment ${amount:.2f} exceeds "
f"residual ${invoice.amount_residual:.2f}"
)
continue
# Register payment via the payment wizard
try:
payment_vals = {
'payment_type': 'inbound',
'partner_type': 'customer',
'partner_id': invoice.partner_id.id or ADP_PARTNER_ID,
'amount': amount,
'date': payment_date,
'journal_id': journal_id,
'ref': f'ADP Remittance - {inv_number}',
}
# Use the payment register wizard
ctx = {
'active_model': 'account.move',
'active_ids': [invoice.id],
}
wizard = env['account.payment.register'].with_context(**ctx).create({
'payment_date': payment_date,
'amount': amount,
'journal_id': journal_id,
'payment_method_line_id': env['account.payment.method.line'].search([
('journal_id', '=', journal_id),
('payment_type', '=', 'inbound'),
], limit=1).id,
})
wizard.action_create_payments()
results.append({
'invoice': inv_number,
'status': 'paid',
'amount': amount,
'move_id': invoice.id,
'move_name': invoice.name,
})
total_paid += amount
except Exception as e:
_logger.warning("ADP payment failed for %s: %s", inv_number, e)
errors.append(f"Invoice {inv_number}: payment failed — {e}")
env.cr.commit()
return {
'status': 'completed',
'paid_count': len([r for r in results if r.get('status') == 'paid']),
'already_paid_count': len([r for r in results if r.get('status') == 'already_paid']),
'total_paid': total_paid,
'results': results,
'errors': errors,
'message': (
f"Registered payments for {len([r for r in results if r.get('status') == 'paid'])} invoices "
f"totalling ${total_paid:,.2f}. "
+ (f"{len(errors)} errors." if errors else "No errors.")
+ " Now use suggest_bank_line_matches to match the bank deposit."
),
}
TOOLS = {
'get_adp_receivable_aging': get_adp_receivable_aging,
'match_adp_payment_to_invoice': match_adp_payment_to_invoice,
'verify_adp_split': verify_adp_split,
'find_adp_without_payment': find_adp_without_payment,
'get_adp_summary': get_adp_summary,
'register_adp_batch_payment': register_adp_batch_payment,
}

View File

@@ -0,0 +1,164 @@
import logging
from odoo import fields
_logger = logging.getLogger(__name__)
def audit_posted_entry(env, params):
move_id = int(params['move_id'])
move = env['account.move'].browse(move_id)
if not move.exists():
return {'error': 'Entry not found'}
issues = []
total_debit = sum(l.debit for l in move.line_ids)
total_credit = sum(l.credit for l in move.line_ids)
if abs(total_debit - total_credit) > 0.01:
issues.append({'severity': 'critical', 'issue': f'Unbalanced entry: debit={total_debit}, credit={total_credit}'})
for line in move.line_ids:
if not line.account_id:
issues.append({'severity': 'critical', 'issue': f'Line missing account: {line.name}'})
if not move.line_ids:
issues.append({'severity': 'warning', 'issue': 'Entry has no lines'})
return {
'move': move.name, 'date': str(move.date),
'issues': issues, 'clean': len(issues) == 0,
}
def audit_account_balances(env, params):
from .journal_review import find_wrong_direction_balances
return find_wrong_direction_balances(env, params)
def audit_tax_compliance(env, params):
from .hst_management import find_missing_tax_invoices, find_missing_itc_bills
invoices = find_missing_tax_invoices(env, params)
bills = find_missing_itc_bills(env, params)
return {
'missing_tax_invoices': invoices.get('missing_tax_count', 0),
'missing_itc_bills': bills.get('missing_itc_count', 0),
'total_issues': invoices.get('missing_tax_count', 0) + bills.get('missing_itc_count', 0),
}
def audit_reconciliation_integrity(env, params):
from .journal_review import verify_reconciliation_integrity
return verify_reconciliation_integrity(env, params)
def check_hash_chain(env, params):
from .month_end import run_hash_integrity_check
return run_hash_integrity_check(env, params)
def check_sequence_gaps(env, params):
from .journal_review import find_sequence_gaps
return find_sequence_gaps(env, params)
def flag_entry(env, params):
move_id = int(params['move_id'])
flag = params.get('flag', 'Review Required')
recommendation = params.get('recommendation', '')
move = env['account.move'].browse(move_id)
if not move.exists():
return {'error': 'Entry not found'}
body = f'<strong>🏴 {flag}</strong><br/>{recommendation}'
move.message_post(body=body, message_type='comment', subtype_xmlid='mail.mt_note')
return {'status': 'flagged', 'move': move.name, 'flag': flag}
def get_audit_status(env, params):
try:
AuditStatus = env['account.audit.account.status']
except KeyError:
return {'error': 'Audit status model (account.audit.account.status) is not available. The account_audit Enterprise module may not be installed.'}
statuses = AuditStatus.search([])
return {
'statuses': [{
'id': s.id,
'account': s.account_id.name,
'status': s.status,
'audit': s.audit_id.display_name if s.audit_id else '',
} for s in statuses[:50]],
}
def set_audit_status(env, params):
try:
AuditStatus = env['account.audit.account.status']
except KeyError:
return {'error': 'Audit status model (account.audit.account.status) is not available. The account_audit Enterprise module may not be installed.'}
status_id = int(params['status_id'])
new_status = params['status']
rec = AuditStatus.browse(status_id)
if not rec.exists():
return {'error': 'Audit status record not found'}
rec.status = new_status
return {'status': 'updated', 'id': status_id, 'new_status': new_status}
def get_audit_trail(env, params):
move_id = int(params['move_id'])
move = env['account.move'].browse(move_id)
if not move.exists():
return {'error': 'Entry not found'}
messages = env['mail.message'].search([
('model', '=', 'account.move'),
('res_id', '=', move_id),
], order='date desc', limit=20)
return {
'move': move.name,
'messages': [{
'date': str(m.date),
'author': m.author_id.name if m.author_id else '',
'body': m.body or '',
'type': m.message_type,
} for m in messages],
}
def run_full_audit(env, params):
results = {}
results['account_balances'] = audit_account_balances(env, params)
results['tax_compliance'] = audit_tax_compliance(env, params)
results['reconciliation'] = audit_reconciliation_integrity(env, params)
results['hash_chain'] = check_hash_chain(env, params)
results['sequence_gaps'] = check_sequence_gaps(env, params)
total_issues = 0
for key, val in results.items():
total_issues += val.get('count', 0) + val.get('total_issues', 0)
score = max(0, 100 - total_issues * 5)
return {
'score': min(100, score),
'total_issues': total_issues,
'details': results,
}
def get_audit_report(env, params):
audit = run_full_audit(env, params)
report_lines = [f"Audit Score: {audit['score']}/100", f"Total Issues: {audit['total_issues']}", '']
for domain, detail in audit.get('details', {}).items():
report_lines.append(f"--- {domain.replace('_', ' ').title()} ---")
count = detail.get('count', detail.get('total_issues', 0))
report_lines.append(f" Issues: {count}")
return {'report': '\n'.join(report_lines), 'score': audit['score']}
TOOLS = {
'audit_posted_entry': audit_posted_entry,
'audit_account_balances': audit_account_balances,
'audit_tax_compliance': audit_tax_compliance,
'audit_reconciliation_integrity': audit_reconciliation_integrity,
'check_hash_chain': check_hash_chain,
'check_sequence_gaps': check_sequence_gaps,
'flag_entry': flag_entry,
'get_audit_status': get_audit_status,
'set_audit_status': set_audit_status,
'get_audit_trail': get_audit_trail,
'run_full_audit': run_full_audit,
'get_audit_report': get_audit_report,
}

View File

@@ -0,0 +1,961 @@
import logging
from datetime import datetime
from odoo import fields
_logger = logging.getLogger(__name__)
def get_unreconciled_bank_lines(env, params):
domain = [('is_reconciled', '=', False), ('company_id', '=', env.company.id)]
if params.get('journal_id'):
domain.append(('journal_id', '=', int(params['journal_id'])))
if params.get('date_from'):
domain.append(('date', '>=', params['date_from']))
if params.get('date_to'):
domain.append(('date', '<=', params['date_to']))
if params.get('min_amount'):
domain.append(('amount', '>=', float(params['min_amount'])))
limit = int(params.get('limit', 50))
lines = env['account.bank.statement.line'].search(domain, limit=limit, order='date desc')
return {
'count': len(lines),
'total_amount': sum(abs(l.amount) for l in lines),
'lines': [{
'id': l.id,
'date': str(l.date),
'payment_ref': l.payment_ref or '',
'partner_name': l.partner_name or (l.partner_id.name if l.partner_id else ''),
'amount': l.amount,
'journal': l.journal_id.name,
} for l in lines],
}
def get_unreconciled_receipts(env, params):
account_code = params.get('account_code', '1122')
accounts = env['account.account'].search([
('code', '=like', f'{account_code}%'),
('company_ids', 'in', env.company.id),
])
domain = [
('account_id', 'in', accounts.ids),
('reconciled', '=', False),
('parent_state', '=', 'posted'),
('company_id', '=', env.company.id),
]
lines = env['account.move.line'].search(domain, order='date desc')
return {
'count': len(lines),
'total_amount': sum(abs(l.amount_residual) for l in lines),
'lines': [{
'id': l.id,
'date': str(l.date),
'ref': l.ref or l.move_id.name,
'partner': l.partner_id.name if l.partner_id else '',
'amount_residual': l.amount_residual,
} for l in lines],
}
def match_bank_line_to_payments(env, params):
st_line_id = int(params['statement_line_id'])
move_line_ids = [int(x) for x in params['move_line_ids']]
st_line = env['account.bank.statement.line'].browse(st_line_id)
if not st_line.exists():
return {'error': 'Statement line not found'}
st_line.set_line_bank_statement_line(move_line_ids)
return {
'status': 'matched',
'statement_line_id': st_line_id,
'matched_move_lines': move_line_ids,
'is_reconciled': st_line.is_reconciled,
}
def auto_reconcile_bank_lines(env, params):
company_id = params.get('company_id', env.company.id)
lines = env['account.bank.statement.line'].search([
('is_reconciled', '=', False),
('company_id', '=', int(company_id)),
])
before_count = len(lines)
lines._try_auto_reconcile_statement_lines(company_id=int(company_id))
still_unreconciled = env['account.bank.statement.line'].search([
('is_reconciled', '=', False),
('company_id', '=', int(company_id)),
])
reconciled_count = before_count - len(still_unreconciled)
return {
'status': 'completed',
'lines_before': before_count,
'lines_reconciled': reconciled_count,
'lines_remaining': len(still_unreconciled),
}
def apply_reconcile_model(env, params):
model_id = int(params['model_id'])
st_line_id = int(params['statement_line_id'])
reco_model = env['account.reconcile.model'].browse(model_id)
st_line = env['account.bank.statement.line'].browse(st_line_id)
if not reco_model.exists() or not st_line.exists():
return {'error': 'Model or statement line not found'}
_liquidity_lines, suspense_lines, _other_lines = st_line._seek_for_lines()
residual = sum(l.amount_residual for l in suspense_lines) if suspense_lines else st_line.amount
write_off_vals = reco_model._get_write_off_move_lines_dict(st_line, residual)
if write_off_vals:
line_ids_create_command = [(0, 0, vals) for vals in write_off_vals]
st_line.move_id.write({'line_ids': line_ids_create_command})
return {
'status': 'applied',
'model': reco_model.name,
'write_off_lines': len(write_off_vals) if write_off_vals else 0,
}
def unmatch_bank_line(env, params):
st_line_id = int(params['statement_line_id'])
st_line = env['account.bank.statement.line'].browse(st_line_id)
if not st_line.exists():
return {'error': 'Statement line not found'}
st_line.action_unreconcile_entry()
return {'status': 'unmatched', 'statement_line_id': st_line_id}
def get_reconcile_suggestions(env, params):
st_line_id = int(params['statement_line_id'])
st_line = env['account.bank.statement.line'].browse(st_line_id)
if not st_line.exists():
return {'error': 'Statement line not found'}
models = env['account.reconcile.model'].search([
('company_id', '=', env.company.id),
])
return {
'models': [{
'id': m.id,
'name': m.name,
'trigger': m.trigger if hasattr(m, 'trigger') else 'manual',
} for m in models],
}
def sum_payments_by_date(env, params):
"""Sum payment/journal activity for a date range.
IMPORTANT: Always pass journal_ids to filter to specific journals.
Without journal_ids, returns totals across ALL journals which is
almost never what you want for reconciliation."""
date_from = params.get('date_from')
date_to = params.get('date_to')
if not date_from or not date_to:
return {'error': 'date_from and date_to are required'}
journal_ids = params.get('journal_ids', [])
domain = [
('parent_state', '=', 'posted'),
('company_id', '=', env.company.id),
('date', '>=', date_from),
('date', '<=', date_to),
]
scope = 'all journals'
if journal_ids:
jids = [int(j) for j in journal_ids]
domain.append(('journal_id', 'in', jids))
journals = env['account.journal'].browse(jids)
scope = ', '.join(j.name for j in journals if j.exists())
else:
# Without journal filter, include a warning and break down by journal
pass
lines = env['account.move.line'].search(domain)
total_debit = sum(l.debit for l in lines)
total_credit = sum(l.credit for l in lines)
result = {
'date_from': date_from,
'date_to': date_to,
'total_debit': total_debit,
'total_credit': total_credit,
'net': total_debit - total_credit,
'line_count': len(lines),
'scope': scope,
}
# If no journal filter, add per-journal breakdown so AI doesn't
# mistake company-wide totals for a specific journal's activity
if not journal_ids:
result['warning'] = (
'No journal_ids filter was provided. These totals are across ALL '
'journals in the company. To get card payment totals, pass the '
'specific card/POS journal IDs.'
)
journal_totals = {}
for l in lines:
jname = l.journal_id.name
if jname not in journal_totals:
journal_totals[jname] = {'debit': 0.0, 'credit': 0.0, 'count': 0}
journal_totals[jname]['debit'] += l.debit
journal_totals[jname]['credit'] += l.credit
journal_totals[jname]['count'] += 1
result['by_journal'] = [
{'journal': jn, 'debit': v['debit'], 'credit': v['credit'], 'count': v['count']}
for jn, v in sorted(journal_totals.items(), key=lambda x: -x[1]['debit'])
][:15]
return result
def get_bank_line_details(env, params):
"""Get full details of a single bank statement line plus matching suggestions."""
line_id = int(params['line_id'])
line = env['account.bank.statement.line'].browse(line_id)
if not line.exists():
return {'error': 'Bank statement line not found'}
result = {
'id': line.id,
'date': str(line.date),
'payment_ref': line.payment_ref or '',
'partner_name': line.partner_name or (line.partner_id.name if line.partner_id else ''),
'partner_id': line.partner_id.id if line.partner_id else None,
'amount': line.amount,
'journal': line.journal_id.name,
'journal_id': line.journal_id.id,
'is_reconciled': line.is_reconciled,
'existing_bills': [],
'suggested_partner': None,
}
# Search for existing vendor bills matching amount ± $0.50 and date ± 3 days
abs_amount = abs(line.amount)
from datetime import timedelta as td
date_from = line.date - td(days=3)
date_to = line.date + td(days=3)
matching_bills = env['account.move'].search([
('move_type', '=', 'in_invoice'),
('state', '=', 'posted'),
('amount_total', '>=', abs_amount - 0.50),
('amount_total', '<=', abs_amount + 0.50),
('date', '>=', str(date_from)),
('date', '<=', str(date_to)),
('company_id', '=', env.company.id),
], limit=5)
for bill in matching_bills:
result['existing_bills'].append({
'id': bill.id,
'name': bill.name,
'partner': bill.partner_id.name if bill.partner_id else '',
'amount_total': bill.amount_total,
'date': str(bill.date),
'payment_state': bill.payment_state,
})
# Try to suggest a partner from payment_ref keyword
if line.payment_ref and not line.partner_id:
# Extract meaningful words from payment_ref (skip common banking terms)
skip_words = {'misc', 'payment', 'online', 'banking', 'pad', 'business',
'deposit', 'cheque', 'transfer', 'e-transfer', 'sent', 'autodeposit'}
words = [w for w in line.payment_ref.split() if len(w) > 2 and w.lower() not in skip_words]
for word in words[:3]:
partners = env['res.partner'].search([
('name', 'ilike', word),
('supplier_rank', '>', 0),
], limit=3)
if partners:
result['suggested_partner'] = {
'id': partners[0].id,
'name': partners[0].name,
'match_word': word,
}
break
return result
def check_recurring_pattern(env, params):
"""Check if a bank line matches a known recurring payment pattern.
Returns the historical coding (account, HST, partner, reconcile model) if found."""
line_id = params.get('line_id')
payment_ref = params.get('payment_ref', '')
amount = params.get('amount')
# If line_id provided, get the ref and amount from the line
if line_id:
line = env['account.bank.statement.line'].browse(int(line_id))
if line.exists():
payment_ref = line.payment_ref or ''
amount = line.amount
if not payment_ref:
return {'match': False, 'reason': 'No payment reference to match'}
# Search cached patterns by keyword
patterns = env['fusion.recurring.pattern'].search([
('company_id', '=', env.company.id),
])
best_match = None
for pat in patterns:
if not pat.ref_keyword:
continue
# Check if the pattern keyword appears in the payment_ref
if pat.ref_keyword.lower()[:30] in payment_ref.lower():
# If amount matches too, it's a strong match
if amount and pat.amount_is_fixed and abs(pat.amount - amount) < 0.01:
best_match = pat
break
# Keyword-only match (amount may vary)
if not best_match or pat.occurrences > best_match.occurrences:
best_match = pat
if not best_match:
return {'match': False, 'payment_ref': payment_ref}
result = {
'match': True,
'pattern_id': best_match.id,
'pattern_name': best_match.name,
'occurrences': best_match.occurrences,
'first_seen': str(best_match.first_seen) if best_match.first_seen else '',
'last_seen': str(best_match.last_seen) if best_match.last_seen else '',
'expense_account_id': best_match.expense_account_id.id if best_match.expense_account_id else None,
'expense_account_code': best_match.expense_account_code or '',
'expense_account_name': best_match.expense_account_id.name if best_match.expense_account_id else '',
'has_hst': best_match.has_hst,
'partner_id': best_match.partner_id.id if best_match.partner_id else None,
'partner_name': best_match.partner_id.name if best_match.partner_id else '',
'action_note': best_match.action_note or '',
'amount_is_fixed': best_match.amount_is_fixed,
}
if best_match.reconcile_model_id:
result['reconcile_model_id'] = best_match.reconcile_model_id.id
result['reconcile_model_name'] = best_match.reconcile_model_id.name
return result
def match_internal_transfers(env, params):
"""[Tier 3] Find and match inter-account transfers between two bank journals.
Matches exact amounts within a date window. Only matches when there is exactly
ONE candidate on each side (no ambiguous matches). Requires user approval.
Typical use: Scotia Current Account ↔ Scotia Visa payments."""
journal_a_id = int(params['journal_a_id']) # e.g., Scotia Current (50)
journal_b_id = int(params['journal_b_id']) # e.g., Scotia Visa (51)
date_from = params.get('date_from', '2025-01-01')
date_to = params.get('date_to', '2025-03-31')
max_days_apart = int(params.get('max_days_apart', 2))
# Get unreconciled positive lines from both journals
# (transfers show as positive on the RECEIVING side)
lines_a = env['account.bank.statement.line'].search([
('is_reconciled', '=', False),
('journal_id', '=', journal_a_id),
('company_id', '=', env.company.id),
])
lines_a = lines_a.filtered(
lambda l: l.move_id.date >= fields.Date.from_string(date_from)
and l.move_id.date <= fields.Date.from_string(date_to)
and l.amount > 0 # money coming IN on this account
)
lines_b = env['account.bank.statement.line'].search([
('is_reconciled', '=', False),
('journal_id', '=', journal_b_id),
('company_id', '=', env.company.id),
])
lines_b = lines_b.filtered(
lambda l: l.move_id.date >= fields.Date.from_string(date_from)
and l.move_id.date <= fields.Date.from_string(date_to)
and l.amount > 0 # money coming IN on this account
)
matched_pairs = []
used_a = set()
used_b = set()
# For each line in A, find exact-amount match in B within date window
for la in sorted(lines_a, key=lambda l: l.move_id.date):
if la.id in used_a:
continue
candidates = []
for lb in lines_b:
if lb.id in used_b:
continue
if abs(la.amount - lb.amount) < 0.01:
days = abs((la.move_id.date - lb.move_id.date).days)
if days <= max_days_apart:
candidates.append(lb)
# Only match if EXACTLY ONE candidate — skip ambiguous
if len(candidates) == 1:
lb = candidates[0]
matched_pairs.append({
'line_a_id': la.id,
'line_a_date': str(la.move_id.date),
'line_a_ref': la.payment_ref or '',
'line_a_journal': la.journal_id.name,
'line_b_id': lb.id,
'line_b_date': str(lb.move_id.date),
'line_b_ref': lb.payment_ref or '',
'line_b_journal': lb.journal_id.name,
'amount': la.amount,
'days_apart': abs((la.move_id.date - lb.move_id.date).days),
})
used_a.add(la.id)
used_b.add(lb.id)
if not matched_pairs:
return {
'status': 'no_matches',
'message': 'No unambiguous transfer pairs found.',
'lines_a_checked': len(lines_a),
'lines_b_checked': len(lines_b),
}
# If this is just a dry-run check (no execute flag), return the pairs for review
if not params.get('execute', False):
return {
'status': 'pairs_found',
'count': len(matched_pairs),
'pairs': matched_pairs,
'message': f'Found {len(matched_pairs)} unambiguous transfer pairs. Set execute=true to reconcile them.',
}
# Execute: create internal transfer journal entries to reconcile both sides
reconciled = []
for pair in matched_pairs:
try:
line_a = env['account.bank.statement.line'].browse(pair['line_a_id'])
line_b = env['account.bank.statement.line'].browse(pair['line_b_id'])
# Create an internal transfer payment
payment = env['account.payment'].create({
'payment_type': 'outbound',
'partner_type': 'supplier',
'partner_id': env.company.partner_id.id, # Self as partner for internal transfer
'amount': pair['amount'],
'journal_id': journal_a_id,
'destination_journal_id': journal_b_id,
'date': line_a.move_id.date,
'ref': f'Internal Transfer: {pair["line_a_ref"]}{pair["line_b_ref"]}',
'is_internal_transfer': True,
})
payment.action_post()
# Now match the payment's move lines to the bank statement lines
# The payment creates lines on both journals' outstanding accounts
for move_line in payment.move_id.line_ids:
if move_line.journal_id.id == journal_a_id and not move_line.reconciled:
try:
line_a.set_line_bank_statement_line(move_line.ids)
except Exception:
pass
# Check paired transfer for the other side
if payment.paired_internal_transfer_payment_id:
paired = payment.paired_internal_transfer_payment_id
for move_line in paired.move_id.line_ids:
if move_line.journal_id.id == journal_b_id and not move_line.reconciled:
try:
line_b.set_line_bank_statement_line(move_line.ids)
except Exception:
pass
reconciled.append({
'line_a_id': pair['line_a_id'],
'line_b_id': pair['line_b_id'],
'amount': pair['amount'],
'payment_id': payment.id,
'status': 'reconciled',
})
except Exception as e:
_logger.error("Failed to reconcile transfer pair %s: %s", pair, e)
reconciled.append({
'line_a_id': pair['line_a_id'],
'line_b_id': pair['line_b_id'],
'amount': pair['amount'],
'status': 'error',
'error': str(e),
})
return {
'status': 'executed',
'total_pairs': len(matched_pairs),
'reconciled': len([r for r in reconciled if r['status'] == 'reconciled']),
'errors': len([r for r in reconciled if r['status'] == 'error']),
'details': reconciled,
}
def find_unreconciled_cheques(env, params):
"""Find unreconciled cheque bank lines and classify as payroll vs non-payroll
by checking if the amount matches an existing payroll liability entry."""
PAYROLL_ACCT = 433 # 2201 Payroll Liabilities
journal_id = int(params.get('journal_id', 50)) # Default Scotia Current
limit = int(params.get('limit', 50))
AML = env['account.move.line'].sudo()
BSL = env['account.bank.statement.line'].sudo()
# Build set of known payroll liability amounts
payroll_amounts = set()
for aml in AML.search([
('account_id', '=', PAYROLL_ACCT),
('parent_state', '=', 'posted'),
('credit', '>', 0),
]):
payroll_amounts.add(round(aml.credit, 2))
cheque_lines = BSL.search([
('journal_id', '=', journal_id),
('is_reconciled', '=', False),
('payment_ref', 'ilike', 'cheque'),
('amount', '<', 0),
('company_id', '=', env.company.id),
], limit=limit, order='move_id desc')
payroll = []
non_payroll = []
for line in cheque_lines:
amt = round(abs(line.amount), 2)
entry = {
'id': line.id,
'date': str(line.move_id.date),
'ref': line.payment_ref or '',
'amount': amt,
'journal': line.journal_id.name,
}
if amt in payroll_amounts:
entry['type'] = 'payroll'
payroll.append(entry)
else:
entry['type'] = 'non_payroll'
non_payroll.append(entry)
return {
'count': len(cheque_lines),
'payroll_count': len(payroll),
'non_payroll_count': len(non_payroll),
'payroll': payroll,
'non_payroll': non_payroll,
}
def reconcile_payroll_cheques(env, params):
"""Reconcile payroll cheque bank lines by applying the Payroll Cheque Clearing
reconcile model. Only reconciles cheques whose amount matches an existing
payroll liability entry on account 2201. Non-payroll cheques are skipped.
Params:
journal_id (int): Bank journal ID (default 50 = Scotia Current)
line_ids (list): Optional list of specific bank line IDs to reconcile.
If not provided, reconciles all matching payroll cheques.
"""
PAYROLL_ACCT = 433
journal_id = int(params.get('journal_id', 50))
AML = env['account.move.line'].sudo()
BSL = env['account.bank.statement.line'].sudo()
RecModel = env['account.reconcile.model'].sudo()
model = RecModel.search([
('name', 'ilike', 'Payroll Cheque'),
('company_id', '=', env.company.id),
], limit=1)
if not model:
return {'error': 'No "Payroll Cheque Clearing" reconcile model found. Create one first.'}
# Get lines to process
if params.get('line_ids'):
cheque_lines = BSL.browse([int(x) for x in params['line_ids']])
cheque_lines = cheque_lines.filtered(lambda l: not l.is_reconciled)
else:
cheque_lines = BSL.search([
('journal_id', '=', journal_id),
('is_reconciled', '=', False),
('payment_ref', 'ilike', 'cheque'),
('amount', '<', 0),
('company_id', '=', env.company.id),
])
# Filter post-lock-date
lock = env.company.fiscalyear_lock_date
if lock:
cheque_lines = cheque_lines.filtered(lambda l: l.move_id.date > lock)
# Filter to payroll-only amounts
payroll_amounts = set()
for aml in AML.search([
('account_id', '=', PAYROLL_ACCT),
('parent_state', '=', 'posted'),
('credit', '>', 0),
]):
payroll_amounts.add(round(aml.credit, 2))
payroll_lines = cheque_lines.filtered(
lambda l: round(abs(l.amount), 2) in payroll_amounts
)
skipped = len(cheque_lines) - len(payroll_lines)
if not payroll_lines:
return {
'status': 'nothing_to_do',
'message': f'No payroll cheques to reconcile ({skipped} non-payroll cheques skipped)',
}
try:
model._apply_reconcile_models(payroll_lines)
env.cr.commit()
except Exception as e:
return {'error': f'Reconciliation failed: {e}'}
still = payroll_lines.filtered(lambda l: not l.is_reconciled)
reconciled = len(payroll_lines) - len(still)
return {
'status': 'completed',
'reconciled': reconciled,
'still_unreconciled': len(still),
'non_payroll_skipped': skipped,
'message': f'Reconciled {reconciled} payroll cheques. {skipped} non-payroll cheques skipped.',
}
def _extract_partner_from_ref(env, payment_ref):
"""Extract a partner from a bank line payment_ref using keyword matching."""
if not payment_ref:
return None
skip_words = {
'misc', 'payment', 'online', 'banking', 'pad', 'business', 'deposit',
'cheque', 'transfer', 'e-transfer', 'sent', 'autodeposit', 'credit',
'debit', 'memo', 'free', 'interac', 'from', 'the', 'and', 'for',
'miscellaneous', 'bill', 'correction', 'adjustment', 'other',
}
# Strip common suffixes like colons and split
clean_ref = payment_ref.replace(':', ' ').replace('-', ' ')
words = [w for w in clean_ref.split() if len(w) > 2 and w.lower() not in skip_words]
# Try progressively shorter phrases
for n in range(min(len(words), 4), 0, -1):
for i in range(len(words) - n + 1):
phrase = ' '.join(words[i:i+n])
partners = env['res.partner'].search([
('name', 'ilike', phrase),
('company_id', 'in', [env.company.id, False]),
], limit=3)
if partners:
return partners[0]
# Fallback: try each word individually with supplier/customer rank
for word in words:
if len(word) < 4:
continue
partners = env['res.partner'].search([
('name', 'ilike', word),
('company_id', 'in', [env.company.id, False]),
'|', ('customer_rank', '>', 0), ('supplier_rank', '>', 0),
], limit=3)
if partners:
return partners[0]
return None
def _find_best_subset(candidates, target, max_items=8):
"""Find the subset of candidates whose amounts sum closest to target.
Returns (aml_ids, total) for the best combination."""
items = candidates[:max_items]
if not items:
return [], 0.0
best_ids = []
best_total = 0.0
best_diff = abs(target)
n = len(items)
# Brute force all subsets (2^n, max 256)
for mask in range(1, 1 << n):
subset_ids = []
subset_total = 0.0
for j in range(n):
if mask & (1 << j):
subset_ids.append(items[j]['aml_id'])
subset_total += items[j]['amount_residual']
diff = abs(subset_total - target)
if diff < best_diff:
best_diff = diff
best_ids = subset_ids
best_total = subset_total
if diff < 0.01:
break # Exact match found
return best_ids, round(best_total, 2)
def suggest_bank_line_matches(env, params):
"""Find candidate journal items (invoices/bills) that could match a bank statement line.
Scores and ranks matches, finds best subset-sum combination.
Returns data for a reconciliation-mode fusion-table."""
line_id = int(params['statement_line_id'])
line = env['account.bank.statement.line'].browse(line_id)
if not line.exists():
return {'error': 'Bank statement line not found'}
if line.is_reconciled:
return {'error': 'Bank statement line is already reconciled'}
AML = env['account.move.line'].sudo()
bank_amount = abs(line.amount)
line_date = line.move_id.date
is_incoming = line.amount > 0 # positive = customer payment, negative = vendor payment
from datetime import timedelta as td
# Determine partner
partner = line.partner_id
if not partner:
partner = _extract_partner_from_ref(env, line.payment_ref)
# Base domain common to all searches
base_domain = [
('parent_state', '=', 'posted'),
('reconciled', '=', False),
('company_id', '=', env.company.id),
('display_type', 'not in', ('line_section', 'line_subsection', 'line_note')),
('statement_line_id', '=', False),
]
# --- PRIORITY 1: Outstanding payments/receipts on bank journal accounts ---
# These are registered payments waiting to be matched to bank lines.
# For incoming bank lines → look for outstanding receipts (credit on outstanding account)
# For outgoing bank lines → look for outstanding payments (debit on outstanding account)
outstanding_acct_ids = env['account.account'].search([
('name', 'ilike', 'outstanding'),
('company_ids', 'in', env.company.id),
]).ids
outstanding_amls = AML
if outstanding_acct_ids:
os_domain = base_domain + [('account_id', 'in', outstanding_acct_ids)]
if is_incoming:
os_domain.append(('amount_residual', '>', 0)) # Debit residual on outstanding receipts
else:
os_domain.append(('amount_residual', '<', 0)) # Credit residual on outstanding payments
if partner:
outstanding_amls = AML.search(os_domain + [('partner_id', '=', partner.id)], limit=30)
if not outstanding_amls:
outstanding_amls = AML.search(os_domain, limit=30)
else:
outstanding_amls = AML.search(os_domain, limit=30)
# --- PRIORITY 2: Open invoices/bills (receivable/payable accounts) ---
inv_domain = list(base_domain)
if is_incoming:
inv_domain.append(('account_id.account_type', '=', 'asset_receivable'))
inv_domain.append(('amount_residual', '>', 0))
else:
inv_domain.append(('account_id.account_type', '=', 'liability_payable'))
inv_domain.append(('amount_residual', '<', 0))
inv_domain.append(('date', '>=', str(line_date - td(days=90))))
inv_domain.append(('date', '<=', str(line_date + td(days=30))))
invoice_amls = AML
if partner:
invoice_amls = AML.search(inv_domain + [('partner_id', '=', partner.id)], limit=30)
if not invoice_amls:
invoice_amls = AML.search(inv_domain, limit=30)
else:
invoice_amls = AML.search(inv_domain, limit=30)
# Merge: outstanding payments first (priority), then invoices/bills
combined = outstanding_amls | invoice_amls
# Score and format candidates
outstanding_ids = set(outstanding_amls.ids) if outstanding_amls else set()
candidates = []
seen_ids = set()
for aml in combined:
if aml.id in seen_ids:
continue
seen_ids.add(aml.id)
residual = abs(aml.amount_residual)
score = 0
reasons = []
is_payment = aml.id in outstanding_ids
# Source type: payment entries get a boost (preferred match)
if is_payment:
score += 15
reasons.append('payment')
# Amount scoring
if abs(residual - bank_amount) < 0.01:
score += 40
reasons.append('exact amount')
elif residual <= bank_amount * 1.05:
score += 20
reasons.append('close amount')
# Partner scoring
if partner and aml.partner_id.id == partner.id:
score += 25
reasons.append('partner')
elif partner and aml.partner_id and partner.name and aml.partner_id.name:
p1_words = set(partner.name.upper().split())
p2_words = set(aml.partner_id.name.upper().split())
if p1_words & p2_words:
score += 10
reasons.append('partial partner')
# Date proximity scoring
days_apart = abs((aml.date - line_date).days)
if days_apart <= 3:
score += 15
reasons.append(f'{days_apart}d')
elif days_apart <= 7:
score += 10
elif days_apart <= 14:
score += 5
# Reference matching
if line.payment_ref and aml.move_id.ref:
if any(w.upper() in (aml.move_id.ref or '').upper()
for w in line.payment_ref.split() if len(w) > 3):
score += 10
reasons.append('ref match')
# Determine entry type label
entry_type = 'payment' if is_payment else 'invoice'
if aml.move_id.move_type == 'in_invoice':
entry_type = 'bill'
elif aml.move_id.move_type == 'out_invoice':
entry_type = 'invoice'
elif aml.move_id.move_type in ('in_refund', 'out_refund'):
entry_type = 'credit note'
elif aml.payment_id:
entry_type = 'payment'
candidates.append({
'aml_id': aml.id,
'move_id': aml.move_id.id,
'name': aml.move_id.name or '',
'ref': aml.move_id.ref or '',
'partner': aml.partner_id.name if aml.partner_id else '',
'partner_id': aml.partner_id.id if aml.partner_id else None,
'date': str(aml.date),
'amount_total': abs(aml.balance),
'amount_residual': residual,
'account': aml.account_id.code if hasattr(aml.account_id, 'code') else '',
'type': entry_type,
'score': score,
'reasons': ', '.join(reasons) if reasons else '',
})
# Sort by score descending
candidates.sort(key=lambda c: -c['score'])
# Find best subset-sum combination
best_combo_ids, best_combo_total = _find_best_subset(candidates, bank_amount)
# Mark which candidates are in the best combination
for c in candidates:
c['in_best_combo'] = c['aml_id'] in best_combo_ids
return {
'bank_line': {
'id': line.id,
'date': str(line_date),
'ref': line.payment_ref or '',
'amount': line.amount,
'abs_amount': bank_amount,
'journal': line.journal_id.name,
'partner': partner.name if partner else '',
'partner_id': partner.id if partner else None,
'direction': 'incoming' if is_incoming else 'outgoing',
},
'candidates': candidates[:20],
'best_combination': best_combo_ids,
'best_combination_total': best_combo_total,
'is_exact_match': abs(best_combo_total - bank_amount) < 0.01,
'count': len(candidates),
}
def search_matching_entries(env, params):
"""Search open journal items by query (invoice/bill number, amount, or partner name).
Used by the reconciliation table search bar via direct RPC."""
query = (params.get('query') or '').strip()
line_id = params.get('statement_line_id')
if not query:
return {'candidates': []}
AML = env['account.move.line'].sudo()
# Search across receivable, payable, AND outstanding accounts
outstanding_acct_ids = env['account.account'].search([
('name', 'ilike', 'outstanding'),
('company_ids', 'in', env.company.id),
]).ids
domain = [
('parent_state', '=', 'posted'),
('reconciled', '=', False),
('company_id', '=', env.company.id),
('display_type', 'not in', ('line_section', 'line_subsection', 'line_note')),
'|',
('account_id.account_type', 'in', ('asset_receivable', 'liability_payable')),
('account_id', 'in', outstanding_acct_ids),
]
# Try as amount first
try:
amount = float(query.replace('$', '').replace(',', ''))
amount_domain = domain + [
'|',
'&', ('amount_residual', '>=', amount - 0.50), ('amount_residual', '<=', amount + 0.50),
'&', ('amount_residual', '>=', -amount - 0.50), ('amount_residual', '<=', -amount + 0.50),
]
amls = AML.search(amount_domain, limit=15)
if amls:
return {'candidates': _format_aml_candidates(amls)}
except ValueError:
pass
# Search by move name (invoice/bill number)
name_amls = AML.search(domain + [('move_id.name', 'ilike', query)], limit=15)
if name_amls:
return {'candidates': _format_aml_candidates(name_amls)}
# Search by move ref
ref_amls = AML.search(domain + [('move_id.ref', 'ilike', query)], limit=15)
if ref_amls:
return {'candidates': _format_aml_candidates(ref_amls)}
# Search by partner name
partner_amls = AML.search(domain + [('partner_id.name', 'ilike', query)], limit=15)
return {'candidates': _format_aml_candidates(partner_amls)}
def _format_aml_candidates(amls):
"""Format AMLs as candidate dicts for the reconciliation table."""
return [{
'aml_id': aml.id,
'move_id': aml.move_id.id,
'name': aml.move_id.name or '',
'ref': aml.move_id.ref or '',
'partner': aml.partner_id.name if aml.partner_id else '',
'partner_id': aml.partner_id.id if aml.partner_id else None,
'date': str(aml.date),
'amount_total': abs(aml.balance),
'amount_residual': abs(aml.amount_residual),
'account': aml.account_id.code if hasattr(aml.account_id, 'code') else '',
'score': 0,
'reasons': 'manual search',
'in_best_combo': False,
} for aml in amls]
TOOLS = {
'get_unreconciled_bank_lines': get_unreconciled_bank_lines,
'get_unreconciled_receipts': get_unreconciled_receipts,
'match_bank_line_to_payments': match_bank_line_to_payments,
'auto_reconcile_bank_lines': auto_reconcile_bank_lines,
'apply_reconcile_model': apply_reconcile_model,
'unmatch_bank_line': unmatch_bank_line,
'get_reconcile_suggestions': get_reconcile_suggestions,
'sum_payments_by_date': sum_payments_by_date,
'get_bank_line_details': get_bank_line_details,
'check_recurring_pattern': check_recurring_pattern,
'match_internal_transfers': match_internal_transfers,
'find_unreconciled_cheques': find_unreconciled_cheques,
'reconcile_payroll_cheques': reconcile_payroll_cheques,
'suggest_bank_line_matches': suggest_bank_line_matches,
'search_matching_entries': search_matching_entries,
}

View File

@@ -0,0 +1,299 @@
import logging
_logger = logging.getLogger(__name__)
def calculate_hst_balance(env, params):
date_from = params.get('date_from')
date_to = params.get('date_to')
base_domain = [
('parent_state', '=', 'posted'),
('company_id', '=', env.company.id),
]
if date_from:
base_domain.append(('date', '>=', date_from))
if date_to:
base_domain.append(('date', '<=', date_to))
# Odoo 19 Enterprise: account.account may not have company_id field
# (shared chart of accounts). Use try/except to handle both cases.
try:
collected_accounts = env['account.account'].search([
('code', '=like', '2005%'), ('company_ids', 'in', env.company.id),
])
itc_accounts = env['account.account'].search([
('code', '=like', '2006%'), ('company_ids', 'in', env.company.id),
])
except Exception:
collected_accounts = env['account.account'].search([
('code', '=like', '2005%'),
])
itc_accounts = env['account.account'].search([
('code', '=like', '2006%'),
])
collected_lines = env['account.move.line'].search(
base_domain + [('account_id', 'in', collected_accounts.ids)]
)
itc_lines = env['account.move.line'].search(
base_domain + [('account_id', 'in', itc_accounts.ids)]
)
hst_collected = abs(sum(l.balance for l in collected_lines))
itcs = abs(sum(l.balance for l in itc_lines))
return {
'hst_collected': hst_collected,
'input_tax_credits': itcs,
'net_hst': hst_collected - itcs,
'status': 'owing' if (hst_collected - itcs) > 0 else 'refund',
'period': f'{date_from or "all"} to {date_to or "now"}',
}
def get_tax_report(env, params):
report_ref = params.get('report_ref', 'account.generic_tax_report')
try:
report = env.ref(report_ref)
except Exception:
return {'error': f'Report not found: {report_ref}'}
options = report.get_options({
'date': {
'date_from': params.get('date_from', ''),
'date_to': params.get('date_to', ''),
}
})
lines = report._get_lines(options)
return {
'report_name': report.name,
'lines': [{
'name': l.get('name', ''),
'columns': [c.get('no_format', c.get('name', '')) for c in l.get('columns', [])],
} for l in lines[:50]],
}
def find_missing_tax_invoices(env, params):
date_from = params.get('date_from')
date_to = params.get('date_to')
domain = [
('move_type', 'in', ('out_invoice', 'out_refund')),
('state', '=', 'posted'),
('company_id', '=', env.company.id),
]
if date_from:
domain.append(('date', '>=', date_from))
if date_to:
domain.append(('date', '<=', date_to))
invoices = env['account.move'].search(domain)
missing = invoices.filtered(
lambda inv: not any(line.tax_ids for line in inv.invoice_line_ids)
)
return {
'total_invoices': len(invoices),
'missing_tax_count': len(missing),
'invoices': [{
'id': inv.id,
'name': inv.name,
'partner': inv.partner_id.name if inv.partner_id else '',
'amount_total': inv.amount_total,
'date': str(inv.date),
} for inv in missing[:30]],
}
def find_missing_itc_bills(env, params):
date_from = params.get('date_from')
date_to = params.get('date_to')
domain = [
('move_type', 'in', ('in_invoice', 'in_refund')),
('state', '=', 'posted'),
('company_id', '=', env.company.id),
]
if date_from:
domain.append(('date', '>=', date_from))
if date_to:
domain.append(('date', '<=', date_to))
bills = env['account.move'].search(domain)
missing = bills.filtered(
lambda b: not any(line.tax_ids for line in b.invoice_line_ids)
)
return {
'total_bills': len(bills),
'missing_itc_count': len(missing),
'bills': [{
'id': b.id,
'name': b.name,
'partner': b.partner_id.name if b.partner_id else '',
'amount_total': b.amount_total,
'date': str(b.date),
} for b in missing[:30]],
}
def get_tax_return_status(env, params):
try:
AccountReturn = env['account.return']
except KeyError:
return {'error': 'Tax return model (account.return) is not available. The account_tax_report or related Enterprise module may not be installed.'}
returns = AccountReturn.search([
('company_id', '=', env.company.id),
], order='date_start desc', limit=10)
return {
'returns': [{
'id': r.id,
'name': r.display_name,
'date_start': str(r.date_start) if hasattr(r, 'date_start') else '',
'date_end': str(r.date_end) if hasattr(r, 'date_end') else '',
'state': r.state if hasattr(r, 'state') else '',
} for r in returns],
}
def generate_tax_return(env, params):
try:
AccountReturn = env['account.return']
except KeyError:
return {'error': 'Tax return model (account.return) is not available.'}
try:
AccountReturn._generate_or_refresh_all_returns(
company=env.company
)
return {'status': 'generated', 'message': 'Tax returns refreshed successfully.'}
except Exception as e:
return {'error': str(e)}
def validate_tax_return(env, params):
try:
AccountReturn = env['account.return']
except KeyError:
return {'error': 'Tax return model (account.return) is not available.'}
return_id = int(params['return_id'])
tax_return = AccountReturn.browse(return_id)
if not tax_return.exists():
return {'error': 'Tax return not found'}
try:
tax_return.action_validate()
return {'status': 'validated', 'return_id': return_id}
except Exception as e:
return {'error': str(e)}
def create_expense_entry(env, params):
"""[Tier 3] Create a direct GL expense entry in the Misc journal with optional HST split.
This is the 'old school' way of recording expenses without a formal vendor bill.
Requires user approval before execution."""
date = params.get('date', str(env['account.move']._fields['date'].default(env['account.move'])))
description = params.get('description', 'Expense')
expense_account_id = int(params['expense_account_id'])
amount = abs(float(params['amount']))
has_hst = params.get('has_hst', False)
bank_journal_id = int(params.get('bank_journal_id', 0))
# Find the MISC journal
misc_journal = env['account.journal'].search([
('code', '=', 'MISC'), ('company_id', '=', env.company.id),
], limit=1)
if not misc_journal:
return {'error': 'Miscellaneous Operations journal (MISC) not found'}
expense_account = env['account.account'].browse(expense_account_id)
if not expense_account.exists():
return {'error': f'Expense account not found: {expense_account_id}'}
# Determine credit account (bank outstanding or AP)
credit_account = None
if bank_journal_id:
bank_journal = env['account.journal'].browse(bank_journal_id)
if bank_journal.exists():
# Use the bank journal's default debit/credit account
credit_account = (bank_journal.default_account_id
or bank_journal.company_id.account_journal_payment_credit_account_id)
if not credit_account:
# Fallback to AP account
credit_account = env['account.account'].search([
('account_type', '=', 'liability_payable'),
('company_ids', 'in', env.company.id),
], limit=1)
if not credit_account:
return {'error': 'Could not determine credit account for the expense entry'}
line_ids = []
if has_hst:
# Split: net expense + 13% HST ITC
hst_rate = 0.13
net_amount = round(amount / (1 + hst_rate), 2)
hst_amount = round(amount - net_amount, 2)
# Find HST ITC account (2006%)
itc_account = env['account.account'].search([
('code', '=like', '2006%'),
], limit=1)
if not itc_account:
# Fallback: use the HST purchase tax account
hst_tax = env['account.tax'].search([
('type_tax_use', '=', 'purchase'), ('amount', '=', 13.0),
('company_id', '=', env.company.id),
], limit=1)
if hst_tax and hst_tax.invoice_repartition_line_ids:
for rep in hst_tax.invoice_repartition_line_ids:
if rep.repartition_type == 'tax' and rep.account_id:
itc_account = rep.account_id
break
if not itc_account:
return {'error': 'HST ITC account (2006) not found'}
line_ids = [
(0, 0, {'name': description, 'account_id': expense_account_id,
'debit': net_amount, 'credit': 0.0}),
(0, 0, {'name': f'HST ITC - {description}', 'account_id': itc_account.id,
'debit': hst_amount, 'credit': 0.0}),
(0, 0, {'name': description, 'account_id': credit_account.id,
'debit': 0.0, 'credit': amount}),
]
else:
# Simple: debit expense / credit bank
line_ids = [
(0, 0, {'name': description, 'account_id': expense_account_id,
'debit': amount, 'credit': 0.0}),
(0, 0, {'name': description, 'account_id': credit_account.id,
'debit': 0.0, 'credit': amount}),
]
try:
move = env['account.move'].create({
'move_type': 'entry',
'journal_id': misc_journal.id,
'date': date,
'ref': description,
'line_ids': line_ids,
'company_id': env.company.id,
})
move.action_post()
return {
'status': 'posted',
'move_id': move.id,
'move_name': move.name,
'amount': amount,
'has_hst': has_hst,
'hst_amount': round(amount - amount / 1.13, 2) if has_hst else 0.0,
}
except Exception as e:
_logger.error("Failed to create expense entry: %s", e)
return {'error': str(e)}
TOOLS = {
'calculate_hst_balance': calculate_hst_balance,
'get_tax_report': get_tax_report,
'find_missing_tax_invoices': find_missing_tax_invoices,
'find_missing_itc_bills': find_missing_itc_bills,
'get_tax_return_status': get_tax_return_status,
'generate_tax_return': generate_tax_return,
'validate_tax_return': validate_tax_return,
'create_expense_entry': create_expense_entry,
}

View File

@@ -0,0 +1,113 @@
import logging
from odoo import fields
_logger = logging.getLogger(__name__)
def get_stock_valuation(env, params):
accounts = env['account.account'].search([
('code', '=like', '1069%'),
('company_ids', 'in', env.company.id),
])
result = []
for acct in accounts:
balance = sum(env['account.move.line'].search([
('account_id', '=', acct.id),
('parent_state', '=', 'posted'),
]).mapped('balance'))
result.append({'code': acct.code, 'name': acct.name, 'balance': balance})
return {'accounts': result, 'total': sum(r['balance'] for r in result)}
def get_price_differences(env, params):
accounts = env['account.account'].search([
('code', '=like', '5010%'),
('company_ids', 'in', env.company.id),
])
domain = [
('account_id', 'in', accounts.ids),
('parent_state', '=', 'posted'),
('company_id', '=', env.company.id),
]
if params.get('date_from'):
domain.append(('date', '>=', params['date_from']))
if params.get('date_to'):
domain.append(('date', '<=', params['date_to']))
lines = env['account.move.line'].search(domain, order='date desc', limit=50)
return {
'total': sum(l.balance for l in lines),
'entries': [{
'id': l.id, 'date': str(l.date),
'move': l.move_id.name, 'amount': l.balance,
'partner': l.partner_id.name if l.partner_id else '',
} for l in lines],
}
def get_cogs_ratio_by_category(env, params):
date_from = params.get('date_from')
date_to = params.get('date_to')
base_domain = [
('parent_state', '=', 'posted'),
('company_id', '=', env.company.id),
]
if date_from:
base_domain.append(('date', '>=', date_from))
if date_to:
base_domain.append(('date', '<=', date_to))
revenue_lines = env['account.move.line'].search(
base_domain + [('account_id.account_type', '=', 'income')]
)
cogs_lines = env['account.move.line'].search(
base_domain + [('account_id.account_type', '=', 'expense_direct_cost')]
)
revenue = abs(sum(l.balance for l in revenue_lines))
cogs = abs(sum(l.balance for l in cogs_lines))
ratio = (cogs / revenue * 100) if revenue else 0
return {'revenue': revenue, 'cogs': cogs, 'ratio_pct': round(ratio, 2)}
def find_unusual_adjustments(env, params):
threshold = float(params.get('threshold', 1000))
domain = [
('parent_state', '=', 'posted'),
('company_id', '=', env.company.id),
('account_id.account_type', '=', 'expense_direct_cost'),
]
lines = env['account.move.line'].search(domain)
unusual = lines.filtered(lambda l: abs(l.balance) > threshold)
return {
'count': len(unusual),
'adjustments': [{
'id': l.id, 'date': str(l.date), 'move': l.move_id.name,
'amount': l.balance, 'name': l.name or '',
} for l in unusual[:20]],
}
def get_inventory_turnover(env, params):
from .reporting import get_profit_loss
pl = get_profit_loss(env, params)
stock = get_stock_valuation(env, params)
avg_inventory = stock.get('total', 0)
cogs = 0
for line in pl.get('lines', []):
if 'cost' in line.get('name', '').lower():
cols = line.get('columns', [])
if cols:
try:
cogs = float(cols[0])
except (ValueError, TypeError):
pass
turnover = (cogs / avg_inventory) if avg_inventory else 0
return {'cogs': cogs, 'avg_inventory': avg_inventory, 'turnover': round(turnover, 2)}
TOOLS = {
'get_stock_valuation': get_stock_valuation,
'get_price_differences': get_price_differences,
'get_cogs_ratio_by_category': get_cogs_ratio_by_category,
'find_unusual_adjustments': find_unusual_adjustments,
'get_inventory_turnover': get_inventory_turnover,
}

View File

@@ -0,0 +1,220 @@
import logging
from odoo import fields
_logger = logging.getLogger(__name__)
ACCOUNT_TYPE_EXPECTED_DIRECTION = {
'asset_receivable': 'debit',
'asset_cash': 'debit',
'asset_current': 'debit',
'asset_non_current': 'debit',
'asset_prepayments': 'debit',
'asset_fixed': 'debit',
'liability_payable': 'credit',
'liability_credit_card': 'credit',
'liability_current': 'credit',
'liability_non_current': 'credit',
'equity': 'credit',
'equity_unaffected': 'credit',
'income': 'credit',
'income_other': 'credit',
'expense': 'debit',
'expense_depreciation': 'debit',
'expense_direct_cost': 'debit',
'off_balance': None,
}
def find_wrong_direction_balances(env, params):
balance_data = env['account.move.line'].read_group(
[('parent_state', '=', 'posted'), ('company_id', '=', env.company.id)],
['balance:sum'], ['account_id'],
)
acct_ids = [row['account_id'][0] for row in balance_data if row.get('account_id')]
acct_map = {}
if acct_ids:
for acct in env['account.account'].browse(acct_ids):
acct_map[acct.id] = acct
issues = []
for row in balance_data:
if not row.get('account_id'):
continue
acct = acct_map.get(row['account_id'][0])
if not acct:
continue
expected = ACCOUNT_TYPE_EXPECTED_DIRECTION.get(acct.account_type)
if not expected:
continue
balance = row.get('balance', 0) or 0
if (expected == 'debit' and balance < -0.01) or (expected == 'credit' and balance > 0.01):
issues.append({
'account_id': acct.id,
'code': acct.code,
'name': acct.name,
'balance': balance,
'expected': expected,
'actual': 'credit' if balance < 0 else 'debit',
})
return {'count': len(issues), 'issues': issues}
def find_duplicate_entries(env, params):
date_from = params.get('date_from')
date_to = params.get('date_to')
domain = [
('state', '=', 'posted'),
('company_id', '=', env.company.id),
]
if date_from:
domain.append(('date', '>=', date_from))
if date_to:
domain.append(('date', '<=', date_to))
moves = env['account.move'].search(domain, order='partner_id, amount_total, date')
duplicates = []
prev = None
for move in moves:
if prev and (
prev.partner_id == move.partner_id and prev.partner_id
and abs(prev.amount_total - move.amount_total) < 0.01
and prev.date == move.date
and prev.journal_id == move.journal_id
):
duplicates.append({
'entry_1': {'id': prev.id, 'name': prev.name},
'entry_2': {'id': move.id, 'name': move.name},
'partner': move.partner_id.name,
'amount': move.amount_total,
'date': str(move.date),
})
prev = move
return {'count': len(duplicates), 'duplicates': duplicates[:20]}
def find_wrong_account_entries(env, params):
date_from = params.get('date_from')
date_to = params.get('date_to')
domain = [
('parent_state', '=', 'posted'),
('company_id', '=', env.company.id),
]
if date_from:
domain.append(('date', '>=', date_from))
if date_to:
domain.append(('date', '<=', date_to))
issues = []
tax_accounts = env['account.account'].search([
('account_type', 'in', ('liability_current', 'asset_current')),
('code', '=like', '2005%'),
('company_ids', 'in', env.company.id),
])
if tax_accounts:
revenue_on_tax = env['account.move.line'].search(
domain + [
('account_id', 'in', tax_accounts.ids),
('product_id', '!=', False),
]
)
for line in revenue_on_tax[:20]:
issues.append({
'id': line.id,
'move': line.move_id.name,
'account': f'{line.account_id.code} {line.account_id.name}',
'product': line.product_id.name,
'amount': line.balance,
'issue': 'Product line on tax account',
})
return {'count': len(issues), 'issues': issues}
def find_sequence_gaps(env, params):
moves = env['account.move'].search([
('state', '=', 'posted'),
('company_id', '=', env.company.id),
('made_sequence_gap', '=', True),
], order='date desc', limit=50)
return {
'count': len(moves),
'gaps': [{
'id': m.id,
'name': m.name,
'date': str(m.date),
'journal': m.journal_id.name,
} for m in moves],
}
def find_draft_entries(env, params):
min_age_days = int(params.get('min_age_days', 30))
from datetime import timedelta
cutoff = fields.Date.today() - timedelta(days=min_age_days)
drafts = env['account.move'].search([
('state', '=', 'draft'),
('date', '<=', cutoff),
('company_id', '=', env.company.id),
], order='date asc', limit=50)
return {
'count': len(drafts),
'entries': [{
'id': d.id,
'name': d.name or 'Draft',
'date': str(d.date),
'journal': d.journal_id.name,
'amount': d.amount_total,
'partner': d.partner_id.name if d.partner_id else '',
} for d in drafts],
}
def find_unreconciled_suspense(env, params):
suspense_accounts = env['account.account'].search([
('code', '=like', '999%'),
('company_ids', 'in', env.company.id),
])
issues = []
for acct in suspense_accounts:
balance = sum(env['account.move.line'].search([
('account_id', '=', acct.id),
('parent_state', '=', 'posted'),
]).mapped('balance'))
if abs(balance) > 0.01:
issues.append({
'account_id': acct.id,
'code': acct.code,
'name': acct.name,
'balance': balance,
})
return {'count': len(issues), 'accounts': issues}
def verify_reconciliation_integrity(env, params):
partials = env['account.partial.reconcile'].search([
('company_id', '=', env.company.id),
], limit=500)
issues = []
for p in partials:
debit_ok = p.debit_move_id.reconciled or abs(p.debit_move_id.amount_residual) < 0.01
credit_ok = p.credit_move_id.reconciled or abs(p.credit_move_id.amount_residual) < 0.01
if not debit_ok and not credit_ok:
issues.append({
'id': p.id,
'debit_move': p.debit_move_id.move_id.name,
'credit_move': p.credit_move_id.move_id.name,
'amount': p.amount,
'debit_residual': p.debit_move_id.amount_residual,
'credit_residual': p.credit_move_id.amount_residual,
})
return {'count': len(issues), 'issues': issues[:20]}
TOOLS = {
'find_wrong_direction_balances': find_wrong_direction_balances,
'find_duplicate_entries': find_duplicate_entries,
'find_wrong_account_entries': find_wrong_account_entries,
'find_sequence_gaps': find_sequence_gaps,
'find_draft_entries': find_draft_entries,
'find_unreconciled_suspense': find_unreconciled_suspense,
'verify_reconciliation_integrity': verify_reconciliation_integrity,
}

View File

@@ -0,0 +1,130 @@
import logging
from odoo import fields
_logger = logging.getLogger(__name__)
def get_close_checklist(env, params):
from .bank_reconciliation import get_unreconciled_bank_lines
from .journal_review import find_draft_entries, find_sequence_gaps
from .hst_management import calculate_hst_balance
period = params.get('period', str(fields.Date.today())[:7])
date_from = f'{period}-01'
import calendar
year, month = int(period[:4]), int(period[5:7])
last_day = calendar.monthrange(year, month)[1]
date_to = f'{period}-{last_day:02d}'
p = {'date_from': date_from, 'date_to': date_to}
bank = get_unreconciled_bank_lines(env, p)
drafts = find_draft_entries(env, {'min_age_days': '0'})
gaps = find_sequence_gaps(env, p)
hst = calculate_hst_balance(env, p)
checklist = [
{'item': 'Bank Reconciliation', 'status': 'ok' if bank['count'] == 0 else 'attention', 'detail': f"{bank['count']} unreconciled lines"},
{'item': 'Draft Entries', 'status': 'ok' if drafts['count'] == 0 else 'attention', 'detail': f"{drafts['count']} draft entries"},
{'item': 'Sequence Gaps', 'status': 'ok' if gaps['count'] == 0 else 'warning', 'detail': f"{gaps['count']} gaps found"},
{'item': 'HST Balance', 'status': 'info', 'detail': f"Net HST: ${hst['net_hst']:.2f}"},
]
return {'period': period, 'checklist': checklist}
def get_unreconciled_counts(env, params):
accounts = env['account.account'].search([
('reconcile', '=', True),
('company_ids', 'in', env.company.id),
])
result = []
for acct in accounts:
count = env['account.move.line'].search_count([
('account_id', '=', acct.id),
('reconciled', '=', False),
('parent_state', '=', 'posted'),
])
if count > 0:
result.append({
'account_id': acct.id,
'code': acct.code,
'name': acct.name,
'unreconciled_count': count,
})
return {'accounts': sorted(result, key=lambda x: -x['unreconciled_count'])}
def find_entries_in_locked_period(env, params):
company = env.company
lock_date = company.fiscalyear_lock_date
if not lock_date:
return {'status': 'no_lock_date', 'entries': []}
entries = env['account.move'].search([
('date', '<=', lock_date),
('state', '=', 'draft'),
('company_id', '=', company.id),
])
return {
'lock_date': str(lock_date),
'count': len(entries),
'entries': [{'id': e.id, 'name': e.name, 'date': str(e.date)} for e in entries[:20]],
}
def get_accrual_status(env, params):
accrual_codes = params.get('account_codes', ['2100', '2110', '2120'])
result = []
for code in accrual_codes:
accounts = env['account.account'].search([
('code', '=like', f'{code}%'),
('company_ids', 'in', env.company.id),
])
for acct in accounts:
balance = sum(env['account.move.line'].search([
('account_id', '=', acct.id),
('parent_state', '=', 'posted'),
]).mapped('balance'))
result.append({'code': acct.code, 'name': acct.name, 'balance': balance})
return {'accruals': result}
def run_hash_integrity_check(env, params):
try:
result = env.company._check_hash_integrity()
return {
'status': 'completed',
'results': result.get('results', []),
'printing_date': result.get('printing_date', ''),
}
except Exception as e:
return {'error': str(e)}
def get_period_summary(env, params):
date_from = params.get('date_from')
date_to = params.get('date_to')
try:
report = env.ref('account_reports.trial_balance_report')
except Exception:
report = env.ref('account.trial_balance_report', raise_if_not_found=False)
if not report:
return {'error': 'Trial balance report not found'}
options = report.get_options({'date': {'date_from': date_from, 'date_to': date_to}})
lines = report._get_lines(options)
return {
'period': f'{date_from} to {date_to}',
'lines': [{
'name': l.get('name', ''),
'columns': [c.get('no_format', c.get('name', '')) for c in l.get('columns', [])],
} for l in lines[:100]],
}
TOOLS = {
'get_close_checklist': get_close_checklist,
'get_unreconciled_counts': get_unreconciled_counts,
'find_entries_in_locked_period': find_entries_in_locked_period,
'get_accrual_status': get_accrual_status,
'run_hash_integrity_check': run_hash_integrity_check,
'get_period_summary': get_period_summary,
}

View File

@@ -0,0 +1,256 @@
import logging
from odoo import fields
_logger = logging.getLogger(__name__)
def get_payroll_entries(env, params):
payroll_journals = env['account.journal'].search([
('name', 'ilike', 'payroll'),
('company_id', '=', env.company.id),
])
if not payroll_journals and params.get('journal_id'):
payroll_journals = env['account.journal'].browse(int(params['journal_id']))
domain = [
('journal_id', 'in', payroll_journals.ids),
('state', '=', 'posted'),
('company_id', '=', env.company.id),
]
if params.get('date_from'):
domain.append(('date', '>=', params['date_from']))
if params.get('date_to'):
domain.append(('date', '<=', params['date_to']))
entries = env['account.move'].search(domain, order='date desc', limit=50)
return {
'count': len(entries),
'entries': [{
'id': e.id, 'name': e.name, 'date': str(e.date),
'amount': e.amount_total, 'ref': e.ref or '',
} for e in entries],
}
def compare_payroll_to_bank(env, params):
date_from = params.get('date_from')
date_to = params.get('date_to')
if not date_from or not date_to:
return {'error': 'date_from and date_to are required'}
payroll_journals = env['account.journal'].search([
('name', 'ilike', 'payroll'), ('company_id', '=', env.company.id),
])
payroll_entries = env['account.move'].search([
('journal_id', 'in', payroll_journals.ids),
('state', '=', 'posted'),
('date', '>=', date_from), ('date', '<=', date_to),
])
bank_lines = env['account.bank.statement.line'].search([
('date', '>=', date_from), ('date', '<=', date_to),
('company_id', '=', env.company.id),
])
payroll_total = sum(e.amount_total for e in payroll_entries)
bank_payroll = sum(abs(l.amount) for l in bank_lines if 'payroll' in (l.payment_ref or '').lower())
return {
'payroll_journal_total': payroll_total,
'bank_payroll_total': bank_payroll,
'difference': payroll_total - bank_payroll,
}
def verify_source_deductions(env, params):
return {
'status': 'info',
'message': 'Source deduction verification requires CRA rate tables. Use fusion_payroll for full verification.',
}
def get_cra_remittance_status(env, params):
cra_accounts = env['account.account'].search([
('name', 'ilike', 'CRA'),
('company_ids', 'in', env.company.id),
])
result = []
for acct in cra_accounts:
balance = sum(env['account.move.line'].search([
('account_id', '=', acct.id),
('parent_state', '=', 'posted'),
]).mapped('balance'))
result.append({'code': acct.code, 'name': acct.name, 'balance': balance})
return {'accounts': result}
def find_unmatched_payroll_cheques(env, params):
bank_lines = env['account.bank.statement.line'].search([
('is_reconciled', '=', False),
('company_id', '=', env.company.id),
('payment_ref', 'ilike', 'cheque'),
])
return {
'count': len(bank_lines),
'cheques': [{
'id': l.id, 'date': str(l.date),
'ref': l.payment_ref, 'amount': l.amount,
} for l in bank_lines[:30]],
}
def parse_payroll_summary(env, params):
import re
raw_data = params.get('data', '')
if not raw_data:
return {'error': 'No payroll data provided'}
lines = raw_data.strip().split('\n')
entries = []
totals = {'gross': 0, 'cpp': 0, 'ei': 0, 'tax': 0, 'net': 0}
for line in lines:
amounts = re.findall(r'\$?([\d,]+\.?\d*)', line)
if len(amounts) >= 2:
name_part = re.sub(r'\$?[\d,]+\.?\d*', '', line).strip(' \t,|-')
parsed_amounts = [float(a.replace(',', '')) for a in amounts]
entry = {'name': name_part or 'Employee', 'amounts': parsed_amounts}
if len(parsed_amounts) >= 5:
entry.update({
'gross': parsed_amounts[0],
'cpp': parsed_amounts[1],
'ei': parsed_amounts[2],
'tax': parsed_amounts[3],
'net': parsed_amounts[4] if len(parsed_amounts) > 4 else parsed_amounts[0] - sum(parsed_amounts[1:4]),
})
for k in ('gross', 'cpp', 'ei', 'tax', 'net'):
totals[k] += entry.get(k, 0)
entries.append(entry)
return {
'status': 'parsed',
'employee_count': len(entries),
'entries': entries,
'totals': totals,
'raw_lines': len(lines),
}
def _resolve_account_id(env, val):
"""Resolve an account code or ID to a valid account ID.
Accepts: integer ID, string ID, or account code string like '2201'."""
if not val:
return False
val_str = str(val).strip()
# Try as a direct ID first
try:
acct = env['account.account'].browse(int(val_str))
if acct.exists():
return acct.id
except (ValueError, TypeError):
pass
# Try as an account code
acct = env['account.account'].search([
('code', '=', val_str),
('company_ids', 'in', env.company.id),
], limit=1)
if acct:
return acct.id
return False
def create_payroll_journal_entry(env, params):
journal_id = int(params['journal_id'])
date = params['date']
ref = params.get('ref', 'Payroll Entry')
lines_data = params['lines']
# Duplicate check: same journal + date + ref + similar amount
total_debit = sum(float(l.get('debit', 0)) for l in lines_data)
existing = env['account.move'].search([
('journal_id', '=', journal_id),
('date', '=', date),
('ref', 'ilike', ref[:30]),
('state', 'in', ('draft', 'posted')),
], limit=1)
if existing:
return {
'status': 'duplicate',
'error': f'Entry already exists: {existing.name} (ref: {existing.ref}) on {existing.date} '
f'for ${existing.amount_total:,.2f}. Skipping to avoid duplicate.',
'existing_move_id': existing.id,
'existing_name': existing.name,
}
# Resolve account codes to IDs
resolved_lines = []
for line in lines_data:
account_id = _resolve_account_id(env, line['account_id'])
if not account_id:
return {'error': f"Account not found: {line['account_id']}. "
f"Provide a valid account code (e.g. '2201') or database ID."}
resolved_lines.append((0, 0, {
'account_id': account_id,
'name': line.get('name', 'Payroll'),
'debit': float(line.get('debit', 0)),
'credit': float(line.get('credit', 0)),
'partner_id': int(line['partner_id']) if line.get('partner_id') else False,
}))
move_vals = {
'journal_id': journal_id,
'date': date,
'ref': ref,
'line_ids': resolved_lines,
}
move = env['account.move'].create(move_vals)
return {'status': 'created', 'move_id': move.id, 'name': move.name}
def get_payroll_schedule(env, params):
return {'status': 'info', 'message': 'Payroll schedule available via fusion_payroll module.'}
def match_payroll_cheques(env, params):
st_line_id = int(params['statement_line_id'])
move_line_ids = [int(x) for x in params['move_line_ids']]
st_line = env['account.bank.statement.line'].browse(st_line_id)
st_line.set_line_bank_statement_line(move_line_ids)
return {'status': 'matched', 'statement_line_id': st_line_id}
def verify_payroll_deductions(env, params):
return verify_source_deductions(env, params)
def get_cra_remittance_due(env, params):
return get_cra_remittance_status(env, params)
def prepare_cra_payment(env, params):
return create_payroll_journal_entry(env, params)
def generate_t4(env, params):
return {'status': 'info', 'message': 'T4 generation available via fusion_payroll module.'}
def generate_roe(env, params):
return {'status': 'info', 'message': 'ROE generation available via fusion_payroll module.'}
def get_payroll_cost_report(env, params):
return get_payroll_entries(env, params)
TOOLS = {
'get_payroll_entries': get_payroll_entries,
'compare_payroll_to_bank': compare_payroll_to_bank,
'verify_source_deductions': verify_source_deductions,
'get_cra_remittance_status': get_cra_remittance_status,
'find_unmatched_payroll_cheques': find_unmatched_payroll_cheques,
'parse_payroll_summary': parse_payroll_summary,
'create_payroll_journal_entry': create_payroll_journal_entry,
'get_payroll_schedule': get_payroll_schedule,
'match_payroll_cheques': match_payroll_cheques,
'verify_payroll_deductions': verify_payroll_deductions,
'get_cra_remittance_due': get_cra_remittance_due,
'prepare_cra_payment': prepare_cra_payment,
'generate_t4': generate_t4,
'generate_roe': generate_roe,
'get_payroll_cost_report': get_payroll_cost_report,
}

View File

@@ -0,0 +1,285 @@
import logging
import base64
_logger = logging.getLogger(__name__)
def _get_report(env, ref_id):
try:
return env.ref(ref_id)
except Exception:
return None
def _run_report(env, report_ref, params):
report = _get_report(env, report_ref)
if not report:
return {'error': f'Report {report_ref} not found'}
date_opts = {}
if params.get('date_from'):
date_opts['date_from'] = params['date_from']
if params.get('date_to'):
date_opts['date_to'] = params['date_to']
options = report.get_options({'date': date_opts} if date_opts else {})
lines = report._get_lines(options)
return {
'report_name': report.name,
'lines': [{
'name': l.get('name', ''),
'level': l.get('level', 0),
'columns': [c.get('no_format', c.get('name', '')) for c in l.get('columns', [])],
} for l in lines[:100]],
}
def get_profit_loss(env, params):
return _run_report(env, 'account_reports.profit_and_loss', params)
def get_balance_sheet(env, params):
return _run_report(env, 'account_reports.balance_sheet', params)
def get_trial_balance(env, params):
return _run_report(env, 'account_reports.trial_balance_report', params)
def get_cash_flow(env, params):
return _run_report(env, 'account_reports.cash_flow_statement', params)
def compare_periods(env, params):
report_ref = params.get('report_ref', 'account_reports.profit_and_loss')
report = _get_report(env, report_ref)
if not report:
return {'error': f'Report {report_ref} not found'}
period1 = _run_report(env, report_ref, {
'date_from': params.get('period1_from'),
'date_to': params.get('period1_to'),
})
period2 = _run_report(env, report_ref, {
'date_from': params.get('period2_from'),
'date_to': params.get('period2_to'),
})
return {'period_1': period1, 'period_2': period2}
def answer_financial_question(env, params):
question = params.get('question', '')
sql_query = params.get('sql_query')
if sql_query:
return {'error': 'Direct SQL not permitted. Use report tools instead.'}
return {'status': 'info', 'message': f'Use specific report tools to answer: {question}'}
def export_report(env, params):
report_ref = params.get('report_ref', 'account_reports.profit_and_loss')
fmt = params.get('format', 'pdf')
report = _get_report(env, report_ref)
if not report:
return {'error': f'Report {report_ref} not found'}
date_opts = {}
if params.get('date_from'):
date_opts['date_from'] = params['date_from']
if params.get('date_to'):
date_opts['date_to'] = params['date_to']
options = report.get_options({'date': date_opts} if date_opts else {})
try:
if fmt == 'xlsx':
result = report.dispatch_report_action(options, 'export_to_xlsx')
else:
result = report.dispatch_report_action(options, 'export_to_pdf')
if isinstance(result, dict) and result.get('file_content'):
return {
'file_name': result.get('file_name', f'report.{fmt}'),
'file_type': result.get('file_type', fmt),
'file_content_b64': base64.b64encode(result['file_content']).decode(),
}
return {
'status': 'generated',
'message': f'Report exported as {fmt}. Use the Odoo UI to download.',
}
except Exception as e:
return {'error': f'Export failed: {str(e)}'}
def get_invoicing_summary(env, params):
"""Get invoicing summary — total invoiced by month, by partner, or for a date range.
Supports: monthly breakdown for a year, current month totals, or filtered by partner."""
from datetime import date, timedelta
import calendar
year = int(params.get('year', date.today().year))
partner_name = params.get('partner_name')
date_from = params.get('date_from')
date_to = params.get('date_to')
domain = [
('move_type', '=', 'out_invoice'),
('state', '=', 'posted'),
('company_id', '=', env.company.id),
]
if partner_name:
partner = env['res.partner'].search([('name', 'ilike', partner_name)], limit=1)
if partner:
domain.append(('partner_id', '=', partner.id))
else:
return {'error': f'Partner not found: {partner_name}'}
if date_from and date_to:
domain += [('date', '>=', date_from), ('date', '<=', date_to)]
invoices = env['account.move'].search(domain, order='date desc')
total = sum(inv.amount_total for inv in invoices)
return {
'period': f'{date_from} to {date_to}',
'count': len(invoices),
'total': total,
'invoices': [{
'id': inv.id, 'name': inv.name, 'partner': inv.partner_id.name,
'date': str(inv.date), 'amount': inv.amount_total,
'payment_state': inv.payment_state,
} for inv in invoices[:30]],
}
# Monthly breakdown for the year
months = []
grand_total = 0
for month in range(1, 13):
m_start = f'{year}-{month:02d}-01'
last_day = calendar.monthrange(year, month)[1]
m_end = f'{year}-{month:02d}-{last_day}'
m_domain = domain + [('date', '>=', m_start), ('date', '<=', m_end)]
invoices = env['account.move'].search(m_domain)
total = sum(inv.amount_total for inv in invoices)
grand_total += total
months.append({
'month': f'{year}-{month:02d}',
'month_name': calendar.month_name[month],
'count': len(invoices),
'total': round(total, 2),
})
return {
'year': year,
'grand_total': round(grand_total, 2),
'months': months,
'partner': partner_name or 'All',
}
def get_billing_summary(env, params):
"""Get billing (vendor bills) summary — total billed by month or date range."""
from datetime import date
import calendar
year = int(params.get('year', date.today().year))
partner_name = params.get('partner_name')
date_from = params.get('date_from')
date_to = params.get('date_to')
domain = [
('move_type', '=', 'in_invoice'),
('state', '=', 'posted'),
('company_id', '=', env.company.id),
]
if partner_name:
partner = env['res.partner'].search([('name', 'ilike', partner_name)], limit=1)
if partner:
domain.append(('partner_id', '=', partner.id))
else:
return {'error': f'Partner not found: {partner_name}'}
if date_from and date_to:
domain += [('date', '>=', date_from), ('date', '<=', date_to)]
bills = env['account.move'].search(domain, order='date desc')
total = sum(b.amount_total for b in bills)
return {
'period': f'{date_from} to {date_to}',
'count': len(bills),
'total': total,
'bills': [{
'id': b.id, 'name': b.name, 'partner': b.partner_id.name,
'date': str(b.date), 'amount': b.amount_total,
'payment_state': b.payment_state,
} for b in bills[:30]],
}
# Monthly breakdown
months = []
grand_total = 0
for month in range(1, 13):
m_start = f'{year}-{month:02d}-01'
last_day = calendar.monthrange(year, month)[1]
m_end = f'{year}-{month:02d}-{last_day}'
m_domain = domain + [('date', '>=', m_start), ('date', '<=', m_end)]
bills = env['account.move'].search(m_domain)
total = sum(b.amount_total for b in bills)
grand_total += total
months.append({
'month': f'{year}-{month:02d}',
'month_name': calendar.month_name[month],
'count': len(bills),
'total': round(total, 2),
})
return {
'year': year,
'grand_total': round(grand_total, 2),
'months': months,
'partner': partner_name or 'All',
}
def get_collections_summary(env, params):
"""Get payment collections summary — how much was collected (received) in a period."""
date_from = params.get('date_from')
date_to = params.get('date_to')
if not date_from or not date_to:
from datetime import date
today = date.today()
date_from = date_from or f'{today.year}-{today.month:02d}-01'
date_to = date_to or str(today)
payments = env['account.payment'].search([
('payment_type', '=', 'inbound'),
('state', '=', 'posted'),
('date', '>=', date_from),
('date', '<=', date_to),
('company_id', '=', env.company.id),
], order='date desc')
total = sum(p.amount for p in payments)
by_partner = {}
for p in payments:
pname = p.partner_id.name if p.partner_id else 'Unknown'
by_partner.setdefault(pname, {'count': 0, 'total': 0})
by_partner[pname]['count'] += 1
by_partner[pname]['total'] += p.amount
top_partners = sorted(by_partner.items(), key=lambda x: -x[1]['total'])[:15]
return {
'period': f'{date_from} to {date_to}',
'total_collected': round(total, 2),
'payment_count': len(payments),
'by_partner': [{'partner': k, 'count': v['count'], 'total': round(v['total'], 2)} for k, v in top_partners],
}
TOOLS = {
'get_profit_loss': get_profit_loss,
'get_balance_sheet': get_balance_sheet,
'get_trial_balance': get_trial_balance,
'get_cash_flow': get_cash_flow,
'compare_periods': compare_periods,
'answer_financial_question': answer_financial_question,
'export_report': export_report,
'get_invoicing_summary': get_invoicing_summary,
'get_billing_summary': get_billing_summary,
'get_collections_summary': get_collections_summary,
}