changes
This commit is contained in:
@@ -0,0 +1,9 @@
|
||||
from . import date_periods
|
||||
from . import account_hierarchy
|
||||
from . import totaling
|
||||
from . import currency_conversion
|
||||
from . import line_resolver
|
||||
from . import drill_down_resolver
|
||||
from . import anomaly_detection
|
||||
from . import commentary_prompt
|
||||
from . import commentary_generator
|
||||
@@ -0,0 +1,62 @@
|
||||
"""Account hierarchy walker.
|
||||
|
||||
Given a flat list of accounts with parent_id pointers, build a tree and
|
||||
provide a recursive walker that yields (account, depth, ancestors) tuples.
|
||||
Used by report line resolvers to render group sub-totals."""
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Iterator
|
||||
|
||||
|
||||
@dataclass
|
||||
class AccountNode:
|
||||
id: int
|
||||
code: str
|
||||
name: str
|
||||
account_type: str
|
||||
parent_id: int | None
|
||||
children: list['AccountNode'] = field(default_factory=list)
|
||||
|
||||
|
||||
def build_tree(accounts: list[dict]) -> list[AccountNode]:
|
||||
"""Build a forest from a flat list of account dicts.
|
||||
|
||||
Each dict must have keys: id, code, name, account_type, parent_id (nullable)."""
|
||||
nodes: dict[int, AccountNode] = {}
|
||||
for acc in accounts:
|
||||
nodes[acc['id']] = AccountNode(
|
||||
id=acc['id'], code=acc['code'], name=acc['name'],
|
||||
account_type=acc['account_type'],
|
||||
parent_id=acc.get('parent_id'),
|
||||
)
|
||||
roots: list[AccountNode] = []
|
||||
for node in nodes.values():
|
||||
if node.parent_id and node.parent_id in nodes:
|
||||
nodes[node.parent_id].children.append(node)
|
||||
else:
|
||||
roots.append(node)
|
||||
for node in nodes.values():
|
||||
node.children.sort(key=lambda n: n.code)
|
||||
roots.sort(key=lambda n: n.code)
|
||||
return roots
|
||||
|
||||
|
||||
def walk(roots: list[AccountNode], *, max_depth: int = 10) -> Iterator[tuple[AccountNode, int, list[AccountNode]]]:
|
||||
"""Depth-first walk yielding (node, depth, ancestors)."""
|
||||
def _walk(node: AccountNode, depth: int, ancestors: list[AccountNode]):
|
||||
yield (node, depth, ancestors)
|
||||
if depth < max_depth:
|
||||
for child in node.children:
|
||||
yield from _walk(child, depth + 1, ancestors + [node])
|
||||
for root in roots:
|
||||
yield from _walk(root, 0, [])
|
||||
|
||||
|
||||
def filter_by_account_type(roots: list[AccountNode], type_prefix: str) -> list[AccountNode]:
|
||||
"""Return all nodes whose account_type starts with type_prefix
|
||||
(e.g. 'asset_' returns asset_receivable, asset_cash, etc.)."""
|
||||
matches: list[AccountNode] = []
|
||||
for node, _depth, _ancestors in walk(roots):
|
||||
if node.account_type.startswith(type_prefix):
|
||||
matches.append(node)
|
||||
return matches
|
||||
@@ -0,0 +1,81 @@
|
||||
"""Anomaly detection for financial reports.
|
||||
|
||||
Compares each row's current-period amount to its comparison-period
|
||||
amount and flags variances exceeding a threshold. Uses both:
|
||||
- Absolute threshold ($X minimum movement)
|
||||
- Percentage threshold (Y% min variance)
|
||||
|
||||
Pure-Python: callers pass the engine's compute_*() result; we return
|
||||
a list of anomaly dicts."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class Anomaly:
|
||||
row_id: str
|
||||
label: str
|
||||
current_amount: float
|
||||
comparison_amount: float
|
||||
variance_amount: float
|
||||
variance_pct: float
|
||||
severity: str # 'low', 'medium', 'high'
|
||||
direction: str # 'increase', 'decrease'
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
'row_id': self.row_id, 'label': self.label,
|
||||
'current_amount': self.current_amount,
|
||||
'comparison_amount': self.comparison_amount,
|
||||
'variance_amount': self.variance_amount,
|
||||
'variance_pct': self.variance_pct,
|
||||
'severity': self.severity, 'direction': self.direction,
|
||||
}
|
||||
|
||||
|
||||
# Defaults -- tunable per company via ir.config_parameter
|
||||
DEFAULT_MIN_ABSOLUTE_THRESHOLD = 100.0
|
||||
DEFAULT_MIN_PCT_THRESHOLD = 10.0 # 10%
|
||||
DEFAULT_HIGH_PCT_THRESHOLD = 50.0 # 50%+ flagged 'high'
|
||||
|
||||
|
||||
def detect(report_result: dict, *, min_absolute: float = None,
|
||||
min_pct: float = None, high_pct: float = None) -> list[dict]:
|
||||
"""Detect anomalies in a report_result dict (engine output).
|
||||
|
||||
Returns list of anomaly dicts ordered by severity desc, variance_amount desc.
|
||||
Returns empty list if no comparison period was computed."""
|
||||
if not report_result.get('comparison_period'):
|
||||
return []
|
||||
min_absolute = min_absolute if min_absolute is not None else DEFAULT_MIN_ABSOLUTE_THRESHOLD
|
||||
min_pct = min_pct if min_pct is not None else DEFAULT_MIN_PCT_THRESHOLD
|
||||
high_pct = high_pct if high_pct is not None else DEFAULT_HIGH_PCT_THRESHOLD
|
||||
|
||||
anomalies = []
|
||||
for row in report_result.get('rows', []):
|
||||
comparison = row.get('amount_comparison')
|
||||
current = row.get('amount', 0.0)
|
||||
if comparison is None:
|
||||
continue
|
||||
variance_amount = current - comparison
|
||||
variance_pct = abs(row.get('variance_pct') or 0.0)
|
||||
if abs(variance_amount) < min_absolute:
|
||||
continue
|
||||
if variance_pct < min_pct:
|
||||
continue
|
||||
severity = 'high' if variance_pct >= high_pct else 'medium' if variance_pct >= min_pct * 2 else 'low'
|
||||
direction = 'increase' if variance_amount > 0 else 'decrease'
|
||||
anomalies.append(Anomaly(
|
||||
row_id=row['id'],
|
||||
label=row.get('label', ''),
|
||||
current_amount=current,
|
||||
comparison_amount=comparison,
|
||||
variance_amount=variance_amount,
|
||||
variance_pct=variance_pct,
|
||||
severity=severity,
|
||||
direction=direction,
|
||||
).to_dict())
|
||||
|
||||
severity_order = {'high': 0, 'medium': 1, 'low': 2}
|
||||
anomalies.sort(key=lambda a: (severity_order[a['severity']], -abs(a['variance_amount'])))
|
||||
return anomalies
|
||||
@@ -0,0 +1,103 @@
|
||||
"""AI-generated narrative commentary for financial reports.
|
||||
|
||||
Takes a report_result dict + optional anomalies list, builds an LLM
|
||||
prompt, parses the structured output. Output contract:
|
||||
{
|
||||
'summary': str, # 2-3 sentence executive summary
|
||||
'highlights': [str, ...], # 3-5 bullet observations
|
||||
'concerns': [str, ...], # things that warrant investigation
|
||||
'next_actions': [str, ...] # suggested follow-ups
|
||||
}
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def generate_commentary(env, *, report_result: dict, anomalies: list = None,
|
||||
provider=None) -> dict:
|
||||
"""Generate narrative commentary via LLM. Returns dict per the contract.
|
||||
|
||||
If no provider configured, returns a templated fallback (no LLM)."""
|
||||
if provider is None:
|
||||
provider = _get_provider(env)
|
||||
if provider is None:
|
||||
return _templated_fallback(report_result, anomalies)
|
||||
|
||||
try:
|
||||
from odoo.addons.fusion_accounting_reports.services.commentary_prompt import build_prompt
|
||||
except ImportError:
|
||||
_logger.debug("commentary_prompt module not yet available; using fallback")
|
||||
return _templated_fallback(report_result, anomalies)
|
||||
|
||||
system, user = build_prompt(report_result, anomalies or [])
|
||||
try:
|
||||
response = provider.complete(
|
||||
system=system,
|
||||
messages=[{'role': 'user', 'content': user}],
|
||||
max_tokens=1200,
|
||||
temperature=0.2,
|
||||
)
|
||||
content = response.get('content') if isinstance(response, dict) else response
|
||||
parsed = json.loads(content)
|
||||
# Validate shape
|
||||
for key in ('summary', 'highlights', 'concerns', 'next_actions'):
|
||||
parsed.setdefault(key, [] if key != 'summary' else '')
|
||||
return parsed
|
||||
except Exception as e:
|
||||
_logger.warning("AI commentary generation failed: %s", e)
|
||||
return _templated_fallback(report_result, anomalies)
|
||||
|
||||
|
||||
def _templated_fallback(report_result: dict, anomalies: list = None) -> dict:
|
||||
"""No-LLM fallback that produces a basic narrative from the report data."""
|
||||
anomalies = anomalies or []
|
||||
rows = report_result.get('rows', [])
|
||||
period = report_result.get('period', {})
|
||||
period_label = period.get('label', 'this period')
|
||||
|
||||
# Find subtotal rows for the summary
|
||||
subtotals = [r for r in rows if r.get('is_subtotal')]
|
||||
summary_parts = [f"{report_result.get('report_name', 'Report')} for {period_label}."]
|
||||
if subtotals:
|
||||
last = subtotals[-1]
|
||||
summary_parts.append(f"{last['label']}: ${last['amount']:,.2f}.")
|
||||
|
||||
highlights = []
|
||||
for row in subtotals[:3]:
|
||||
highlights.append(f"{row['label']}: ${row['amount']:,.2f}")
|
||||
|
||||
concerns = []
|
||||
for a in anomalies[:3]:
|
||||
concerns.append(
|
||||
f"{a['label']} {a['direction']}d {a['variance_pct']:.1f}% "
|
||||
f"(${a['variance_amount']:+,.2f})")
|
||||
|
||||
return {
|
||||
'summary': ' '.join(summary_parts),
|
||||
'highlights': highlights,
|
||||
'concerns': concerns,
|
||||
'next_actions': ['Review the flagged anomalies above.'] if concerns else [],
|
||||
}
|
||||
|
||||
|
||||
def _get_provider(env):
|
||||
"""Look up provider for 'reports_commentary' feature; return None if not configured."""
|
||||
param = env['ir.config_parameter'].sudo()
|
||||
provider_name = param.get_param('fusion_accounting.provider.reports_commentary')
|
||||
if not provider_name:
|
||||
provider_name = param.get_param('fusion_accounting.provider.default')
|
||||
if not provider_name:
|
||||
return None
|
||||
try:
|
||||
from odoo.addons.fusion_accounting_ai.services.adapters.openai_adapter import OpenAIAdapter
|
||||
from odoo.addons.fusion_accounting_ai.services.adapters.claude import ClaudeAdapter
|
||||
except ImportError:
|
||||
return None
|
||||
if provider_name.startswith('openai'):
|
||||
return OpenAIAdapter(env)
|
||||
elif provider_name.startswith('claude'):
|
||||
return ClaudeAdapter(env)
|
||||
return None
|
||||
@@ -0,0 +1,67 @@
|
||||
"""LLM prompt for AI report commentary.
|
||||
|
||||
Provider-agnostic system + user prompt builder. Output contract:
|
||||
JSON with keys summary, highlights, concerns, next_actions."""
|
||||
|
||||
|
||||
SYSTEM_PROMPT = """You are an experienced CFO providing executive-level commentary
|
||||
on a financial report. Your output MUST be valid JSON of this exact shape:
|
||||
|
||||
{
|
||||
"summary": "<2-3 sentence executive summary of the report period>",
|
||||
"highlights": ["<observation 1>", "<observation 2>", ...],
|
||||
"concerns": ["<thing to investigate 1>", ...],
|
||||
"next_actions": ["<suggested action 1>", ...]
|
||||
}
|
||||
|
||||
Rules:
|
||||
- Use the data provided. Do not invent numbers.
|
||||
- Tone: professional, concise, factual.
|
||||
- Currency formatting: always include the $ symbol and 2 decimal places.
|
||||
- For anomalies: explicitly mention the variance percentage AND the dollar amount.
|
||||
- Do NOT include markdown code fences. Do NOT include any prose outside the JSON.
|
||||
"""
|
||||
|
||||
|
||||
def build_prompt(report_result: dict, anomalies: list) -> tuple[str, str]:
|
||||
"""Build (system_prompt, user_prompt) tuple."""
|
||||
parts = []
|
||||
|
||||
# Report context
|
||||
parts.append(f"REPORT: {report_result.get('report_name', 'Untitled')}")
|
||||
period = report_result.get('period', {})
|
||||
parts.append(f"PERIOD: {period.get('label', '')} "
|
||||
f"({period.get('date_from', '')} to {period.get('date_to', '')})")
|
||||
comp_period = report_result.get('comparison_period')
|
||||
if comp_period:
|
||||
parts.append(f"COMPARED TO: {comp_period.get('label', '')} "
|
||||
f"({comp_period.get('date_from', '')} to {comp_period.get('date_to', '')})")
|
||||
parts.append("")
|
||||
|
||||
# Rows (the actual numbers)
|
||||
parts.append("REPORT LINES:")
|
||||
for row in report_result.get('rows', []):
|
||||
line = f" - {row.get('label', '?')}: ${row.get('amount', 0):,.2f}"
|
||||
if row.get('amount_comparison') is not None:
|
||||
line += f" (comparison: ${row['amount_comparison']:,.2f}"
|
||||
if row.get('variance_pct') is not None:
|
||||
line += f", {row['variance_pct']:+.1f}%"
|
||||
line += ")"
|
||||
if row.get('is_subtotal'):
|
||||
line += " [SUBTOTAL]"
|
||||
parts.append(line)
|
||||
parts.append("")
|
||||
|
||||
# Anomalies
|
||||
if anomalies:
|
||||
parts.append("ANOMALIES (variances exceeding threshold):")
|
||||
for a in anomalies[:10]:
|
||||
parts.append(
|
||||
f" - {a['label']}: {a['direction']}d {a['variance_pct']:.1f}% "
|
||||
f"(${a['variance_amount']:+,.2f}, severity: {a['severity']})"
|
||||
)
|
||||
parts.append("")
|
||||
|
||||
parts.append("Generate the JSON commentary per the system prompt.")
|
||||
|
||||
return (SYSTEM_PROMPT, "\n".join(parts))
|
||||
@@ -0,0 +1,66 @@
|
||||
"""Multi-currency conversion for financial reports.
|
||||
|
||||
Converts move-line amounts to the report's display currency at the
|
||||
report end-date. Pure-Python - caller provides exchange rates as a
|
||||
dict {(source_code, target_code, date): rate}."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from datetime import date
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConversionRate:
|
||||
source: str
|
||||
target: str
|
||||
rate: float
|
||||
rate_date: date
|
||||
|
||||
|
||||
def convert_amount(amount: float, *, source_currency: str, target_currency: str,
|
||||
rate_date: date, rates: dict) -> float:
|
||||
"""Convert `amount` from source to target at the given date.
|
||||
|
||||
`rates` is a dict keyed by (source, target, date) -> rate.
|
||||
If source == target, returns amount unchanged."""
|
||||
if source_currency == target_currency:
|
||||
return amount
|
||||
key = (source_currency, target_currency, rate_date)
|
||||
if key in rates:
|
||||
return amount * rates[key]
|
||||
inv_key = (target_currency, source_currency, rate_date)
|
||||
if inv_key in rates:
|
||||
inv = rates[inv_key]
|
||||
if inv != 0:
|
||||
return amount / inv
|
||||
candidates = [
|
||||
(d, r) for (s, t, d), r in rates.items()
|
||||
if s == source_currency and t == target_currency and d <= rate_date
|
||||
]
|
||||
if candidates:
|
||||
candidates.sort(key=lambda x: x[0], reverse=True)
|
||||
return amount * candidates[0][1]
|
||||
raise ValueError(
|
||||
f"No exchange rate available for {source_currency}->{target_currency} on or before {rate_date}"
|
||||
)
|
||||
|
||||
|
||||
def fetch_rates(env, *, target_currency_id: int, as_of: date,
|
||||
source_currency_ids: list[int] | None = None) -> dict:
|
||||
"""Fetch all relevant rates from res.currency.rate as of a given date.
|
||||
|
||||
Returns the dict-of-rates structure consumed by convert_amount.
|
||||
Pulls only rates where source != target and date <= as_of."""
|
||||
Rate = env['res.currency.rate'].sudo()
|
||||
target = env['res.currency'].browse(target_currency_id)
|
||||
domain = [
|
||||
('name', '<=', as_of),
|
||||
('currency_id', '!=', target.id),
|
||||
]
|
||||
if source_currency_ids:
|
||||
domain.append(('currency_id', 'in', source_currency_ids))
|
||||
rates_recs = Rate.search(domain)
|
||||
|
||||
out = {}
|
||||
for r in rates_recs:
|
||||
out[(r.currency_id.name, target.name, r.name)] = (1.0 / r.rate) if r.rate else 0.0
|
||||
return out
|
||||
@@ -0,0 +1,103 @@
|
||||
"""Date period math for financial reports.
|
||||
|
||||
Pure-Python helpers that compute:
|
||||
- Fiscal year start/end given any reference date + company fiscal year settings
|
||||
- Comparison periods (prior year same period, prior period, etc.)
|
||||
- Period boundaries for monthly / quarterly / yearly reporting
|
||||
|
||||
NO Odoo imports - all callers pass in primitive types so the same module
|
||||
is unit-testable without an Odoo registry."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from datetime import date, timedelta
|
||||
from typing import Literal
|
||||
|
||||
|
||||
PeriodGranularity = Literal['month', 'quarter', 'year', 'custom']
|
||||
ComparisonMode = Literal['none', 'previous_period', 'previous_year']
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Period:
|
||||
date_from: date
|
||||
date_to: date
|
||||
label: str
|
||||
|
||||
def __post_init__(self):
|
||||
if self.date_from > self.date_to:
|
||||
raise ValueError(f"date_from ({self.date_from}) > date_to ({self.date_to})")
|
||||
|
||||
@property
|
||||
def days(self) -> int:
|
||||
return (self.date_to - self.date_from).days + 1
|
||||
|
||||
|
||||
def fiscal_year_bounds(reference_date: date, *, fy_start_month: int = 1,
|
||||
fy_start_day: int = 1) -> Period:
|
||||
"""Return the fiscal year period containing `reference_date`.
|
||||
|
||||
Default: calendar year (Jan 1 - Dec 31). Pass fy_start_month=4, fy_start_day=1
|
||||
for an April-March fiscal year."""
|
||||
if reference_date.month < fy_start_month or (
|
||||
reference_date.month == fy_start_month and reference_date.day < fy_start_day
|
||||
):
|
||||
start_year = reference_date.year - 1
|
||||
else:
|
||||
start_year = reference_date.year
|
||||
start = date(start_year, fy_start_month, fy_start_day)
|
||||
next_start = date(start_year + 1, fy_start_month, fy_start_day)
|
||||
end = next_start - timedelta(days=1)
|
||||
return Period(date_from=start, date_to=end, label=f"FY {start_year}")
|
||||
|
||||
|
||||
def month_bounds(reference_date: date) -> Period:
|
||||
"""Return the calendar month containing `reference_date`."""
|
||||
start = reference_date.replace(day=1)
|
||||
if reference_date.month == 12:
|
||||
next_start = date(reference_date.year + 1, 1, 1)
|
||||
else:
|
||||
next_start = date(reference_date.year, reference_date.month + 1, 1)
|
||||
return Period(
|
||||
date_from=start,
|
||||
date_to=next_start - timedelta(days=1),
|
||||
label=start.strftime('%B %Y'),
|
||||
)
|
||||
|
||||
|
||||
def quarter_bounds(reference_date: date) -> Period:
|
||||
"""Return the calendar quarter containing `reference_date`."""
|
||||
quarter = (reference_date.month - 1) // 3 + 1
|
||||
start_month = (quarter - 1) * 3 + 1
|
||||
start = date(reference_date.year, start_month, 1)
|
||||
end_month = start_month + 2
|
||||
if end_month == 12:
|
||||
end = date(reference_date.year, 12, 31)
|
||||
else:
|
||||
end = date(reference_date.year, end_month + 1, 1) - timedelta(days=1)
|
||||
return Period(date_from=start, date_to=end, label=f"Q{quarter} {reference_date.year}")
|
||||
|
||||
|
||||
def comparison_period(period: Period, mode: ComparisonMode) -> Period | None:
|
||||
"""Derive the comparison period for `period` per `mode`.
|
||||
|
||||
`previous_period`: same length, immediately before
|
||||
`previous_year`: same calendar dates, one year earlier
|
||||
`none`: returns None"""
|
||||
if mode == 'none':
|
||||
return None
|
||||
if mode == 'previous_period':
|
||||
days = period.days
|
||||
new_to = period.date_from - timedelta(days=1)
|
||||
new_from = new_to - timedelta(days=days - 1)
|
||||
return Period(date_from=new_from, date_to=new_to,
|
||||
label=f"{period.label} (previous)")
|
||||
if mode == 'previous_year':
|
||||
try:
|
||||
new_from = period.date_from.replace(year=period.date_from.year - 1)
|
||||
new_to = period.date_to.replace(year=period.date_to.year - 1)
|
||||
except ValueError:
|
||||
new_from = period.date_from.replace(year=period.date_from.year - 1, day=28)
|
||||
new_to = period.date_to.replace(year=period.date_to.year - 1, day=28)
|
||||
return Period(date_from=new_from, date_to=new_to,
|
||||
label=f"{period.label} (prev year)")
|
||||
raise ValueError(f"Unknown comparison mode: {mode}")
|
||||
@@ -0,0 +1,81 @@
|
||||
"""Drill-down: from a report line to its underlying journal items.
|
||||
|
||||
Given an account_id and a Period, fetches the matching account.move.line
|
||||
records and returns them in a flat list. Used by the OWL drill-down
|
||||
dialog and the engine's drill_down() public API."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from datetime import date
|
||||
|
||||
|
||||
@dataclass
|
||||
class DrillDownRow:
|
||||
move_line_id: int
|
||||
move_id: int
|
||||
move_name: str
|
||||
date: date
|
||||
account_code: str
|
||||
account_name: str
|
||||
partner_name: str | None
|
||||
label: str
|
||||
debit: float
|
||||
credit: float
|
||||
balance: float
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
'move_line_id': self.move_line_id,
|
||||
'move_id': self.move_id,
|
||||
'move_name': self.move_name,
|
||||
'date': str(self.date),
|
||||
'account_code': self.account_code,
|
||||
'account_name': self.account_name,
|
||||
'partner_name': self.partner_name or '',
|
||||
'label': self.label,
|
||||
'debit': self.debit,
|
||||
'credit': self.credit,
|
||||
'balance': self.balance,
|
||||
}
|
||||
|
||||
|
||||
def fetch_drill_down(
|
||||
env,
|
||||
*,
|
||||
account_id: int,
|
||||
date_from: date,
|
||||
date_to: date,
|
||||
company_id: int | None = None,
|
||||
limit: int = 500,
|
||||
) -> list[dict]:
|
||||
"""Fetch journal items for an account within a date range.
|
||||
|
||||
Returns flat list of dicts ready for the drill-down OWL table."""
|
||||
Line = env['account.move.line'].sudo()
|
||||
domain = [
|
||||
('account_id', '=', account_id),
|
||||
('date', '>=', date_from),
|
||||
('date', '<=', date_to),
|
||||
('parent_state', '=', 'posted'),
|
||||
]
|
||||
if company_id:
|
||||
domain.append(('company_id', '=', company_id))
|
||||
|
||||
move_lines = Line.search(domain, limit=limit, order='date asc, id asc')
|
||||
rows = []
|
||||
for ml in move_lines:
|
||||
rows.append(
|
||||
DrillDownRow(
|
||||
move_line_id=ml.id,
|
||||
move_id=ml.move_id.id,
|
||||
move_name=ml.move_id.name or '',
|
||||
date=ml.date,
|
||||
account_code=ml.account_id.code,
|
||||
account_name=ml.account_id.name,
|
||||
partner_name=ml.partner_id.name if ml.partner_id else None,
|
||||
label=ml.name or '',
|
||||
debit=ml.debit,
|
||||
credit=ml.credit,
|
||||
balance=ml.balance,
|
||||
).to_dict()
|
||||
)
|
||||
return rows
|
||||
@@ -0,0 +1,143 @@
|
||||
"""Resolve a fusion.report definition into report rows.
|
||||
|
||||
Pure-Python: takes line_specs (list of dicts), a period, and aggregated
|
||||
move-line data (per-account totals) - returns ordered list of report row
|
||||
dicts ready for the OWL frontend or PDF rendering.
|
||||
|
||||
Row shape:
|
||||
{
|
||||
'id': 'line_<index>',
|
||||
'label': str,
|
||||
'level': int, # indentation depth
|
||||
'is_subtotal': bool,
|
||||
'amount': float,
|
||||
'amount_comparison': float | None,
|
||||
'variance_pct': float | None,
|
||||
'account_id': int | None, # for drill-down (None for subtotals)
|
||||
'children': list[dict], # populated when expanded
|
||||
}"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
from .totaling import TotalLine
|
||||
|
||||
|
||||
@dataclass
|
||||
class ReportRow:
|
||||
id: str
|
||||
label: str
|
||||
level: int = 0
|
||||
is_subtotal: bool = False
|
||||
amount: float = 0.0
|
||||
amount_comparison: float | None = None
|
||||
variance_pct: float | None = None
|
||||
account_id: int | None = None
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
'id': self.id,
|
||||
'label': self.label,
|
||||
'level': self.level,
|
||||
'is_subtotal': self.is_subtotal,
|
||||
'amount': self.amount,
|
||||
'amount_comparison': self.amount_comparison,
|
||||
'variance_pct': self.variance_pct,
|
||||
'account_id': self.account_id,
|
||||
}
|
||||
|
||||
|
||||
def resolve(
|
||||
line_specs: list[dict],
|
||||
*,
|
||||
account_totals: dict[int, TotalLine],
|
||||
accounts_by_id: dict[int, dict],
|
||||
comparison_totals: dict[int, TotalLine] | None = None,
|
||||
) -> list[dict]:
|
||||
"""Resolve line_specs against actual account totals -> list of row dicts.
|
||||
|
||||
Args:
|
||||
line_specs: report definition line specs (from fusion.report.line_specs).
|
||||
account_totals: {account_id: TotalLine} for the period.
|
||||
accounts_by_id: {account_id: {code, name, account_type, ...}}.
|
||||
comparison_totals: optional {account_id: TotalLine} for comparison period.
|
||||
|
||||
Returns: list of row dicts."""
|
||||
rows: list[ReportRow] = []
|
||||
|
||||
for idx, spec in enumerate(line_specs):
|
||||
if spec.get('compute') == 'subtotal':
|
||||
n = spec.get('above', 1)
|
||||
sign = spec.get('sign', 1)
|
||||
recent = [r.amount for r in rows[-n:] if not r.is_subtotal]
|
||||
row = ReportRow(
|
||||
id=f'line_{idx}',
|
||||
label=spec.get('label', 'Subtotal'),
|
||||
level=spec.get('level', 0),
|
||||
is_subtotal=True,
|
||||
amount=sum(recent) * sign,
|
||||
)
|
||||
if comparison_totals is not None:
|
||||
comp_recent = [
|
||||
r.amount_comparison
|
||||
for r in rows[-n:]
|
||||
if not r.is_subtotal and r.amount_comparison is not None
|
||||
]
|
||||
row.amount_comparison = (
|
||||
sum(comp_recent) * sign if comp_recent else None
|
||||
)
|
||||
rows.append(row)
|
||||
|
||||
elif spec.get('account_type_prefix'):
|
||||
prefix = spec['account_type_prefix']
|
||||
sign = spec.get('sign', 1)
|
||||
matched_ids = [
|
||||
aid for aid, info in accounts_by_id.items()
|
||||
if info.get('account_type', '').startswith(prefix)
|
||||
]
|
||||
amount = sum(
|
||||
account_totals.get(aid, TotalLine()).balance * sign
|
||||
for aid in matched_ids
|
||||
)
|
||||
row = ReportRow(
|
||||
id=f'line_{idx}',
|
||||
label=spec.get('label', prefix),
|
||||
level=spec.get('level', 0),
|
||||
amount=amount,
|
||||
)
|
||||
if comparison_totals is not None:
|
||||
comp_amount = sum(
|
||||
comparison_totals.get(aid, TotalLine()).balance * sign
|
||||
for aid in matched_ids
|
||||
)
|
||||
row.amount_comparison = comp_amount
|
||||
if comp_amount != 0:
|
||||
row.variance_pct = (
|
||||
(amount - comp_amount) / abs(comp_amount)
|
||||
) * 100
|
||||
rows.append(row)
|
||||
|
||||
elif spec.get('account_id'):
|
||||
aid = spec['account_id']
|
||||
sign = spec.get('sign', 1)
|
||||
tot = account_totals.get(aid, TotalLine())
|
||||
label = spec.get('label') or accounts_by_id.get(aid, {}).get(
|
||||
'name', f'Account {aid}'
|
||||
)
|
||||
row = ReportRow(
|
||||
id=f'line_{idx}',
|
||||
label=label,
|
||||
level=spec.get('level', 0),
|
||||
amount=tot.balance * sign,
|
||||
account_id=aid,
|
||||
)
|
||||
if comparison_totals is not None:
|
||||
comp = comparison_totals.get(aid, TotalLine())
|
||||
row.amount_comparison = comp.balance * sign
|
||||
if row.amount_comparison and row.amount_comparison != 0:
|
||||
row.variance_pct = (
|
||||
(row.amount - row.amount_comparison)
|
||||
/ abs(row.amount_comparison)
|
||||
) * 100
|
||||
rows.append(row)
|
||||
|
||||
return [r.to_dict() for r in rows]
|
||||
@@ -0,0 +1,49 @@
|
||||
"""Move-line aggregation primitives for report totaling.
|
||||
|
||||
Pure-Python helpers - callers pass dicts with debit/credit/balance/currency keys,
|
||||
no Odoo recordsets needed. Keeps the math testable without an ORM."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class TotalLine:
|
||||
debit: float = 0.0
|
||||
credit: float = 0.0
|
||||
balance: float = 0.0
|
||||
debit_currency: float = 0.0
|
||||
credit_currency: float = 0.0
|
||||
balance_currency: float = 0.0
|
||||
line_count: int = 0
|
||||
|
||||
|
||||
def aggregate(move_lines: list[dict]) -> TotalLine:
|
||||
"""Aggregate a list of move-line dicts into a TotalLine.
|
||||
|
||||
Each dict must have: debit, credit, balance (signed). Optional:
|
||||
debit_currency, credit_currency, balance_currency."""
|
||||
out = TotalLine()
|
||||
for ml in move_lines:
|
||||
out.debit += ml.get('debit', 0.0)
|
||||
out.credit += ml.get('credit', 0.0)
|
||||
out.balance += ml.get('balance', 0.0)
|
||||
out.debit_currency += ml.get('debit_currency', 0.0)
|
||||
out.credit_currency += ml.get('credit_currency', 0.0)
|
||||
out.balance_currency += ml.get('balance_currency', 0.0)
|
||||
out.line_count += 1
|
||||
return out
|
||||
|
||||
|
||||
def aggregate_per_account(move_lines: list[dict]) -> dict[int, TotalLine]:
|
||||
"""Group + aggregate by account_id. Returns {account_id: TotalLine}."""
|
||||
grouped: dict[int, list[dict]] = {}
|
||||
for ml in move_lines:
|
||||
acct = ml['account_id']
|
||||
grouped.setdefault(acct, []).append(ml)
|
||||
return {acct: aggregate(lines) for acct, lines in grouped.items()}
|
||||
|
||||
|
||||
def is_balanced(move_lines: list[dict], *, tolerance: float = 0.005) -> bool:
|
||||
"""True if total debits == total credits (within tolerance for rounding)."""
|
||||
agg = aggregate(move_lines)
|
||||
return abs(agg.debit - agg.credit) <= tolerance
|
||||
Reference in New Issue
Block a user