feat(fusion_accounting_reports): pure-Python services for date+account+totaling

Three service modules with no Odoo dependencies:
- date_periods: fiscal year/month/quarter bounds + comparison derivation
- account_hierarchy: parent-child tree walker with type filtering
- totaling: move-line aggregation primitives

18 unit tests covering edge cases (December rollover, Feb 29, fiscal-
year-before-start, balance check tolerance).

Made-with: Cursor
This commit is contained in:
gsinghpal
2026-04-19 15:07:05 -04:00
parent a93162cb70
commit 0a9ed635e8
7 changed files with 361 additions and 0 deletions

View File

@@ -0,0 +1 @@
from . import services

View File

@@ -0,0 +1,3 @@
from . import date_periods
from . import account_hierarchy
from . import totaling

View File

@@ -0,0 +1,62 @@
"""Account hierarchy walker.
Given a flat list of accounts with parent_id pointers, build a tree and
provide a recursive walker that yields (account, depth, ancestors) tuples.
Used by report line resolvers to render group sub-totals."""
from dataclasses import dataclass, field
from typing import Iterator
@dataclass
class AccountNode:
id: int
code: str
name: str
account_type: str
parent_id: int | None
children: list['AccountNode'] = field(default_factory=list)
def build_tree(accounts: list[dict]) -> list[AccountNode]:
"""Build a forest from a flat list of account dicts.
Each dict must have keys: id, code, name, account_type, parent_id (nullable)."""
nodes: dict[int, AccountNode] = {}
for acc in accounts:
nodes[acc['id']] = AccountNode(
id=acc['id'], code=acc['code'], name=acc['name'],
account_type=acc['account_type'],
parent_id=acc.get('parent_id'),
)
roots: list[AccountNode] = []
for node in nodes.values():
if node.parent_id and node.parent_id in nodes:
nodes[node.parent_id].children.append(node)
else:
roots.append(node)
for node in nodes.values():
node.children.sort(key=lambda n: n.code)
roots.sort(key=lambda n: n.code)
return roots
def walk(roots: list[AccountNode], *, max_depth: int = 10) -> Iterator[tuple[AccountNode, int, list[AccountNode]]]:
"""Depth-first walk yielding (node, depth, ancestors)."""
def _walk(node: AccountNode, depth: int, ancestors: list[AccountNode]):
yield (node, depth, ancestors)
if depth < max_depth:
for child in node.children:
yield from _walk(child, depth + 1, ancestors + [node])
for root in roots:
yield from _walk(root, 0, [])
def filter_by_account_type(roots: list[AccountNode], type_prefix: str) -> list[AccountNode]:
"""Return all nodes whose account_type starts with type_prefix
(e.g. 'asset_' returns asset_receivable, asset_cash, etc.)."""
matches: list[AccountNode] = []
for node, _depth, _ancestors in walk(roots):
if node.account_type.startswith(type_prefix):
matches.append(node)
return matches

View File

@@ -0,0 +1,103 @@
"""Date period math for financial reports.
Pure-Python helpers that compute:
- Fiscal year start/end given any reference date + company fiscal year settings
- Comparison periods (prior year same period, prior period, etc.)
- Period boundaries for monthly / quarterly / yearly reporting
NO Odoo imports - all callers pass in primitive types so the same module
is unit-testable without an Odoo registry."""
from dataclasses import dataclass
from datetime import date, timedelta
from typing import Literal
PeriodGranularity = Literal['month', 'quarter', 'year', 'custom']
ComparisonMode = Literal['none', 'previous_period', 'previous_year']
@dataclass(frozen=True)
class Period:
date_from: date
date_to: date
label: str
def __post_init__(self):
if self.date_from > self.date_to:
raise ValueError(f"date_from ({self.date_from}) > date_to ({self.date_to})")
@property
def days(self) -> int:
return (self.date_to - self.date_from).days + 1
def fiscal_year_bounds(reference_date: date, *, fy_start_month: int = 1,
fy_start_day: int = 1) -> Period:
"""Return the fiscal year period containing `reference_date`.
Default: calendar year (Jan 1 - Dec 31). Pass fy_start_month=4, fy_start_day=1
for an April-March fiscal year."""
if reference_date.month < fy_start_month or (
reference_date.month == fy_start_month and reference_date.day < fy_start_day
):
start_year = reference_date.year - 1
else:
start_year = reference_date.year
start = date(start_year, fy_start_month, fy_start_day)
next_start = date(start_year + 1, fy_start_month, fy_start_day)
end = next_start - timedelta(days=1)
return Period(date_from=start, date_to=end, label=f"FY {start_year}")
def month_bounds(reference_date: date) -> Period:
"""Return the calendar month containing `reference_date`."""
start = reference_date.replace(day=1)
if reference_date.month == 12:
next_start = date(reference_date.year + 1, 1, 1)
else:
next_start = date(reference_date.year, reference_date.month + 1, 1)
return Period(
date_from=start,
date_to=next_start - timedelta(days=1),
label=start.strftime('%B %Y'),
)
def quarter_bounds(reference_date: date) -> Period:
"""Return the calendar quarter containing `reference_date`."""
quarter = (reference_date.month - 1) // 3 + 1
start_month = (quarter - 1) * 3 + 1
start = date(reference_date.year, start_month, 1)
end_month = start_month + 2
if end_month == 12:
end = date(reference_date.year, 12, 31)
else:
end = date(reference_date.year, end_month + 1, 1) - timedelta(days=1)
return Period(date_from=start, date_to=end, label=f"Q{quarter} {reference_date.year}")
def comparison_period(period: Period, mode: ComparisonMode) -> Period | None:
"""Derive the comparison period for `period` per `mode`.
`previous_period`: same length, immediately before
`previous_year`: same calendar dates, one year earlier
`none`: returns None"""
if mode == 'none':
return None
if mode == 'previous_period':
days = period.days
new_to = period.date_from - timedelta(days=1)
new_from = new_to - timedelta(days=days - 1)
return Period(date_from=new_from, date_to=new_to,
label=f"{period.label} (previous)")
if mode == 'previous_year':
try:
new_from = period.date_from.replace(year=period.date_from.year - 1)
new_to = period.date_to.replace(year=period.date_to.year - 1)
except ValueError:
new_from = period.date_from.replace(year=period.date_from.year - 1, day=28)
new_to = period.date_to.replace(year=period.date_to.year - 1, day=28)
return Period(date_from=new_from, date_to=new_to,
label=f"{period.label} (prev year)")
raise ValueError(f"Unknown comparison mode: {mode}")

