import logging from odoo import fields _logger = logging.getLogger(__name__) def audit_posted_entry(env, params): move_id = int(params['move_id']) move = env['account.move'].browse(move_id) if not move.exists(): return {'error': 'Entry not found'} issues = [] total_debit = sum(l.debit for l in move.line_ids) total_credit = sum(l.credit for l in move.line_ids) if abs(total_debit - total_credit) > 0.01: issues.append({'severity': 'critical', 'issue': f'Unbalanced entry: debit={total_debit}, credit={total_credit}'}) for line in move.line_ids: if not line.account_id: issues.append({'severity': 'critical', 'issue': f'Line missing account: {line.name}'}) if not move.line_ids: issues.append({'severity': 'warning', 'issue': 'Entry has no lines'}) return { 'move': move.name, 'date': str(move.date), 'issues': issues, 'clean': len(issues) == 0, } def audit_account_balances(env, params): from .journal_review import find_wrong_direction_balances return find_wrong_direction_balances(env, params) def audit_tax_compliance(env, params): from .hst_management import find_missing_tax_invoices, find_missing_itc_bills invoices = find_missing_tax_invoices(env, params) bills = find_missing_itc_bills(env, params) return { 'missing_tax_invoices': invoices.get('missing_tax_count', 0), 'missing_itc_bills': bills.get('missing_itc_count', 0), 'total_issues': invoices.get('missing_tax_count', 0) + bills.get('missing_itc_count', 0), } def audit_reconciliation_integrity(env, params): from .journal_review import verify_reconciliation_integrity return verify_reconciliation_integrity(env, params) def check_hash_chain(env, params): from .month_end import run_hash_integrity_check return run_hash_integrity_check(env, params) def check_sequence_gaps(env, params): from .journal_review import find_sequence_gaps return find_sequence_gaps(env, params) def flag_entry(env, params): move_id = int(params['move_id']) flag = params.get('flag', 'Review Required') recommendation = params.get('recommendation', '') move = env['account.move'].browse(move_id) if not move.exists(): return {'error': 'Entry not found'} body = f'🏴 {flag}
{recommendation}' move.message_post(body=body, message_type='comment', subtype_xmlid='mail.mt_note') return {'status': 'flagged', 'move': move.name, 'flag': flag} def get_audit_status(env, params): try: AuditStatus = env['account.audit.account.status'] except KeyError: return {'error': 'Audit status model (account.audit.account.status) is not available. The account_audit Enterprise module may not be installed.'} statuses = AuditStatus.search([]) return { 'statuses': [{ 'id': s.id, 'account': s.account_id.name, 'status': s.status, 'audit': s.audit_id.display_name if s.audit_id else '', } for s in statuses[:50]], } def set_audit_status(env, params): try: AuditStatus = env['account.audit.account.status'] except KeyError: return {'error': 'Audit status model (account.audit.account.status) is not available. The account_audit Enterprise module may not be installed.'} status_id = int(params['status_id']) new_status = params['status'] rec = AuditStatus.browse(status_id) if not rec.exists(): return {'error': 'Audit status record not found'} rec.status = new_status return {'status': 'updated', 'id': status_id, 'new_status': new_status} def get_audit_trail(env, params): move_id = int(params['move_id']) move = env['account.move'].browse(move_id) if not move.exists(): return {'error': 'Entry not found'} messages = env['mail.message'].search([ ('model', '=', 'account.move'), ('res_id', '=', move_id), ], order='date desc', limit=20) return { 'move': move.name, 'messages': [{ 'date': str(m.date), 'author': m.author_id.name if m.author_id else '', 'body': m.body or '', 'type': m.message_type, } for m in messages], } def run_full_audit(env, params): results = {} results['account_balances'] = audit_account_balances(env, params) results['tax_compliance'] = audit_tax_compliance(env, params) results['reconciliation'] = audit_reconciliation_integrity(env, params) results['hash_chain'] = check_hash_chain(env, params) results['sequence_gaps'] = check_sequence_gaps(env, params) total_issues = 0 for key, val in results.items(): total_issues += val.get('count', 0) + val.get('total_issues', 0) score = max(0, 100 - total_issues * 5) return { 'score': min(100, score), 'total_issues': total_issues, 'details': results, } def get_audit_report(env, params): audit = run_full_audit(env, params) report_lines = [f"Audit Score: {audit['score']}/100", f"Total Issues: {audit['total_issues']}", ''] for domain, detail in audit.get('details', {}).items(): report_lines.append(f"--- {domain.replace('_', ' ').title()} ---") count = detail.get('count', detail.get('total_issues', 0)) report_lines.append(f" Issues: {count}") return {'report': '\n'.join(report_lines), 'score': audit['score']} TOOLS = { 'audit_posted_entry': audit_posted_entry, 'audit_account_balances': audit_account_balances, 'audit_tax_compliance': audit_tax_compliance, 'audit_reconciliation_integrity': audit_reconciliation_integrity, 'check_hash_chain': check_hash_chain, 'check_sequence_gaps': check_sequence_gaps, 'flag_entry': flag_entry, 'get_audit_status': get_audit_status, 'set_audit_status': set_audit_status, 'get_audit_trail': get_audit_trail, 'run_full_audit': run_full_audit, 'get_audit_report': get_audit_report, }