Files
Odoo-Modules/fusion_accounting_reports/models/fusion_report_engine.py
gsinghpal ff6d21a561 feat(fusion_accounting_reports): partner-grouped engine method
Adds engine.compute_partner_grouped(period, account_type=...) that
returns per-partner aggregations with aging buckets (current/1-30/
31-60/61-90/90+). SQL-direct for performance — single GROUP BY query
with conditional sum per bucket.

Foundation for the 3 partner-grouped reports landing in commit 3:
Aged Receivable, Aged Payable, Partner Ledger.

Made-with: Cursor
2026-04-19 23:54:32 -04:00

370 lines
13 KiB
Python

"""The reports engine - orchestrator for all report computation.
5-method public API. All controllers, AI tools, wizards, exports must
go through these methods; no direct ORM aggregation queries from
anywhere else.
Internal pipeline (per report run):
1. Validate (period valid, company allowed, report exists)
2. Fetch account hierarchy (cached per (company, fiscal_year))
3. Aggregate move lines per account (the SQL workhorse)
4. Resolve line_specs into report rows
5. (Optional) Compute comparison-period rows
6. (Optional) Detect anomalies (deferred to later tasks)
"""
import logging
from datetime import date, timedelta
from odoo import _, api, models
from odoo.exceptions import ValidationError
from ..services.account_hierarchy import build_tree
from ..services.date_periods import Period, comparison_period as _comp_period
from ..services.drill_down_resolver import fetch_drill_down
from ..services.line_resolver import resolve as _resolve_lines
from ..services.totaling import TotalLine
_logger = logging.getLogger(__name__)
class FusionReportEngine(models.AbstractModel):
_name = "fusion.report.engine"
_description = "Fusion Financial Reports Engine"
# ============================================================
# PUBLIC API (5 methods)
# ============================================================
@api.model
def compute_pnl(
self, period: Period, *, comparison: str = 'none',
company_id: int | None = None,
) -> dict:
"""Income statement (P&L) for the given period."""
report = self._get_report('pnl', company_id=company_id)
return self._compute(
report, period, comparison=comparison, company_id=company_id,
)
@api.model
def compute_balance_sheet(
self, date_to: date, *, comparison: str = 'none',
company_id: int | None = None,
) -> dict:
"""Balance sheet AS OF date_to. Period.date_from is set to a
far-past date so balances are cumulative-since-inception."""
report = self._get_report('balance_sheet', company_id=company_id)
period = Period(
date_from=date(1970, 1, 1),
date_to=date_to,
label=f"As of {date_to}",
)
return self._compute(
report, period, comparison=comparison, company_id=company_id,
)
@api.model
def compute_trial_balance(
self, period: Period, *, company_id: int | None = None,
) -> dict:
"""Trial balance for the given period - every account with
non-zero balance."""
report = self._get_report('trial_balance', company_id=company_id)
return self._compute(
report, period, comparison='none', company_id=company_id,
)
@api.model
def compute_gl(
self, period: Period, *, account_ids: list | None = None,
company_id: int | None = None,
) -> dict:
"""General ledger for the given period.
Returns per-account move-line listings rather than aggregated rows."""
report = self._get_report('general_ledger', company_id=company_id)
company_id = company_id or self.env.company.id
result = self._compute(
report, period, comparison='none', company_id=company_id,
)
gl_by_account = {}
target_ids = account_ids or list(result.get('account_totals', {}).keys())
for acct_id in target_ids:
gl_by_account[acct_id] = fetch_drill_down(
self.env,
account_id=acct_id,
date_from=period.date_from,
date_to=period.date_to,
company_id=company_id,
limit=200,
)
result['gl_by_account'] = gl_by_account
return result
@api.model
def drill_down(
self, *, account_id: int, period: Period,
company_id: int | None = None,
) -> list:
"""Drill into a report line: list the journal items behind it."""
company_id = company_id or self.env.company.id
return fetch_drill_down(
self.env,
account_id=account_id,
date_from=period.date_from,
date_to=period.date_to,
company_id=company_id,
limit=500,
)
@api.model
def compute_partner_grouped(
self, period: Period, *, account_type: str = 'asset_receivable',
comparison: str = 'none', company_id: int | None = None,
) -> dict:
"""Per-partner aggregation report (Aged Receivable, Aged Payable,
Partner Ledger).
Returns a dict with ``rows`` = list of partner-level aggregates.
Each row has the partner_id, partner_name, total residual, and
aging buckets: current / 1-30 / 31-60 / 61-90 / 90+ days past
``period.date_to``.
SQL-direct for performance: a single GROUP BY query with conditional
sum per bucket. Only un-reconciled, posted lines with non-zero
residual at the as-of date are included.
"""
company_id = company_id or self.env.company.id
accounts = self.env['account.account'].sudo().search([
('account_type', '=', account_type),
('company_ids', 'in', company_id),
])
if not accounts:
return {
'report_type': 'partner_grouped',
'account_type': account_type,
'period': {
'date_from': str(period.date_from),
'date_to': str(period.date_to),
'label': period.label,
},
'rows': [],
'total': 0.0,
'partner_count': 0,
}
as_of = period.date_to
d30 = as_of - timedelta(days=30)
d60 = as_of - timedelta(days=60)
d90 = as_of - timedelta(days=90)
self.env.cr.execute(
"""
SELECT
COALESCE(p.id, 0) AS partner_id,
COALESCE(p.name, '(no partner)') AS partner_name,
SUM(aml.amount_residual) AS total_residual,
SUM(CASE
WHEN aml.date_maturity >= %s
OR aml.date_maturity IS NULL
THEN aml.amount_residual ELSE 0
END) AS bucket_current,
SUM(CASE
WHEN aml.date_maturity < %s
AND aml.date_maturity >= %s
THEN aml.amount_residual ELSE 0
END) AS bucket_1_30,
SUM(CASE
WHEN aml.date_maturity < %s
AND aml.date_maturity >= %s
THEN aml.amount_residual ELSE 0
END) AS bucket_31_60,
SUM(CASE
WHEN aml.date_maturity < %s
AND aml.date_maturity >= %s
THEN aml.amount_residual ELSE 0
END) AS bucket_61_90,
SUM(CASE
WHEN aml.date_maturity < %s
THEN aml.amount_residual ELSE 0
END) AS bucket_90_plus,
COUNT(*) AS line_count
FROM account_move_line aml
LEFT JOIN res_partner p ON p.id = aml.partner_id
WHERE aml.account_id = ANY(%s)
AND aml.parent_state = 'posted'
AND aml.reconciled = false
AND aml.amount_residual != 0
AND aml.company_id = %s
AND aml.date <= %s
GROUP BY p.id, p.name
HAVING SUM(aml.amount_residual) != 0
ORDER BY total_residual DESC
""",
(
as_of,
as_of, d30,
d30, d60,
d60, d90,
d90,
list(accounts.ids), company_id, as_of,
),
)
rows = []
for r in self.env.cr.dictfetchall():
rows.append({
'partner_id': r['partner_id'] or False,
'partner_name': r['partner_name'] or '(no partner)',
'total': float(r['total_residual'] or 0),
'bucket_current': float(r['bucket_current'] or 0),
'bucket_1_30': float(r['bucket_1_30'] or 0),
'bucket_31_60': float(r['bucket_31_60'] or 0),
'bucket_61_90': float(r['bucket_61_90'] or 0),
'bucket_90_plus': float(r['bucket_90_plus'] or 0),
'line_count': r['line_count'],
})
total = sum(r['total'] for r in rows)
return {
'report_type': 'partner_grouped',
'account_type': account_type,
'period': {
'date_from': str(period.date_from),
'date_to': str(period.date_to),
'label': period.label,
},
'company_id': company_id,
'rows': rows,
'total': total,
'partner_count': len(rows),
}
# ============================================================
# PRIVATE HELPERS
# ============================================================
def _get_report(self, report_type: str, *, company_id: int | None = None):
"""Look up the active fusion.report definition for a given
type+company. If no per-company override, falls back to global
(company_id=False)."""
Report = self.env['fusion.report'].sudo()
company_id = company_id or self.env.company.id
report = Report.search(
[
('report_type', '=', report_type),
('active', '=', True),
'|',
('company_id', '=', company_id),
('company_id', '=', False),
],
order='company_id desc nulls last',
limit=1,
)
if not report:
raise ValidationError(
_("No active fusion.report definition for type '%s'") % report_type
)
return report
def _fetch_accounts(self, company_id):
"""Fetch all accounts for a company, return flat dict + tree."""
Account = self.env['account.account'].sudo()
records = Account.search([('company_ids', 'in', company_id)])
# account.account doesn't carry a parent_id in V19 - we use
# account_type prefixes instead, so parent_id is always None here.
flat = [
{
'id': a.id,
'code': a.code,
'name': a.name,
'account_type': a.account_type or '',
'parent_id': None,
}
for a in records
]
accounts_by_id = {a['id']: a for a in flat}
tree = build_tree(flat)
return accounts_by_id, tree
def _aggregate_period(self, period: Period, company_id: int) -> dict:
"""SQL aggregate per account_id for a period.
Raw SQL for performance; this is the perf-critical step."""
self.env.cr.execute(
"""
SELECT account_id,
COALESCE(SUM(debit), 0) AS d,
COALESCE(SUM(credit), 0) AS c,
COALESCE(SUM(balance), 0) AS b
FROM account_move_line
WHERE parent_state = 'posted'
AND company_id = %s
AND date >= %s
AND date <= %s
GROUP BY account_id
""",
(company_id, period.date_from, period.date_to),
)
out = {}
for row in self.env.cr.fetchall():
out[row[0]] = TotalLine(
debit=float(row[1] or 0),
credit=float(row[2] or 0),
balance=float(row[3] or 0),
)
return out
def _compute(
self, report, period: Period, *, comparison: str,
company_id: int | None = None,
) -> dict:
"""Shared computation pipeline. Returns dict with rows, totals,
metadata."""
company_id = company_id or self.env.company.id
accounts_by_id, _tree = self._fetch_accounts(company_id)
account_totals = self._aggregate_period(period, company_id)
comp_totals = None
comp_period = None
if comparison and comparison != 'none':
comp_period = _comp_period(period, comparison)
if comp_period:
comp_totals = self._aggregate_period(comp_period, company_id)
rows = _resolve_lines(
report.line_specs or [],
account_totals=account_totals,
accounts_by_id=accounts_by_id,
comparison_totals=comp_totals,
)
return {
'report_id': report.id,
'report_name': report.name,
'report_type': report.report_type,
'period': {
'date_from': str(period.date_from),
'date_to': str(period.date_to),
'label': period.label,
},
'comparison_period': (
{
'date_from': str(comp_period.date_from),
'date_to': str(comp_period.date_to),
'label': comp_period.label,
}
if comp_period
else None
),
'company_id': company_id,
'rows': rows,
'account_totals': {
aid: tl.balance for aid, tl in account_totals.items()
},
}