# Fusion Accounting - Analytic Group By for Financial Reports # Enables analytic plan / account column grouping on account reports from odoo import models, fields, api, osv from odoo.addons.web.controllers.utils import clean_action from odoo.tools import SQL, Query class FusionReportAnalyticGroupby(models.AbstractModel): """Extends the accounting report engine to support grouping by analytic accounts or plans via shadow-table substitution.""" _inherit = 'account.report' filter_analytic_groupby = fields.Boolean( string="Analytic Group By", compute=lambda self: self._compute_report_option_filter('filter_analytic_groupby'), readonly=False, store=True, depends=['root_report_id'], ) # ------------------------------------------------------------------ # Initialization sequencing # ------------------------------------------------------------------ def _get_options_initializers_forced_sequence_map(self): """Insert the analytic-groupby initializer between column-header creation and final column building (sequence 995).""" seq = super()._get_options_initializers_forced_sequence_map() seq[self._init_options_analytic_groupby] = 995 return seq # ------------------------------------------------------------------ # Option initializer # ------------------------------------------------------------------ def _init_options_analytic_groupby(self, options, previous_options): """Populate analytic groupby filters in *options* when the report advertises ``filter_analytic_groupby`` and the user has the analytic-accounting group.""" if not self.filter_analytic_groupby: return has_analytic_perm = self.env.user.has_group('analytic.group_analytic_accounting') if not has_analytic_perm: return options['display_analytic_groupby'] = True options['display_analytic_plan_groupby'] = True # --- analytic-without-aml toggle --- options['include_analytic_without_aml'] = previous_options.get('include_analytic_without_aml', False) # --- analytic accounts --- prev_account_ids = [int(v) for v in previous_options.get('analytic_accounts_groupby', [])] chosen_accounts = ( self.env['account.analytic.account'] .with_context(active_test=False) .search([('id', 'in', prev_account_ids)]) ) options['analytic_accounts_groupby'] = chosen_accounts.ids options['selected_analytic_account_groupby_names'] = chosen_accounts.mapped('name') # --- analytic plans --- prev_plan_ids = [int(v) for v in previous_options.get('analytic_plans_groupby', [])] chosen_plans = self.env['account.analytic.plan'].search([('id', 'in', prev_plan_ids)]) options['analytic_plans_groupby'] = chosen_plans.ids options['selected_analytic_plan_groupby_names'] = chosen_plans.mapped('name') self._build_analytic_column_headers(options) # ------------------------------------------------------------------ # Readonly-query interaction # ------------------------------------------------------------------ def _init_options_readonly_query(self, options, previous_options): super()._init_options_readonly_query(options, previous_options) # Analytic columns use a shadow table ⇒ disable readonly shortcut options['readonly_query'] = ( options['readonly_query'] and not options.get('analytic_groupby_option') ) # ------------------------------------------------------------------ # Column header generation # ------------------------------------------------------------------ def _build_analytic_column_headers(self, options): """Create extra column headers for every selected analytic plan or individual analytic account.""" extra_headers = [] # --- plans → accounts within each plan --- plan_recs = self.env['account.analytic.plan'].browse(options.get('analytic_plans_groupby')) for plan in plan_recs: child_accounts = self.env['account.analytic.account'].search([ ('plan_id', 'child_of', plan.id), ]) extra_headers.append({ 'name': plan.name, 'forced_options': { 'analytic_groupby_option': True, 'analytic_accounts_list': tuple(child_accounts.ids), }, }) # --- individual accounts --- acct_recs = self.env['account.analytic.account'].browse(options.get('analytic_accounts_groupby')) for acct in acct_recs: extra_headers.append({ 'name': acct.name, 'forced_options': { 'analytic_groupby_option': True, 'analytic_accounts_list': (acct.id,), }, }) if not extra_headers: return budget_active = any(b for b in options.get('budgets', []) if b.get('selected')) if budget_active: # Place analytic headers next to budget headers on the same level options['column_headers'][-1] = extra_headers + options['column_headers'][-1] else: # Append a new header tier for analytic columns + a blank for totals extra_headers.append({'name': ''}) options['column_headers'] = [ *options['column_headers'], extra_headers, ] # ------------------------------------------------------------------ # Shadow-table preparation # ------------------------------------------------------------------ @api.model def _prepare_lines_for_analytic_groupby(self): """Build a temporary ``analytic_temp_account_move_line`` table that mirrors the *account_move_line* schema but is populated from *account_analytic_line*. Created once per SQL transaction.""" self.env.cr.execute( "SELECT 1 FROM information_schema.tables " "WHERE table_name = 'analytic_temp_account_move_line'" ) if self.env.cr.fetchone(): return # already prepared in this transaction root_plan, additional_plans = self.env['account.analytic.plan']._get_all_plans() all_plans = root_plan + additional_plans analytic_col_refs = SQL(", ").join( SQL('"account_analytic_line".%s', SQL.identifier(p._column_name())) for p in all_plans ) distribution_expr = SQL( 'to_jsonb(UNNEST(ARRAY[%s]))', analytic_col_refs, ) field_mapping = { 'id': SQL("account_analytic_line.id"), 'balance': SQL("-amount"), 'display_type': 'product', 'parent_state': 'posted', 'account_id': SQL.identifier("general_account_id"), 'debit': SQL("CASE WHEN amount < 0 THEN -amount ELSE 0 END"), 'credit': SQL("CASE WHEN amount > 0 THEN amount ELSE 0 END"), 'analytic_distribution': distribution_expr, } # Fill in the remaining stored fields with values from the linked AML aml_fields_meta = self.env['account.move.line'].fields_get() persisted_fields = { fname for fname, meta in aml_fields_meta.items() if meta['type'] not in ('many2many', 'one2many') and meta.get('store') } for fname in persisted_fields: if fname not in field_mapping: field_mapping[fname] = SQL('"account_move_line".%s', SQL.identifier(fname)) col_names_sql, val_exprs_sql = ( self.env['account.move.line']._prepare_aml_shadowing_for_report(field_mapping) ) shadow_sql = SQL(""" -- Build temporary shadow table inheriting AML schema CREATE TEMPORARY TABLE IF NOT EXISTS analytic_temp_account_move_line () INHERITS (account_move_line) ON COMMIT DROP; ALTER TABLE analytic_temp_account_move_line NO INHERIT account_move_line; ALTER TABLE analytic_temp_account_move_line DROP CONSTRAINT IF EXISTS account_move_line_check_amount_currency_balance_sign; ALTER TABLE analytic_temp_account_move_line ALTER COLUMN move_id DROP NOT NULL; ALTER TABLE analytic_temp_account_move_line ALTER COLUMN currency_id DROP NOT NULL; INSERT INTO analytic_temp_account_move_line (%(col_names)s) SELECT %(val_exprs)s FROM account_analytic_line LEFT JOIN account_move_line ON account_analytic_line.move_line_id = account_move_line.id WHERE account_analytic_line.general_account_id IS NOT NULL; CREATE INDEX IF NOT EXISTS analytic_temp_aml__composite_idx ON analytic_temp_account_move_line (analytic_distribution, journal_id, date, company_id); ANALYZE analytic_temp_account_move_line; """, col_names=col_names_sql, val_exprs=val_exprs_sql) self.env.cr.execute(shadow_sql) # ------------------------------------------------------------------ # Query overrides # ------------------------------------------------------------------ def _get_report_query(self, options, date_scope, domain=None) -> Query: """When analytic-groupby columns are active, inject the context flag that causes `_where_calc` to swap the AML table.""" ctx_self = self.with_context( account_report_analytic_groupby=options.get('analytic_groupby_option'), ) query = super(FusionReportAnalyticGroupby, ctx_self)._get_report_query(options, date_scope, domain) if options.get('analytic_accounts'): if 'analytic_accounts_list' in options: # Shadow table stores bare integer ids in analytic_distribution acct_str_ids = tuple(str(aid) for aid in options['analytic_accounts']) query.add_where(SQL( "account_move_line.analytic_distribution IN %s", acct_str_ids, )) else: # Real AML table – JSON distribution with percentages acct_id_list = [[str(aid) for aid in options['analytic_accounts']]] query.add_where(SQL( '%s && %s', acct_id_list, self.env['account.move.line']._query_analytic_accounts(), )) return query # ------------------------------------------------------------------ # Audit action # ------------------------------------------------------------------ def action_audit_cell(self, options, params): """Redirect the audit action to analytic lines when the column being audited belongs to an analytic-groupby column group.""" col_opts = self._get_column_group_options(options, params['column_group_key']) if not col_opts.get('analytic_groupby_option'): return super().action_audit_cell(options, params) # Translate AML domain → analytic line domain rpt_line = self.env['account.report.line'].browse(params['report_line_id']) expr = rpt_line.expression_ids.filtered(lambda e: e.label == params['expression_label']) raw_domain = self._get_audit_line_domain(col_opts, expr, params) AnalyticLine = self.env['account.analytic.line'] converted_domain = [] for leaf in raw_domain: if len(leaf) == 1: converted_domain.append(leaf) continue fld, op, val = leaf root_field = fld.split('.')[0] if root_field == 'account_id': converted_domain.append((fld.replace('account_id', 'general_account_id'), op, val)) elif fld == 'analytic_distribution': converted_domain.append(('auto_account_id', 'in', val)) elif root_field not in AnalyticLine._fields: expr_leaf = [(f'move_line_id.{fld}', op, val)] if options.get('include_analytic_without_aml'): expr_leaf = osv.expression.OR([ [('move_line_id', '=', False)], expr_leaf, ]) converted_domain.extend(expr_leaf) else: converted_domain.append(leaf) act = clean_action( self.env.ref('analytic.account_analytic_line_action_entries')._get_action_dict(), env=self.env, ) act['domain'] = converted_domain return act # ------------------------------------------------------------------ # Journal domain # ------------------------------------------------------------------ @api.model def _get_options_journals_domain(self, options): """Allow journal-less lines when analytic lines without a parent move line are included.""" base_domain = super()._get_options_journals_domain(options) if options.get('include_analytic_without_aml'): base_domain = osv.expression.OR([ base_domain, [('journal_id', '=', False)], ]) return base_domain # ------------------------------------------------------------------ # Options domain # ------------------------------------------------------------------ def _get_options_domain(self, options, date_scope): self.ensure_one() base = super()._get_options_domain(options, date_scope) acct_filter = options.get('analytic_accounts_list') if acct_filter: base = osv.expression.AND([ base, [('analytic_distribution', 'in', list(acct_filter))], ]) return base class FusionAMLAnalyticShadow(models.Model): """Hooks into `_where_calc` to swap the AML table for the analytic shadow table when the report context flag is set.""" _inherit = "account.move.line" def _where_calc(self, domain, active_test=True): """Replace the base ``account_move_line`` table reference with the ``analytic_temp_account_move_line`` shadow table whenever the ``account_report_analytic_groupby`` context key is truthy, unless a cash-basis report is active (which already replaces the table).""" qry = super()._where_calc(domain, active_test) ctx = self.env.context if ctx.get('account_report_analytic_groupby') and not ctx.get('account_report_cash_basis'): self.env['account.report']._prepare_lines_for_analytic_groupby() qry._tables['account_move_line'] = SQL.identifier('analytic_temp_account_move_line') return qry