# Fusion Accounting - Generic Tax Report Handlers # Base tax-report handler, generic handler, and account/tax grouping variants import ast from collections import defaultdict from odoo import models, api, fields, Command, _ from odoo.addons.web.controllers.utils import clean_action from odoo.exceptions import UserError, RedirectWarning from odoo.osv import expression from odoo.tools import SQL class FusionTaxReportHandler(models.AbstractModel): """Base handler providing the Closing Entry button and tax-period configuration for all tax reports (generic and country-specific).""" _name = 'account.tax.report.handler' _inherit = 'account.report.custom.handler' _description = 'Account Report Handler for Tax Reports' def _custom_options_initializer(self, report, options, previous_options): period_type_map = {'monthly': 'month', 'trimester': 'quarter', 'year': 'year'} options['buttons'].append({ 'name': _('Closing Entry'), 'action': 'action_periodic_vat_entries', 'sequence': 110, 'always_show': True, }) self._enable_export_buttons_for_common_vat_groups_in_branches(options) start_day, start_month = self.env.company._get_tax_closing_start_date_attributes(report) tax_period = self.env.company._get_tax_periodicity(report) options['tax_periodicity'] = { 'periodicity': tax_period, 'months_per_period': self.env.company._get_tax_periodicity_months_delay(report), 'start_day': start_day, 'start_month': start_month, } options['show_tax_period_filter'] = ( tax_period not in period_type_map or start_day != 1 or start_month != 1 ) if not options['show_tax_period_filter']: std_period = period_type_map[tax_period] options['date']['filter'] = options['date']['filter'].replace('tax_period', std_period) options['date']['period_type'] = options['date']['period_type'].replace('tax_period', std_period) def _get_custom_display_config(self): cfg = defaultdict(dict) cfg['templates']['AccountReportFilters'] = 'fusion_accounting.GenericTaxReportFiltersCustomizable' return cfg def _customize_warnings(self, report, options, all_column_groups_expression_totals, warnings): if 'fusion_accounting.common_warning_draft_in_period' in warnings: has_non_closing_drafts = self.env['account.move'].search_count([ ('state', '=', 'draft'), ('date', '<=', options['date']['date_to']), ('tax_closing_report_id', '=', False), ], limit=1) if not has_non_closing_drafts: warnings.pop('fusion_accounting.common_warning_draft_in_period') qry = report._get_report_query(options, 'strict_range') inactive_rows = self.env.execute_query(SQL(""" SELECT 1 FROM %s JOIN account_account_tag_account_move_line_rel aml_tag ON account_move_line.id = aml_tag.account_move_line_id JOIN account_account_tag tag ON aml_tag.account_account_tag_id = tag.id WHERE %s AND NOT tag.active LIMIT 1 """, qry.from_clause, qry.where_clause)) if inactive_rows: warnings['fusion_accounting.tax_report_warning_inactive_tags'] = {} # ================================================================ # TAX CLOSING # ================================================================ def _is_period_equal_to_options(self, report, options): opt_to = fields.Date.from_string(options['date']['date_to']) opt_from = fields.Date.from_string(options['date']['date_from']) boundary_from, boundary_to = self.env.company._get_tax_closing_period_boundaries(opt_to, report) return boundary_from == opt_from and boundary_to == opt_to def action_periodic_vat_entries(self, options, from_post=False): report = self.env['account.report'].browse(options['report_id']) if ( options['date']['period_type'] != 'tax_period' and not self._is_period_equal_to_options(report, options) and not self.env.context.get('override_tax_closing_warning') ): if len(options['companies']) > 1 and ( report.filter_multi_company != 'tax_units' or not (report.country_id and options['available_tax_units']) ): warning_msg = _( "You're about to generate closing entries for multiple companies. " "Each will follow its own tax periodicity." ) else: warning_msg = _( "The selected dates don't match a tax period. The closing entry " "will target the closest matching period." ) return { 'type': 'ir.actions.client', 'tag': 'fusion_accounting.redirect_action', 'target': 'new', 'params': { 'depending_action': self.with_context( override_tax_closing_warning=True, ).action_periodic_vat_entries(options), 'message': warning_msg, 'button_text': _("Proceed"), }, 'context': {'dialog_size': 'medium', 'override_tax_closing_warning': True}, } generated_moves = self._get_periodic_vat_entries(options, from_post=from_post) action = self.env["ir.actions.actions"]._for_xml_id("account.action_move_journal_line") action = clean_action(action, env=self.env) action.pop('domain', None) if len(generated_moves) == 1: action['views'] = [(self.env.ref('account.view_move_form').id, 'form')] action['res_id'] = generated_moves.id else: action['domain'] = [('id', 'in', generated_moves.ids)] action['context'] = dict(ast.literal_eval(action['context'])) action['context'].pop('search_default_posted', None) return action def _get_periodic_vat_entries(self, options, from_post=False): report = self.env['account.report'].browse(options['report_id']) if options.get('integer_rounding'): options['integer_rounding_enabled'] = True result_moves = self.env['account.move'] company_set = self.env['res.company'].browse(report.get_report_company_ids(options)) existing = self._get_tax_closing_entries_for_closed_period( report, options, company_set, posted_only=False, ) result_moves += existing result_moves += self._generate_tax_closing_entries( report, options, companies=company_set - existing.company_id, from_post=from_post, ) return result_moves def _generate_tax_closing_entries(self, report, options, closing_moves=None, companies=None, from_post=False): if companies is None: companies = self.env['res.company'].browse(report.get_report_company_ids(options)) if closing_moves is None: closing_moves = self.env['account.move'] period_end = fields.Date.from_string(options['date']['date_to']) moves_by_company = defaultdict(lambda: self.env['account.move']) remaining_cos = companies.filtered(lambda c: c not in closing_moves.company_id) if closing_moves: for mv in closing_moves.filtered(lambda m: m.state == 'draft'): moves_by_company[mv.company_id] |= mv for co in remaining_cos: include_dom, fpos_set = self._get_fpos_info_for_tax_closing(co, report, options) co_moves = co._get_and_update_tax_closing_moves( period_end, report, fiscal_positions=fpos_set, include_domestic=include_dom, ) moves_by_company[co] = co_moves closing_moves += co_moves for co, co_moves in moves_by_company.items(): countries = self.env['res.country'] for mv in co_moves: if mv.fiscal_position_id.foreign_vat: countries |= mv.fiscal_position_id.country_id else: countries |= co.account_fiscal_country_id if self.env['account.tax.group']._check_misconfigured_tax_groups(co, countries): self._redirect_to_misconfigured_tax_groups(co, countries) for mv in co_moves: if from_post and mv == moves_by_company.get(self.env.company): continue mv_opts = { **options, 'fiscal_position': mv.fiscal_position_id.id if mv.fiscal_position_id else 'domestic', } line_cmds, tg_subtotals = self._compute_vat_closing_entry(co, mv_opts) line_cmds += self._add_tax_group_closing_items(tg_subtotals, mv) if mv.line_ids: line_cmds += [Command.delete(aml.id) for aml in mv.line_ids] if line_cmds: mv.write({'line_ids': line_cmds}) return closing_moves def _get_tax_closing_entries_for_closed_period(self, report, options, companies, posted_only=True): found = self.env['account.move'] for co in companies: _s, p_end = co._get_tax_closing_period_boundaries( fields.Date.from_string(options['date']['date_to']), report, ) inc_dom, fpos = self._get_fpos_info_for_tax_closing(co, report, options) fpos_ids = fpos.ids + ([False] if inc_dom else []) state_cond = ('state', '=', 'posted') if posted_only else ('state', '!=', 'cancel') found += self.env['account.move'].search([ ('company_id', '=', co.id), ('fiscal_position_id', 'in', fpos_ids), ('date', '=', p_end), ('tax_closing_report_id', '=', options['report_id']), state_cond, ], limit=1) return found @api.model def _compute_vat_closing_entry(self, company, options): self = self.with_company(company) self.env['account.tax'].flush_model(['name', 'tax_group_id']) self.env['account.tax.repartition.line'].flush_model(['use_in_tax_closing']) self.env['account.move.line'].flush_model([ 'account_id', 'debit', 'credit', 'move_id', 'tax_line_id', 'date', 'company_id', 'display_type', 'parent_state', ]) self.env['account.move'].flush_model(['state']) adjusted_opts = {**options, 'all_entries': False, 'date': dict(options['date'])} report = self.env['account.report'].browse(options['report_id']) p_start, p_end = company._get_tax_closing_period_boundaries( fields.Date.from_string(options['date']['date_to']), report, ) adjusted_opts['date']['date_from'] = fields.Date.to_string(p_start) adjusted_opts['date']['date_to'] = fields.Date.to_string(p_end) adjusted_opts['date']['period_type'] = 'custom' adjusted_opts['date']['filter'] = 'custom' adjusted_opts = report.with_context( allowed_company_ids=company.ids, ).get_options(previous_options=adjusted_opts) adjusted_opts['fiscal_position'] = options['fiscal_position'] qry = self.env.ref('account.generic_tax_report')._get_report_query( adjusted_opts, 'strict_range', domain=self._get_vat_closing_entry_additional_domain(), ) tax_name_expr = self.env['account.tax']._field_to_sql('tax', 'name') stmt = SQL(""" SELECT "account_move_line".tax_line_id as tax_id, tax.tax_group_id as tax_group_id, %(tax_name)s as tax_name, "account_move_line".account_id, COALESCE(SUM("account_move_line".balance), 0) as amount FROM account_tax tax, account_tax_repartition_line repartition, %(tbl)s WHERE %(where)s AND tax.id = "account_move_line".tax_line_id AND repartition.id = "account_move_line".tax_repartition_line_id AND repartition.use_in_tax_closing GROUP BY tax.tax_group_id, "account_move_line".tax_line_id, tax.name, "account_move_line".account_id """, tax_name=tax_name_expr, tbl=qry.from_clause, where=qry.where_clause) self.env.cr.execute(stmt) raw_results = self.env.cr.dictfetchall() raw_results = self._postprocess_vat_closing_entry_results(company, adjusted_opts, raw_results) tg_ids = [r['tax_group_id'] for r in raw_results] tax_groups = {} for tg, row in zip(self.env['account.tax.group'].browse(tg_ids), raw_results): tax_groups.setdefault(tg, {}).setdefault(row.get('tax_id'), []).append( (row.get('tax_name'), row.get('account_id'), row.get('amount')) ) line_cmds = [] tg_subtotals = {} cur = self.env.company.currency_id for tg, tax_entries in tax_groups.items(): if not tg.tax_receivable_account_id or not tg.tax_payable_account_id: continue tg_total = 0 for _tid, vals_list in tax_entries.items(): for t_name, acct_id, amt in vals_list: line_cmds.append((0, 0, { 'name': t_name, 'debit': abs(amt) if amt < 0 else 0, 'credit': amt if amt > 0 else 0, 'account_id': acct_id, })) tg_total += amt if not cur.is_zero(tg_total): key = ( tg.advance_tax_payment_account_id.id or False, tg.tax_receivable_account_id.id, tg.tax_payable_account_id.id, ) tg_subtotals[key] = tg_subtotals.get(key, 0) + tg_total if not line_cmds: rep_in = self.env['account.tax.repartition.line'].search([ *self.env['account.tax.repartition.line']._check_company_domain(company), ('repartition_type', '=', 'tax'), ('document_type', '=', 'invoice'), ('tax_id.type_tax_use', '=', 'purchase'), ], limit=1) rep_out = self.env['account.tax.repartition.line'].search([ *self.env['account.tax.repartition.line']._check_company_domain(company), ('repartition_type', '=', 'tax'), ('document_type', '=', 'invoice'), ('tax_id.type_tax_use', '=', 'sale'), ], limit=1) if rep_out.account_id and rep_in.account_id: line_cmds = [ Command.create({'name': _('Tax Received Adjustment'), 'debit': 0, 'credit': 0.0, 'account_id': rep_out.account_id.id}), Command.create({'name': _('Tax Paid Adjustment'), 'debit': 0.0, 'credit': 0, 'account_id': rep_in.account_id.id}), ] return line_cmds, tg_subtotals def _get_vat_closing_entry_additional_domain(self): return [] def _postprocess_vat_closing_entry_results(self, company, options, results): return results def _vat_closing_entry_results_rounding(self, company, options, results, rounding_accounts, vat_results_summary): if not rounding_accounts.get('profit') or not rounding_accounts.get('loss'): return results total_amt = sum(r['amount'] for r in results) last_tg_id = results[-1]['tax_group_id'] if results else None report = self.env['account.report'].browse(options['report_id']) for ln in report._get_lines(options): mdl, rec_id = report._get_model_info_from_id(ln['id']) if mdl != 'account.report.line': continue for (op_type, rpt_line_id, col_label) in vat_results_summary: for col in ln['columns']: if rec_id != rpt_line_id or col['expression_label'] != col_label: continue if op_type in {'due', 'total'}: total_amt += col['no_format'] elif op_type == 'deductible': total_amt -= col['no_format'] diff = company.currency_id.round(total_amt) if not company.currency_id.is_zero(diff): results.append({ 'tax_name': _('Difference from rounding taxes'), 'amount': diff * -1, 'tax_group_id': last_tg_id, 'account_id': rounding_accounts['profit'].id if diff < 0 else rounding_accounts['loss'].id, }) return results @api.model def _add_tax_group_closing_items(self, tg_subtotals, closing_move): sql_balance = ''' SELECT SUM(aml.balance) AS balance FROM account_move_line aml LEFT JOIN account_move move ON move.id = aml.move_id WHERE aml.account_id = %s AND aml.date <= %s AND move.state = 'posted' AND aml.company_id = %s ''' cur = closing_move.company_id.currency_id cmds = [] balanced_accounts = [] def _balance_account(acct_id, lbl): self.env.cr.execute(sql_balance, (acct_id, closing_move.date, closing_move.company_id.id)) row = self.env.cr.dictfetchone() bal = row.get('balance') or 0 if not cur.is_zero(bal): cmds.append((0, 0, { 'name': lbl, 'debit': abs(bal) if bal < 0 else 0, 'credit': abs(bal) if bal > 0 else 0, 'account_id': acct_id, })) return bal for key, val in tg_subtotals.items(): running = val if key[0] and key[0] not in balanced_accounts: running += _balance_account(key[0], _('Balance tax advance payment account')) balanced_accounts.append(key[0]) if key[1] and key[1] not in balanced_accounts: running += _balance_account(key[1], _('Balance tax current account (receivable)')) balanced_accounts.append(key[1]) if key[2] and key[2] not in balanced_accounts: running += _balance_account(key[2], _('Balance tax current account (payable)')) balanced_accounts.append(key[2]) if not cur.is_zero(running): cmds.append(Command.create({ 'name': _('Payable tax amount') if running < 0 else _('Receivable tax amount'), 'debit': running if running > 0 else 0, 'credit': abs(running) if running < 0 else 0, 'account_id': key[2] if running < 0 else key[1], })) return cmds @api.model def _redirect_to_misconfigured_tax_groups(self, company, countries): raise RedirectWarning( _('Please specify the accounts necessary for the Tax Closing Entry.'), { 'type': 'ir.actions.act_window', 'name': 'Tax groups', 'res_model': 'account.tax.group', 'view_mode': 'list', 'views': [[False, 'list']], 'domain': ['|', ('country_id', 'in', countries.ids), ('country_id', '=', False)], }, _('Configure your TAX accounts - %s', company.display_name), ) def _get_fpos_info_for_tax_closing(self, company, report, options): if options['fiscal_position'] == 'domestic': fpos = self.env['account.fiscal.position'] elif options['fiscal_position'] == 'all': fpos = self.env['account.fiscal.position'].search([ *self.env['account.fiscal.position']._check_company_domain(company), ('foreign_vat', '!=', False), ]) else: fpos = self.env['account.fiscal.position'].browse([options['fiscal_position']]) if options['fiscal_position'] == 'all': fiscal_country = company.account_fiscal_country_id include_dom = ( not fpos or not report.country_id or fiscal_country == fpos[0].country_id ) else: include_dom = options['fiscal_position'] == 'domestic' return include_dom, fpos def _get_amls_with_archived_tags_domain(self, options): domain = [ ('tax_tag_ids.active', '=', False), ('parent_state', '=', 'posted'), ('date', '>=', options['date']['date_from']), ] if options['date']['mode'] == 'single': domain.append(('date', '<=', options['date']['date_to'])) return domain def action_open_amls_with_archived_tags(self, options, params=None): return { 'name': _("Journal items with archived tax tags"), 'type': 'ir.actions.act_window', 'res_model': 'account.move.line', 'domain': self._get_amls_with_archived_tags_domain(options), 'context': {'active_test': False}, 'views': [(self.env.ref('fusion_accounting.view_archived_tag_move_tree').id, 'list')], } class FusionGenericTaxReportHandler(models.AbstractModel): """Handler for the standard generic tax report (Tax -> Tax grouping).""" _name = 'account.generic.tax.report.handler' _inherit = 'account.tax.report.handler' _description = 'Generic Tax Report Custom Handler' def _get_custom_display_config(self): cfg = super()._get_custom_display_config() cfg['css_custom_class'] = 'generic_tax_report' cfg['templates']['AccountReportLineName'] = 'fusion_accounting.TaxReportLineName' return cfg def _custom_options_initializer(self, report, options, previous_options=None): super()._custom_options_initializer(report, options, previous_options=previous_options) if ( not report.country_id and len(options['available_vat_fiscal_positions']) <= (0 if options['allow_domestic'] else 1) and len(options['companies']) <= 1 ): options['allow_domestic'] = False options['fiscal_position'] = 'all' def _dynamic_lines_generator(self, report, options, all_column_groups_expression_totals, warnings=None): return self._get_dynamic_lines(report, options, 'default', warnings) def _caret_options_initializer(self): return { 'generic_tax_report': [ {'name': _("Audit"), 'action': 'caret_option_audit_tax'}, ], } def _get_dynamic_lines(self, report, options, grouping, warnings=None): opts_per_cg = report._split_options_per_column_group(options) if grouping == 'tax_account': gb_fields = [('src_tax', 'type_tax_use'), ('src_tax', 'id'), ('account', 'id')] comodel_list = [None, 'account.tax', 'account.account'] elif grouping == 'account_tax': gb_fields = [('src_tax', 'type_tax_use'), ('account', 'id'), ('src_tax', 'id')] comodel_list = [None, 'account.account', 'account.tax'] else: gb_fields = [('src_tax', 'type_tax_use'), ('src_tax', 'id')] comodel_list = [None, 'account.tax'] if grouping in ('tax_account', 'account_tax'): amount_tree = self._read_generic_tax_report_amounts(report, opts_per_cg, gb_fields) else: amount_tree = self._read_generic_tax_report_amounts_no_tax_details(report, options, opts_per_cg) id_sets = [set() for _ in gb_fields] def _collect_ids(node, depth=0): for k, v in node.items(): if k: id_sets[depth].add(k) if v.get('children'): _collect_ids(v['children'], depth + 1) _collect_ids(amount_tree) sort_maps = [] for i, cm in enumerate(comodel_list): if cm: recs = self.env[cm].with_context(active_test=False).search([('id', 'in', tuple(id_sets[i]))]) sort_maps.append({r.id: (r, j) for j, r in enumerate(recs)}) else: sel = self.env['account.tax']._fields['type_tax_use'].selection sort_maps.append({v[0]: (v, j) for j, v in enumerate(sel) if v[0] in id_sets[i]}) output = [] self._populate_lines_recursively(report, options, output, sort_maps, gb_fields, amount_tree, warnings=warnings) return output # ================================================================ # AMOUNT COMPUTATION # ================================================================ @api.model def _read_generic_tax_report_amounts_no_tax_details(self, report, options, opts_per_cg): co_ids = report.get_report_company_ids(options) co_domain = self.env['account.tax']._check_company_domain(co_ids) co_where = self.env['account.tax'].with_context(active_test=False)._where_calc(co_domain) self.env.cr.execute(SQL(''' SELECT account_tax.id, account_tax.type_tax_use, ARRAY_AGG(child_tax.id) AS child_tax_ids, ARRAY_AGG(DISTINCT child_tax.type_tax_use) AS child_types FROM account_tax_filiation_rel account_tax_rel JOIN account_tax ON account_tax.id = account_tax_rel.parent_tax JOIN account_tax child_tax ON child_tax.id = account_tax_rel.child_tax WHERE account_tax.amount_type = 'group' AND %s GROUP BY account_tax.id ''', co_where.where_clause or SQL("TRUE"))) group_info = {} child_map = {} for row in self.env.cr.dictfetchall(): row['to_expand'] = row['child_types'] != ['none'] group_info[row['id']] = row for cid in row['child_tax_ids']: child_map[cid] = row['id'] results = defaultdict(lambda: { 'base_amount': {cg: 0.0 for cg in options['column_groups']}, 'tax_amount': {cg: 0.0 for cg in options['column_groups']}, 'tax_non_deductible': {cg: 0.0 for cg in options['column_groups']}, 'tax_deductible': {cg: 0.0 for cg in options['column_groups']}, 'tax_due': {cg: 0.0 for cg in options['column_groups']}, 'children': defaultdict(lambda: { 'base_amount': {cg: 0.0 for cg in options['column_groups']}, 'tax_amount': {cg: 0.0 for cg in options['column_groups']}, 'tax_non_deductible': {cg: 0.0 for cg in options['column_groups']}, 'tax_deductible': {cg: 0.0 for cg in options['column_groups']}, 'tax_due': {cg: 0.0 for cg in options['column_groups']}, }), }) for cg_key, cg_opts in opts_per_cg.items(): qry = report._get_report_query(cg_opts, 'strict_range') # Base amounts self.env.cr.execute(SQL(''' SELECT tax.id AS tax_id, tax.type_tax_use AS tax_type_tax_use, src_group_tax.id AS src_group_tax_id, src_group_tax.type_tax_use AS src_group_tax_type_tax_use, src_tax.id AS src_tax_id, src_tax.type_tax_use AS src_tax_type_tax_use, SUM(account_move_line.balance) AS base_amount FROM %(tbl)s JOIN account_move_line_account_tax_rel tax_rel ON account_move_line.id = tax_rel.account_move_line_id JOIN account_tax tax ON tax.id = tax_rel.account_tax_id LEFT JOIN account_tax src_tax ON src_tax.id = account_move_line.tax_line_id LEFT JOIN account_tax src_group_tax ON src_group_tax.id = account_move_line.group_tax_id WHERE %(where)s AND (account_move_line__move_id.always_tax_exigible OR account_move_line__move_id.tax_cash_basis_rec_id IS NOT NULL OR tax.tax_exigibility != 'on_payment') AND ( (account_move_line.tax_line_id IS NOT NULL AND (src_tax.type_tax_use IN ('sale','purchase') OR src_group_tax.type_tax_use IN ('sale','purchase'))) OR (account_move_line.tax_line_id IS NULL AND tax.type_tax_use IN ('sale','purchase')) ) GROUP BY tax.id, src_group_tax.id, src_tax.id ORDER BY src_group_tax.sequence, src_group_tax.id, src_tax.sequence, src_tax.id, tax.sequence, tax.id ''', tbl=qry.from_clause, where=qry.where_clause)) groups_with_extra = set() for r in self.env.cr.dictfetchall(): is_tax_ln = bool(r['src_tax_id']) if is_tax_ln: if r['src_group_tax_id'] and not group_info.get(r['src_group_tax_id'], {}).get('to_expand') and r['tax_id'] in group_info.get(r['src_group_tax_id'], {}).get('child_tax_ids', []): pass elif r['tax_type_tax_use'] == 'none' and child_map.get(r['tax_id']): gid = child_map[r['tax_id']] if gid not in groups_with_extra: gi = group_info[gid] results[gi['type_tax_use']]['children'][gid]['base_amount'][cg_key] += r['base_amount'] groups_with_extra.add(gid) else: ttu = r['src_group_tax_type_tax_use'] or r['src_tax_type_tax_use'] results[ttu]['children'][r['tax_id']]['base_amount'][cg_key] += r['base_amount'] else: if r['tax_id'] in group_info and group_info[r['tax_id']]['to_expand']: gi = group_info[r['tax_id']] for child_id in gi['child_tax_ids']: results[gi['type_tax_use']]['children'][child_id]['base_amount'][cg_key] += r['base_amount'] else: results[r['tax_type_tax_use']]['children'][r['tax_id']]['base_amount'][cg_key] += r['base_amount'] # Tax amounts sel_ded = join_ded = gb_ded = SQL() if cg_opts.get('account_journal_report_tax_deductibility_columns'): sel_ded = SQL(", repartition.use_in_tax_closing AS trl_tax_closing, SIGN(repartition.factor_percent) AS trl_factor") join_ded = SQL("JOIN account_tax_repartition_line repartition ON account_move_line.tax_repartition_line_id = repartition.id") gb_ded = SQL(', repartition.use_in_tax_closing, SIGN(repartition.factor_percent)') self.env.cr.execute(SQL(''' SELECT tax.id AS tax_id, tax.type_tax_use AS tax_type_tax_use, group_tax.id AS group_tax_id, group_tax.type_tax_use AS group_tax_type_tax_use, SUM(account_move_line.balance) AS tax_amount %(sel_ded)s FROM %(tbl)s JOIN account_tax tax ON tax.id = account_move_line.tax_line_id %(join_ded)s LEFT JOIN account_tax group_tax ON group_tax.id = account_move_line.group_tax_id WHERE %(where)s AND (account_move_line__move_id.always_tax_exigible OR account_move_line__move_id.tax_cash_basis_rec_id IS NOT NULL OR tax.tax_exigibility != 'on_payment') AND ((group_tax.id IS NULL AND tax.type_tax_use IN ('sale','purchase')) OR (group_tax.id IS NOT NULL AND group_tax.type_tax_use IN ('sale','purchase'))) GROUP BY tax.id, group_tax.id %(gb_ded)s ''', sel_ded=sel_ded, tbl=qry.from_clause, join_ded=join_ded, where=qry.where_clause, gb_ded=gb_ded)) for r in self.env.cr.dictfetchall(): tid = r['tax_id'] if r['group_tax_id']: ttu = r['group_tax_type_tax_use'] if not group_info.get(r['group_tax_id'], {}).get('to_expand'): tid = r['group_tax_id'] else: ttu = r['group_tax_type_tax_use'] or r['tax_type_tax_use'] results[ttu]['tax_amount'][cg_key] += r['tax_amount'] results[ttu]['children'][tid]['tax_amount'][cg_key] += r['tax_amount'] if cg_opts.get('account_journal_report_tax_deductibility_columns'): detail_label = False if r['trl_factor'] > 0 and ttu == 'purchase': detail_label = 'tax_deductible' if r['trl_tax_closing'] else 'tax_non_deductible' elif r['trl_tax_closing'] and (r['trl_factor'] > 0, ttu) in ((False, 'purchase'), (True, 'sale')): detail_label = 'tax_due' if detail_label: results[ttu][detail_label][cg_key] += r['tax_amount'] * r['trl_factor'] results[ttu]['children'][tid][detail_label][cg_key] += r['tax_amount'] * r['trl_factor'] return results def _read_generic_tax_report_amounts(self, report, opts_per_cg, gb_fields): needs_group = False select_parts, gb_parts = [], [] for alias, fld in gb_fields: select_parts.append(SQL("%s AS %s", SQL.identifier(alias, fld), SQL.identifier(f'{alias}_{fld}'))) gb_parts.append(SQL.identifier(alias, fld)) if alias == 'src_tax': select_parts.append(SQL("%s AS %s", SQL.identifier('tax', fld), SQL.identifier(f'tax_{fld}'))) gb_parts.append(SQL.identifier('tax', fld)) needs_group = True expand_set = set() if needs_group: groups = self.env['account.tax'].with_context(active_test=False).search([('amount_type', '=', 'group')]) for g in groups: if set(g.children_tax_ids.mapped('type_tax_use')) != {'none'}: expand_set.add(g.id) tree = {} for cg_key, cg_opts in opts_per_cg.items(): qry = report._get_report_query(cg_opts, 'strict_range') td_qry = self.env['account.move.line']._get_query_tax_details(qry.from_clause, qry.where_clause) seen_keys = set() self.env.cr.execute(SQL(''' SELECT %(sel)s, trl.document_type = 'refund' AS is_refund, SUM(CASE WHEN tdr.display_type = 'rounding' THEN 0 ELSE tdr.base_amount END) AS base_amount, SUM(tdr.tax_amount) AS tax_amount FROM (%(td)s) AS tdr JOIN account_tax_repartition_line trl ON trl.id = tdr.tax_repartition_line_id JOIN account_tax tax ON tax.id = tdr.tax_id JOIN account_tax src_tax ON src_tax.id = COALESCE(tdr.group_tax_id, tdr.tax_id) AND src_tax.type_tax_use IN ('sale','purchase') JOIN account_account account ON account.id = tdr.base_account_id WHERE tdr.tax_exigible GROUP BY tdr.tax_repartition_line_id, trl.document_type, %(gb)s ORDER BY src_tax.sequence, src_tax.id, tax.sequence, tax.id ''', sel=SQL(',').join(select_parts), td=td_qry, gb=SQL(',').join(gb_parts))) for row in self.env.cr.dictfetchall(): node = tree cum_key = [row['is_refund']] for alias, fld in gb_fields: gk = f'{alias}_{fld}' if gk == 'src_tax_id' and row['src_tax_id'] in expand_set: cum_key.append(row[gk]) gk = 'tax_id' rk = row[gk] cum_key.append(rk) ck_tuple = tuple(cum_key) node.setdefault(rk, { 'base_amount': {k: 0.0 for k in cg_opts['column_groups']}, 'tax_amount': {k: 0.0 for k in cg_opts['column_groups']}, 'children': {}, }) sub = node[rk] if ck_tuple not in seen_keys: sub['base_amount'][cg_key] += row['base_amount'] sub['tax_amount'][cg_key] += row['tax_amount'] node = sub['children'] seen_keys.add(ck_tuple) return tree def _populate_lines_recursively( self, report, options, lines, sort_maps, gb_fields, node, index=0, type_tax_use=None, parent_line_id=None, warnings=None, ): if index == len(gb_fields): return alias, fld = gb_fields[index] gk = f'{alias}_{fld}' smap = sort_maps[index] sorted_keys = sorted(node.keys(), key=lambda k: smap[k][1]) for key in sorted_keys: if gk == 'src_tax_type_tax_use': type_tax_use = key sign = -1 if type_tax_use == 'sale' else 1 amounts = node[key] cols = [] for col in options['columns']: el = col.get('expression_label') if el == 'net': cv = sign * amounts['base_amount'][col['column_group_key']] if index == len(gb_fields) - 1 else '' if el == 'tax': cv = sign * amounts['tax_amount'][col['column_group_key']] cols.append(report._build_column_dict(cv, col, options=options)) if el == 'tax' and options.get('account_journal_report_tax_deductibility_columns'): for dt in ('tax_non_deductible', 'tax_deductible', 'tax_due'): cols.append(report._build_column_dict( col_value=sign * amounts[dt][col['column_group_key']], col_data={'figure_type': 'monetary', 'column_group_key': col['column_group_key'], 'expression_label': dt}, options=options, )) defaults = {'columns': cols, 'level': index if index == 0 else index + 1, 'unfoldable': False} rpt_line = self._build_report_line(report, options, defaults, gk, smap[key][0], parent_line_id, warnings) if gk == 'src_tax_id': rpt_line['caret_options'] = 'generic_tax_report' lines.append((0, rpt_line)) self._populate_lines_recursively( report, options, lines, sort_maps, gb_fields, amounts.get('children'), index=index + 1, type_tax_use=type_tax_use, parent_line_id=rpt_line['id'], warnings=warnings, ) def _build_report_line(self, report, options, defaults, gk, value, parent_id, warnings=None): ln = dict(defaults) if parent_id is not None: ln['parent_id'] = parent_id if gk == 'src_tax_type_tax_use': ln['id'] = report._get_generic_line_id(None, None, markup=value[0], parent_line_id=parent_id) ln['name'] = value[1] elif gk == 'src_tax_id': tax = value ln['id'] = report._get_generic_line_id(tax._name, tax.id, parent_line_id=parent_id) if tax.amount_type == 'percent': ln['name'] = f"{tax.name} ({tax.amount}%)" if warnings is not None: self._check_line_consistency(report, options, ln, tax, warnings) elif tax.amount_type == 'fixed': ln['name'] = f"{tax.name} ({tax.amount})" else: ln['name'] = tax.name if options.get('multi-company'): ln['name'] = f"{ln['name']} - {tax.company_id.display_name}" elif gk == 'account_id': acct = value ln['id'] = report._get_generic_line_id(acct._name, acct.id, parent_line_id=parent_id) ln['name'] = f"{acct.display_name} - {acct.company_id.display_name}" if options.get('multi-company') else acct.display_name return ln def _check_line_consistency(self, report, options, ln, tax, warnings=None): eff_rate = tax.amount * sum( tax.invoice_repartition_line_ids.filtered( lambda r: r.repartition_type == 'tax' ).mapped('factor') ) / 100 for cg_key, cg_opts in report._split_options_per_column_group(options).items(): net = next((c['no_format'] for c in ln['columns'] if c['column_group_key'] == cg_key and c['expression_label'] == 'net'), 0) tax_val = next((c['no_format'] for c in ln['columns'] if c['column_group_key'] == cg_key and c['expression_label'] == 'tax'), 0) if net == '': continue cur = self.env.company.currency_id expected = float(net or 0) * eff_rate if cur.compare_amounts(expected, tax_val): err = abs(abs(tax_val) - abs(expected)) / float(net or 1) if err > 0.001: ln['alert'] = True warnings['fusion_accounting.tax_report_warning_lines_consistency'] = {'alert_type': 'danger'} return # ================================================================ # CARET / AUDIT # ================================================================ def caret_option_audit_tax(self, options, params): report = self.env['account.report'].browse(options['report_id']) mdl, tax_id = report._get_model_info_from_id(params['line_id']) if mdl != 'account.tax': raise UserError(_("Cannot audit tax from a non-tax model.")) tax = self.env['account.tax'].browse(tax_id) if tax.amount_type == 'group': affect_domain = [('tax_ids', 'in', tax.children_tax_ids.ids), ('tax_repartition_line_id', '!=', False)] else: affect_domain = [('tax_ids', '=', tax.id), ('tax_ids.type_tax_use', '=', tax.type_tax_use), ('tax_repartition_line_id', '!=', False)] domain = report._get_options_domain(options, 'strict_range') + expression.OR(( [('tax_ids', 'in', tax.ids), ('tax_ids.type_tax_use', '=', tax.type_tax_use), ('tax_repartition_line_id', '=', False)], [('group_tax_id', '=', tax.id) if tax.amount_type == 'group' else ('tax_line_id', '=', tax.id)], affect_domain, )) return { 'type': 'ir.actions.act_window', 'name': _('Journal Items for Tax Audit'), 'res_model': 'account.move.line', 'views': [[self.env.ref('account.view_move_line_tax_audit_tree').id, 'list']], 'domain': domain, 'context': {**self.env.context, 'search_default_group_by_account': 2, 'expand': 1}, } class FusionGenericTaxReportHandlerAT(models.AbstractModel): _name = 'account.generic.tax.report.handler.account.tax' _inherit = 'account.generic.tax.report.handler' _description = 'Generic Tax Report Custom Handler (Account -> Tax)' def _dynamic_lines_generator(self, report, options, all_column_groups_expression_totals, warnings=None): return super()._get_dynamic_lines(report, options, 'account_tax', warnings) class FusionGenericTaxReportHandlerTA(models.AbstractModel): _name = 'account.generic.tax.report.handler.tax.account' _inherit = 'account.generic.tax.report.handler' _description = 'Generic Tax Report Custom Handler (Tax -> Account)' def _dynamic_lines_generator(self, report, options, all_column_groups_expression_totals, warnings=None): return super()._get_dynamic_lines(report, options, 'tax_account', warnings)