View File

@@ -0,0 +1,49 @@
"""Move-line aggregation primitives for report totaling.
Pure-Python helpers - callers pass dicts with debit/credit/balance/currency keys,
no Odoo recordsets needed. Keeps the math testable without an ORM."""
from dataclasses import dataclass
@dataclass
class TotalLine:
debit: float = 0.0
credit: float = 0.0
balance: float = 0.0
debit_currency: float = 0.0
credit_currency: float = 0.0
balance_currency: float = 0.0
line_count: int = 0
def aggregate(move_lines: list[dict]) -> TotalLine:
"""Aggregate a list of move-line dicts into a TotalLine.
Each dict must have: debit, credit, balance (signed). Optional:
debit_currency, credit_currency, balance_currency."""
out = TotalLine()
for ml in move_lines:
out.debit += ml.get('debit', 0.0)
out.credit += ml.get('credit', 0.0)
out.balance += ml.get('balance', 0.0)
out.debit_currency += ml.get('debit_currency', 0.0)
out.credit_currency += ml.get('credit_currency', 0.0)
out.balance_currency += ml.get('balance_currency', 0.0)
out.line_count += 1
return out
def aggregate_per_account(move_lines: list[dict]) -> dict[int, TotalLine]:
"""Group + aggregate by account_id. Returns {account_id: TotalLine}."""
grouped: dict[int, list[dict]] = {}
for ml in move_lines:
acct = ml['account_id']
grouped.setdefault(acct, []).append(ml)
return {acct: aggregate(lines) for acct, lines in grouped.items()}
def is_balanced(move_lines: list[dict], *, tolerance: float = 0.005) -> bool:
"""True if total debits == total credits (within tolerance for rounding)."""
agg = aggregate(move_lines)
return abs(agg.debit - agg.credit) <= tolerance

View File

@@ -0,0 +1 @@
from . import test_services_unit

View File

