4 Commits

Author SHA1 Message Date
gsinghpal
de6d8fda3e feat(fusion_accounting_assets): 2 cron jobs (depreciation post + anomaly scan)
Some checks failed
fusion_accounting CI / test (fusion_accounting_migration) (push) Has been cancelled
fusion_accounting CI / test (fusion_accounting_ai) (push) Has been cancelled
fusion_accounting CI / test (fusion_accounting_core) (push) Has been cancelled
Made-with: Cursor
2026-04-19 17:17:21 -04:00
gsinghpal
9092a78be2 feat(fusion_accounting_ai): 5 new asset management AI tools
Made-with: Cursor
2026-04-19 17:16:22 -04:00
gsinghpal
79cd0216ff feat(fusion_accounting_ai): wire AssetsAdapter fusion paths to engine
Made-with: Cursor
2026-04-19 17:15:24 -04:00
gsinghpal
3e8b7b1e82 feat(fusion_accounting_assets): 8 JSON-RPC endpoints for OWL widget
Made-with: Cursor
2026-04-19 17:14:22 -04:00
15 changed files with 682 additions and 28 deletions

View File

@@ -1,42 +1,98 @@
"""Assets data adapter."""
"""Assets data adapter — routes asset queries through fusion engine if installed."""
from .base import DataAdapter
from ._registry import register_adapter
class AssetsAdapter(DataAdapter):
FUSION_MODEL = 'fusion.asset'
FUSION_MODEL = 'fusion.asset.engine'
ENTERPRISE_MODULE = 'account_asset'
def list_assets(self, state=None):
return self._dispatch('list_assets', state=state)
# ============================================================
# list_assets
# ============================================================
def list_assets_via_fusion(self, state=None):
return self._read_fusion('fusion.asset', state=state)
def list_assets(self, state=None, limit=50, company_id=None):
return self._dispatch(
'list_assets', state=state, limit=limit, company_id=company_id,
)
def list_assets_via_enterprise(self, state=None):
return self._read_fusion('account.asset', state=state)
def list_assets_via_fusion(self, **kwargs):
if 'fusion.asset.engine' not in self.env.registry:
return {'assets': [], 'count': 0, 'total': 0}
Asset = self.env['fusion.asset'].sudo()
domain = [('company_id', '=', kwargs.get('company_id') or self.env.company.id)]
if kwargs.get('state'):
domain.append(('state', '=', kwargs['state']))
total = Asset.search_count(domain)
assets = Asset.search(
domain, limit=int(kwargs.get('limit', 50)),
order='acquisition_date desc',
)
return {
'count': len(assets), 'total': total,
'assets': [{
'id': a.id, 'name': a.name, 'state': a.state,
'cost': a.cost, 'book_value': a.book_value,
'method': a.method,
'category_name': a.category_id.name if a.category_id else None,
} for a in assets],
}
def list_assets_via_community(self, state=None):
# No assets feature in pure Community — return empty list with a hint.
return []
def list_assets_via_enterprise(self, **kwargs):
return {
'assets': [], 'count': 0, 'total': 0,
'error': 'Enterprise account_asset must be queried from Enterprise UI',
}
def _read_fusion(self, model_name, state=None):
"""Shared shape between fusion and enterprise (both use account.asset-like API)."""
Model = self.env[model_name].sudo()
domain = []
if state:
domain.append(('state', '=', state))
records = Model.search(domain, limit=200)
out = []
for r in records:
out.append({
'id': r.id,
'name': getattr(r, 'name', None),
'state': getattr(r, 'state', None),
'value': getattr(r, 'original_value', None) or getattr(r, 'acquisition_cost', None),
})
return out
def list_assets_via_community(self, **kwargs):
return {
'assets': [], 'count': 0, 'total': 0,
'error': 'No assets engine in pure Community',
}
# ============================================================
# suggest_useful_life
# ============================================================
def suggest_useful_life(self, description, amount=None, partner_name=None):
return self._dispatch(
'suggest_useful_life',
description=description, amount=amount, partner_name=partner_name,
)
def suggest_useful_life_via_fusion(self, **kwargs):
if 'fusion.asset.engine' not in self.env.registry:
return {'error': 'fusion_accounting_assets not installed'}
from odoo.addons.fusion_accounting_assets.services.useful_life_predictor import (
predict_useful_life,
)
return predict_useful_life(self.env, **kwargs)
def suggest_useful_life_via_enterprise(self, **kwargs):
return {'error': 'AI useful-life suggestion is fusion-only'}
def suggest_useful_life_via_community(self, **kwargs):
return {'error': 'AI useful-life suggestion is fusion-only'}
# ============================================================
# dispose_asset
# ============================================================
def dispose_asset(self, asset_id, **kwargs):
return self._dispatch('dispose_asset', asset_id=asset_id, **kwargs)
def dispose_asset_via_fusion(self, asset_id, **kwargs):
if 'fusion.asset.engine' not in self.env.registry:
return {'error': 'fusion_accounting_assets not installed'}
asset = self.env['fusion.asset'].sudo().browse(int(asset_id))
return self.env['fusion.asset.engine'].sudo().dispose_asset(asset, **kwargs)
def dispose_asset_via_enterprise(self, asset_id, **kwargs):
return {'error': 'Enterprise asset disposal must use Enterprise UI'}
def dispose_asset_via_community(self, asset_id, **kwargs):
return {'error': 'Community has no asset disposal flow'}
register_adapter('assets', AssetsAdapter)

