880 lines
43 KiB
Python
880 lines
43 KiB
Python
# Fusion Accounting - Generic Tax Report Handlers
|
|
# Base tax-report handler, generic handler, and account/tax grouping variants
|
|
|
|
import ast
|
|
from collections import defaultdict
|
|
|
|
from odoo import models, api, fields, Command, _
|
|
from odoo.addons.web.controllers.utils import clean_action
|
|
from odoo.exceptions import UserError, RedirectWarning
|
|
from odoo.osv import expression
|
|
from odoo.tools import SQL
|
|
|
|
|
|
class FusionTaxReportHandler(models.AbstractModel):
|
|
"""Base handler providing the Closing Entry button and tax-period
|
|
configuration for all tax reports (generic and country-specific)."""
|
|
|
|
_name = 'account.tax.report.handler'
|
|
_inherit = 'account.report.custom.handler'
|
|
_description = 'Account Report Handler for Tax Reports'
|
|
|
|
def _custom_options_initializer(self, report, options, previous_options):
|
|
period_type_map = {'monthly': 'month', 'trimester': 'quarter', 'year': 'year'}
|
|
|
|
options['buttons'].append({
|
|
'name': _('Closing Entry'),
|
|
'action': 'action_periodic_vat_entries',
|
|
'sequence': 110,
|
|
'always_show': True,
|
|
})
|
|
self._enable_export_buttons_for_common_vat_groups_in_branches(options)
|
|
|
|
start_day, start_month = self.env.company._get_tax_closing_start_date_attributes(report)
|
|
tax_period = self.env.company._get_tax_periodicity(report)
|
|
options['tax_periodicity'] = {
|
|
'periodicity': tax_period,
|
|
'months_per_period': self.env.company._get_tax_periodicity_months_delay(report),
|
|
'start_day': start_day,
|
|
'start_month': start_month,
|
|
}
|
|
|
|
options['show_tax_period_filter'] = (
|
|
tax_period not in period_type_map or start_day != 1 or start_month != 1
|
|
)
|
|
if not options['show_tax_period_filter']:
|
|
std_period = period_type_map[tax_period]
|
|
options['date']['filter'] = options['date']['filter'].replace('tax_period', std_period)
|
|
options['date']['period_type'] = options['date']['period_type'].replace('tax_period', std_period)
|
|
|
|
def _get_custom_display_config(self):
|
|
cfg = defaultdict(dict)
|
|
cfg['templates']['AccountReportFilters'] = 'fusion_accounting.GenericTaxReportFiltersCustomizable'
|
|
return cfg
|
|
|
|
def _customize_warnings(self, report, options, all_column_groups_expression_totals, warnings):
|
|
if 'fusion_accounting.common_warning_draft_in_period' in warnings:
|
|
has_non_closing_drafts = self.env['account.move'].search_count([
|
|
('state', '=', 'draft'),
|
|
('date', '<=', options['date']['date_to']),
|
|
('tax_closing_report_id', '=', False),
|
|
], limit=1)
|
|
if not has_non_closing_drafts:
|
|
warnings.pop('fusion_accounting.common_warning_draft_in_period')
|
|
|
|
qry = report._get_report_query(options, 'strict_range')
|
|
inactive_rows = self.env.execute_query(SQL("""
|
|
SELECT 1
|
|
FROM %s
|
|
JOIN account_account_tag_account_move_line_rel aml_tag
|
|
ON account_move_line.id = aml_tag.account_move_line_id
|
|
JOIN account_account_tag tag
|
|
ON aml_tag.account_account_tag_id = tag.id
|
|
WHERE %s AND NOT tag.active
|
|
LIMIT 1
|
|
""", qry.from_clause, qry.where_clause))
|
|
if inactive_rows:
|
|
warnings['fusion_accounting.tax_report_warning_inactive_tags'] = {}
|
|
|
|
# ================================================================
|
|
# TAX CLOSING
|
|
# ================================================================
|
|
|
|
def _is_period_equal_to_options(self, report, options):
|
|
opt_to = fields.Date.from_string(options['date']['date_to'])
|
|
opt_from = fields.Date.from_string(options['date']['date_from'])
|
|
boundary_from, boundary_to = self.env.company._get_tax_closing_period_boundaries(opt_to, report)
|
|
return boundary_from == opt_from and boundary_to == opt_to
|
|
|
|
def action_periodic_vat_entries(self, options, from_post=False):
|
|
report = self.env['account.report'].browse(options['report_id'])
|
|
if (
|
|
options['date']['period_type'] != 'tax_period'
|
|
and not self._is_period_equal_to_options(report, options)
|
|
and not self.env.context.get('override_tax_closing_warning')
|
|
):
|
|
if len(options['companies']) > 1 and (
|
|
report.filter_multi_company != 'tax_units'
|
|
or not (report.country_id and options['available_tax_units'])
|
|
):
|
|
warning_msg = _(
|
|
"You're about to generate closing entries for multiple companies. "
|
|
"Each will follow its own tax periodicity."
|
|
)
|
|
else:
|
|
warning_msg = _(
|
|
"The selected dates don't match a tax period. The closing entry "
|
|
"will target the closest matching period."
|
|
)
|
|
|
|
return {
|
|
'type': 'ir.actions.client',
|
|
'tag': 'fusion_accounting.redirect_action',
|
|
'target': 'new',
|
|
'params': {
|
|
'depending_action': self.with_context(
|
|
override_tax_closing_warning=True,
|
|
).action_periodic_vat_entries(options),
|
|
'message': warning_msg,
|
|
'button_text': _("Proceed"),
|
|
},
|
|
'context': {'dialog_size': 'medium', 'override_tax_closing_warning': True},
|
|
}
|
|
|
|
generated_moves = self._get_periodic_vat_entries(options, from_post=from_post)
|
|
action = self.env["ir.actions.actions"]._for_xml_id("account.action_move_journal_line")
|
|
action = clean_action(action, env=self.env)
|
|
action.pop('domain', None)
|
|
|
|
if len(generated_moves) == 1:
|
|
action['views'] = [(self.env.ref('account.view_move_form').id, 'form')]
|
|
action['res_id'] = generated_moves.id
|
|
else:
|
|
action['domain'] = [('id', 'in', generated_moves.ids)]
|
|
action['context'] = dict(ast.literal_eval(action['context']))
|
|
action['context'].pop('search_default_posted', None)
|
|
return action
|
|
|
|
def _get_periodic_vat_entries(self, options, from_post=False):
|
|
report = self.env['account.report'].browse(options['report_id'])
|
|
if options.get('integer_rounding'):
|
|
options['integer_rounding_enabled'] = True
|
|
|
|
result_moves = self.env['account.move']
|
|
company_set = self.env['res.company'].browse(report.get_report_company_ids(options))
|
|
|
|
existing = self._get_tax_closing_entries_for_closed_period(
|
|
report, options, company_set, posted_only=False,
|
|
)
|
|
result_moves += existing
|
|
result_moves += self._generate_tax_closing_entries(
|
|
report, options,
|
|
companies=company_set - existing.company_id,
|
|
from_post=from_post,
|
|
)
|
|
return result_moves
|
|
|
|
def _generate_tax_closing_entries(self, report, options, closing_moves=None, companies=None, from_post=False):
|
|
if companies is None:
|
|
companies = self.env['res.company'].browse(report.get_report_company_ids(options))
|
|
if closing_moves is None:
|
|
closing_moves = self.env['account.move']
|
|
|
|
period_end = fields.Date.from_string(options['date']['date_to'])
|
|
moves_by_company = defaultdict(lambda: self.env['account.move'])
|
|
|
|
remaining_cos = companies.filtered(lambda c: c not in closing_moves.company_id)
|
|
if closing_moves:
|
|
for mv in closing_moves.filtered(lambda m: m.state == 'draft'):
|
|
moves_by_company[mv.company_id] |= mv
|
|
|
|
for co in remaining_cos:
|
|
include_dom, fpos_set = self._get_fpos_info_for_tax_closing(co, report, options)
|
|
co_moves = co._get_and_update_tax_closing_moves(
|
|
period_end, report, fiscal_positions=fpos_set, include_domestic=include_dom,
|
|
)
|
|
moves_by_company[co] = co_moves
|
|
closing_moves += co_moves
|
|
|
|
for co, co_moves in moves_by_company.items():
|
|
countries = self.env['res.country']
|
|
for mv in co_moves:
|
|
if mv.fiscal_position_id.foreign_vat:
|
|
countries |= mv.fiscal_position_id.country_id
|
|
else:
|
|
countries |= co.account_fiscal_country_id
|
|
|
|
if self.env['account.tax.group']._check_misconfigured_tax_groups(co, countries):
|
|
self._redirect_to_misconfigured_tax_groups(co, countries)
|
|
|
|
for mv in co_moves:
|
|
if from_post and mv == moves_by_company.get(self.env.company):
|
|
continue
|
|
|
|
mv_opts = {
|
|
**options,
|
|
'fiscal_position': mv.fiscal_position_id.id if mv.fiscal_position_id else 'domestic',
|
|
}
|
|
line_cmds, tg_subtotals = self._compute_vat_closing_entry(co, mv_opts)
|
|
line_cmds += self._add_tax_group_closing_items(tg_subtotals, mv)
|
|
|
|
if mv.line_ids:
|
|
line_cmds += [Command.delete(aml.id) for aml in mv.line_ids]
|
|
|
|
if line_cmds:
|
|
mv.write({'line_ids': line_cmds})
|
|
|
|
return closing_moves
|
|
|
|
def _get_tax_closing_entries_for_closed_period(self, report, options, companies, posted_only=True):
|
|
found = self.env['account.move']
|
|
for co in companies:
|
|
_s, p_end = co._get_tax_closing_period_boundaries(
|
|
fields.Date.from_string(options['date']['date_to']), report,
|
|
)
|
|
inc_dom, fpos = self._get_fpos_info_for_tax_closing(co, report, options)
|
|
fpos_ids = fpos.ids + ([False] if inc_dom else [])
|
|
state_cond = ('state', '=', 'posted') if posted_only else ('state', '!=', 'cancel')
|
|
found += self.env['account.move'].search([
|
|
('company_id', '=', co.id),
|
|
('fiscal_position_id', 'in', fpos_ids),
|
|
('date', '=', p_end),
|
|
('tax_closing_report_id', '=', options['report_id']),
|
|
state_cond,
|
|
], limit=1)
|
|
return found
|
|
|
|
@api.model
|
|
def _compute_vat_closing_entry(self, company, options):
|
|
self = self.with_company(company)
|
|
self.env['account.tax'].flush_model(['name', 'tax_group_id'])
|
|
self.env['account.tax.repartition.line'].flush_model(['use_in_tax_closing'])
|
|
self.env['account.move.line'].flush_model([
|
|
'account_id', 'debit', 'credit', 'move_id', 'tax_line_id',
|
|
'date', 'company_id', 'display_type', 'parent_state',
|
|
])
|
|
self.env['account.move'].flush_model(['state'])
|
|
|
|
adjusted_opts = {**options, 'all_entries': False, 'date': dict(options['date'])}
|
|
report = self.env['account.report'].browse(options['report_id'])
|
|
p_start, p_end = company._get_tax_closing_period_boundaries(
|
|
fields.Date.from_string(options['date']['date_to']), report,
|
|
)
|
|
adjusted_opts['date']['date_from'] = fields.Date.to_string(p_start)
|
|
adjusted_opts['date']['date_to'] = fields.Date.to_string(p_end)
|
|
adjusted_opts['date']['period_type'] = 'custom'
|
|
adjusted_opts['date']['filter'] = 'custom'
|
|
adjusted_opts = report.with_context(
|
|
allowed_company_ids=company.ids,
|
|
).get_options(previous_options=adjusted_opts)
|
|
adjusted_opts['fiscal_position'] = options['fiscal_position']
|
|
|
|
qry = self.env.ref('account.generic_tax_report')._get_report_query(
|
|
adjusted_opts, 'strict_range',
|
|
domain=self._get_vat_closing_entry_additional_domain(),
|
|
)
|
|
tax_name_expr = self.env['account.tax']._field_to_sql('tax', 'name')
|
|
stmt = SQL("""
|
|
SELECT "account_move_line".tax_line_id as tax_id,
|
|
tax.tax_group_id as tax_group_id,
|
|
%(tax_name)s as tax_name,
|
|
"account_move_line".account_id,
|
|
COALESCE(SUM("account_move_line".balance), 0) as amount
|
|
FROM account_tax tax, account_tax_repartition_line repartition, %(tbl)s
|
|
WHERE %(where)s
|
|
AND tax.id = "account_move_line".tax_line_id
|
|
AND repartition.id = "account_move_line".tax_repartition_line_id
|
|
AND repartition.use_in_tax_closing
|
|
GROUP BY tax.tax_group_id, "account_move_line".tax_line_id, tax.name, "account_move_line".account_id
|
|
""", tax_name=tax_name_expr, tbl=qry.from_clause, where=qry.where_clause)
|
|
self.env.cr.execute(stmt)
|
|
raw_results = self.env.cr.dictfetchall()
|
|
raw_results = self._postprocess_vat_closing_entry_results(company, adjusted_opts, raw_results)
|
|
|
|
tg_ids = [r['tax_group_id'] for r in raw_results]
|
|
tax_groups = {}
|
|
for tg, row in zip(self.env['account.tax.group'].browse(tg_ids), raw_results):
|
|
tax_groups.setdefault(tg, {}).setdefault(row.get('tax_id'), []).append(
|
|
(row.get('tax_name'), row.get('account_id'), row.get('amount'))
|
|
)
|
|
|
|
line_cmds = []
|
|
tg_subtotals = {}
|
|
cur = self.env.company.currency_id
|
|
|
|
for tg, tax_entries in tax_groups.items():
|
|
if not tg.tax_receivable_account_id or not tg.tax_payable_account_id:
|
|
continue
|
|
tg_total = 0
|
|
for _tid, vals_list in tax_entries.items():
|
|
for t_name, acct_id, amt in vals_list:
|
|
line_cmds.append((0, 0, {
|
|
'name': t_name,
|
|
'debit': abs(amt) if amt < 0 else 0,
|
|
'credit': amt if amt > 0 else 0,
|
|
'account_id': acct_id,
|
|
}))
|
|
tg_total += amt
|
|
|
|
if not cur.is_zero(tg_total):
|
|
key = (
|
|
tg.advance_tax_payment_account_id.id or False,
|
|
tg.tax_receivable_account_id.id,
|
|
tg.tax_payable_account_id.id,
|
|
)
|
|
tg_subtotals[key] = tg_subtotals.get(key, 0) + tg_total
|
|
|
|
if not line_cmds:
|
|
rep_in = self.env['account.tax.repartition.line'].search([
|
|
*self.env['account.tax.repartition.line']._check_company_domain(company),
|
|
('repartition_type', '=', 'tax'),
|
|
('document_type', '=', 'invoice'),
|
|
('tax_id.type_tax_use', '=', 'purchase'),
|
|
], limit=1)
|
|
rep_out = self.env['account.tax.repartition.line'].search([
|
|
*self.env['account.tax.repartition.line']._check_company_domain(company),
|
|
('repartition_type', '=', 'tax'),
|
|
('document_type', '=', 'invoice'),
|
|
('tax_id.type_tax_use', '=', 'sale'),
|
|
], limit=1)
|
|
if rep_out.account_id and rep_in.account_id:
|
|
line_cmds = [
|
|
Command.create({'name': _('Tax Received Adjustment'), 'debit': 0, 'credit': 0.0, 'account_id': rep_out.account_id.id}),
|
|
Command.create({'name': _('Tax Paid Adjustment'), 'debit': 0.0, 'credit': 0, 'account_id': rep_in.account_id.id}),
|
|
]
|
|
|
|
return line_cmds, tg_subtotals
|
|
|
|
def _get_vat_closing_entry_additional_domain(self):
|
|
return []
|
|
|
|
def _postprocess_vat_closing_entry_results(self, company, options, results):
|
|
return results
|
|
|
|
def _vat_closing_entry_results_rounding(self, company, options, results, rounding_accounts, vat_results_summary):
|
|
if not rounding_accounts.get('profit') or not rounding_accounts.get('loss'):
|
|
return results
|
|
|
|
total_amt = sum(r['amount'] for r in results)
|
|
last_tg_id = results[-1]['tax_group_id'] if results else None
|
|
report = self.env['account.report'].browse(options['report_id'])
|
|
|
|
for ln in report._get_lines(options):
|
|
mdl, rec_id = report._get_model_info_from_id(ln['id'])
|
|
if mdl != 'account.report.line':
|
|
continue
|
|
for (op_type, rpt_line_id, col_label) in vat_results_summary:
|
|
for col in ln['columns']:
|
|
if rec_id != rpt_line_id or col['expression_label'] != col_label:
|
|
continue
|
|
if op_type in {'due', 'total'}:
|
|
total_amt += col['no_format']
|
|
elif op_type == 'deductible':
|
|
total_amt -= col['no_format']
|
|
|
|
diff = company.currency_id.round(total_amt)
|
|
if not company.currency_id.is_zero(diff):
|
|
results.append({
|
|
'tax_name': _('Difference from rounding taxes'),
|
|
'amount': diff * -1,
|
|
'tax_group_id': last_tg_id,
|
|
'account_id': rounding_accounts['profit'].id if diff < 0 else rounding_accounts['loss'].id,
|
|
})
|
|
return results
|
|
|
|
@api.model
|
|
def _add_tax_group_closing_items(self, tg_subtotals, closing_move):
|
|
sql_balance = '''
|
|
SELECT SUM(aml.balance) AS balance
|
|
FROM account_move_line aml
|
|
LEFT JOIN account_move move ON move.id = aml.move_id
|
|
WHERE aml.account_id = %s AND aml.date <= %s AND move.state = 'posted' AND aml.company_id = %s
|
|
'''
|
|
cur = closing_move.company_id.currency_id
|
|
cmds = []
|
|
balanced_accounts = []
|
|
|
|
def _balance_account(acct_id, lbl):
|
|
self.env.cr.execute(sql_balance, (acct_id, closing_move.date, closing_move.company_id.id))
|
|
row = self.env.cr.dictfetchone()
|
|
bal = row.get('balance') or 0
|
|
if not cur.is_zero(bal):
|
|
cmds.append((0, 0, {
|
|
'name': lbl,
|
|
'debit': abs(bal) if bal < 0 else 0,
|
|
'credit': abs(bal) if bal > 0 else 0,
|
|
'account_id': acct_id,
|
|
}))
|
|
return bal
|
|
|
|
for key, val in tg_subtotals.items():
|
|
running = val
|
|
if key[0] and key[0] not in balanced_accounts:
|
|
running += _balance_account(key[0], _('Balance tax advance payment account'))
|
|
balanced_accounts.append(key[0])
|
|
if key[1] and key[1] not in balanced_accounts:
|
|
running += _balance_account(key[1], _('Balance tax current account (receivable)'))
|
|
balanced_accounts.append(key[1])
|
|
if key[2] and key[2] not in balanced_accounts:
|
|
running += _balance_account(key[2], _('Balance tax current account (payable)'))
|
|
balanced_accounts.append(key[2])
|
|
if not cur.is_zero(running):
|
|
cmds.append(Command.create({
|
|
'name': _('Payable tax amount') if running < 0 else _('Receivable tax amount'),
|
|
'debit': running if running > 0 else 0,
|
|
'credit': abs(running) if running < 0 else 0,
|
|
'account_id': key[2] if running < 0 else key[1],
|
|
}))
|
|
return cmds
|
|
|
|
@api.model
|
|
def _redirect_to_misconfigured_tax_groups(self, company, countries):
|
|
raise RedirectWarning(
|
|
_('Please specify the accounts necessary for the Tax Closing Entry.'),
|
|
{
|
|
'type': 'ir.actions.act_window', 'name': 'Tax groups',
|
|
'res_model': 'account.tax.group', 'view_mode': 'list',
|
|
'views': [[False, 'list']],
|
|
'domain': ['|', ('country_id', 'in', countries.ids), ('country_id', '=', False)],
|
|
},
|
|
_('Configure your TAX accounts - %s', company.display_name),
|
|
)
|
|
|
|
def _get_fpos_info_for_tax_closing(self, company, report, options):
|
|
if options['fiscal_position'] == 'domestic':
|
|
fpos = self.env['account.fiscal.position']
|
|
elif options['fiscal_position'] == 'all':
|
|
fpos = self.env['account.fiscal.position'].search([
|
|
*self.env['account.fiscal.position']._check_company_domain(company),
|
|
('foreign_vat', '!=', False),
|
|
])
|
|
else:
|
|
fpos = self.env['account.fiscal.position'].browse([options['fiscal_position']])
|
|
|
|
if options['fiscal_position'] == 'all':
|
|
fiscal_country = company.account_fiscal_country_id
|
|
include_dom = (
|
|
not fpos or not report.country_id
|
|
or fiscal_country == fpos[0].country_id
|
|
)
|
|
else:
|
|
include_dom = options['fiscal_position'] == 'domestic'
|
|
|
|
return include_dom, fpos
|
|
|
|
def _get_amls_with_archived_tags_domain(self, options):
|
|
domain = [
|
|
('tax_tag_ids.active', '=', False),
|
|
('parent_state', '=', 'posted'),
|
|
('date', '>=', options['date']['date_from']),
|
|
]
|
|
if options['date']['mode'] == 'single':
|
|
domain.append(('date', '<=', options['date']['date_to']))
|
|
return domain
|
|
|
|
def action_open_amls_with_archived_tags(self, options, params=None):
|
|
return {
|
|
'name': _("Journal items with archived tax tags"),
|
|
'type': 'ir.actions.act_window',
|
|
'res_model': 'account.move.line',
|
|
'domain': self._get_amls_with_archived_tags_domain(options),
|
|
'context': {'active_test': False},
|
|
'views': [(self.env.ref('fusion_accounting.view_archived_tag_move_tree').id, 'list')],
|
|
}
|
|
|
|
|
|
class FusionGenericTaxReportHandler(models.AbstractModel):
|
|
"""Handler for the standard generic tax report (Tax -> Tax grouping)."""
|
|
|
|
_name = 'account.generic.tax.report.handler'
|
|
_inherit = 'account.tax.report.handler'
|
|
_description = 'Generic Tax Report Custom Handler'
|
|
|
|
def _get_custom_display_config(self):
|
|
cfg = super()._get_custom_display_config()
|
|
cfg['css_custom_class'] = 'generic_tax_report'
|
|
cfg['templates']['AccountReportLineName'] = 'fusion_accounting.TaxReportLineName'
|
|
return cfg
|
|
|
|
def _custom_options_initializer(self, report, options, previous_options=None):
|
|
super()._custom_options_initializer(report, options, previous_options=previous_options)
|
|
if (
|
|
not report.country_id
|
|
and len(options['available_vat_fiscal_positions']) <= (0 if options['allow_domestic'] else 1)
|
|
and len(options['companies']) <= 1
|
|
):
|
|
options['allow_domestic'] = False
|
|
options['fiscal_position'] = 'all'
|
|
|
|
def _dynamic_lines_generator(self, report, options, all_column_groups_expression_totals, warnings=None):
|
|
return self._get_dynamic_lines(report, options, 'default', warnings)
|
|
|
|
def _caret_options_initializer(self):
|
|
return {
|
|
'generic_tax_report': [
|
|
{'name': _("Audit"), 'action': 'caret_option_audit_tax'},
|
|
],
|
|
}
|
|
|
|
def _get_dynamic_lines(self, report, options, grouping, warnings=None):
|
|
opts_per_cg = report._split_options_per_column_group(options)
|
|
|
|
if grouping == 'tax_account':
|
|
gb_fields = [('src_tax', 'type_tax_use'), ('src_tax', 'id'), ('account', 'id')]
|
|
comodel_list = [None, 'account.tax', 'account.account']
|
|
elif grouping == 'account_tax':
|
|
gb_fields = [('src_tax', 'type_tax_use'), ('account', 'id'), ('src_tax', 'id')]
|
|
comodel_list = [None, 'account.account', 'account.tax']
|
|
else:
|
|
gb_fields = [('src_tax', 'type_tax_use'), ('src_tax', 'id')]
|
|
comodel_list = [None, 'account.tax']
|
|
|
|
if grouping in ('tax_account', 'account_tax'):
|
|
amount_tree = self._read_generic_tax_report_amounts(report, opts_per_cg, gb_fields)
|
|
else:
|
|
amount_tree = self._read_generic_tax_report_amounts_no_tax_details(report, options, opts_per_cg)
|
|
|
|
id_sets = [set() for _ in gb_fields]
|
|
|
|
def _collect_ids(node, depth=0):
|
|
for k, v in node.items():
|
|
if k:
|
|
id_sets[depth].add(k)
|
|
if v.get('children'):
|
|
_collect_ids(v['children'], depth + 1)
|
|
|
|
_collect_ids(amount_tree)
|
|
|
|
sort_maps = []
|
|
for i, cm in enumerate(comodel_list):
|
|
if cm:
|
|
recs = self.env[cm].with_context(active_test=False).search([('id', 'in', tuple(id_sets[i]))])
|
|
sort_maps.append({r.id: (r, j) for j, r in enumerate(recs)})
|
|
else:
|
|
sel = self.env['account.tax']._fields['type_tax_use'].selection
|
|
sort_maps.append({v[0]: (v, j) for j, v in enumerate(sel) if v[0] in id_sets[i]})
|
|
|
|
output = []
|
|
self._populate_lines_recursively(report, options, output, sort_maps, gb_fields, amount_tree, warnings=warnings)
|
|
return output
|
|
|
|
# ================================================================
|
|
# AMOUNT COMPUTATION
|
|
# ================================================================
|
|
|
|
@api.model
|
|
def _read_generic_tax_report_amounts_no_tax_details(self, report, options, opts_per_cg):
|
|
co_ids = report.get_report_company_ids(options)
|
|
co_domain = self.env['account.tax']._check_company_domain(co_ids)
|
|
co_where = self.env['account.tax'].with_context(active_test=False)._where_calc(co_domain)
|
|
self.env.cr.execute(SQL('''
|
|
SELECT account_tax.id, account_tax.type_tax_use,
|
|
ARRAY_AGG(child_tax.id) AS child_tax_ids,
|
|
ARRAY_AGG(DISTINCT child_tax.type_tax_use) AS child_types
|
|
FROM account_tax_filiation_rel account_tax_rel
|
|
JOIN account_tax ON account_tax.id = account_tax_rel.parent_tax
|
|
JOIN account_tax child_tax ON child_tax.id = account_tax_rel.child_tax
|
|
WHERE account_tax.amount_type = 'group' AND %s
|
|
GROUP BY account_tax.id
|
|
''', co_where.where_clause or SQL("TRUE")))
|
|
|
|
group_info = {}
|
|
child_map = {}
|
|
for row in self.env.cr.dictfetchall():
|
|
row['to_expand'] = row['child_types'] != ['none']
|
|
group_info[row['id']] = row
|
|
for cid in row['child_tax_ids']:
|
|
child_map[cid] = row['id']
|
|
|
|
results = defaultdict(lambda: {
|
|
'base_amount': {cg: 0.0 for cg in options['column_groups']},
|
|
'tax_amount': {cg: 0.0 for cg in options['column_groups']},
|
|
'tax_non_deductible': {cg: 0.0 for cg in options['column_groups']},
|
|
'tax_deductible': {cg: 0.0 for cg in options['column_groups']},
|
|
'tax_due': {cg: 0.0 for cg in options['column_groups']},
|
|
'children': defaultdict(lambda: {
|
|
'base_amount': {cg: 0.0 for cg in options['column_groups']},
|
|
'tax_amount': {cg: 0.0 for cg in options['column_groups']},
|
|
'tax_non_deductible': {cg: 0.0 for cg in options['column_groups']},
|
|
'tax_deductible': {cg: 0.0 for cg in options['column_groups']},
|
|
'tax_due': {cg: 0.0 for cg in options['column_groups']},
|
|
}),
|
|
})
|
|
|
|
for cg_key, cg_opts in opts_per_cg.items():
|
|
qry = report._get_report_query(cg_opts, 'strict_range')
|
|
|
|
# Base amounts
|
|
self.env.cr.execute(SQL('''
|
|
SELECT tax.id AS tax_id, tax.type_tax_use AS tax_type_tax_use,
|
|
src_group_tax.id AS src_group_tax_id, src_group_tax.type_tax_use AS src_group_tax_type_tax_use,
|
|
src_tax.id AS src_tax_id, src_tax.type_tax_use AS src_tax_type_tax_use,
|
|
SUM(account_move_line.balance) AS base_amount
|
|
FROM %(tbl)s
|
|
JOIN account_move_line_account_tax_rel tax_rel ON account_move_line.id = tax_rel.account_move_line_id
|
|
JOIN account_tax tax ON tax.id = tax_rel.account_tax_id
|
|
LEFT JOIN account_tax src_tax ON src_tax.id = account_move_line.tax_line_id
|
|
LEFT JOIN account_tax src_group_tax ON src_group_tax.id = account_move_line.group_tax_id
|
|
WHERE %(where)s
|
|
AND (account_move_line__move_id.always_tax_exigible OR account_move_line__move_id.tax_cash_basis_rec_id IS NOT NULL OR tax.tax_exigibility != 'on_payment')
|
|
AND (
|
|
(account_move_line.tax_line_id IS NOT NULL AND (src_tax.type_tax_use IN ('sale','purchase') OR src_group_tax.type_tax_use IN ('sale','purchase')))
|
|
OR (account_move_line.tax_line_id IS NULL AND tax.type_tax_use IN ('sale','purchase'))
|
|
)
|
|
GROUP BY tax.id, src_group_tax.id, src_tax.id
|
|
ORDER BY src_group_tax.sequence, src_group_tax.id, src_tax.sequence, src_tax.id, tax.sequence, tax.id
|
|
''', tbl=qry.from_clause, where=qry.where_clause))
|
|
|
|
groups_with_extra = set()
|
|
for r in self.env.cr.dictfetchall():
|
|
is_tax_ln = bool(r['src_tax_id'])
|
|
if is_tax_ln:
|
|
if r['src_group_tax_id'] and not group_info.get(r['src_group_tax_id'], {}).get('to_expand') and r['tax_id'] in group_info.get(r['src_group_tax_id'], {}).get('child_tax_ids', []):
|
|
pass
|
|
elif r['tax_type_tax_use'] == 'none' and child_map.get(r['tax_id']):
|
|
gid = child_map[r['tax_id']]
|
|
if gid not in groups_with_extra:
|
|
gi = group_info[gid]
|
|
results[gi['type_tax_use']]['children'][gid]['base_amount'][cg_key] += r['base_amount']
|
|
groups_with_extra.add(gid)
|
|
else:
|
|
ttu = r['src_group_tax_type_tax_use'] or r['src_tax_type_tax_use']
|
|
results[ttu]['children'][r['tax_id']]['base_amount'][cg_key] += r['base_amount']
|
|
else:
|
|
if r['tax_id'] in group_info and group_info[r['tax_id']]['to_expand']:
|
|
gi = group_info[r['tax_id']]
|
|
for child_id in gi['child_tax_ids']:
|
|
results[gi['type_tax_use']]['children'][child_id]['base_amount'][cg_key] += r['base_amount']
|
|
else:
|
|
results[r['tax_type_tax_use']]['children'][r['tax_id']]['base_amount'][cg_key] += r['base_amount']
|
|
|
|
# Tax amounts
|
|
sel_ded = join_ded = gb_ded = SQL()
|
|
if cg_opts.get('account_journal_report_tax_deductibility_columns'):
|
|
sel_ded = SQL(", repartition.use_in_tax_closing AS trl_tax_closing, SIGN(repartition.factor_percent) AS trl_factor")
|
|
join_ded = SQL("JOIN account_tax_repartition_line repartition ON account_move_line.tax_repartition_line_id = repartition.id")
|
|
gb_ded = SQL(', repartition.use_in_tax_closing, SIGN(repartition.factor_percent)')
|
|
|
|
self.env.cr.execute(SQL('''
|
|
SELECT tax.id AS tax_id, tax.type_tax_use AS tax_type_tax_use,
|
|
group_tax.id AS group_tax_id, group_tax.type_tax_use AS group_tax_type_tax_use,
|
|
SUM(account_move_line.balance) AS tax_amount %(sel_ded)s
|
|
FROM %(tbl)s
|
|
JOIN account_tax tax ON tax.id = account_move_line.tax_line_id
|
|
%(join_ded)s
|
|
LEFT JOIN account_tax group_tax ON group_tax.id = account_move_line.group_tax_id
|
|
WHERE %(where)s
|
|
AND (account_move_line__move_id.always_tax_exigible OR account_move_line__move_id.tax_cash_basis_rec_id IS NOT NULL OR tax.tax_exigibility != 'on_payment')
|
|
AND ((group_tax.id IS NULL AND tax.type_tax_use IN ('sale','purchase')) OR (group_tax.id IS NOT NULL AND group_tax.type_tax_use IN ('sale','purchase')))
|
|
GROUP BY tax.id, group_tax.id %(gb_ded)s
|
|
''', sel_ded=sel_ded, tbl=qry.from_clause, join_ded=join_ded, where=qry.where_clause, gb_ded=gb_ded))
|
|
|
|
for r in self.env.cr.dictfetchall():
|
|
tid = r['tax_id']
|
|
if r['group_tax_id']:
|
|
ttu = r['group_tax_type_tax_use']
|
|
if not group_info.get(r['group_tax_id'], {}).get('to_expand'):
|
|
tid = r['group_tax_id']
|
|
else:
|
|
ttu = r['group_tax_type_tax_use'] or r['tax_type_tax_use']
|
|
|
|
results[ttu]['tax_amount'][cg_key] += r['tax_amount']
|
|
results[ttu]['children'][tid]['tax_amount'][cg_key] += r['tax_amount']
|
|
|
|
if cg_opts.get('account_journal_report_tax_deductibility_columns'):
|
|
detail_label = False
|
|
if r['trl_factor'] > 0 and ttu == 'purchase':
|
|
detail_label = 'tax_deductible' if r['trl_tax_closing'] else 'tax_non_deductible'
|
|
elif r['trl_tax_closing'] and (r['trl_factor'] > 0, ttu) in ((False, 'purchase'), (True, 'sale')):
|
|
detail_label = 'tax_due'
|
|
if detail_label:
|
|
results[ttu][detail_label][cg_key] += r['tax_amount'] * r['trl_factor']
|
|
results[ttu]['children'][tid][detail_label][cg_key] += r['tax_amount'] * r['trl_factor']
|
|
|
|
return results
|
|
|
|
def _read_generic_tax_report_amounts(self, report, opts_per_cg, gb_fields):
|
|
needs_group = False
|
|
select_parts, gb_parts = [], []
|
|
for alias, fld in gb_fields:
|
|
select_parts.append(SQL("%s AS %s", SQL.identifier(alias, fld), SQL.identifier(f'{alias}_{fld}')))
|
|
gb_parts.append(SQL.identifier(alias, fld))
|
|
if alias == 'src_tax':
|
|
select_parts.append(SQL("%s AS %s", SQL.identifier('tax', fld), SQL.identifier(f'tax_{fld}')))
|
|
gb_parts.append(SQL.identifier('tax', fld))
|
|
needs_group = True
|
|
|
|
expand_set = set()
|
|
if needs_group:
|
|
groups = self.env['account.tax'].with_context(active_test=False).search([('amount_type', '=', 'group')])
|
|
for g in groups:
|
|
if set(g.children_tax_ids.mapped('type_tax_use')) != {'none'}:
|
|
expand_set.add(g.id)
|
|
|
|
tree = {}
|
|
for cg_key, cg_opts in opts_per_cg.items():
|
|
qry = report._get_report_query(cg_opts, 'strict_range')
|
|
td_qry = self.env['account.move.line']._get_query_tax_details(qry.from_clause, qry.where_clause)
|
|
seen_keys = set()
|
|
|
|
self.env.cr.execute(SQL('''
|
|
SELECT %(sel)s, trl.document_type = 'refund' AS is_refund,
|
|
SUM(CASE WHEN tdr.display_type = 'rounding' THEN 0 ELSE tdr.base_amount END) AS base_amount,
|
|
SUM(tdr.tax_amount) AS tax_amount
|
|
FROM (%(td)s) AS tdr
|
|
JOIN account_tax_repartition_line trl ON trl.id = tdr.tax_repartition_line_id
|
|
JOIN account_tax tax ON tax.id = tdr.tax_id
|
|
JOIN account_tax src_tax ON src_tax.id = COALESCE(tdr.group_tax_id, tdr.tax_id) AND src_tax.type_tax_use IN ('sale','purchase')
|
|
JOIN account_account account ON account.id = tdr.base_account_id
|
|
WHERE tdr.tax_exigible
|
|
GROUP BY tdr.tax_repartition_line_id, trl.document_type, %(gb)s
|
|
ORDER BY src_tax.sequence, src_tax.id, tax.sequence, tax.id
|
|
''', sel=SQL(',').join(select_parts), td=td_qry, gb=SQL(',').join(gb_parts)))
|
|
|
|
for row in self.env.cr.dictfetchall():
|
|
node = tree
|
|
cum_key = [row['is_refund']]
|
|
for alias, fld in gb_fields:
|
|
gk = f'{alias}_{fld}'
|
|
if gk == 'src_tax_id' and row['src_tax_id'] in expand_set:
|
|
cum_key.append(row[gk])
|
|
gk = 'tax_id'
|
|
rk = row[gk]
|
|
cum_key.append(rk)
|
|
ck_tuple = tuple(cum_key)
|
|
node.setdefault(rk, {
|
|
'base_amount': {k: 0.0 for k in cg_opts['column_groups']},
|
|
'tax_amount': {k: 0.0 for k in cg_opts['column_groups']},
|
|
'children': {},
|
|
})
|
|
sub = node[rk]
|
|
if ck_tuple not in seen_keys:
|
|
sub['base_amount'][cg_key] += row['base_amount']
|
|
sub['tax_amount'][cg_key] += row['tax_amount']
|
|
node = sub['children']
|
|
seen_keys.add(ck_tuple)
|
|
return tree
|
|
|
|
def _populate_lines_recursively(
|
|
self, report, options, lines, sort_maps, gb_fields, node,
|
|
index=0, type_tax_use=None, parent_line_id=None, warnings=None,
|
|
):
|
|
if index == len(gb_fields):
|
|
return
|
|
alias, fld = gb_fields[index]
|
|
gk = f'{alias}_{fld}'
|
|
smap = sort_maps[index]
|
|
sorted_keys = sorted(node.keys(), key=lambda k: smap[k][1])
|
|
|
|
for key in sorted_keys:
|
|
if gk == 'src_tax_type_tax_use':
|
|
type_tax_use = key
|
|
sign = -1 if type_tax_use == 'sale' else 1
|
|
|
|
amounts = node[key]
|
|
cols = []
|
|
for col in options['columns']:
|
|
el = col.get('expression_label')
|
|
if el == 'net':
|
|
cv = sign * amounts['base_amount'][col['column_group_key']] if index == len(gb_fields) - 1 else ''
|
|
if el == 'tax':
|
|
cv = sign * amounts['tax_amount'][col['column_group_key']]
|
|
cols.append(report._build_column_dict(cv, col, options=options))
|
|
|
|
if el == 'tax' and options.get('account_journal_report_tax_deductibility_columns'):
|
|
for dt in ('tax_non_deductible', 'tax_deductible', 'tax_due'):
|
|
cols.append(report._build_column_dict(
|
|
col_value=sign * amounts[dt][col['column_group_key']],
|
|
col_data={'figure_type': 'monetary', 'column_group_key': col['column_group_key'], 'expression_label': dt},
|
|
options=options,
|
|
))
|
|
|
|
defaults = {'columns': cols, 'level': index if index == 0 else index + 1, 'unfoldable': False}
|
|
rpt_line = self._build_report_line(report, options, defaults, gk, smap[key][0], parent_line_id, warnings)
|
|
|
|
if gk == 'src_tax_id':
|
|
rpt_line['caret_options'] = 'generic_tax_report'
|
|
|
|
lines.append((0, rpt_line))
|
|
self._populate_lines_recursively(
|
|
report, options, lines, sort_maps, gb_fields,
|
|
amounts.get('children'), index=index + 1,
|
|
type_tax_use=type_tax_use, parent_line_id=rpt_line['id'],
|
|
warnings=warnings,
|
|
)
|
|
|
|
def _build_report_line(self, report, options, defaults, gk, value, parent_id, warnings=None):
|
|
ln = dict(defaults)
|
|
if parent_id is not None:
|
|
ln['parent_id'] = parent_id
|
|
|
|
if gk == 'src_tax_type_tax_use':
|
|
ln['id'] = report._get_generic_line_id(None, None, markup=value[0], parent_line_id=parent_id)
|
|
ln['name'] = value[1]
|
|
elif gk == 'src_tax_id':
|
|
tax = value
|
|
ln['id'] = report._get_generic_line_id(tax._name, tax.id, parent_line_id=parent_id)
|
|
if tax.amount_type == 'percent':
|
|
ln['name'] = f"{tax.name} ({tax.amount}%)"
|
|
if warnings is not None:
|
|
self._check_line_consistency(report, options, ln, tax, warnings)
|
|
elif tax.amount_type == 'fixed':
|
|
ln['name'] = f"{tax.name} ({tax.amount})"
|
|
else:
|
|
ln['name'] = tax.name
|
|
if options.get('multi-company'):
|
|
ln['name'] = f"{ln['name']} - {tax.company_id.display_name}"
|
|
elif gk == 'account_id':
|
|
acct = value
|
|
ln['id'] = report._get_generic_line_id(acct._name, acct.id, parent_line_id=parent_id)
|
|
ln['name'] = f"{acct.display_name} - {acct.company_id.display_name}" if options.get('multi-company') else acct.display_name
|
|
return ln
|
|
|
|
def _check_line_consistency(self, report, options, ln, tax, warnings=None):
|
|
eff_rate = tax.amount * sum(
|
|
tax.invoice_repartition_line_ids.filtered(
|
|
lambda r: r.repartition_type == 'tax'
|
|
).mapped('factor')
|
|
) / 100
|
|
for cg_key, cg_opts in report._split_options_per_column_group(options).items():
|
|
net = next((c['no_format'] for c in ln['columns'] if c['column_group_key'] == cg_key and c['expression_label'] == 'net'), 0)
|
|
tax_val = next((c['no_format'] for c in ln['columns'] if c['column_group_key'] == cg_key and c['expression_label'] == 'tax'), 0)
|
|
if net == '':
|
|
continue
|
|
cur = self.env.company.currency_id
|
|
expected = float(net or 0) * eff_rate
|
|
if cur.compare_amounts(expected, tax_val):
|
|
err = abs(abs(tax_val) - abs(expected)) / float(net or 1)
|
|
if err > 0.001:
|
|
ln['alert'] = True
|
|
warnings['fusion_accounting.tax_report_warning_lines_consistency'] = {'alert_type': 'danger'}
|
|
return
|
|
|
|
# ================================================================
|
|
# CARET / AUDIT
|
|
# ================================================================
|
|
|
|
def caret_option_audit_tax(self, options, params):
|
|
report = self.env['account.report'].browse(options['report_id'])
|
|
mdl, tax_id = report._get_model_info_from_id(params['line_id'])
|
|
if mdl != 'account.tax':
|
|
raise UserError(_("Cannot audit tax from a non-tax model."))
|
|
|
|
tax = self.env['account.tax'].browse(tax_id)
|
|
if tax.amount_type == 'group':
|
|
affect_domain = [('tax_ids', 'in', tax.children_tax_ids.ids), ('tax_repartition_line_id', '!=', False)]
|
|
else:
|
|
affect_domain = [('tax_ids', '=', tax.id), ('tax_ids.type_tax_use', '=', tax.type_tax_use), ('tax_repartition_line_id', '!=', False)]
|
|
|
|
domain = report._get_options_domain(options, 'strict_range') + expression.OR((
|
|
[('tax_ids', 'in', tax.ids), ('tax_ids.type_tax_use', '=', tax.type_tax_use), ('tax_repartition_line_id', '=', False)],
|
|
[('group_tax_id', '=', tax.id) if tax.amount_type == 'group' else ('tax_line_id', '=', tax.id)],
|
|
affect_domain,
|
|
))
|
|
return {
|
|
'type': 'ir.actions.act_window',
|
|
'name': _('Journal Items for Tax Audit'),
|
|
'res_model': 'account.move.line',
|
|
'views': [[self.env.ref('account.view_move_line_tax_audit_tree').id, 'list']],
|
|
'domain': domain,
|
|
'context': {**self.env.context, 'search_default_group_by_account': 2, 'expand': 1},
|
|
}
|
|
|
|
|
|
class FusionGenericTaxReportHandlerAT(models.AbstractModel):
|
|
_name = 'account.generic.tax.report.handler.account.tax'
|
|
_inherit = 'account.generic.tax.report.handler'
|
|
_description = 'Generic Tax Report Custom Handler (Account -> Tax)'
|
|
|
|
def _dynamic_lines_generator(self, report, options, all_column_groups_expression_totals, warnings=None):
|
|
return super()._get_dynamic_lines(report, options, 'account_tax', warnings)
|
|
|
|
|
|
class FusionGenericTaxReportHandlerTA(models.AbstractModel):
|
|
_name = 'account.generic.tax.report.handler.tax.account'
|
|
_inherit = 'account.generic.tax.report.handler'
|
|
_description = 'Generic Tax Report Custom Handler (Tax -> Account)'
|
|
|
|
def _dynamic_lines_generator(self, report, options, all_column_groups_expression_totals, warnings=None):
|
|
return super()._get_dynamic_lines(report, options, 'tax_account', warnings)
|