342 lines
15 KiB
Python
342 lines
15 KiB
Python
# Fusion Accounting - Analytic Group By for Financial Reports
|
||
# Enables analytic plan / account column grouping on account reports
|
||
|
||
from odoo import models, fields, api, osv
|
||
from odoo.addons.web.controllers.utils import clean_action
|
||
from odoo.tools import SQL, Query
|
||
|
||
|
||
class FusionReportAnalyticGroupby(models.AbstractModel):
|
||
"""Extends the accounting report engine to support grouping by
|
||
analytic accounts or plans via shadow-table substitution."""
|
||
|
||
_inherit = 'account.report'
|
||
|
||
filter_analytic_groupby = fields.Boolean(
|
||
string="Analytic Group By",
|
||
compute=lambda self: self._compute_report_option_filter('filter_analytic_groupby'),
|
||
readonly=False,
|
||
store=True,
|
||
depends=['root_report_id'],
|
||
)
|
||
|
||
# ------------------------------------------------------------------
|
||
# Initialization sequencing
|
||
# ------------------------------------------------------------------
|
||
|
||
def _get_options_initializers_forced_sequence_map(self):
|
||
"""Insert the analytic-groupby initializer between column-header
|
||
creation and final column building (sequence 995)."""
|
||
seq = super()._get_options_initializers_forced_sequence_map()
|
||
seq[self._init_options_analytic_groupby] = 995
|
||
return seq
|
||
|
||
# ------------------------------------------------------------------
|
||
# Option initializer
|
||
# ------------------------------------------------------------------
|
||
|
||
def _init_options_analytic_groupby(self, options, previous_options):
|
||
"""Populate analytic groupby filters in *options* when the report
|
||
advertises ``filter_analytic_groupby`` and the user has the
|
||
analytic-accounting group."""
|
||
if not self.filter_analytic_groupby:
|
||
return
|
||
|
||
has_analytic_perm = self.env.user.has_group('analytic.group_analytic_accounting')
|
||
if not has_analytic_perm:
|
||
return
|
||
|
||
options['display_analytic_groupby'] = True
|
||
options['display_analytic_plan_groupby'] = True
|
||
|
||
# --- analytic-without-aml toggle ---
|
||
options['include_analytic_without_aml'] = previous_options.get('include_analytic_without_aml', False)
|
||
|
||
# --- analytic accounts ---
|
||
prev_account_ids = [int(v) for v in previous_options.get('analytic_accounts_groupby', [])]
|
||
chosen_accounts = (
|
||
self.env['account.analytic.account']
|
||
.with_context(active_test=False)
|
||
.search([('id', 'in', prev_account_ids)])
|
||
)
|
||
options['analytic_accounts_groupby'] = chosen_accounts.ids
|
||
options['selected_analytic_account_groupby_names'] = chosen_accounts.mapped('name')
|
||
|
||
# --- analytic plans ---
|
||
prev_plan_ids = [int(v) for v in previous_options.get('analytic_plans_groupby', [])]
|
||
chosen_plans = self.env['account.analytic.plan'].search([('id', 'in', prev_plan_ids)])
|
||
options['analytic_plans_groupby'] = chosen_plans.ids
|
||
options['selected_analytic_plan_groupby_names'] = chosen_plans.mapped('name')
|
||
|
||
self._build_analytic_column_headers(options)
|
||
|
||
# ------------------------------------------------------------------
|
||
# Readonly-query interaction
|
||
# ------------------------------------------------------------------
|
||
|
||
def _init_options_readonly_query(self, options, previous_options):
|
||
super()._init_options_readonly_query(options, previous_options)
|
||
# Analytic columns use a shadow table ⇒ disable readonly shortcut
|
||
options['readonly_query'] = (
|
||
options['readonly_query'] and not options.get('analytic_groupby_option')
|
||
)
|
||
|
||
# ------------------------------------------------------------------
|
||
# Column header generation
|
||
# ------------------------------------------------------------------
|
||
|
||
def _build_analytic_column_headers(self, options):
|
||
"""Create extra column headers for every selected analytic plan
|
||
or individual analytic account."""
|
||
extra_headers = []
|
||
|
||
# --- plans → accounts within each plan ---
|
||
plan_recs = self.env['account.analytic.plan'].browse(options.get('analytic_plans_groupby'))
|
||
for plan in plan_recs:
|
||
child_accounts = self.env['account.analytic.account'].search([
|
||
('plan_id', 'child_of', plan.id),
|
||
])
|
||
extra_headers.append({
|
||
'name': plan.name,
|
||
'forced_options': {
|
||
'analytic_groupby_option': True,
|
||
'analytic_accounts_list': tuple(child_accounts.ids),
|
||
},
|
||
})
|
||
|
||
# --- individual accounts ---
|
||
acct_recs = self.env['account.analytic.account'].browse(options.get('analytic_accounts_groupby'))
|
||
for acct in acct_recs:
|
||
extra_headers.append({
|
||
'name': acct.name,
|
||
'forced_options': {
|
||
'analytic_groupby_option': True,
|
||
'analytic_accounts_list': (acct.id,),
|
||
},
|
||
})
|
||
|
||
if not extra_headers:
|
||
return
|
||
|
||
budget_active = any(b for b in options.get('budgets', []) if b.get('selected'))
|
||
if budget_active:
|
||
# Place analytic headers next to budget headers on the same level
|
||
options['column_headers'][-1] = extra_headers + options['column_headers'][-1]
|
||
else:
|
||
# Append a new header tier for analytic columns + a blank for totals
|
||
extra_headers.append({'name': ''})
|
||
options['column_headers'] = [
|
||
*options['column_headers'],
|
||
extra_headers,
|
||
]
|
||
|
||
# ------------------------------------------------------------------
|
||
# Shadow-table preparation
|
||
# ------------------------------------------------------------------
|
||
|
||
@api.model
|
||
def _prepare_lines_for_analytic_groupby(self):
|
||
"""Build a temporary ``analytic_temp_account_move_line`` table that
|
||
mirrors the *account_move_line* schema but is populated from
|
||
*account_analytic_line*. Created once per SQL transaction."""
|
||
self.env.cr.execute(
|
||
"SELECT 1 FROM information_schema.tables "
|
||
"WHERE table_name = 'analytic_temp_account_move_line'"
|
||
)
|
||
if self.env.cr.fetchone():
|
||
return # already prepared in this transaction
|
||
|
||
root_plan, additional_plans = self.env['account.analytic.plan']._get_all_plans()
|
||
all_plans = root_plan + additional_plans
|
||
|
||
analytic_col_refs = SQL(", ").join(
|
||
SQL('"account_analytic_line".%s', SQL.identifier(p._column_name()))
|
||
for p in all_plans
|
||
)
|
||
distribution_expr = SQL(
|
||
'to_jsonb(UNNEST(ARRAY[%s]))', analytic_col_refs,
|
||
)
|
||
|
||
field_mapping = {
|
||
'id': SQL("account_analytic_line.id"),
|
||
'balance': SQL("-amount"),
|
||
'display_type': 'product',
|
||
'parent_state': 'posted',
|
||
'account_id': SQL.identifier("general_account_id"),
|
||
'debit': SQL("CASE WHEN amount < 0 THEN -amount ELSE 0 END"),
|
||
'credit': SQL("CASE WHEN amount > 0 THEN amount ELSE 0 END"),
|
||
'analytic_distribution': distribution_expr,
|
||
}
|
||
|
||
# Fill in the remaining stored fields with values from the linked AML
|
||
aml_fields_meta = self.env['account.move.line'].fields_get()
|
||
persisted_fields = {
|
||
fname
|
||
for fname, meta in aml_fields_meta.items()
|
||
if meta['type'] not in ('many2many', 'one2many') and meta.get('store')
|
||
}
|
||
for fname in persisted_fields:
|
||
if fname not in field_mapping:
|
||
field_mapping[fname] = SQL('"account_move_line".%s', SQL.identifier(fname))
|
||
|
||
col_names_sql, val_exprs_sql = (
|
||
self.env['account.move.line']._prepare_aml_shadowing_for_report(field_mapping)
|
||
)
|
||
|
||
shadow_sql = SQL("""
|
||
-- Build temporary shadow table inheriting AML schema
|
||
CREATE TEMPORARY TABLE IF NOT EXISTS analytic_temp_account_move_line ()
|
||
INHERITS (account_move_line) ON COMMIT DROP;
|
||
ALTER TABLE analytic_temp_account_move_line NO INHERIT account_move_line;
|
||
ALTER TABLE analytic_temp_account_move_line
|
||
DROP CONSTRAINT IF EXISTS account_move_line_check_amount_currency_balance_sign;
|
||
ALTER TABLE analytic_temp_account_move_line ALTER COLUMN move_id DROP NOT NULL;
|
||
ALTER TABLE analytic_temp_account_move_line ALTER COLUMN currency_id DROP NOT NULL;
|
||
|
||
INSERT INTO analytic_temp_account_move_line (%(col_names)s)
|
||
SELECT %(val_exprs)s
|
||
FROM account_analytic_line
|
||
LEFT JOIN account_move_line
|
||
ON account_analytic_line.move_line_id = account_move_line.id
|
||
WHERE account_analytic_line.general_account_id IS NOT NULL;
|
||
|
||
CREATE INDEX IF NOT EXISTS analytic_temp_aml__composite_idx
|
||
ON analytic_temp_account_move_line (analytic_distribution, journal_id, date, company_id);
|
||
|
||
ANALYZE analytic_temp_account_move_line;
|
||
""", col_names=col_names_sql, val_exprs=val_exprs_sql)
|
||
|
||
self.env.cr.execute(shadow_sql)
|
||
|
||
# ------------------------------------------------------------------
|
||
# Query overrides
|
||
# ------------------------------------------------------------------
|
||
|
||
def _get_report_query(self, options, date_scope, domain=None) -> Query:
|
||
"""When analytic-groupby columns are active, inject the context
|
||
flag that causes `_where_calc` to swap the AML table."""
|
||
ctx_self = self.with_context(
|
||
account_report_analytic_groupby=options.get('analytic_groupby_option'),
|
||
)
|
||
query = super(FusionReportAnalyticGroupby, ctx_self)._get_report_query(options, date_scope, domain)
|
||
|
||
if options.get('analytic_accounts'):
|
||
if 'analytic_accounts_list' in options:
|
||
# Shadow table stores bare integer ids in analytic_distribution
|
||
acct_str_ids = tuple(str(aid) for aid in options['analytic_accounts'])
|
||
query.add_where(SQL(
|
||
"account_move_line.analytic_distribution IN %s",
|
||
acct_str_ids,
|
||
))
|
||
else:
|
||
# Real AML table – JSON distribution with percentages
|
||
acct_id_list = [[str(aid) for aid in options['analytic_accounts']]]
|
||
query.add_where(SQL(
|
||
'%s && %s',
|
||
acct_id_list,
|
||
self.env['account.move.line']._query_analytic_accounts(),
|
||
))
|
||
|
||
return query
|
||
|
||
# ------------------------------------------------------------------
|
||
# Audit action
|
||
# ------------------------------------------------------------------
|
||
|
||
def action_audit_cell(self, options, params):
|
||
"""Redirect the audit action to analytic lines when the column
|
||
being audited belongs to an analytic-groupby column group."""
|
||
col_opts = self._get_column_group_options(options, params['column_group_key'])
|
||
|
||
if not col_opts.get('analytic_groupby_option'):
|
||
return super().action_audit_cell(options, params)
|
||
|
||
# Translate AML domain → analytic line domain
|
||
rpt_line = self.env['account.report.line'].browse(params['report_line_id'])
|
||
expr = rpt_line.expression_ids.filtered(lambda e: e.label == params['expression_label'])
|
||
raw_domain = self._get_audit_line_domain(col_opts, expr, params)
|
||
|
||
AnalyticLine = self.env['account.analytic.line']
|
||
converted_domain = []
|
||
for leaf in raw_domain:
|
||
if len(leaf) == 1:
|
||
converted_domain.append(leaf)
|
||
continue
|
||
|
||
fld, op, val = leaf
|
||
root_field = fld.split('.')[0]
|
||
|
||
if root_field == 'account_id':
|
||
converted_domain.append((fld.replace('account_id', 'general_account_id'), op, val))
|
||
elif fld == 'analytic_distribution':
|
||
converted_domain.append(('auto_account_id', 'in', val))
|
||
elif root_field not in AnalyticLine._fields:
|
||
expr_leaf = [(f'move_line_id.{fld}', op, val)]
|
||
if options.get('include_analytic_without_aml'):
|
||
expr_leaf = osv.expression.OR([
|
||
[('move_line_id', '=', False)],
|
||
expr_leaf,
|
||
])
|
||
converted_domain.extend(expr_leaf)
|
||
else:
|
||
converted_domain.append(leaf)
|
||
|
||
act = clean_action(
|
||
self.env.ref('analytic.account_analytic_line_action_entries')._get_action_dict(),
|
||
env=self.env,
|
||
)
|
||
act['domain'] = converted_domain
|
||
return act
|
||
|
||
# ------------------------------------------------------------------
|
||
# Journal domain
|
||
# ------------------------------------------------------------------
|
||
|
||
@api.model
|
||
def _get_options_journals_domain(self, options):
|
||
"""Allow journal-less lines when analytic lines without a parent
|
||
move line are included."""
|
||
base_domain = super()._get_options_journals_domain(options)
|
||
if options.get('include_analytic_without_aml'):
|
||
base_domain = osv.expression.OR([
|
||
base_domain,
|
||
[('journal_id', '=', False)],
|
||
])
|
||
return base_domain
|
||
|
||
# ------------------------------------------------------------------
|
||
# Options domain
|
||
# ------------------------------------------------------------------
|
||
|
||
def _get_options_domain(self, options, date_scope):
|
||
self.ensure_one()
|
||
base = super()._get_options_domain(options, date_scope)
|
||
|
||
acct_filter = options.get('analytic_accounts_list')
|
||
if acct_filter:
|
||
base = osv.expression.AND([
|
||
base,
|
||
[('analytic_distribution', 'in', list(acct_filter))],
|
||
])
|
||
|
||
return base
|
||
|
||
|
||
class FusionAMLAnalyticShadow(models.Model):
|
||
"""Hooks into `_where_calc` to swap the AML table for the analytic
|
||
shadow table when the report context flag is set."""
|
||
|
||
_inherit = "account.move.line"
|
||
|
||
def _where_calc(self, domain, active_test=True):
|
||
"""Replace the base ``account_move_line`` table reference with the
|
||
``analytic_temp_account_move_line`` shadow table whenever the
|
||
``account_report_analytic_groupby`` context key is truthy, unless
|
||
a cash-basis report is active (which already replaces the table)."""
|
||
qry = super()._where_calc(domain, active_test)
|
||
ctx = self.env.context
|
||
if ctx.get('account_report_analytic_groupby') and not ctx.get('account_report_cash_basis'):
|
||
self.env['account.report']._prepare_lines_for_analytic_groupby()
|
||
qry._tables['account_move_line'] = SQL.identifier('analytic_temp_account_move_line')
|
||
return qry
|