View File

@@ -10,11 +10,13 @@ from .adp import TOOLS as ADP_TOOLS
from .reporting import TOOLS as REPORTING_TOOLS
from .audit import TOOLS as AUDIT_TOOLS
from .financial_reports import TOOLS as FINANCIAL_REPORTS_TOOLS
from .asset_management import TOOLS as ASSET_MANAGEMENT_TOOLS
TOOL_DISPATCH = {}
for tools_dict in [
BANK_RECON_TOOLS, HST_TOOLS, AR_TOOLS, AP_TOOLS, JOURNAL_TOOLS,
MONTH_END_TOOLS, PAYROLL_TOOLS, INVENTORY_TOOLS, ADP_TOOLS,
REPORTING_TOOLS, AUDIT_TOOLS, FINANCIAL_REPORTS_TOOLS,
ASSET_MANAGEMENT_TOOLS,
]:
TOOL_DISPATCH.update(tools_dict)

View File

@@ -0,0 +1,77 @@
"""Fusion-engine-routed AI tools for asset management."""
import logging
_logger = logging.getLogger(__name__)
def fusion_list_assets(env, params):
if 'fusion.asset.engine' not in env.registry:
return {'error': 'fusion_accounting_assets not installed'}
from ..data_adapters import get_adapter
adapter = get_adapter(env, 'assets')
return adapter.list_assets(
state=params.get('state'),
limit=int(params.get('limit', 50)),
company_id=int(params['company_id']) if params.get('company_id') else env.company.id,
)
def fusion_get_asset_detail(env, params):
if 'fusion.asset.engine' not in env.registry:
return {'error': 'fusion_accounting_assets not installed'}
Asset = env['fusion.asset']
asset = Asset.browse(int(params['asset_id']))
if not asset.exists():
return {'error': 'Asset not found'}
return {
'asset': {
'id': asset.id, 'name': asset.name, 'state': asset.state,
'cost': asset.cost, 'book_value': asset.book_value,
'total_depreciated': asset.total_depreciated,
'method': asset.method, 'useful_life_years': asset.useful_life_years,
},
'depreciation_count': len(asset.depreciation_line_ids),
}
def fusion_compute_asset_schedule(env, params):
if 'fusion.asset.engine' not in env.registry:
return {'error': 'fusion_accounting_assets not installed'}
asset = env['fusion.asset'].browse(int(params['asset_id']))
return env['fusion.asset.engine'].compute_depreciation_schedule(
asset, recompute=bool(params.get('recompute', False)),
)
def fusion_dispose_asset(env, params):
if 'fusion.asset.engine' not in env.registry:
return {'error': 'fusion_accounting_assets not installed'}
from ..data_adapters import get_adapter
adapter = get_adapter(env, 'assets')
return adapter.dispose_asset(
asset_id=int(params['asset_id']),
sale_amount=float(params.get('sale_amount', 0)),
disposal_type=params.get('disposal_type', 'sale'),
)
def fusion_suggest_asset_useful_life(env, params):
if 'fusion.asset.engine' not in env.registry:
return {'error': 'fusion_accounting_assets not installed'}
from ..data_adapters import get_adapter
adapter = get_adapter(env, 'assets')
return adapter.suggest_useful_life(
description=params.get('description', ''),
amount=float(params['amount']) if params.get('amount') else None,
partner_name=params.get('partner_name'),
)
TOOLS = {
'fusion_list_assets': fusion_list_assets,
'fusion_get_asset_detail': fusion_get_asset_detail,
'fusion_compute_asset_schedule': fusion_compute_asset_schedule,
'fusion_dispose_asset': fusion_dispose_asset,
'fusion_suggest_asset_useful_life': fusion_suggest_asset_useful_life,
}

