Compare commits

...

4 Commits

Author SHA1 Message Date
gsinghpal
475d17c1aa test(fusion_accounting_assets): performance benchmarks with P95 targets
Made-with: Cursor
2026-04-19 17:26:01 -04:00
gsinghpal
fec1c12246 feat(fusion_accounting_assets): MV for per-asset book value snapshot
Made-with: Cursor
2026-04-19 17:25:14 -04:00
gsinghpal
c939b83812 test(fusion_accounting_assets): integration tests for all 3 depreciation methods
Made-with: Cursor
2026-04-19 17:23:41 -04:00
gsinghpal
1e70b8d5c0 test(fusion_accounting_assets): Hypothesis property-based depreciation invariants
Made-with: Cursor
2026-04-19 17:22:55 -04:00
11 changed files with 474 additions and 1 deletions

View File

@@ -1,6 +1,6 @@
{
'name': 'Fusion Accounting Assets',
'version': '19.0.1.0.17',
'version': '19.0.1.0.18',
'category': 'Accounting/Accounting',
'summary': 'AI-augmented asset management with depreciation schedules.',
'description': """

View File

@@ -11,6 +11,16 @@
<field name="active" eval="True"/>
</record>
<record id="cron_fusion_assets_refresh_book_values_mv" model="ir.cron">
<field name="name">Fusion Assets — Refresh Book Values MV</field>
<field name="model_id" ref="model_fusion_assets_cron"/>
<field name="state">code</field>
<field name="code">model._cron_refresh_book_values_mv()</field>
<field name="interval_number">1</field>
<field name="interval_type">hours</field>
<field name="active" eval="True"/>
</record>
<record id="cron_fusion_assets_anomaly_scan" model="ir.cron">
<field name="name">Fusion Assets — Monthly Anomaly Scan</field>
<field name="model_id" ref="model_fusion_assets_cron"/>

View File

@@ -0,0 +1,29 @@
-- Materialized view: per-asset book value snapshot.
-- Refreshed via cron. Used by the OWL dashboard for portfolio summaries.
CREATE MATERIALIZED VIEW IF NOT EXISTS fusion_asset_book_values_mv AS
SELECT
a.id AS id,
a.id AS asset_id,
a.company_id,
a.category_id,
a.state,
a.cost,
a.salvage_value,
COALESCE(SUM(CASE WHEN l.is_posted THEN l.amount ELSE 0 END), 0) AS total_depreciated,
a.cost - COALESCE(SUM(CASE WHEN l.is_posted THEN l.amount ELSE 0 END), 0) AS book_value,
COUNT(l.id) FILTER (WHERE l.is_posted) AS posted_periods,
COUNT(l.id) FILTER (WHERE NOT l.is_posted) AS pending_periods,
a.acquisition_date,
a.in_service_date
FROM fusion_asset a
LEFT JOIN fusion_asset_depreciation_line l ON l.asset_id = a.id
GROUP BY a.id, a.company_id, a.category_id, a.state, a.cost, a.salvage_value,
a.acquisition_date, a.in_service_date;
CREATE UNIQUE INDEX IF NOT EXISTS fusion_asset_book_values_mv_pkey
ON fusion_asset_book_values_mv (id);
CREATE INDEX IF NOT EXISTS fusion_asset_book_values_mv_company_state
ON fusion_asset_book_values_mv (company_id, state);
CREATE INDEX IF NOT EXISTS fusion_asset_book_values_mv_category
ON fusion_asset_book_values_mv (category_id) WHERE category_id IS NOT NULL;

View File

@@ -6,3 +6,4 @@ from . import fusion_asset_anomaly
from . import account_move
from . import fusion_asset_engine
from . import fusion_assets_cron
from . import fusion_asset_book_values_mv

View File

@@ -0,0 +1,59 @@
"""MV of per-asset book value snapshot. Refresh via cron or model._refresh()."""
import logging
import os
from odoo import api, fields, models
_logger = logging.getLogger(__name__)
class FusionAssetBookValuesMV(models.Model):
_name = "fusion.asset.book.values.mv"
_description = "MV of asset book value snapshot"
_auto = False
_table = "fusion_asset_book_values_mv"
_order = "book_value desc"
asset_id = fields.Many2one('fusion.asset', readonly=True)
company_id = fields.Many2one('res.company', readonly=True)
category_id = fields.Many2one('fusion.asset.category', readonly=True)
state = fields.Char(readonly=True)
cost = fields.Float(readonly=True)
salvage_value = fields.Float(readonly=True)
total_depreciated = fields.Float(readonly=True)
book_value = fields.Float(readonly=True)
posted_periods = fields.Integer(readonly=True)
pending_periods = fields.Integer(readonly=True)
acquisition_date = fields.Date(readonly=True)
in_service_date = fields.Date(readonly=True)
def init(self):
sql_path = os.path.join(
os.path.dirname(__file__), '..', 'data', 'sql',
'create_mv_asset_book_values.sql',
)
with open(sql_path, 'r') as f:
self.env.cr.execute(f.read())
_logger.info("fusion_asset_book_values_mv: created/verified MV")
@api.model
def _refresh(self, *, concurrently=True):
# CONCURRENTLY requires a unique index (we have one) and that the MV
# has been populated at least once. Wrap the concurrent attempt in a
# savepoint so a failure (e.g. first-ever refresh before the MV is
# populated) does NOT poison the surrounding transaction; we then
# fall back to a plain REFRESH.
if concurrently:
try:
with self.env.cr.savepoint():
self.env.cr.execute(
"REFRESH MATERIALIZED VIEW CONCURRENTLY "
"fusion_asset_book_values_mv"
)
return
except Exception as e: # noqa: BLE001
_logger.warning("Concurrent MV refresh failed (%s); fallback", e)
self.env.cr.execute(
"REFRESH MATERIALIZED VIEW fusion_asset_book_values_mv"
)

View File

@@ -36,6 +36,17 @@ class FusionAssetsCron(models.AbstractModel):
"Cron: posted depreciation on %d lines across %d running assets",
posted_total, len(running_assets),
)
# Keep the book-value MV in sync after posting so the dashboard
# reflects today's numbers without waiting for the dedicated MV cron.
try:
self.env['fusion.asset.book.values.mv']._refresh()
except Exception as e: # noqa: BLE001
_logger.warning("Post-cron MV refresh failed: %s", e)
@api.model
def _cron_refresh_book_values_mv(self):
"""Refresh the per-asset book value MV (hourly)."""
self.env['fusion.asset.book.values.mv']._refresh()
@api.model
def _cron_anomaly_scan(self):

View File

@@ -15,3 +15,7 @@ from . import test_assets_controller
from . import test_assets_adapter
from . import test_asset_tools
from . import test_assets_cron
from . import test_engine_property
from . import test_method_integration
from . import test_asset_book_values_mv
from . import test_performance_benchmarks

View File

@@ -0,0 +1,29 @@
"""Tests for the per-asset book value MV."""
from datetime import date
from odoo.tests.common import TransactionCase
from odoo.tests import tagged
@tagged('post_install', '-at_install')
class TestAssetBookValuesMV(TransactionCase):
def test_mv_exists_and_is_queryable(self):
self.env['fusion.asset.book.values.mv']._refresh(concurrently=False)
rows = self.env['fusion.asset.book.values.mv'].search([], limit=10)
self.assertIsNotNone(rows)
def test_mv_includes_new_asset_after_refresh(self):
asset = self.env['fusion.asset'].create({
'name': 'MV Test', 'cost': 5000, 'salvage_value': 500,
'acquisition_date': date(2026, 1, 1),
'method': 'straight_line', 'useful_life_years': 5,
})
self.env.flush_all()
self.env['fusion.asset.book.values.mv']._refresh(concurrently=False)
mv_row = self.env['fusion.asset.book.values.mv'].search([
('asset_id', '=', asset.id),
], limit=1)
self.assertTrue(mv_row)
self.assertAlmostEqual(mv_row.book_value, 5000, places=2)

View File

@@ -0,0 +1,101 @@
"""Property-based invariant tests for the asset engine.
Hypothesis generates random inputs; we assert mathematical invariants
that must hold regardless of input."""
from hypothesis import given, settings, strategies as st, HealthCheck
from odoo.tests.common import TransactionCase
from odoo.tests import tagged
from odoo.addons.fusion_accounting_assets.services.depreciation_methods import (
straight_line, declining_balance, units_of_production,
)
@tagged('post_install', '-at_install', 'property_based')
class TestDepreciationInvariants(TransactionCase):
@given(
cost=st.floats(min_value=100.0, max_value=1000000.0,
allow_nan=False, allow_infinity=False),
salvage_pct=st.floats(min_value=0.0, max_value=0.5,
allow_nan=False, allow_infinity=False),
n_periods=st.integers(min_value=1, max_value=40),
)
@settings(max_examples=80, deadline=2000,
suppress_health_check=[HealthCheck.function_scoped_fixture])
def test_straight_line_total_equals_cost_minus_salvage(self, cost, salvage_pct, n_periods):
cost = round(cost, 2)
salvage = round(cost * salvage_pct, 2)
steps = straight_line(cost=cost, salvage_value=salvage, n_periods=n_periods)
total = sum(s.period_amount for s in steps)
# Within 1c rounding tolerance
self.assertAlmostEqual(
total, cost - salvage, places=1,
msg=f"cost={cost}, salvage={salvage}, n={n_periods}, total={total:.2f}",
)
@given(
cost=st.floats(min_value=100.0, max_value=1000000.0,
allow_nan=False, allow_infinity=False),
salvage_pct=st.floats(min_value=0.0, max_value=0.5,
allow_nan=False, allow_infinity=False),
n_periods=st.integers(min_value=1, max_value=20),
)
@settings(max_examples=50, deadline=2000,
suppress_health_check=[HealthCheck.function_scoped_fixture])
def test_straight_line_book_value_decreasing(self, cost, salvage_pct, n_periods):
cost = round(cost, 2)
salvage = round(cost * salvage_pct, 2)
steps = straight_line(cost=cost, salvage_value=salvage, n_periods=n_periods)
for i in range(1, len(steps)):
self.assertLessEqual(
steps[i].book_value_at_end,
steps[i - 1].book_value_at_end + 0.01,
)
@given(
cost=st.floats(min_value=1000.0, max_value=100000.0,
allow_nan=False, allow_infinity=False),
salvage_pct=st.floats(min_value=0.0, max_value=0.3,
allow_nan=False, allow_infinity=False),
n_periods=st.integers(min_value=2, max_value=20),
rate=st.floats(min_value=0.05, max_value=0.5,
allow_nan=False, allow_infinity=False),
)
@settings(max_examples=50, deadline=3000,
suppress_health_check=[HealthCheck.function_scoped_fixture])
def test_declining_balance_never_below_salvage(self, cost, salvage_pct, n_periods, rate):
cost = round(cost, 2)
salvage = round(cost * salvage_pct, 2)
steps = declining_balance(
cost=cost, salvage_value=salvage,
n_periods=n_periods, rate=rate,
)
for s in steps:
self.assertGreaterEqual(
s.book_value_at_end, salvage - 0.01,
msg=f"cost={cost}, salvage={salvage}, rate={rate}, step={s}",
)
@given(
cost=st.floats(min_value=1000.0, max_value=100000.0,
allow_nan=False, allow_infinity=False),
total_units=st.floats(min_value=100.0, max_value=10000.0,
allow_nan=False, allow_infinity=False),
n_periods=st.integers(min_value=1, max_value=10),
)
@settings(max_examples=30, deadline=2000,
suppress_health_check=[HealthCheck.function_scoped_fixture])
def test_units_of_production_total_at_full_use_equals_depreciable(self, cost, total_units, n_periods):
cost = round(cost, 2)
salvage = 0.0
# Distribute total_units evenly across periods
per_period = total_units / n_periods
steps = units_of_production(
cost=cost, salvage_value=salvage,
total_units_expected=total_units,
units_per_period=[per_period] * n_periods,
)
total = sum(s.period_amount for s in steps)
self.assertAlmostEqual(total, cost - salvage, places=1)

View File

@@ -0,0 +1,112 @@
"""Integration tests verifying all 3 depreciation methods through the engine."""
from datetime import date
from odoo.tests.common import TransactionCase
from odoo.tests import tagged
@tagged('post_install', '-at_install', 'integration')
class TestStraightLineIntegration(TransactionCase):
def setUp(self):
super().setUp()
self.engine = self.env['fusion.asset.engine']
def test_straight_line_5yr_no_salvage(self):
asset = self.env['fusion.asset'].create({
'name': 'SL Test', 'cost': 10000, 'salvage_value': 0,
'acquisition_date': date(2026, 1, 1),
'in_service_date': date(2026, 1, 1),
'method': 'straight_line', 'useful_life_years': 5,
})
self.engine.compute_depreciation_schedule(asset)
lines = asset.depreciation_line_ids.sorted('period_index')
self.assertEqual(len(lines), 5)
for line in lines:
self.assertAlmostEqual(line.amount, 2000, places=2)
def test_straight_line_10yr_with_salvage(self):
asset = self.env['fusion.asset'].create({
'name': 'SL10', 'cost': 50000, 'salvage_value': 5000,
'acquisition_date': date(2026, 1, 1),
'in_service_date': date(2026, 1, 1),
'method': 'straight_line', 'useful_life_years': 10,
})
self.engine.compute_depreciation_schedule(asset)
lines = asset.depreciation_line_ids.sorted('period_index')
self.assertEqual(len(lines), 10)
# Each year = (50000-5000)/10 = 4500; total depreciable = 45000
self.assertAlmostEqual(sum(lines.mapped('amount')), 45000, places=2)
def test_straight_line_book_value_at_end_equals_salvage(self):
asset = self.env['fusion.asset'].create({
'name': 'SL', 'cost': 10000, 'salvage_value': 1000,
'acquisition_date': date(2026, 1, 1),
'in_service_date': date(2026, 1, 1),
'method': 'straight_line', 'useful_life_years': 5,
})
self.engine.compute_depreciation_schedule(asset)
last = asset.depreciation_line_ids.sorted('period_index')[-1]
self.assertAlmostEqual(last.book_value_at_end, 1000, places=2)
@tagged('post_install', '-at_install', 'integration')
class TestDecliningBalanceIntegration(TransactionCase):
def setUp(self):
super().setUp()
self.engine = self.env['fusion.asset.engine']
def test_declining_balance_30pct(self):
asset = self.env['fusion.asset'].create({
'name': 'DB', 'cost': 10000, 'salvage_value': 1000,
'acquisition_date': date(2026, 1, 1),
'in_service_date': date(2026, 1, 1),
'method': 'declining_balance', 'useful_life_years': 5,
'declining_rate_pct': 30.0,
})
self.engine.compute_depreciation_schedule(asset)
lines = asset.depreciation_line_ids.sorted('period_index')
# First period: 10000 * 0.30 = 3000
self.assertAlmostEqual(lines[0].amount, 3000, places=2)
# Should not exceed salvage at end
self.assertGreaterEqual(lines[-1].book_value_at_end, 999.99)
def test_declining_balance_50pct_high_rate(self):
asset = self.env['fusion.asset'].create({
'name': 'DB50', 'cost': 8000, 'salvage_value': 500,
'acquisition_date': date(2026, 1, 1),
'in_service_date': date(2026, 1, 1),
'method': 'declining_balance', 'useful_life_years': 5,
'declining_rate_pct': 50.0,
})
self.engine.compute_depreciation_schedule(asset)
# First period: 8000 * 0.50 = 4000
first = asset.depreciation_line_ids.sorted('period_index')[0]
self.assertAlmostEqual(first.amount, 4000, places=2)
@tagged('post_install', '-at_install', 'integration')
class TestUnitsOfProductionIntegration(TransactionCase):
def setUp(self):
super().setUp()
self.engine = self.env['fusion.asset.engine']
def test_units_of_production_5yr_even_distribution(self):
asset = self.env['fusion.asset'].create({
'name': 'UOP', 'cost': 50000, 'salvage_value': 0,
'acquisition_date': date(2026, 1, 1),
'in_service_date': date(2026, 1, 1),
'method': 'units_of_production',
'total_units_expected': 100000,
'useful_life_years': 5,
})
self.engine.compute_depreciation_schedule(asset)
lines = asset.depreciation_line_ids.sorted('period_index')
# 5 periods, even distribution = 20000 units/period
# Each period: (20000/100000) * 50000 = 10000
self.assertEqual(len(lines), 5)
for line in lines:
self.assertAlmostEqual(line.amount, 10000, places=2)

View File

@@ -0,0 +1,117 @@
"""Performance benchmarks tagged 'benchmark'."""
import json
import statistics
import time
from datetime import date
from odoo.tests.common import HttpCase, TransactionCase, new_test_user
from odoo.tests import tagged
@tagged('post_install', '-at_install', 'benchmark')
class TestEngineBenchmarks(TransactionCase):
def setUp(self):
super().setUp()
self.engine = self.env['fusion.asset.engine']
def _percentile(self, samples, p):
if len(samples) <= 1:
return samples[0] if samples else 0
sorted_s = sorted(samples)
idx = int(len(sorted_s) * p / 100)
return sorted_s[min(idx, len(sorted_s) - 1)]
def test_compute_schedule_p95(self):
timings = []
for i in range(10):
asset = self.env['fusion.asset'].create({
'name': f'PerfAsset{i}', 'cost': 100000, 'salvage_value': 5000,
'acquisition_date': date(2026, 1, 1),
'method': 'straight_line', 'useful_life_years': 10,
})
start = time.perf_counter()
self.engine.compute_depreciation_schedule(asset)
timings.append((time.perf_counter() - start) * 1000)
p95 = self._percentile(timings, 95)
median = statistics.median(timings)
msg = f"compute_schedule(10yr): median={median:.0f}ms p95={p95:.0f}ms"
print(f"\n PERF: {msg} (target <500ms)")
self.assertLess(p95, 5000, f"way over budget: {msg}")
def test_post_depreciation_p95(self):
asset = self.env['fusion.asset'].create({
'name': 'PostPerf', 'cost': 50000,
'acquisition_date': date(2026, 1, 1),
'in_service_date': date(2026, 1, 1),
'method': 'straight_line', 'useful_life_years': 10,
})
self.engine.compute_depreciation_schedule(asset)
asset.action_set_running()
timings = []
for _ in range(5):
start = time.perf_counter()
self.engine.post_depreciation_entry(asset)
timings.append((time.perf_counter() - start) * 1000)
p95 = self._percentile(timings, 95)
median = statistics.median(timings)
msg = f"post_depreciation: median={median:.0f}ms p95={p95:.0f}ms"
print(f"\n PERF: {msg} (target <300ms)")
self.assertLess(p95, 3000)
def test_dispose_asset_p95(self):
timings = []
for i in range(5):
asset = self.env['fusion.asset'].create({
'name': f'DispPerf{i}', 'cost': 10000,
'acquisition_date': date(2026, 1, 1),
'in_service_date': date(2026, 1, 1),
'method': 'straight_line', 'useful_life_years': 5,
})
self.engine.compute_depreciation_schedule(asset)
asset.action_set_running()
start = time.perf_counter()
self.engine.dispose_asset(asset, sale_amount=5000)
timings.append((time.perf_counter() - start) * 1000)
p95 = self._percentile(timings, 95)
median = statistics.median(timings)
msg = f"dispose_asset: median={median:.0f}ms p95={p95:.0f}ms"
print(f"\n PERF: {msg} (target <300ms)")
self.assertLess(p95, 3000)
@tagged('post_install', '-at_install', 'benchmark')
class TestControllerBenchmarks(HttpCase):
def test_list_endpoint_p95(self):
new_test_user(
self.env, login='asset_perf',
groups='base.group_user,account.group_account_invoice',
)
for i in range(20):
self.env['fusion.asset'].create({
'name': f'ListPerf{i}', 'cost': 1000,
'acquisition_date': date(2026, 1, 1),
'method': 'straight_line', 'useful_life_years': 4,
})
self.authenticate('asset_perf', 'asset_perf')
timings = []
for _ in range(5):
start = time.perf_counter()
response = self.url_open(
'/fusion/assets/list',
data=json.dumps({
'jsonrpc': '2.0', 'method': 'call', 'id': 1,
'params': {'company_id': self.env.company.id},
}),
headers={'Content-Type': 'application/json'},
)
timings.append((time.perf_counter() - start) * 1000)
self.assertEqual(response.status_code, 200)
sorted_t = sorted(timings)
p95 = sorted_t[min(int(len(sorted_t) * 0.95), len(sorted_t) - 1)]
median = statistics.median(timings)
msg = f"controller.list: median={median:.0f}ms p95={p95:.0f}ms"
print(f"\n PERF: {msg} (target <300ms)")
self.assertLess(p95, 3000)