feat(fusion_accounting_assets): fusion.asset.engine 7-method API

The orchestrator AbstractModel for asset depreciation lifecycle.
compute_depreciation_schedule, post_depreciation_entry, dispose_asset,
partial_sale, pause_asset, resume_asset, reverse_disposal.

All controllers, AI tools, wizards, and cron must route through these
methods; no direct ORM writes to fusion.asset.depreciation.line or
account.move from anywhere else.

Made-with: Cursor
This commit is contained in:
gsinghpal
2026-04-19 17:06:12 -04:00
parent 38a6e375e6
commit 54922a0b32
5 changed files with 516 additions and 1 deletions

View File

@@ -1,6 +1,6 @@
{
'name': 'Fusion Accounting Assets',
'version': '19.0.1.0.11',
'version': '19.0.1.0.12',
'category': 'Accounting/Accounting',
'summary': 'AI-augmented asset management with depreciation schedules.',
'description': """

View File

@@ -4,3 +4,4 @@ from . import fusion_asset_depreciation_line
from . import fusion_asset_disposal
from . import fusion_asset_anomaly
from . import account_move
from . import fusion_asset_engine

View File

@@ -0,0 +1,398 @@
"""The asset engine — orchestrator for all asset depreciation + lifecycle.
7-method public API. No direct ORM writes to fusion.asset.depreciation.line
or account.move from anywhere else; everything routes through here for
consistent validation, audit, and side-effect handling.
"""
import logging
from datetime import date, timedelta
from odoo import _, api, fields, models
from odoo.exceptions import ValidationError
from ..services.depreciation_methods import (
straight_line,
declining_balance,
units_of_production,
)
_logger = logging.getLogger(__name__)
class FusionAssetEngine(models.AbstractModel):
_name = "fusion.asset.engine"
_description = "Fusion Asset Engine"
# ============================================================
# PUBLIC API (7 methods)
# ============================================================
@api.model
def compute_depreciation_schedule(self, asset, *, recompute: bool = False) -> dict:
"""Compute (or re-compute) the depreciation board for an asset.
If recompute=False and posted lines exist, ONLY un-posted future lines
are regenerated. If recompute=True, all unposted lines are wiped and
regenerated from scratch using current asset config.
"""
if not asset:
raise ValidationError(_("asset is required"))
asset.ensure_one()
self._validate_asset_for_schedule(asset)
Line = self.env['fusion.asset.depreciation.line'].sudo()
if recompute:
Line.search([
('asset_id', '=', asset.id),
('is_posted', '=', False),
]).unlink()
existing_posted = Line.search([
('asset_id', '=', asset.id),
('is_posted', '=', True),
], order='period_index')
start_period = max([l.period_index for l in existing_posted], default=-1) + 1
accumulated_so_far = sum(l.amount for l in existing_posted)
steps = self._compute_steps(asset)
new_steps = steps[start_period:]
base_date = asset.in_service_date or asset.acquisition_date
# Accumulated baseline at the boundary between posted and to-be-created
# lines: subtract the accumulated value the algorithm itself reports at
# that boundary, then re-add the actually-posted total. This keeps the
# board's accumulated column monotonic when picking up mid-life.
baseline_offset = 0.0
if start_period > 0 and start_period <= len(steps):
baseline_offset = steps[start_period - 1].accumulated_depreciation
line_vals = []
for s in new_steps:
scheduled_date = self._add_periods(base_date, s.period_index)
running_accumulated = round(
accumulated_so_far + s.accumulated_depreciation - baseline_offset, 2
)
line_vals.append({
'asset_id': asset.id,
'period_index': s.period_index,
'scheduled_date': scheduled_date,
'amount': s.period_amount,
'accumulated': running_accumulated,
'book_value_at_end': s.book_value_at_end,
'is_posted': False,
})
if line_vals:
Line.create(line_vals)
return {
'asset_id': asset.id,
'lines_created': len(line_vals),
'total_lines': len(asset.depreciation_line_ids),
'method': asset.method,
}
@api.model
def post_depreciation_entry(self, asset, *, period_date: date = None) -> dict:
"""Post the next-due un-posted depreciation line.
If period_date provided, post all lines whose scheduled_date <= period_date.
Otherwise, post the single next un-posted line (the earliest one).
"""
asset.ensure_one()
if asset.state != 'running':
raise ValidationError(
_("Cannot post depreciation for asset in state %s") % asset.state
)
Line = self.env['fusion.asset.depreciation.line'].sudo()
domain = [('asset_id', '=', asset.id), ('is_posted', '=', False)]
if period_date:
domain.append(('scheduled_date', '<=', period_date))
unposted = Line.search(domain, order='scheduled_date, period_index')
if not unposted:
return {'posted_count': 0, 'reason': 'no unposted lines due'}
if not period_date:
unposted = unposted[:1]
posted_ids = []
for line in unposted:
self._create_journal_entry(asset, line)
line.action_post()
posted_ids.append(line.id)
return {'posted_count': len(posted_ids), 'posted_line_ids': posted_ids}
@api.model
def dispose_asset(self, asset, *, sale_amount: float = 0.0,
sale_date: date = None, sale_partner=None,
disposal_type: str = 'sale') -> dict:
"""Dispose an asset (sale, scrap, donation, lost)."""
asset.ensure_one()
if asset.state == 'disposed':
raise ValidationError(_("Asset already disposed."))
sale_date = sale_date or fields.Date.today()
Line = self.env['fusion.asset.depreciation.line'].sudo()
future_unposted = Line.search([
('asset_id', '=', asset.id),
('is_posted', '=', False),
('scheduled_date', '>', sale_date),
])
future_unposted.unlink()
asset.invalidate_recordset(['book_value', 'total_depreciated'])
book_value = asset.book_value
Disposal = self.env['fusion.asset.disposal'].sudo()
partner_id = False
if sale_partner:
partner_id = sale_partner.id if hasattr(sale_partner, 'id') else sale_partner
disposal = Disposal.create({
'asset_id': asset.id,
'disposal_type': disposal_type,
'disposal_date': sale_date,
'sale_amount': sale_amount,
'sale_partner_id': partner_id,
'book_value_at_disposal': book_value,
})
asset.write({
'state': 'disposed',
'disposed_date': sale_date,
})
return {
'asset_id': asset.id,
'disposal_id': disposal.id,
'gain_loss_amount': disposal.gain_loss_amount,
'book_value_at_disposal': book_value,
}
@api.model
def partial_sale(self, asset, *, sold_amount: float, sold_qty: float = None,
sale_date: date = None, sale_partner=None) -> dict:
"""Partially dispose: split asset into two — sold child + remaining parent.
sold_amount is cash received for the sold portion.
sold_qty is the ratio of original cost to attribute to the sold portion (0..1).
If sold_qty is None, defaults to sold_amount / cost.
"""
asset.ensure_one()
if asset.state == 'disposed':
raise ValidationError(_("Cannot partially sell a disposed asset."))
if sold_qty is None:
sold_qty = sold_amount / asset.cost if asset.cost else 0
if not (0 < sold_qty < 1):
raise ValidationError(
_("sold_qty must be strictly between 0 and 1; got %s") % sold_qty
)
sale_date = sale_date or fields.Date.today()
Asset = self.env['fusion.asset'].sudo()
sold_cost = round(asset.cost * sold_qty, 2)
sold_salvage = round(asset.salvage_value * sold_qty, 2)
child_vals = {
'name': f"{asset.name} (sold portion)",
'parent_id': asset.id,
'cost': sold_cost,
'salvage_value': sold_salvage,
'acquisition_date': asset.acquisition_date,
'in_service_date': asset.in_service_date,
'method': asset.method,
'useful_life_years': asset.useful_life_years,
'declining_rate_pct': asset.declining_rate_pct,
'prorate_convention': asset.prorate_convention,
'company_id': asset.company_id.id,
'state': 'running',
}
if asset.category_id:
child_vals['category_id'] = asset.category_id.id
child = Asset.create(child_vals)
new_cost = round(asset.cost - sold_cost, 2)
new_salvage = round(asset.salvage_value - sold_salvage, 2)
asset.write({
'cost': new_cost,
'salvage_value': new_salvage,
})
self.compute_depreciation_schedule(asset, recompute=True)
result = self.dispose_asset(
child, sale_amount=sold_amount, sale_date=sale_date,
sale_partner=sale_partner, disposal_type='sale',
)
return {
'parent_asset_id': asset.id,
'child_asset_id': child.id,
'disposal_id': result['disposal_id'],
'gain_loss_amount': result['gain_loss_amount'],
}
@api.model
def pause_asset(self, asset, pause_date: date = None) -> dict:
"""Pause depreciation. Wraps asset.action_pause for API symmetry and
to log the pause date for downstream auditing."""
asset.ensure_one()
asset.action_pause()
return {
'asset_id': asset.id,
'pause_date': pause_date or fields.Date.today(),
'state': 'paused',
}
@api.model
def resume_asset(self, asset, resume_date: date = None) -> dict:
"""Resume a paused asset."""
asset.ensure_one()
asset.action_resume()
return {
'asset_id': asset.id,
'resume_date': resume_date or fields.Date.today(),
'state': 'running',
}
@api.model
def reverse_disposal(self, asset) -> dict:
"""Reverse a disposal (rare — recovery from accidental sale entry)."""
asset.ensure_one()
if asset.state != 'disposed':
raise ValidationError(_("Asset is not disposed."))
Disposal = self.env['fusion.asset.disposal'].sudo()
last_disposal = Disposal.search(
[('asset_id', '=', asset.id)],
order='disposal_date desc, id desc', limit=1,
)
if last_disposal and last_disposal.move_id:
try:
last_disposal.move_id.button_cancel()
except Exception as e: # noqa: BLE001
_logger.warning("Could not cancel disposal move: %s", e)
if last_disposal:
last_disposal.unlink()
asset.write({'state': 'running', 'disposed_date': False})
return {'asset_id': asset.id, 'state': 'running'}
# ============================================================
# PRIVATE HELPERS
# ============================================================
def _validate_asset_for_schedule(self, asset):
if asset.cost <= 0:
raise ValidationError(_("Asset cost must be > 0 to compute schedule."))
if asset.method == 'units_of_production' and not asset.total_units_expected:
raise ValidationError(_(
"Units of Production assets need total_units_expected set."
))
if asset.method in ('straight_line', 'declining_balance'):
if asset.useful_life_years < 1:
raise ValidationError(_("useful_life_years must be >= 1."))
if asset.salvage_value > asset.cost:
raise ValidationError(_("Salvage value cannot exceed cost."))
def _compute_steps(self, asset) -> list:
"""Dispatch to the appropriate depreciation method service."""
if asset.method == 'straight_line':
return straight_line(
cost=asset.cost,
salvage_value=asset.salvage_value,
n_periods=asset.useful_life_years,
)
if asset.method == 'declining_balance':
return declining_balance(
cost=asset.cost,
salvage_value=asset.salvage_value,
n_periods=asset.useful_life_years,
rate=asset.declining_rate_pct / 100.0,
)
if asset.method == 'units_of_production':
# Phase 3 simple: assume even per-period units. Phase 3.5 can read
# from a per-period usage table populated by maintenance/IoT data.
if asset.useful_life_years:
per_period = asset.total_units_expected / asset.useful_life_years
periods = asset.useful_life_years
else:
per_period = asset.total_units_expected
periods = 1
return units_of_production(
cost=asset.cost,
salvage_value=asset.salvage_value,
total_units_expected=asset.total_units_expected,
units_per_period=[per_period] * periods,
)
return []
def _add_periods(self, base_date: date, n_periods: int) -> date:
"""Add (n_periods + 1) yearly increments to base_date and step back one
day, giving the period-end date.
Phase 3.5 can split this into monthly/quarterly variants when the asset
carries a sub-annual frequency.
"""
try:
return base_date.replace(year=base_date.year + n_periods + 1) - timedelta(days=1)
except ValueError:
return base_date.replace(
year=base_date.year + n_periods + 1, day=28,
) - timedelta(days=1)
def _create_journal_entry(self, asset, line):
"""Create the journal entry for a depreciation line.
Phase 3 keeps this minimal: requires the category to have both
depreciation_account_id and expense_account_id wired up. Without that,
the line is still posted (is_posted flag) but no move is created.
Phase 3.5 will add multi-currency, allocation rules, and analytic tags.
"""
category = asset.category_id
if not category or not (category.depreciation_account_id and category.expense_account_id):
_logger.debug(
"No accounts on category for asset %s; skipping journal entry",
asset.id,
)
return None
Move = self.env['account.move'].sudo()
journal = self.env['account.journal'].search([
('type', '=', 'general'),
('company_id', '=', asset.company_id.id),
], limit=1)
if not journal:
_logger.warning(
"No general journal for company %s; skipping move creation",
asset.company_id.name,
)
return None
try:
move = Move.create({
'date': line.scheduled_date,
'journal_id': journal.id,
'ref': f"Depreciation: {asset.name} (P{line.period_index + 1})",
'line_ids': [
(0, 0, {
'name': f"Depreciation expense - {asset.name}",
'account_id': category.expense_account_id.id,
'debit': line.amount,
'credit': 0,
}),
(0, 0, {
'name': f"Accumulated depreciation - {asset.name}",
'account_id': category.depreciation_account_id.id,
'debit': 0,
'credit': line.amount,
}),
],
})
move.action_post()
line.write({'move_id': move.id})
return move
except Exception as e: # noqa: BLE001
_logger.warning(
"Failed to create depreciation move for asset %s line %s: %s",
asset.id, line.id, e,
)
return None

View File

@@ -9,3 +9,4 @@ from . import test_fusion_asset_category
from . import test_fusion_asset_disposal
from . import test_fusion_asset_anomaly
from . import test_account_move_inherit
from . import test_fusion_asset_engine

View File

@@ -0,0 +1,115 @@
from datetime import date
from odoo.tests.common import TransactionCase
from odoo.tests import tagged
from odoo.exceptions import ValidationError
@tagged('post_install', '-at_install')
class TestFusionAssetEngine(TransactionCase):
def setUp(self):
super().setUp()
self.engine = self.env['fusion.asset.engine']
self.asset = self.env['fusion.asset'].create({
'name': 'Test Engine Asset',
'cost': 10000,
'salvage_value': 1000,
'acquisition_date': date(2026, 1, 1),
'in_service_date': date(2026, 1, 1),
'method': 'straight_line',
'useful_life_years': 5,
})
def test_engine_model_exists(self):
self.assertIn('fusion.asset.engine', self.env.registry)
def test_compute_schedule_straight_line(self):
result = self.engine.compute_depreciation_schedule(self.asset)
self.assertEqual(result['lines_created'], 5)
lines = self.asset.depreciation_line_ids
self.assertEqual(len(lines), 5)
# Total depreciation should equal cost - salvage = 9000
total = sum(lines.mapped('amount'))
self.assertAlmostEqual(total, 9000, places=2)
def test_compute_schedule_declining_balance(self):
self.asset.write({'method': 'declining_balance', 'declining_rate_pct': 30.0})
self.engine.compute_depreciation_schedule(self.asset)
lines = self.asset.depreciation_line_ids
self.assertGreater(len(lines), 0)
# First-period amount should be cost * rate = 10000 * 0.3 = 3000
first = lines.sorted('period_index')[0]
self.assertAlmostEqual(first.amount, 3000, places=2)
def test_compute_schedule_recompute_wipes_unposted(self):
self.engine.compute_depreciation_schedule(self.asset)
self.asset.write({'useful_life_years': 8})
self.engine.compute_depreciation_schedule(self.asset, recompute=True)
self.assertEqual(len(self.asset.depreciation_line_ids), 8)
def test_compute_schedule_validates_zero_cost(self):
# Bypass DB constraint with sudo + the constraint allows cost >= 0,
# but engine validation requires cost > 0.
bad = self.env['fusion.asset'].create({
'name': 'Zero',
'cost': 0,
'acquisition_date': date(2026, 1, 1),
'method': 'straight_line',
'useful_life_years': 5,
})
with self.assertRaises(ValidationError):
self.engine.compute_depreciation_schedule(bad)
def test_post_depreciation_entry_marks_line_posted(self):
self.engine.compute_depreciation_schedule(self.asset)
self.asset.action_set_running()
result = self.engine.post_depreciation_entry(self.asset)
self.assertEqual(result['posted_count'], 1)
first_line = self.asset.depreciation_line_ids.sorted('period_index')[0]
self.assertTrue(first_line.is_posted)
def test_post_depreciation_only_after_running(self):
self.engine.compute_depreciation_schedule(self.asset)
# asset is still in 'draft' state
with self.assertRaises(ValidationError):
self.engine.post_depreciation_entry(self.asset)
def test_dispose_asset_creates_disposal_record(self):
self.engine.compute_depreciation_schedule(self.asset)
self.asset.action_set_running()
result = self.engine.dispose_asset(
self.asset, sale_amount=5000, sale_date=date(2027, 6, 1),
)
self.assertEqual(self.asset.state, 'disposed')
self.assertIn('disposal_id', result)
self.assertEqual(result['book_value_at_disposal'], self.asset.book_value)
def test_partial_sale_creates_child_and_disposes(self):
self.engine.compute_depreciation_schedule(self.asset)
self.asset.action_set_running()
original_cost = self.asset.cost
result = self.engine.partial_sale(
self.asset, sold_amount=3000, sold_qty=0.3,
sale_date=date(2027, 6, 1),
)
self.assertIn('parent_asset_id', result)
self.assertIn('child_asset_id', result)
self.asset.invalidate_recordset(['cost'])
expected_remaining = round(original_cost * 0.7, 2)
self.assertAlmostEqual(self.asset.cost, expected_remaining, places=2)
def test_pause_resume_round_trip(self):
self.asset.action_set_running()
self.engine.pause_asset(self.asset)
self.assertEqual(self.asset.state, 'paused')
self.engine.resume_asset(self.asset)
self.assertEqual(self.asset.state, 'running')
def test_reverse_disposal_restores_running_state(self):
self.engine.compute_depreciation_schedule(self.asset)
self.asset.action_set_running()
self.engine.dispose_asset(self.asset, sale_amount=5000)
self.assertEqual(self.asset.state, 'disposed')
self.engine.reverse_disposal(self.asset)
self.assertEqual(self.asset.state, 'running')