Compare commits

...

2 Commits

Author SHA1 Message Date
gsinghpal
345c971d59 test(fusion_accounting_assets): engine integration tests for full lifecycle
Made-with: Cursor
2026-04-19 17:06:55 -04:00
gsinghpal
54922a0b32 feat(fusion_accounting_assets): fusion.asset.engine 7-method API
The orchestrator AbstractModel for asset depreciation lifecycle.
compute_depreciation_schedule, post_depreciation_entry, dispose_asset,
partial_sale, pause_asset, resume_asset, reverse_disposal.

All controllers, AI tools, wizards, and cron must route through these
methods; no direct ORM writes to fusion.asset.depreciation.line or
account.move from anywhere else.

Made-with: Cursor
2026-04-19 17:06:12 -04:00
6 changed files with 668 additions and 1 deletions

View File

@@ -1,6 +1,6 @@
{
'name': 'Fusion Accounting Assets',
'version': '19.0.1.0.11',
'version': '19.0.1.0.13',
'category': 'Accounting/Accounting',
'summary': 'AI-augmented asset management with depreciation schedules.',
'description': """

View File

@@ -4,3 +4,4 @@ from . import fusion_asset_depreciation_line
from . import fusion_asset_disposal
from . import fusion_asset_anomaly
from . import account_move
from . import fusion_asset_engine

View File

@@ -0,0 +1,398 @@
"""The asset engine — orchestrator for all asset depreciation + lifecycle.
7-method public API. No direct ORM writes to fusion.asset.depreciation.line
or account.move from anywhere else; everything routes through here for
consistent validation, audit, and side-effect handling.
"""
import logging
from datetime import date, timedelta
from odoo import _, api, fields, models
from odoo.exceptions import ValidationError
from ..services.depreciation_methods import (
straight_line,
declining_balance,
units_of_production,
)
_logger = logging.getLogger(__name__)
class FusionAssetEngine(models.AbstractModel):
_name = "fusion.asset.engine"
_description = "Fusion Asset Engine"
# ============================================================
# PUBLIC API (7 methods)
# ============================================================
@api.model
def compute_depreciation_schedule(self, asset, *, recompute: bool = False) -> dict:
"""Compute (or re-compute) the depreciation board for an asset.
If recompute=False and posted lines exist, ONLY un-posted future lines
are regenerated. If recompute=True, all unposted lines are wiped and
regenerated from scratch using current asset config.
"""
if not asset:
raise ValidationError(_("asset is required"))
asset.ensure_one()
self._validate_asset_for_schedule(asset)
Line = self.env['fusion.asset.depreciation.line'].sudo()
if recompute:
Line.search([
('asset_id', '=', asset.id),
('is_posted', '=', False),
]).unlink()
existing_posted = Line.search([
('asset_id', '=', asset.id),
('is_posted', '=', True),
], order='period_index')
start_period = max([l.period_index for l in existing_posted], default=-1) + 1
accumulated_so_far = sum(l.amount for l in existing_posted)
steps = self._compute_steps(asset)
new_steps = steps[start_period:]
base_date = asset.in_service_date or asset.acquisition_date
# Accumulated baseline at the boundary between posted and to-be-created
# lines: subtract the accumulated value the algorithm itself reports at
# that boundary, then re-add the actually-posted total. This keeps the
# board's accumulated column monotonic when picking up mid-life.
baseline_offset = 0.0
if start_period > 0 and start_period <= len(steps):
baseline_offset = steps[start_period - 1].accumulated_depreciation
line_vals = []
for s in new_steps:
scheduled_date = self._add_periods(base_date, s.period_index)
running_accumulated = round(
accumulated_so_far + s.accumulated_depreciation - baseline_offset, 2
)
line_vals.append({
'asset_id': asset.id,
'period_index': s.period_index,
'scheduled_date': scheduled_date,
'amount': s.period_amount,
'accumulated': running_accumulated,
'book_value_at_end': s.book_value_at_end,
'is_posted': False,
})
if line_vals:
Line.create(line_vals)
return {
'asset_id': asset.id,
'lines_created': len(line_vals),
'total_lines': len(asset.depreciation_line_ids),
'method': asset.method,
}
@api.model
def post_depreciation_entry(self, asset, *, period_date: date = None) -> dict:
"""Post the next-due un-posted depreciation line.
If period_date provided, post all lines whose scheduled_date <= period_date.
Otherwise, post the single next un-posted line (the earliest one).
"""
asset.ensure_one()
if asset.state != 'running':
raise ValidationError(
_("Cannot post depreciation for asset in state %s") % asset.state
)
Line = self.env['fusion.asset.depreciation.line'].sudo()
domain = [('asset_id', '=', asset.id), ('is_posted', '=', False)]
if period_date:
domain.append(('scheduled_date', '<=', period_date))
unposted = Line.search(domain, order='scheduled_date, period_index')
if not unposted:
return {'posted_count': 0, 'reason': 'no unposted lines due'}
if not period_date:
unposted = unposted[:1]
posted_ids = []
for line in unposted:
self._create_journal_entry(asset, line)
line.action_post()
posted_ids.append(line.id)
return {'posted_count': len(posted_ids), 'posted_line_ids': posted_ids}
@api.model
def dispose_asset(self, asset, *, sale_amount: float = 0.0,
sale_date: date = None, sale_partner=None,
disposal_type: str = 'sale') -> dict:
"""Dispose an asset (sale, scrap, donation, lost)."""
asset.ensure_one()
if asset.state == 'disposed':
raise ValidationError(_("Asset already disposed."))
sale_date = sale_date or fields.Date.today()
Line = self.env['fusion.asset.depreciation.line'].sudo()
future_unposted = Line.search([
('asset_id', '=', asset.id),
('is_posted', '=', False),
('scheduled_date', '>', sale_date),
])
future_unposted.unlink()
asset.invalidate_recordset(['book_value', 'total_depreciated'])
book_value = asset.book_value
Disposal = self.env['fusion.asset.disposal'].sudo()
partner_id = False
if sale_partner:
partner_id = sale_partner.id if hasattr(sale_partner, 'id') else sale_partner
disposal = Disposal.create({
'asset_id': asset.id,
'disposal_type': disposal_type,
'disposal_date': sale_date,
'sale_amount': sale_amount,
'sale_partner_id': partner_id,
'book_value_at_disposal': book_value,
})
asset.write({
'state': 'disposed',
'disposed_date': sale_date,
})
return {
'asset_id': asset.id,
'disposal_id': disposal.id,
'gain_loss_amount': disposal.gain_loss_amount,
'book_value_at_disposal': book_value,
}
@api.model
def partial_sale(self, asset, *, sold_amount: float, sold_qty: float = None,
sale_date: date = None, sale_partner=None) -> dict:
"""Partially dispose: split asset into two — sold child + remaining parent.
sold_amount is cash received for the sold portion.
sold_qty is the ratio of original cost to attribute to the sold portion (0..1).
If sold_qty is None, defaults to sold_amount / cost.
"""
asset.ensure_one()
if asset.state == 'disposed':
raise ValidationError(_("Cannot partially sell a disposed asset."))
if sold_qty is None:
sold_qty = sold_amount / asset.cost if asset.cost else 0
if not (0 < sold_qty < 1):
raise ValidationError(
_("sold_qty must be strictly between 0 and 1; got %s") % sold_qty
)
sale_date = sale_date or fields.Date.today()
Asset = self.env['fusion.asset'].sudo()
sold_cost = round(asset.cost * sold_qty, 2)
sold_salvage = round(asset.salvage_value * sold_qty, 2)
child_vals = {
'name': f"{asset.name} (sold portion)",
'parent_id': asset.id,
'cost': sold_cost,
'salvage_value': sold_salvage,
'acquisition_date': asset.acquisition_date,
'in_service_date': asset.in_service_date,
'method': asset.method,
'useful_life_years': asset.useful_life_years,
'declining_rate_pct': asset.declining_rate_pct,
'prorate_convention': asset.prorate_convention,
'company_id': asset.company_id.id,
'state': 'running',
}
if asset.category_id:
child_vals['category_id'] = asset.category_id.id
child = Asset.create(child_vals)
new_cost = round(asset.cost - sold_cost, 2)
new_salvage = round(asset.salvage_value - sold_salvage, 2)
asset.write({
'cost': new_cost,
'salvage_value': new_salvage,
})
self.compute_depreciation_schedule(asset, recompute=True)
result = self.dispose_asset(
child, sale_amount=sold_amount, sale_date=sale_date,
sale_partner=sale_partner, disposal_type='sale',
)
return {
'parent_asset_id': asset.id,
'child_asset_id': child.id,
'disposal_id': result['disposal_id'],
'gain_loss_amount': result['gain_loss_amount'],
}
@api.model
def pause_asset(self, asset, pause_date: date = None) -> dict:
"""Pause depreciation. Wraps asset.action_pause for API symmetry and
to log the pause date for downstream auditing."""
asset.ensure_one()
asset.action_pause()
return {
'asset_id': asset.id,
'pause_date': pause_date or fields.Date.today(),
'state': 'paused',
}
@api.model
def resume_asset(self, asset, resume_date: date = None) -> dict:
"""Resume a paused asset."""
asset.ensure_one()
asset.action_resume()
return {
'asset_id': asset.id,
'resume_date': resume_date or fields.Date.today(),
'state': 'running',
}
@api.model
def reverse_disposal(self, asset) -> dict:
"""Reverse a disposal (rare — recovery from accidental sale entry)."""
asset.ensure_one()
if asset.state != 'disposed':
raise ValidationError(_("Asset is not disposed."))
Disposal = self.env['fusion.asset.disposal'].sudo()
last_disposal = Disposal.search(
[('asset_id', '=', asset.id)],
order='disposal_date desc, id desc', limit=1,
)
if last_disposal and last_disposal.move_id:
try:
last_disposal.move_id.button_cancel()
except Exception as e: # noqa: BLE001
_logger.warning("Could not cancel disposal move: %s", e)
if last_disposal:
last_disposal.unlink()
asset.write({'state': 'running', 'disposed_date': False})
return {'asset_id': asset.id, 'state': 'running'}
# ============================================================
# PRIVATE HELPERS
# ============================================================
def _validate_asset_for_schedule(self, asset):
if asset.cost <= 0:
raise ValidationError(_("Asset cost must be > 0 to compute schedule."))
if asset.method == 'units_of_production' and not asset.total_units_expected:
raise ValidationError(_(
"Units of Production assets need total_units_expected set."
))
if asset.method in ('straight_line', 'declining_balance'):
if asset.useful_life_years < 1:
raise ValidationError(_("useful_life_years must be >= 1."))
if asset.salvage_value > asset.cost:
raise ValidationError(_("Salvage value cannot exceed cost."))
def _compute_steps(self, asset) -> list:
"""Dispatch to the appropriate depreciation method service."""
if asset.method == 'straight_line':
return straight_line(
cost=asset.cost,
salvage_value=asset.salvage_value,
n_periods=asset.useful_life_years,
)
if asset.method == 'declining_balance':
return declining_balance(
cost=asset.cost,
salvage_value=asset.salvage_value,
n_periods=asset.useful_life_years,
rate=asset.declining_rate_pct / 100.0,
)
if asset.method == 'units_of_production':
# Phase 3 simple: assume even per-period units. Phase 3.5 can read
# from a per-period usage table populated by maintenance/IoT data.
if asset.useful_life_years:
per_period = asset.total_units_expected / asset.useful_life_years
periods = asset.useful_life_years
else:
per_period = asset.total_units_expected
periods = 1
return units_of_production(
cost=asset.cost,
salvage_value=asset.salvage_value,
total_units_expected=asset.total_units_expected,
units_per_period=[per_period] * periods,
)
return []
def _add_periods(self, base_date: date, n_periods: int) -> date:
"""Add (n_periods + 1) yearly increments to base_date and step back one
day, giving the period-end date.
Phase 3.5 can split this into monthly/quarterly variants when the asset
carries a sub-annual frequency.
"""
try:
return base_date.replace(year=base_date.year + n_periods + 1) - timedelta(days=1)
except ValueError:
return base_date.replace(
year=base_date.year + n_periods + 1, day=28,
) - timedelta(days=1)
def _create_journal_entry(self, asset, line):
"""Create the journal entry for a depreciation line.
Phase 3 keeps this minimal: requires the category to have both
depreciation_account_id and expense_account_id wired up. Without that,
the line is still posted (is_posted flag) but no move is created.
Phase 3.5 will add multi-currency, allocation rules, and analytic tags.
"""
category = asset.category_id
if not category or not (category.depreciation_account_id and category.expense_account_id):
_logger.debug(
"No accounts on category for asset %s; skipping journal entry",
asset.id,
)
return None
Move = self.env['account.move'].sudo()
journal = self.env['account.journal'].search([
('type', '=', 'general'),
('company_id', '=', asset.company_id.id),
], limit=1)
if not journal:
_logger.warning(
"No general journal for company %s; skipping move creation",
asset.company_id.name,
)
return None
try:
move = Move.create({
'date': line.scheduled_date,
'journal_id': journal.id,
'ref': f"Depreciation: {asset.name} (P{line.period_index + 1})",
'line_ids': [
(0, 0, {
'name': f"Depreciation expense - {asset.name}",
'account_id': category.expense_account_id.id,
'debit': line.amount,
'credit': 0,
}),
(0, 0, {
'name': f"Accumulated depreciation - {asset.name}",
'account_id': category.depreciation_account_id.id,
'debit': 0,
'credit': line.amount,
}),
],
})
move.action_post()
line.write({'move_id': move.id})
return move
except Exception as e: # noqa: BLE001
_logger.warning(
"Failed to create depreciation move for asset %s line %s: %s",
asset.id, line.id, e,
)
return None

View File

@@ -9,3 +9,5 @@ from . import test_fusion_asset_category
from . import test_fusion_asset_disposal
from . import test_fusion_asset_anomaly
from . import test_account_move_inherit
from . import test_fusion_asset_engine
from . import test_engine_integration

View File

@@ -0,0 +1,151 @@
"""End-to-end engine integration tests.
Each test creates a complete realistic asset (with category and accounts),
runs the engine through a full lifecycle, and asserts both the model state
and the journal entries (where category accounts are configured).
"""
from datetime import date
from odoo.tests.common import TransactionCase
from odoo.tests import tagged
from odoo.exceptions import ValidationError
@tagged('post_install', '-at_install', 'integration')
class TestAssetEngineIntegration(TransactionCase):
def setUp(self):
super().setUp()
self.engine = self.env['fusion.asset.engine']
Account = self.env['account.account']
company_id = self.env.company.id
self.expense_account = Account.search([
('account_type', '=', 'expense_depreciation'),
('company_ids', 'in', company_id),
], limit=1)
if not self.expense_account:
self.expense_account = Account.create({
'name': 'Test Depreciation Expense',
'code': '7180',
'account_type': 'expense_depreciation',
'company_ids': [(6, 0, [company_id])],
})
self.dep_account = Account.search([
('account_type', '=', 'asset_fixed'),
('company_ids', 'in', company_id),
], limit=1)
if not self.dep_account:
self.dep_account = Account.create({
'name': 'Test Accumulated Depreciation',
'code': '1690',
'account_type': 'asset_fixed',
'company_ids': [(6, 0, [company_id])],
})
self.category = self.env['fusion.asset.category'].create({
'name': 'Test Category',
'method': 'straight_line',
'useful_life_years': 5,
'asset_account_id': self.dep_account.id,
'depreciation_account_id': self.dep_account.id,
'expense_account_id': self.expense_account.id,
})
def _make_asset(self, **kwargs):
defaults = {
'name': 'Integration Asset',
'cost': 12000,
'salvage_value': 0,
'acquisition_date': date(2026, 1, 1),
'in_service_date': date(2026, 1, 1),
'method': 'straight_line',
'useful_life_years': 4,
'category_id': self.category.id,
}
defaults.update(kwargs)
return self.env['fusion.asset'].create(defaults)
def test_full_lifecycle_straight_line(self):
asset = self._make_asset()
self.engine.compute_depreciation_schedule(asset)
self.assertEqual(len(asset.depreciation_line_ids), 4)
self.assertAlmostEqual(
sum(asset.depreciation_line_ids.mapped('amount')), 12000, places=2,
)
asset.action_set_running()
for _i in range(2):
result = self.engine.post_depreciation_entry(asset)
self.assertEqual(result['posted_count'], 1)
asset.invalidate_recordset(['book_value', 'total_depreciated'])
self.assertAlmostEqual(asset.total_depreciated, 6000, places=2)
def test_post_creates_journal_entry_when_accounts_configured(self):
asset = self._make_asset()
self.engine.compute_depreciation_schedule(asset)
asset.action_set_running()
self.engine.post_depreciation_entry(asset)
first = asset.depreciation_line_ids.sorted('period_index')[0]
self.assertTrue(first.move_id, "Expected journal entry on posted line")
moves = first.move_id
self.assertAlmostEqual(
sum(moves.line_ids.mapped('debit')),
sum(moves.line_ids.mapped('credit')),
places=2,
)
def test_dispose_caps_future_lines(self):
asset = self._make_asset()
self.engine.compute_depreciation_schedule(asset)
asset.action_set_running()
self.engine.post_depreciation_entry(asset)
self.engine.dispose_asset(
asset, sale_amount=5000, sale_date=date(2027, 6, 1),
)
self.assertEqual(asset.state, 'disposed')
unposted = asset.depreciation_line_ids.filtered(lambda l: not l.is_posted)
for line in unposted:
self.assertLessEqual(line.scheduled_date, date(2027, 6, 1))
def test_dispose_records_correct_book_value(self):
asset = self._make_asset()
self.engine.compute_depreciation_schedule(asset)
asset.action_set_running()
for _i in range(2):
self.engine.post_depreciation_entry(asset)
result = self.engine.dispose_asset(
asset, sale_amount=8000, sale_date=date(2028, 6, 1),
)
# Book value at disposal = cost - accumulated = 12000 - 6000 = 6000.
self.assertAlmostEqual(result['book_value_at_disposal'], 6000, places=2)
# Gain = 8000 - 6000 = 2000.
self.assertAlmostEqual(result['gain_loss_amount'], 2000, places=2)
def test_partial_sale_30pct(self):
asset = self._make_asset(cost=10000, salvage_value=0)
self.engine.compute_depreciation_schedule(asset)
asset.action_set_running()
result = self.engine.partial_sale(
asset, sold_amount=3500, sold_qty=0.3,
sale_date=date(2027, 1, 1),
)
asset.invalidate_recordset(['cost'])
self.assertAlmostEqual(asset.cost, 7000, places=2)
child = self.env['fusion.asset'].browse(result['child_asset_id'])
self.assertAlmostEqual(child.cost, 3000, places=2)
self.assertEqual(child.state, 'disposed')
# Child has no posted depreciation; book_value at disposal = 3000.
# Gain = 3500 - 3000 = 500.
self.assertAlmostEqual(result['gain_loss_amount'], 500, places=0)
def test_pause_then_resume_lifecycle(self):
asset = self._make_asset()
self.engine.compute_depreciation_schedule(asset)
asset.action_set_running()
self.engine.post_depreciation_entry(asset)
self.engine.pause_asset(asset)
with self.assertRaises(ValidationError):
self.engine.post_depreciation_entry(asset)
self.engine.resume_asset(asset)
result = self.engine.post_depreciation_entry(asset)
self.assertEqual(result['posted_count'], 1)

View File

@@ -0,0 +1,115 @@
from datetime import date
from odoo.tests.common import TransactionCase
from odoo.tests import tagged
from odoo.exceptions import ValidationError
@tagged('post_install', '-at_install')
class TestFusionAssetEngine(TransactionCase):
def setUp(self):
super().setUp()
self.engine = self.env['fusion.asset.engine']
self.asset = self.env['fusion.asset'].create({
'name': 'Test Engine Asset',
'cost': 10000,
'salvage_value': 1000,
'acquisition_date': date(2026, 1, 1),
'in_service_date': date(2026, 1, 1),
'method': 'straight_line',
'useful_life_years': 5,
})
def test_engine_model_exists(self):
self.assertIn('fusion.asset.engine', self.env.registry)
def test_compute_schedule_straight_line(self):
result = self.engine.compute_depreciation_schedule(self.asset)
self.assertEqual(result['lines_created'], 5)
lines = self.asset.depreciation_line_ids
self.assertEqual(len(lines), 5)
# Total depreciation should equal cost - salvage = 9000
total = sum(lines.mapped('amount'))
self.assertAlmostEqual(total, 9000, places=2)
def test_compute_schedule_declining_balance(self):
self.asset.write({'method': 'declining_balance', 'declining_rate_pct': 30.0})
self.engine.compute_depreciation_schedule(self.asset)
lines = self.asset.depreciation_line_ids
self.assertGreater(len(lines), 0)
# First-period amount should be cost * rate = 10000 * 0.3 = 3000
first = lines.sorted('period_index')[0]
self.assertAlmostEqual(first.amount, 3000, places=2)
def test_compute_schedule_recompute_wipes_unposted(self):
self.engine.compute_depreciation_schedule(self.asset)
self.asset.write({'useful_life_years': 8})
self.engine.compute_depreciation_schedule(self.asset, recompute=True)
self.assertEqual(len(self.asset.depreciation_line_ids), 8)
def test_compute_schedule_validates_zero_cost(self):
# Bypass DB constraint with sudo + the constraint allows cost >= 0,
# but engine validation requires cost > 0.
bad = self.env['fusion.asset'].create({
'name': 'Zero',
'cost': 0,
'acquisition_date': date(2026, 1, 1),
'method': 'straight_line',
'useful_life_years': 5,
})
with self.assertRaises(ValidationError):
self.engine.compute_depreciation_schedule(bad)
def test_post_depreciation_entry_marks_line_posted(self):
self.engine.compute_depreciation_schedule(self.asset)
self.asset.action_set_running()
result = self.engine.post_depreciation_entry(self.asset)
self.assertEqual(result['posted_count'], 1)
first_line = self.asset.depreciation_line_ids.sorted('period_index')[0]
self.assertTrue(first_line.is_posted)
def test_post_depreciation_only_after_running(self):
self.engine.compute_depreciation_schedule(self.asset)
# asset is still in 'draft' state
with self.assertRaises(ValidationError):
self.engine.post_depreciation_entry(self.asset)
def test_dispose_asset_creates_disposal_record(self):
self.engine.compute_depreciation_schedule(self.asset)
self.asset.action_set_running()
result = self.engine.dispose_asset(
self.asset, sale_amount=5000, sale_date=date(2027, 6, 1),
)
self.assertEqual(self.asset.state, 'disposed')
self.assertIn('disposal_id', result)
self.assertEqual(result['book_value_at_disposal'], self.asset.book_value)
def test_partial_sale_creates_child_and_disposes(self):
self.engine.compute_depreciation_schedule(self.asset)
self.asset.action_set_running()
original_cost = self.asset.cost
result = self.engine.partial_sale(
self.asset, sold_amount=3000, sold_qty=0.3,
sale_date=date(2027, 6, 1),
)
self.assertIn('parent_asset_id', result)
self.assertIn('child_asset_id', result)
self.asset.invalidate_recordset(['cost'])
expected_remaining = round(original_cost * 0.7, 2)
self.assertAlmostEqual(self.asset.cost, expected_remaining, places=2)
def test_pause_resume_round_trip(self):
self.asset.action_set_running()
self.engine.pause_asset(self.asset)
self.assertEqual(self.asset.state, 'paused')
self.engine.resume_asset(self.asset)
self.assertEqual(self.asset.state, 'running')
def test_reverse_disposal_restores_running_state(self):
self.engine.compute_depreciation_schedule(self.asset)
self.asset.action_set_running()
self.engine.dispose_asset(self.asset, sale_amount=5000)
self.assertEqual(self.asset.state, 'disposed')
self.engine.reverse_disposal(self.asset)
self.assertEqual(self.asset.state, 'running')