feat(fusion_accounting_assets): 8 JSON-RPC endpoints for OWL widget

Made-with: Cursor
This commit is contained in:
gsinghpal
2026-04-19 17:14:22 -04:00
parent 345c971d59
commit 3e8b7b1e82
6 changed files with 282 additions and 1 deletions

View File

@@ -1,2 +1,3 @@
from . import models
from . import services
from . import controllers

View File

@@ -1,6 +1,6 @@
{
'name': 'Fusion Accounting Assets',
'version': '19.0.1.0.13',
'version': '19.0.1.0.14',
'category': 'Accounting/Accounting',
'summary': 'AI-augmented asset management with depreciation schedules.',
'description': """

View File

@@ -0,0 +1 @@
from . import assets_controller

View File

@@ -0,0 +1,175 @@
"""HTTP controller: 8 JSON-RPC endpoints for the OWL asset dashboard.
All endpoints route through fusion.asset.engine. V19 type='jsonrpc'.
"""
import logging
from datetime import date, datetime
from odoo import _, http
from odoo.exceptions import ValidationError
from odoo.http import request
_logger = logging.getLogger(__name__)
def _parse_date(value):
if isinstance(value, date):
return value
if not value:
return None
return datetime.strptime(value, '%Y-%m-%d').date()
class FusionAssetsController(http.Controller):
@http.route('/fusion/assets/list', type='jsonrpc', auth='user')
def list_assets(self, state=None, category_id=None, limit=50, offset=0,
company_id=None):
company_id = int(company_id) if company_id else request.env.company.id
Asset = request.env['fusion.asset'].sudo()
domain = [('company_id', '=', company_id)]
if state:
domain.append(('state', '=', state))
if category_id:
domain.append(('category_id', '=', int(category_id)))
total = Asset.search_count(domain)
assets = Asset.search(domain, limit=int(limit), offset=int(offset),
order='acquisition_date desc')
return {
'count': len(assets),
'total': total,
'assets': [{
'id': a.id, 'name': a.name, 'code': a.code or '',
'state': a.state, 'cost': a.cost, 'salvage_value': a.salvage_value,
'book_value': a.book_value, 'total_depreciated': a.total_depreciated,
'method': a.method, 'useful_life_years': a.useful_life_years,
'acquisition_date': str(a.acquisition_date),
'in_service_date': str(a.in_service_date) if a.in_service_date else None,
'category_id': a.category_id.id if a.category_id else None,
'category_name': a.category_id.name if a.category_id else None,
'currency_code': a.currency_id.name,
} for a in assets],
}
@http.route('/fusion/assets/get_detail', type='jsonrpc', auth='user')
def get_detail(self, asset_id):
asset = request.env['fusion.asset'].browse(int(asset_id))
if not asset.exists():
raise ValidationError(_("Asset %s not found") % asset_id)
return {
'asset': {
'id': asset.id, 'name': asset.name, 'code': asset.code or '',
'state': asset.state, 'cost': asset.cost,
'salvage_value': asset.salvage_value,
'book_value': asset.book_value,
'total_depreciated': asset.total_depreciated,
'method': asset.method,
'useful_life_years': asset.useful_life_years,
'declining_rate_pct': asset.declining_rate_pct,
'total_units_expected': asset.total_units_expected,
'units_used_to_date': asset.units_used_to_date,
'prorate_convention': asset.prorate_convention,
'acquisition_date': str(asset.acquisition_date),
'in_service_date': str(asset.in_service_date) if asset.in_service_date else None,
'disposed_date': str(asset.disposed_date) if asset.disposed_date else None,
'category_id': asset.category_id.id if asset.category_id else None,
'category_name': asset.category_id.name if asset.category_id else None,
'currency_id': asset.currency_id.id,
'currency_code': asset.currency_id.name,
},
'depreciation_lines': [{
'id': l.id, 'period_index': l.period_index,
'scheduled_date': str(l.scheduled_date),
'amount': l.amount, 'accumulated': l.accumulated,
'book_value_at_end': l.book_value_at_end,
'is_posted': l.is_posted,
'posted_date': str(l.posted_date) if l.posted_date else None,
} for l in asset.depreciation_line_ids.sorted('period_index')],
'anomalies': [{
'id': a.id, 'anomaly_type': a.anomaly_type,
'severity': a.severity, 'detail': a.detail or '',
'state': a.state,
} for a in request.env['fusion.asset.anomaly'].search([
('asset_id', '=', asset.id), ('state', 'in', ('new', 'acknowledged'))
])],
}
@http.route('/fusion/assets/compute_schedule', type='jsonrpc', auth='user')
def compute_schedule(self, asset_id, recompute=False):
asset = request.env['fusion.asset'].sudo().browse(int(asset_id))
engine = request.env['fusion.asset.engine'].sudo()
return engine.compute_depreciation_schedule(asset, recompute=bool(recompute))
@http.route('/fusion/assets/post_depreciation', type='jsonrpc', auth='user')
def post_depreciation(self, asset_id, period_date=None):
asset = request.env['fusion.asset'].sudo().browse(int(asset_id))
engine = request.env['fusion.asset.engine'].sudo()
return engine.post_depreciation_entry(asset, period_date=_parse_date(period_date))
@http.route('/fusion/assets/dispose', type='jsonrpc', auth='user')
def dispose(self, asset_id, sale_amount=0, sale_date=None,
sale_partner_id=None, disposal_type='sale'):
asset = request.env['fusion.asset'].sudo().browse(int(asset_id))
engine = request.env['fusion.asset.engine'].sudo()
partner = None
if sale_partner_id:
partner = request.env['res.partner'].sudo().browse(int(sale_partner_id))
return engine.dispose_asset(
asset, sale_amount=float(sale_amount),
sale_date=_parse_date(sale_date),
sale_partner=partner, disposal_type=disposal_type,
)
@http.route('/fusion/assets/get_anomalies', type='jsonrpc', auth='user')
def get_anomalies(self, asset_id=None, severity=None, state='new', limit=50,
company_id=None):
company_id = int(company_id) if company_id else request.env.company.id
Anomaly = request.env['fusion.asset.anomaly'].sudo()
domain = [('company_id', '=', company_id)]
if asset_id:
domain.append(('asset_id', '=', int(asset_id)))
if severity:
domain.append(('severity', '=', severity))
if state:
domain.append(('state', '=', state))
anomalies = Anomaly.search(domain, limit=int(limit), order='detected_at desc')
return {
'count': len(anomalies),
'anomalies': [{
'id': a.id, 'asset_id': a.asset_id.id, 'asset_name': a.asset_id.name,
'anomaly_type': a.anomaly_type, 'severity': a.severity,
'expected': a.expected, 'actual': a.actual,
'variance_pct': a.variance_pct, 'detail': a.detail or '',
'state': a.state,
'detected_at': str(a.detected_at),
} for a in anomalies],
}
@http.route('/fusion/assets/suggest_useful_life', type='jsonrpc', auth='user')
def suggest_useful_life(self, description, amount=None, partner_name=None):
from odoo.addons.fusion_accounting_assets.services.useful_life_predictor import (
predict_useful_life,
)
return predict_useful_life(
request.env, description=description,
amount=float(amount) if amount is not None else None,
partner_name=partner_name,
)
@http.route('/fusion/assets/get_partner_history', type='jsonrpc', auth='user')
def get_partner_history(self, partner_id, limit=20):
Asset = request.env['fusion.asset'].sudo()
assets = Asset.search([
('source_invoice_line_id.partner_id', '=', int(partner_id)),
], limit=int(limit), order='acquisition_date desc')
return {
'partner_id': int(partner_id),
'count': len(assets),
'assets': [{
'id': a.id, 'name': a.name,
'cost': a.cost, 'book_value': a.book_value,
'state': a.state,
'acquisition_date': str(a.acquisition_date),
} for a in assets],
}

View File

@@ -11,3 +11,4 @@ from . import test_fusion_asset_anomaly
from . import test_account_move_inherit
from . import test_fusion_asset_engine
from . import test_engine_integration
from . import test_assets_controller

View File

@@ -0,0 +1,103 @@
"""Controller tests using HttpCase."""
import json
from datetime import date
from odoo.tests import tagged
from odoo.tests.common import HttpCase, new_test_user
@tagged('post_install', '-at_install')
class TestAssetsController(HttpCase):
def setUp(self):
super().setUp()
self.user = new_test_user(
self.env, login='assets_test_user',
groups='base.group_user,account.group_account_invoice',
)
def _jsonrpc(self, endpoint, params):
self.authenticate('assets_test_user', 'assets_test_user')
url = f'/fusion/assets/{endpoint}'
body = {'jsonrpc': '2.0', 'method': 'call', 'params': params, 'id': 1}
response = self.url_open(
url, data=json.dumps(body),
headers={'Content-Type': 'application/json'},
)
self.assertEqual(
response.status_code, 200,
f"{endpoint} returned {response.status_code}: {response.text[:300]}",
)
result = response.json()
if 'error' in result:
self.fail(f"{endpoint} errored: {result['error']}")
return result.get('result', {})
def test_list_returns_dict(self):
result = self._jsonrpc('list', {'company_id': self.env.company.id})
self.assertIn('assets', result)
self.assertIn('total', result)
def test_get_detail_returns_asset(self):
asset = self.env['fusion.asset'].create({
'name': 'Ctrl Test Asset', 'cost': 5000,
'acquisition_date': date(2026, 1, 1),
'method': 'straight_line', 'useful_life_years': 5,
})
result = self._jsonrpc('get_detail', {'asset_id': asset.id})
self.assertEqual(result['asset']['id'], asset.id)
self.assertIn('depreciation_lines', result)
def test_compute_schedule_creates_lines(self):
asset = self.env['fusion.asset'].create({
'name': 'CompTest', 'cost': 4000,
'acquisition_date': date(2026, 1, 1),
'method': 'straight_line', 'useful_life_years': 4,
})
result = self._jsonrpc('compute_schedule', {'asset_id': asset.id})
self.assertEqual(result['lines_created'], 4)
def test_post_depreciation_after_running(self):
asset = self.env['fusion.asset'].create({
'name': 'PostTest', 'cost': 3000,
'acquisition_date': date(2026, 1, 1),
'in_service_date': date(2026, 1, 1),
'method': 'straight_line', 'useful_life_years': 3,
})
self.env['fusion.asset.engine'].compute_depreciation_schedule(asset)
asset.action_set_running()
result = self._jsonrpc('post_depreciation', {'asset_id': asset.id})
self.assertEqual(result['posted_count'], 1)
def test_dispose_marks_asset_disposed(self):
asset = self.env['fusion.asset'].create({
'name': 'DispTest', 'cost': 6000,
'acquisition_date': date(2026, 1, 1),
'in_service_date': date(2026, 1, 1),
'method': 'straight_line', 'useful_life_years': 3,
})
self.env['fusion.asset.engine'].compute_depreciation_schedule(asset)
asset.action_set_running()
result = self._jsonrpc('dispose', {
'asset_id': asset.id, 'sale_amount': 4000,
'sale_date': '2027-06-01', 'disposal_type': 'sale',
})
self.assertIn('disposal_id', result)
asset.invalidate_recordset(['state'])
self.assertEqual(asset.state, 'disposed')
def test_get_anomalies_returns_list(self):
result = self._jsonrpc('get_anomalies', {'company_id': self.env.company.id})
self.assertIn('anomalies', result)
def test_suggest_useful_life_returns_dict(self):
result = self._jsonrpc('suggest_useful_life', {'description': 'Dell laptop'})
self.assertIn('useful_life_years', result)
self.assertIn('depreciation_method', result)
self.assertEqual(result['useful_life_years'], 4)
def test_get_partner_history(self):
partner = self.env['res.partner'].create({'name': 'History Test Partner'})
result = self._jsonrpc('get_partner_history', {'partner_id': partner.id})
self.assertEqual(result['partner_id'], partner.id)