Files
Odoo-Modules/Fusion Accounting/models/account_analytic_report.py
2026-02-22 01:22:18 -05:00

342 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Fusion Accounting - Analytic Group By for Financial Reports
# Enables analytic plan / account column grouping on account reports
from odoo import models, fields, api, osv
from odoo.addons.web.controllers.utils import clean_action
from odoo.tools import SQL, Query
class FusionReportAnalyticGroupby(models.AbstractModel):
"""Extends the accounting report engine to support grouping by
analytic accounts or plans via shadow-table substitution."""
_inherit = 'account.report'
filter_analytic_groupby = fields.Boolean(
string="Analytic Group By",
compute=lambda self: self._compute_report_option_filter('filter_analytic_groupby'),
readonly=False,
store=True,
depends=['root_report_id'],
)
# ------------------------------------------------------------------
# Initialization sequencing
# ------------------------------------------------------------------
def _get_options_initializers_forced_sequence_map(self):
"""Insert the analytic-groupby initializer between column-header
creation and final column building (sequence 995)."""
seq = super()._get_options_initializers_forced_sequence_map()
seq[self._init_options_analytic_groupby] = 995
return seq
# ------------------------------------------------------------------
# Option initializer
# ------------------------------------------------------------------
def _init_options_analytic_groupby(self, options, previous_options):
"""Populate analytic groupby filters in *options* when the report
advertises ``filter_analytic_groupby`` and the user has the
analytic-accounting group."""
if not self.filter_analytic_groupby:
return
has_analytic_perm = self.env.user.has_group('analytic.group_analytic_accounting')
if not has_analytic_perm:
return
options['display_analytic_groupby'] = True
options['display_analytic_plan_groupby'] = True
# --- analytic-without-aml toggle ---
options['include_analytic_without_aml'] = previous_options.get('include_analytic_without_aml', False)
# --- analytic accounts ---
prev_account_ids = [int(v) for v in previous_options.get('analytic_accounts_groupby', [])]
chosen_accounts = (
self.env['account.analytic.account']
.with_context(active_test=False)
.search([('id', 'in', prev_account_ids)])
)
options['analytic_accounts_groupby'] = chosen_accounts.ids
options['selected_analytic_account_groupby_names'] = chosen_accounts.mapped('name')
# --- analytic plans ---
prev_plan_ids = [int(v) for v in previous_options.get('analytic_plans_groupby', [])]
chosen_plans = self.env['account.analytic.plan'].search([('id', 'in', prev_plan_ids)])
options['analytic_plans_groupby'] = chosen_plans.ids
options['selected_analytic_plan_groupby_names'] = chosen_plans.mapped('name')
self._build_analytic_column_headers(options)
# ------------------------------------------------------------------
# Readonly-query interaction
# ------------------------------------------------------------------
def _init_options_readonly_query(self, options, previous_options):
super()._init_options_readonly_query(options, previous_options)
# Analytic columns use a shadow table ⇒ disable readonly shortcut
options['readonly_query'] = (
options['readonly_query'] and not options.get('analytic_groupby_option')
)
# ------------------------------------------------------------------
# Column header generation
# ------------------------------------------------------------------
def _build_analytic_column_headers(self, options):
"""Create extra column headers for every selected analytic plan
or individual analytic account."""
extra_headers = []
# --- plans → accounts within each plan ---
plan_recs = self.env['account.analytic.plan'].browse(options.get('analytic_plans_groupby'))
for plan in plan_recs:
child_accounts = self.env['account.analytic.account'].search([
('plan_id', 'child_of', plan.id),
])
extra_headers.append({
'name': plan.name,
'forced_options': {
'analytic_groupby_option': True,
'analytic_accounts_list': tuple(child_accounts.ids),
},
})
# --- individual accounts ---
acct_recs = self.env['account.analytic.account'].browse(options.get('analytic_accounts_groupby'))
for acct in acct_recs:
extra_headers.append({
'name': acct.name,
'forced_options': {
'analytic_groupby_option': True,
'analytic_accounts_list': (acct.id,),
},
})
if not extra_headers:
return
budget_active = any(b for b in options.get('budgets', []) if b.get('selected'))
if budget_active:
# Place analytic headers next to budget headers on the same level
options['column_headers'][-1] = extra_headers + options['column_headers'][-1]
else:
# Append a new header tier for analytic columns + a blank for totals
extra_headers.append({'name': ''})
options['column_headers'] = [
*options['column_headers'],
extra_headers,
]
# ------------------------------------------------------------------
# Shadow-table preparation
# ------------------------------------------------------------------
@api.model
def _prepare_lines_for_analytic_groupby(self):
"""Build a temporary ``analytic_temp_account_move_line`` table that
mirrors the *account_move_line* schema but is populated from
*account_analytic_line*. Created once per SQL transaction."""
self.env.cr.execute(
"SELECT 1 FROM information_schema.tables "
"WHERE table_name = 'analytic_temp_account_move_line'"
)
if self.env.cr.fetchone():
return # already prepared in this transaction
root_plan, additional_plans = self.env['account.analytic.plan']._get_all_plans()
all_plans = root_plan + additional_plans
analytic_col_refs = SQL(", ").join(
SQL('"account_analytic_line".%s', SQL.identifier(p._column_name()))
for p in all_plans
)
distribution_expr = SQL(
'to_jsonb(UNNEST(ARRAY[%s]))', analytic_col_refs,
)
field_mapping = {
'id': SQL("account_analytic_line.id"),
'balance': SQL("-amount"),
'display_type': 'product',
'parent_state': 'posted',
'account_id': SQL.identifier("general_account_id"),
'debit': SQL("CASE WHEN amount < 0 THEN -amount ELSE 0 END"),
'credit': SQL("CASE WHEN amount > 0 THEN amount ELSE 0 END"),
'analytic_distribution': distribution_expr,
}
# Fill in the remaining stored fields with values from the linked AML
aml_fields_meta = self.env['account.move.line'].fields_get()
persisted_fields = {
fname
for fname, meta in aml_fields_meta.items()
if meta['type'] not in ('many2many', 'one2many') and meta.get('store')
}
for fname in persisted_fields:
if fname not in field_mapping:
field_mapping[fname] = SQL('"account_move_line".%s', SQL.identifier(fname))
col_names_sql, val_exprs_sql = (
self.env['account.move.line']._prepare_aml_shadowing_for_report(field_mapping)
)
shadow_sql = SQL("""
-- Build temporary shadow table inheriting AML schema
CREATE TEMPORARY TABLE IF NOT EXISTS analytic_temp_account_move_line ()
INHERITS (account_move_line) ON COMMIT DROP;
ALTER TABLE analytic_temp_account_move_line NO INHERIT account_move_line;
ALTER TABLE analytic_temp_account_move_line
DROP CONSTRAINT IF EXISTS account_move_line_check_amount_currency_balance_sign;
ALTER TABLE analytic_temp_account_move_line ALTER COLUMN move_id DROP NOT NULL;
ALTER TABLE analytic_temp_account_move_line ALTER COLUMN currency_id DROP NOT NULL;
INSERT INTO analytic_temp_account_move_line (%(col_names)s)
SELECT %(val_exprs)s
FROM account_analytic_line
LEFT JOIN account_move_line
ON account_analytic_line.move_line_id = account_move_line.id
WHERE account_analytic_line.general_account_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS analytic_temp_aml__composite_idx
ON analytic_temp_account_move_line (analytic_distribution, journal_id, date, company_id);
ANALYZE analytic_temp_account_move_line;
""", col_names=col_names_sql, val_exprs=val_exprs_sql)
self.env.cr.execute(shadow_sql)
# ------------------------------------------------------------------
# Query overrides
# ------------------------------------------------------------------
def _get_report_query(self, options, date_scope, domain=None) -> Query:
"""When analytic-groupby columns are active, inject the context
flag that causes `_where_calc` to swap the AML table."""
ctx_self = self.with_context(
account_report_analytic_groupby=options.get('analytic_groupby_option'),
)
query = super(FusionReportAnalyticGroupby, ctx_self)._get_report_query(options, date_scope, domain)
if options.get('analytic_accounts'):
if 'analytic_accounts_list' in options:
# Shadow table stores bare integer ids in analytic_distribution
acct_str_ids = tuple(str(aid) for aid in options['analytic_accounts'])
query.add_where(SQL(
"account_move_line.analytic_distribution IN %s",
acct_str_ids,
))
else:
# Real AML table JSON distribution with percentages
acct_id_list = [[str(aid) for aid in options['analytic_accounts']]]
query.add_where(SQL(
'%s && %s',
acct_id_list,
self.env['account.move.line']._query_analytic_accounts(),
))
return query
# ------------------------------------------------------------------
# Audit action
# ------------------------------------------------------------------
def action_audit_cell(self, options, params):
"""Redirect the audit action to analytic lines when the column
being audited belongs to an analytic-groupby column group."""
col_opts = self._get_column_group_options(options, params['column_group_key'])
if not col_opts.get('analytic_groupby_option'):
return super().action_audit_cell(options, params)
# Translate AML domain → analytic line domain
rpt_line = self.env['account.report.line'].browse(params['report_line_id'])
expr = rpt_line.expression_ids.filtered(lambda e: e.label == params['expression_label'])
raw_domain = self._get_audit_line_domain(col_opts, expr, params)
AnalyticLine = self.env['account.analytic.line']
converted_domain = []
for leaf in raw_domain:
if len(leaf) == 1:
converted_domain.append(leaf)
continue
fld, op, val = leaf
root_field = fld.split('.')[0]
if root_field == 'account_id':
converted_domain.append((fld.replace('account_id', 'general_account_id'), op, val))
elif fld == 'analytic_distribution':
converted_domain.append(('auto_account_id', 'in', val))
elif root_field not in AnalyticLine._fields:
expr_leaf = [(f'move_line_id.{fld}', op, val)]
if options.get('include_analytic_without_aml'):
expr_leaf = osv.expression.OR([
[('move_line_id', '=', False)],
expr_leaf,
])
converted_domain.extend(expr_leaf)
else:
converted_domain.append(leaf)
act = clean_action(
self.env.ref('analytic.account_analytic_line_action_entries')._get_action_dict(),
env=self.env,
)
act['domain'] = converted_domain
return act
# ------------------------------------------------------------------
# Journal domain
# ------------------------------------------------------------------
@api.model
def _get_options_journals_domain(self, options):
"""Allow journal-less lines when analytic lines without a parent
move line are included."""
base_domain = super()._get_options_journals_domain(options)
if options.get('include_analytic_without_aml'):
base_domain = osv.expression.OR([
base_domain,
[('journal_id', '=', False)],
])
return base_domain
# ------------------------------------------------------------------
# Options domain
# ------------------------------------------------------------------
def _get_options_domain(self, options, date_scope):
self.ensure_one()
base = super()._get_options_domain(options, date_scope)
acct_filter = options.get('analytic_accounts_list')
if acct_filter:
base = osv.expression.AND([
base,
[('analytic_distribution', 'in', list(acct_filter))],
])
return base
class FusionAMLAnalyticShadow(models.Model):
"""Hooks into `_where_calc` to swap the AML table for the analytic
shadow table when the report context flag is set."""
_inherit = "account.move.line"
def _where_calc(self, domain, active_test=True):
"""Replace the base ``account_move_line`` table reference with the
``analytic_temp_account_move_line`` shadow table whenever the
``account_report_analytic_groupby`` context key is truthy, unless
a cash-basis report is active (which already replaces the table)."""
qry = super()._where_calc(domain, active_test)
ctx = self.env.context
if ctx.get('account_report_analytic_groupby') and not ctx.get('account_report_cash_basis'):
self.env['account.report']._prepare_lines_for_analytic_groupby()
qry._tables['account_move_line'] = SQL.identifier('analytic_temp_account_move_line')
return qry