View File

@@ -1,2 +1,3 @@
from . import models
from . import services
from . import controllers

View File

@@ -1,6 +1,6 @@
{
'name': 'Fusion Accounting Assets',
'version': '19.0.1.0.13',
'version': '19.0.1.0.17',
'category': 'Accounting/Accounting',
'summary': 'AI-augmented asset management with depreciation schedules.',
'description': """
@@ -33,6 +33,7 @@ menu hides; the engine + AI tools remain available for the chat.
],
'data': [
'security/ir.model.access.csv',
'data/cron.xml',
],
'assets': {
'web.assets_backend': [

View File

@@ -0,0 +1 @@
from . import assets_controller

View File

@@ -0,0 +1,175 @@
"""HTTP controller: 8 JSON-RPC endpoints for the OWL asset dashboard.
All endpoints route through fusion.asset.engine. V19 type='jsonrpc'.
"""
import logging
from datetime import date, datetime
from odoo import _, http
from odoo.exceptions import ValidationError
from odoo.http import request
_logger = logging.getLogger(__name__)
def _parse_date(value):
if isinstance(value, date):
return value
if not value:
return None
return datetime.strptime(value, '%Y-%m-%d').date()
class FusionAssetsController(http.Controller):
@http.route('/fusion/assets/list', type='jsonrpc', auth='user')
def list_assets(self, state=None, category_id=None, limit=50, offset=0,
company_id=None):
company_id = int(company_id) if company_id else request.env.company.id
Asset = request.env['fusion.asset'].sudo()
domain = [('company_id', '=', company_id)]
if state:
domain.append(('state', '=', state))
if category_id:
domain.append(('category_id', '=', int(category_id)))
total = Asset.search_count(domain)
assets = Asset.search(domain, limit=int(limit), offset=int(offset),
order='acquisition_date desc')
return {
'count': len(assets),
'total': total,
'assets': [{
'id': a.id, 'name': a.name, 'code': a.code or '',
'state': a.state, 'cost': a.cost, 'salvage_value': a.salvage_value,
'book_value': a.book_value, 'total_depreciated': a.total_depreciated,
'method': a.method, 'useful_life_years': a.useful_life_years,
'acquisition_date': str(a.acquisition_date),
'in_service_date': str(a.in_service_date) if a.in_service_date else None,
'category_id': a.category_id.id if a.category_id else None,
'category_name': a.category_id.name if a.category_id else None,
'currency_code': a.currency_id.name,
} for a in assets],
}
@http.route('/fusion/assets/get_detail', type='jsonrpc', auth='user')
def get_detail(self, asset_id):
asset = request.env['fusion.asset'].browse(int(asset_id))
if not asset.exists():
raise ValidationError(_("Asset %s not found") % asset_id)
return {
'asset': {
'id': asset.id, 'name': asset.name, 'code': asset.code or '',
'state': asset.state, 'cost': asset.cost,
'salvage_value': asset.salvage_value,
'book_value': asset.book_value,
'total_depreciated': asset.total_depreciated,
'method': asset.method,
'useful_life_years': asset.useful_life_years,
'declining_rate_pct': asset.declining_rate_pct,
'total_units_expected': asset.total_units_expected,
'units_used_to_date': asset.units_used_to_date,
'prorate_convention': asset.prorate_convention,
'acquisition_date': str(asset.acquisition_date),
'in_service_date': str(asset.in_service_date) if asset.in_service_date else None,
'disposed_date': str(asset.disposed_date) if asset.disposed_date else None,
'category_id': asset.category_id.id if asset.category_id else None,
'category_name': asset.category_id.name if asset.category_id else None,
'currency_id': asset.currency_id.id,
'currency_code': asset.currency_id.name,
},
'depreciation_lines': [{
'id': l.id, 'period_index': l.period_index,
'scheduled_date': str(l.scheduled_date),
'amount': l.amount, 'accumulated': l.accumulated,
'book_value_at_end': l.book_value_at_end,
'is_posted': l.is_posted,
'posted_date': str(l.posted_date) if l.posted_date else None,
} for l in asset.depreciation_line_ids.sorted('period_index')],
'anomalies': [{
'id': a.id, 'anomaly_type': a.anomaly_type,
'severity': a.severity, 'detail': a.detail or '',
'state': a.state,
} for a in request.env['fusion.asset.anomaly'].search([
('asset_id', '=', asset.id), ('state', 'in', ('new', 'acknowledged'))
])],
}
@http.route('/fusion/assets/compute_schedule', type='jsonrpc', auth='user')
def compute_schedule(self, asset_id, recompute=False):
asset = request.env['fusion.asset'].sudo().browse(int(asset_id))
engine = request.env['fusion.asset.engine'].sudo()
return engine.compute_depreciation_schedule(asset, recompute=bool(recompute))
@http.route('/fusion/assets/post_depreciation', type='jsonrpc', auth='user')
def post_depreciation(self, asset_id, period_date=None):
asset = request.env['fusion.asset'].sudo().browse(int(asset_id))
engine = request.env['fusion.asset.engine'].sudo()
return engine.post_depreciation_entry(asset, period_date=_parse_date(period_date))
@http.route('/fusion/assets/dispose', type='jsonrpc', auth='user')
def dispose(self, asset_id, sale_amount=0, sale_date=None,
sale_partner_id=None, disposal_type='sale'):
asset = request.env['fusion.asset'].sudo().browse(int(asset_id))
engine = request.env['fusion.asset.engine'].sudo()
partner = None
if sale_partner_id:
partner = request.env['res.partner'].sudo().browse(int(sale_partner_id))
return engine.dispose_asset(
asset, sale_amount=float(sale_amount),
sale_date=_parse_date(sale_date),
sale_partner=partner, disposal_type=disposal_type,
)
@http.route('/fusion/assets/get_anomalies', type='jsonrpc', auth='user')
def get_anomalies(self, asset_id=None, severity=None, state='new', limit=50,
company_id=None):
company_id = int(company_id) if company_id else request.env.company.id
Anomaly = request.env['fusion.asset.anomaly'].sudo()
domain = [('company_id', '=', company_id)]
if asset_id:
domain.append(('asset_id', '=', int(asset_id)))
if severity:
domain.append(('severity', '=', severity))
if state:
domain.append(('state', '=', state))
anomalies = Anomaly.search(domain, limit=int(limit), order='detected_at desc')
return {
'count': len(anomalies),
'anomalies': [{
'id': a.id, 'asset_id': a.asset_id.id, 'asset_name': a.asset_id.name,
'anomaly_type': a.anomaly_type, 'severity': a.severity,
'expected': a.expected, 'actual': a.actual,
'variance_pct': a.variance_pct, 'detail': a.detail or '',
'state': a.state,
'detected_at': str(a.detected_at),
} for a in anomalies],
}
@http.route('/fusion/assets/suggest_useful_life', type='jsonrpc', auth='user')
def suggest_useful_life(self, description, amount=None, partner_name=None):
from odoo.addons.fusion_accounting_assets.services.useful_life_predictor import (
predict_useful_life,
)
return predict_useful_life(
request.env, description=description,
amount=float(amount) if amount is not None else None,
partner_name=partner_name,
)
@http.route('/fusion/assets/get_partner_history', type='jsonrpc', auth='user')
def get_partner_history(self, partner_id, limit=20):
Asset = request.env['fusion.asset'].sudo()
assets = Asset.search([
('source_invoice_line_id.partner_id', '=', int(partner_id)),
], limit=int(limit), order='acquisition_date desc')
return {
'partner_id': int(partner_id),
'count': len(assets),
'assets': [{
'id': a.id, 'name': a.name,
'cost': a.cost, 'book_value': a.book_value,
'state': a.state,
'acquisition_date': str(a.acquisition_date),
} for a in assets],
}

View File

@@ -0,0 +1,24 @@
<?xml version="1.0" encoding="utf-8"?>
<odoo noupdate="1">
<record id="cron_fusion_assets_post_depreciation" model="ir.cron">
<field name="name">Fusion Assets — Post Due Depreciation</field>
<field name="model_id" ref="model_fusion_assets_cron"/>
<field name="state">code</field>
<field name="code">model._cron_post_due_depreciation()</field>
<field name="interval_number">1</field>
<field name="interval_type">days</field>
<field name="active" eval="True"/>
</record>
<record id="cron_fusion_assets_anomaly_scan" model="ir.cron">
<field name="name">Fusion Assets — Monthly Anomaly Scan</field>
<field name="model_id" ref="model_fusion_assets_cron"/>
<field name="state">code</field>
<field name="code">model._cron_anomaly_scan()</field>
<field name="interval_number">30</field>
<field name="interval_type">days</field>
<field name="active" eval="True"/>
</record>
</odoo>

View File

@@ -5,3 +5,4 @@ from . import fusion_asset_disposal
from . import fusion_asset_anomaly
from . import account_move
from . import fusion_asset_engine
from . import fusion_assets_cron

View File

@@ -0,0 +1,85 @@
"""Cron handlers for fusion_accounting_assets.
- _cron_post_due_depreciation: daily, post due depreciation lines for running assets
- _cron_anomaly_scan: monthly, scan for schedule variance and create anomaly records
"""
import logging
from odoo import api, fields, models
from ..services.anomaly_detection import detect_schedule_variance
_logger = logging.getLogger(__name__)
class FusionAssetsCron(models.AbstractModel):
_name = "fusion.assets.cron"
_description = "Fusion Assets Cron Handlers"
@api.model
def _cron_post_due_depreciation(self):
"""For each running asset, post any due un-posted depreciation lines."""
today = fields.Date.today()
engine = self.env['fusion.asset.engine']
Asset = self.env['fusion.asset']
running_assets = Asset.search([('state', '=', 'running')])
posted_total = 0
for asset in running_assets:
try:
with self.env.cr.savepoint():
result = engine.post_depreciation_entry(asset, period_date=today)
posted_total += result.get('posted_count', 0)
except Exception as e: # noqa: BLE001
_logger.warning("Cron post failed for asset %s: %s", asset.id, e)
_logger.info(
"Cron: posted depreciation on %d lines across %d running assets",
posted_total, len(running_assets),
)
@api.model
def _cron_anomaly_scan(self):
"""For each running asset, compare expected accumulated depreciation
vs posted, and persist any variance flags."""
Asset = self.env['fusion.asset']
Anomaly = self.env['fusion.asset.anomaly']
running_assets = Asset.search([('state', '=', 'running')])
flagged = 0
today = fields.Date.today()
for asset in running_assets:
try:
expected = sum(
l.amount for l in asset.depreciation_line_ids
if l.scheduled_date and l.scheduled_date <= today
)
actual = asset.total_depreciated
anomaly = detect_schedule_variance(
asset_id=asset.id, asset_name=asset.name,
expected_accumulated=expected, actual_accumulated=actual,
)
if anomaly is None:
continue
anomaly_dict = anomaly.to_dict()
existing = Anomaly.search([
('asset_id', '=', asset.id),
('anomaly_type', '=', anomaly_dict['anomaly_type']),
('state', 'in', ('new', 'acknowledged')),
], limit=1)
if existing:
continue
Anomaly.create({
'asset_id': asset.id,
'anomaly_type': anomaly_dict['anomaly_type'],
'severity': anomaly_dict['severity'],
'expected': anomaly_dict['expected'],
'actual': anomaly_dict['actual'],
'variance_pct': anomaly_dict['variance_pct'],
'detail': anomaly_dict['detail'],
})
flagged += 1
except Exception as e: # noqa: BLE001
_logger.warning("Cron anomaly scan failed for asset %s: %s", asset.id, e)
_logger.info(
"Cron: scanned %d assets, flagged %d anomalies",
len(running_assets), flagged,
)

View File

@@ -11,3 +11,7 @@ from . import test_fusion_asset_anomaly
from . import test_account_move_inherit
from . import test_fusion_asset_engine
from . import test_engine_integration
from . import test_assets_controller
from . import test_assets_adapter
from . import test_asset_tools
from . import test_assets_cron

View File

@@ -0,0 +1,56 @@
"""Tests for the 5 fusion-asset AI tools."""
from datetime import date
from odoo.tests import tagged
from odoo.tests.common import TransactionCase
from odoo.addons.fusion_accounting_ai.services.tools import asset_management as tools
@tagged('post_install', '-at_install')
class TestFusionAssetTools(TransactionCase):
def test_fusion_list_assets(self):
self.env['fusion.asset'].create({
'name': 'Tool Test', 'cost': 1000,
'acquisition_date': date(2026, 1, 1),
'method': 'straight_line', 'useful_life_years': 4,
})
result = tools.fusion_list_assets(self.env, {'company_id': self.env.company.id})
self.assertGreaterEqual(result.get('count', 0), 1)
def test_fusion_get_asset_detail(self):
asset = self.env['fusion.asset'].create({
'name': 'Detail Test', 'cost': 1500,
'acquisition_date': date(2026, 1, 1),
'method': 'straight_line', 'useful_life_years': 4,
})
result = tools.fusion_get_asset_detail(self.env, {'asset_id': asset.id})
self.assertEqual(result['asset']['name'], 'Detail Test')
def test_fusion_compute_schedule(self):
asset = self.env['fusion.asset'].create({
'name': 'Schedule Test', 'cost': 2000,
'acquisition_date': date(2026, 1, 1),
'method': 'straight_line', 'useful_life_years': 4,
})
result = tools.fusion_compute_asset_schedule(self.env, {'asset_id': asset.id})
self.assertEqual(result['lines_created'], 4)
def test_fusion_suggest_useful_life(self):
self.env['ir.config_parameter'].sudo().search([
('key', 'in', ['fusion_accounting.provider.asset_useful_life',
'fusion_accounting.provider.default'])
]).unlink()
result = tools.fusion_suggest_asset_useful_life(self.env, {
'description': 'desk',
})
self.assertEqual(result['useful_life_years'], 7)
def test_tools_registered_in_dispatch(self):
from odoo.addons.fusion_accounting_ai.services.tools import TOOL_DISPATCH
for tool_name in ['fusion_list_assets', 'fusion_get_asset_detail',
'fusion_compute_asset_schedule', 'fusion_dispose_asset',
'fusion_suggest_asset_useful_life']:
self.assertIn(tool_name, TOOL_DISPATCH)

View File

@@ -0,0 +1,40 @@
"""AssetsAdapter wiring tests — fusion-mode dispatch."""
from datetime import date
from odoo.tests import tagged
from odoo.tests.common import TransactionCase
from odoo.addons.fusion_accounting_ai.services.data_adapters.assets import (
AssetsAdapter,
)
@tagged('post_install', '-at_install')
class TestAssetsAdapter(TransactionCase):
def setUp(self):
super().setUp()
self.adapter = AssetsAdapter(self.env)
def test_list_assets_via_fusion(self):
self.env['fusion.asset'].create({
'name': 'Adapter Test', 'cost': 1000,
'acquisition_date': date(2026, 1, 1),
'method': 'straight_line', 'useful_life_years': 4,
})
result = self.adapter.list_assets_via_fusion(company_id=self.env.company.id)
self.assertGreaterEqual(result['count'], 1)
def test_suggest_useful_life_via_fusion_uses_templated_fallback(self):
self.env['ir.config_parameter'].sudo().search([
('key', 'in', ['fusion_accounting.provider.asset_useful_life',
'fusion_accounting.provider.default'])
]).unlink()
result = self.adapter.suggest_useful_life_via_fusion(description='laptop')
self.assertEqual(result['useful_life_years'], 4)
self.assertEqual(result['depreciation_method'], 'straight_line')
def test_dispose_asset_via_community_returns_error(self):
result = self.adapter.dispose_asset_via_community(asset_id=1, sale_amount=100)
self.assertIn('error', result)

View File

@@ -0,0 +1,103 @@
"""Controller tests using HttpCase."""
import json
from datetime import date
from odoo.tests import tagged
from odoo.tests.common import HttpCase, new_test_user
@tagged('post_install', '-at_install')
class TestAssetsController(HttpCase):
def setUp(self):
super().setUp()
self.user = new_test_user(
self.env, login='assets_test_user',
groups='base.group_user,account.group_account_invoice',
)
def _jsonrpc(self, endpoint, params):
self.authenticate('assets_test_user', 'assets_test_user')
url = f'/fusion/assets/{endpoint}'
body = {'jsonrpc': '2.0', 'method': 'call', 'params': params, 'id': 1}
response = self.url_open(
url, data=json.dumps(body),
headers={'Content-Type': 'application/json'},
)
self.assertEqual(
response.status_code, 200,
f"{endpoint} returned {response.status_code}: {response.text[:300]}",
)
result = response.json()
if 'error' in result:
self.fail(f"{endpoint} errored: {result['error']}")
return result.get('result', {})
def test_list_returns_dict(self):
result = self._jsonrpc('list', {'company_id': self.env.company.id})
self.assertIn('assets', result)
self.assertIn('total', result)
def test_get_detail_returns_asset(self):
asset = self.env['fusion.asset'].create({
'name': 'Ctrl Test Asset', 'cost': 5000,
'acquisition_date': date(2026, 1, 1),
'method': 'straight_line', 'useful_life_years': 5,
})
result = self._jsonrpc('get_detail', {'asset_id': asset.id})
self.assertEqual(result['asset']['id'], asset.id)
self.assertIn('depreciation_lines', result)
def test_compute_schedule_creates_lines(self):
asset = self.env['fusion.asset'].create({
'name': 'CompTest', 'cost': 4000,
'acquisition_date': date(2026, 1, 1),
'method': 'straight_line', 'useful_life_years': 4,
})
result = self._jsonrpc('compute_schedule', {'asset_id': asset.id})
self.assertEqual(result['lines_created'], 4)
def test_post_depreciation_after_running(self):
asset = self.env['fusion.asset'].create({
'name': 'PostTest', 'cost': 3000,
'acquisition_date': date(2026, 1, 1),
'in_service_date': date(2026, 1, 1),
'method': 'straight_line', 'useful_life_years': 3,
})
self.env['fusion.asset.engine'].compute_depreciation_schedule(asset)
asset.action_set_running()
result = self._jsonrpc('post_depreciation', {'asset_id': asset.id})
self.assertEqual(result['posted_count'], 1)
def test_dispose_marks_asset_disposed(self):
asset = self.env['fusion.asset'].create({
'name': 'DispTest', 'cost': 6000,
'acquisition_date': date(2026, 1, 1),
'in_service_date': date(2026, 1, 1),
'method': 'straight_line', 'useful_life_years': 3,
})
self.env['fusion.asset.engine'].compute_depreciation_schedule(asset)
asset.action_set_running()
result = self._jsonrpc('dispose', {
'asset_id': asset.id, 'sale_amount': 4000,
'sale_date': '2027-06-01', 'disposal_type': 'sale',
})
self.assertIn('disposal_id', result)
asset.invalidate_recordset(['state'])
self.assertEqual(asset.state, 'disposed')
def test_get_anomalies_returns_list(self):
result = self._jsonrpc('get_anomalies', {'company_id': self.env.company.id})
self.assertIn('anomalies', result)
def test_suggest_useful_life_returns_dict(self):
result = self._jsonrpc('suggest_useful_life', {'description': 'Dell laptop'})
self.assertIn('useful_life_years', result)
self.assertIn('depreciation_method', result)
self.assertEqual(result['useful_life_years'], 4)
def test_get_partner_history(self):
partner = self.env['res.partner'].create({'name': 'History Test Partner'})
result = self._jsonrpc('get_partner_history', {'partner_id': partner.id})
self.assertEqual(result['partner_id'], partner.id)

View File

@@ -0,0 +1,28 @@
"""Cron handler smoke tests."""
from datetime import date
from odoo.tests import tagged
from odoo.tests.common import TransactionCase
@tagged('post_install', '-at_install')
class TestFusionAssetsCron(TransactionCase):
def setUp(self):
super().setUp()
self.cron = self.env['fusion.assets.cron']
self.asset = self.env['fusion.asset'].create({
'name': 'Cron Test', 'cost': 4000,
'acquisition_date': date(2026, 1, 1),
'in_service_date': date(2026, 1, 1),
'method': 'straight_line', 'useful_life_years': 4,
})
self.env['fusion.asset.engine'].compute_depreciation_schedule(self.asset)
self.asset.action_set_running()
def test_cron_post_due_depreciation_runs(self):
self.cron._cron_post_due_depreciation()
def test_cron_anomaly_scan_runs(self):
self.cron._cron_anomaly_scan()