@@ -0,0 +1,142 @@
"""Unit tests for date_periods, account_hierarchy, totaling services."""
from datetime import date
from odoo.tests.common import TransactionCase, tagged
from odoo.addons.fusion_accounting_reports.services.date_periods import (
Period, fiscal_year_bounds, month_bounds, quarter_bounds, comparison_period,
)
from odoo.addons.fusion_accounting_reports.services.account_hierarchy import (
build_tree, walk, filter_by_account_type,
)
from odoo.addons.fusion_accounting_reports.services.totaling import (
aggregate, aggregate_per_account, is_balanced,
)
@tagged('post_install', '-at_install')
class TestDatePeriods(TransactionCase):
def test_fiscal_year_calendar_default(self):
period = fiscal_year_bounds(date(2026, 6, 15))
self.assertEqual(period.date_from, date(2026, 1, 1))
self.assertEqual(period.date_to, date(2026, 12, 31))
def test_fiscal_year_april_start(self):
period = fiscal_year_bounds(date(2026, 6, 15), fy_start_month=4)
self.assertEqual(period.date_from, date(2026, 4, 1))
self.assertEqual(period.date_to, date(2027, 3, 31))
def test_fiscal_year_before_start_returns_prior(self):
period = fiscal_year_bounds(date(2026, 2, 15), fy_start_month=4)
self.assertEqual(period.date_from, date(2025, 4, 1))
self.assertEqual(period.date_to, date(2026, 3, 31))
def test_month_bounds(self):
period = month_bounds(date(2026, 4, 19))
self.assertEqual(period.date_from, date(2026, 4, 1))
self.assertEqual(period.date_to, date(2026, 4, 30))
def test_month_bounds_december(self):
period = month_bounds(date(2026, 12, 19))
self.assertEqual(period.date_from, date(2026, 12, 1))
self.assertEqual(period.date_to, date(2026, 12, 31))
def test_quarter_bounds_q2(self):
period = quarter_bounds(date(2026, 5, 15))
self.assertEqual(period.date_from, date(2026, 4, 1))
self.assertEqual(period.date_to, date(2026, 6, 30))
def test_comparison_previous_year(self):
period = Period(date(2026, 1, 1), date(2026, 12, 31), 'FY 2026')
comp = comparison_period(period, 'previous_year')
self.assertEqual(comp.date_from, date(2025, 1, 1))
self.assertEqual(comp.date_to, date(2025, 12, 31))
def test_comparison_previous_period_same_length(self):
period = Period(date(2026, 4, 1), date(2026, 4, 30), 'Apr 2026')
comp = comparison_period(period, 'previous_period')
self.assertEqual(comp.date_to, date(2026, 3, 31))
self.assertEqual(comp.days, period.days)
def test_period_validates_bounds(self):
with self.assertRaises(ValueError):
Period(date(2026, 12, 31), date(2026, 1, 1), 'invalid')
@tagged('post_install', '-at_install')
class TestAccountHierarchy(TransactionCase):
def setUp(self):
super().setUp()
self.flat = [
{'id': 1, 'code': '1', 'name': 'Assets', 'account_type': 'asset_root', 'parent_id': None},
{'id': 2, 'code': '11', 'name': 'Cash', 'account_type': 'asset_cash', 'parent_id': 1},
{'id': 3, 'code': '12', 'name': 'AR', 'account_type': 'asset_receivable', 'parent_id': 1},
{'id': 4, 'code': '2', 'name': 'Liabilities', 'account_type': 'liability_root', 'parent_id': None},
{'id': 5, 'code': '21', 'name': 'AP', 'account_type': 'liability_payable', 'parent_id': 4},
]
def test_build_tree_returns_two_roots(self):
roots = build_tree(self.flat)
self.assertEqual(len(roots), 2)
def test_walk_yields_all_nodes(self):
roots = build_tree(self.flat)
ids = [n.id for n, _, _ in walk(roots)]
self.assertEqual(set(ids), {1, 2, 3, 4, 5})
def test_walk_depth_correct(self):
roots = build_tree(self.flat)
depths = {n.id: depth for n, depth, _ in walk(roots)}
self.assertEqual(depths[1], 0)
self.assertEqual(depths[2], 1)
self.assertEqual(depths[3], 1)
def test_filter_by_type_prefix(self):
roots = build_tree(self.flat)
assets = filter_by_account_type(roots, 'asset_')
self.assertEqual(len(assets), 3)
@tagged('post_install', '-at_install')
class TestTotaling(TransactionCase):
def test_aggregate_empty(self):
result = aggregate([])
self.assertEqual(result.debit, 0.0)
self.assertEqual(result.line_count, 0)
def test_aggregate_simple(self):
lines = [
{'debit': 100, 'credit': 0, 'balance': 100, 'account_id': 1},
{'debit': 0, 'credit': 50, 'balance': -50, 'account_id': 1},
]
result = aggregate(lines)
self.assertEqual(result.debit, 100)
self.assertEqual(result.credit, 50)
self.assertEqual(result.balance, 50)
def test_aggregate_per_account_groups_correctly(self):
lines = [
{'debit': 100, 'credit': 0, 'balance': 100, 'account_id': 1},
{'debit': 50, 'credit': 0, 'balance': 50, 'account_id': 1},
{'debit': 0, 'credit': 25, 'balance': -25, 'account_id': 2},
]
result = aggregate_per_account(lines)
self.assertEqual(result[1].debit, 150)
self.assertEqual(result[2].credit, 25)
def test_is_balanced_true(self):
lines = [
{'debit': 100, 'credit': 0, 'balance': 100},
{'debit': 0, 'credit': 100, 'balance': -100},
]
self.assertTrue(is_balanced(lines))
def test_is_balanced_false(self):
lines = [
{'debit': 100, 'credit': 0, 'balance': 100},
{'debit': 0, 'credit': 50, 'balance': -50},
]
self.assertFalse(is_balanced(lines))