Compare commits
4 Commits
345c971d59
...
de6d8fda3e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
de6d8fda3e | ||
|
|
9092a78be2 | ||
|
|
79cd0216ff | ||
|
|
3e8b7b1e82 |
@@ -1,42 +1,98 @@
|
||||
"""Assets data adapter."""
|
||||
"""Assets data adapter — routes asset queries through fusion engine if installed."""
|
||||
|
||||
from .base import DataAdapter
|
||||
from ._registry import register_adapter
|
||||
|
||||
|
||||
class AssetsAdapter(DataAdapter):
|
||||
FUSION_MODEL = 'fusion.asset'
|
||||
FUSION_MODEL = 'fusion.asset.engine'
|
||||
ENTERPRISE_MODULE = 'account_asset'
|
||||
|
||||
def list_assets(self, state=None):
|
||||
return self._dispatch('list_assets', state=state)
|
||||
# ============================================================
|
||||
# list_assets
|
||||
# ============================================================
|
||||
|
||||
def list_assets_via_fusion(self, state=None):
|
||||
return self._read_fusion('fusion.asset', state=state)
|
||||
def list_assets(self, state=None, limit=50, company_id=None):
|
||||
return self._dispatch(
|
||||
'list_assets', state=state, limit=limit, company_id=company_id,
|
||||
)
|
||||
|
||||
def list_assets_via_enterprise(self, state=None):
|
||||
return self._read_fusion('account.asset', state=state)
|
||||
def list_assets_via_fusion(self, **kwargs):
|
||||
if 'fusion.asset.engine' not in self.env.registry:
|
||||
return {'assets': [], 'count': 0, 'total': 0}
|
||||
Asset = self.env['fusion.asset'].sudo()
|
||||
domain = [('company_id', '=', kwargs.get('company_id') or self.env.company.id)]
|
||||
if kwargs.get('state'):
|
||||
domain.append(('state', '=', kwargs['state']))
|
||||
total = Asset.search_count(domain)
|
||||
assets = Asset.search(
|
||||
domain, limit=int(kwargs.get('limit', 50)),
|
||||
order='acquisition_date desc',
|
||||
)
|
||||
return {
|
||||
'count': len(assets), 'total': total,
|
||||
'assets': [{
|
||||
'id': a.id, 'name': a.name, 'state': a.state,
|
||||
'cost': a.cost, 'book_value': a.book_value,
|
||||
'method': a.method,
|
||||
'category_name': a.category_id.name if a.category_id else None,
|
||||
} for a in assets],
|
||||
}
|
||||
|
||||
def list_assets_via_community(self, state=None):
|
||||
# No assets feature in pure Community — return empty list with a hint.
|
||||
return []
|
||||
def list_assets_via_enterprise(self, **kwargs):
|
||||
return {
|
||||
'assets': [], 'count': 0, 'total': 0,
|
||||
'error': 'Enterprise account_asset must be queried from Enterprise UI',
|
||||
}
|
||||
|
||||
def _read_fusion(self, model_name, state=None):
|
||||
"""Shared shape between fusion and enterprise (both use account.asset-like API)."""
|
||||
Model = self.env[model_name].sudo()
|
||||
domain = []
|
||||
if state:
|
||||
domain.append(('state', '=', state))
|
||||
records = Model.search(domain, limit=200)
|
||||
out = []
|
||||
for r in records:
|
||||
out.append({
|
||||
'id': r.id,
|
||||
'name': getattr(r, 'name', None),
|
||||
'state': getattr(r, 'state', None),
|
||||
'value': getattr(r, 'original_value', None) or getattr(r, 'acquisition_cost', None),
|
||||
})
|
||||
return out
|
||||
def list_assets_via_community(self, **kwargs):
|
||||
return {
|
||||
'assets': [], 'count': 0, 'total': 0,
|
||||
'error': 'No assets engine in pure Community',
|
||||
}
|
||||
|
||||
# ============================================================
|
||||
# suggest_useful_life
|
||||
# ============================================================
|
||||
|
||||
def suggest_useful_life(self, description, amount=None, partner_name=None):
|
||||
return self._dispatch(
|
||||
'suggest_useful_life',
|
||||
description=description, amount=amount, partner_name=partner_name,
|
||||
)
|
||||
|
||||
def suggest_useful_life_via_fusion(self, **kwargs):
|
||||
if 'fusion.asset.engine' not in self.env.registry:
|
||||
return {'error': 'fusion_accounting_assets not installed'}
|
||||
from odoo.addons.fusion_accounting_assets.services.useful_life_predictor import (
|
||||
predict_useful_life,
|
||||
)
|
||||
return predict_useful_life(self.env, **kwargs)
|
||||
|
||||
def suggest_useful_life_via_enterprise(self, **kwargs):
|
||||
return {'error': 'AI useful-life suggestion is fusion-only'}
|
||||
|
||||
def suggest_useful_life_via_community(self, **kwargs):
|
||||
return {'error': 'AI useful-life suggestion is fusion-only'}
|
||||
|
||||
# ============================================================
|
||||
# dispose_asset
|
||||
# ============================================================
|
||||
|
||||
def dispose_asset(self, asset_id, **kwargs):
|
||||
return self._dispatch('dispose_asset', asset_id=asset_id, **kwargs)
|
||||
|
||||
def dispose_asset_via_fusion(self, asset_id, **kwargs):
|
||||
if 'fusion.asset.engine' not in self.env.registry:
|
||||
return {'error': 'fusion_accounting_assets not installed'}
|
||||
asset = self.env['fusion.asset'].sudo().browse(int(asset_id))
|
||||
return self.env['fusion.asset.engine'].sudo().dispose_asset(asset, **kwargs)
|
||||
|
||||
def dispose_asset_via_enterprise(self, asset_id, **kwargs):
|
||||
return {'error': 'Enterprise asset disposal must use Enterprise UI'}
|
||||
|
||||
def dispose_asset_via_community(self, asset_id, **kwargs):
|
||||
return {'error': 'Community has no asset disposal flow'}
|
||||
|
||||
|
||||
register_adapter('assets', AssetsAdapter)
|
||||
|
||||
@@ -10,11 +10,13 @@ from .adp import TOOLS as ADP_TOOLS
|
||||
from .reporting import TOOLS as REPORTING_TOOLS
|
||||
from .audit import TOOLS as AUDIT_TOOLS
|
||||
from .financial_reports import TOOLS as FINANCIAL_REPORTS_TOOLS
|
||||
from .asset_management import TOOLS as ASSET_MANAGEMENT_TOOLS
|
||||
|
||||
TOOL_DISPATCH = {}
|
||||
for tools_dict in [
|
||||
BANK_RECON_TOOLS, HST_TOOLS, AR_TOOLS, AP_TOOLS, JOURNAL_TOOLS,
|
||||
MONTH_END_TOOLS, PAYROLL_TOOLS, INVENTORY_TOOLS, ADP_TOOLS,
|
||||
REPORTING_TOOLS, AUDIT_TOOLS, FINANCIAL_REPORTS_TOOLS,
|
||||
ASSET_MANAGEMENT_TOOLS,
|
||||
]:
|
||||
TOOL_DISPATCH.update(tools_dict)
|
||||
|
||||
77
fusion_accounting_ai/services/tools/asset_management.py
Normal file
77
fusion_accounting_ai/services/tools/asset_management.py
Normal file
@@ -0,0 +1,77 @@
|
||||
"""Fusion-engine-routed AI tools for asset management."""
|
||||
|
||||
import logging
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def fusion_list_assets(env, params):
|
||||
if 'fusion.asset.engine' not in env.registry:
|
||||
return {'error': 'fusion_accounting_assets not installed'}
|
||||
from ..data_adapters import get_adapter
|
||||
adapter = get_adapter(env, 'assets')
|
||||
return adapter.list_assets(
|
||||
state=params.get('state'),
|
||||
limit=int(params.get('limit', 50)),
|
||||
company_id=int(params['company_id']) if params.get('company_id') else env.company.id,
|
||||
)
|
||||
|
||||
|
||||
def fusion_get_asset_detail(env, params):
|
||||
if 'fusion.asset.engine' not in env.registry:
|
||||
return {'error': 'fusion_accounting_assets not installed'}
|
||||
Asset = env['fusion.asset']
|
||||
asset = Asset.browse(int(params['asset_id']))
|
||||
if not asset.exists():
|
||||
return {'error': 'Asset not found'}
|
||||
return {
|
||||
'asset': {
|
||||
'id': asset.id, 'name': asset.name, 'state': asset.state,
|
||||
'cost': asset.cost, 'book_value': asset.book_value,
|
||||
'total_depreciated': asset.total_depreciated,
|
||||
'method': asset.method, 'useful_life_years': asset.useful_life_years,
|
||||
},
|
||||
'depreciation_count': len(asset.depreciation_line_ids),
|
||||
}
|
||||
|
||||
|
||||
def fusion_compute_asset_schedule(env, params):
|
||||
if 'fusion.asset.engine' not in env.registry:
|
||||
return {'error': 'fusion_accounting_assets not installed'}
|
||||
asset = env['fusion.asset'].browse(int(params['asset_id']))
|
||||
return env['fusion.asset.engine'].compute_depreciation_schedule(
|
||||
asset, recompute=bool(params.get('recompute', False)),
|
||||
)
|
||||
|
||||
|
||||
def fusion_dispose_asset(env, params):
|
||||
if 'fusion.asset.engine' not in env.registry:
|
||||
return {'error': 'fusion_accounting_assets not installed'}
|
||||
from ..data_adapters import get_adapter
|
||||
adapter = get_adapter(env, 'assets')
|
||||
return adapter.dispose_asset(
|
||||
asset_id=int(params['asset_id']),
|
||||
sale_amount=float(params.get('sale_amount', 0)),
|
||||
disposal_type=params.get('disposal_type', 'sale'),
|
||||
)
|
||||
|
||||
|
||||
def fusion_suggest_asset_useful_life(env, params):
|
||||
if 'fusion.asset.engine' not in env.registry:
|
||||
return {'error': 'fusion_accounting_assets not installed'}
|
||||
from ..data_adapters import get_adapter
|
||||
adapter = get_adapter(env, 'assets')
|
||||
return adapter.suggest_useful_life(
|
||||
description=params.get('description', ''),
|
||||
amount=float(params['amount']) if params.get('amount') else None,
|
||||
partner_name=params.get('partner_name'),
|
||||
)
|
||||
|
||||
|
||||
TOOLS = {
|
||||
'fusion_list_assets': fusion_list_assets,
|
||||
'fusion_get_asset_detail': fusion_get_asset_detail,
|
||||
'fusion_compute_asset_schedule': fusion_compute_asset_schedule,
|
||||
'fusion_dispose_asset': fusion_dispose_asset,
|
||||
'fusion_suggest_asset_useful_life': fusion_suggest_asset_useful_life,
|
||||
}
|
||||
@@ -1,2 +1,3 @@
|
||||
from . import models
|
||||
from . import services
|
||||
from . import controllers
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
'name': 'Fusion Accounting Assets',
|
||||
'version': '19.0.1.0.13',
|
||||
'version': '19.0.1.0.17',
|
||||
'category': 'Accounting/Accounting',
|
||||
'summary': 'AI-augmented asset management with depreciation schedules.',
|
||||
'description': """
|
||||
@@ -33,6 +33,7 @@ menu hides; the engine + AI tools remain available for the chat.
|
||||
],
|
||||
'data': [
|
||||
'security/ir.model.access.csv',
|
||||
'data/cron.xml',
|
||||
],
|
||||
'assets': {
|
||||
'web.assets_backend': [
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
from . import assets_controller
|
||||
|
||||
175
fusion_accounting_assets/controllers/assets_controller.py
Normal file
175
fusion_accounting_assets/controllers/assets_controller.py
Normal file
@@ -0,0 +1,175 @@
|
||||
"""HTTP controller: 8 JSON-RPC endpoints for the OWL asset dashboard.
|
||||
|
||||
All endpoints route through fusion.asset.engine. V19 type='jsonrpc'.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from datetime import date, datetime
|
||||
|
||||
from odoo import _, http
|
||||
from odoo.exceptions import ValidationError
|
||||
from odoo.http import request
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _parse_date(value):
|
||||
if isinstance(value, date):
|
||||
return value
|
||||
if not value:
|
||||
return None
|
||||
return datetime.strptime(value, '%Y-%m-%d').date()
|
||||
|
||||
|
||||
class FusionAssetsController(http.Controller):
|
||||
|
||||
@http.route('/fusion/assets/list', type='jsonrpc', auth='user')
|
||||
def list_assets(self, state=None, category_id=None, limit=50, offset=0,
|
||||
company_id=None):
|
||||
company_id = int(company_id) if company_id else request.env.company.id
|
||||
Asset = request.env['fusion.asset'].sudo()
|
||||
domain = [('company_id', '=', company_id)]
|
||||
if state:
|
||||
domain.append(('state', '=', state))
|
||||
if category_id:
|
||||
domain.append(('category_id', '=', int(category_id)))
|
||||
total = Asset.search_count(domain)
|
||||
assets = Asset.search(domain, limit=int(limit), offset=int(offset),
|
||||
order='acquisition_date desc')
|
||||
return {
|
||||
'count': len(assets),
|
||||
'total': total,
|
||||
'assets': [{
|
||||
'id': a.id, 'name': a.name, 'code': a.code or '',
|
||||
'state': a.state, 'cost': a.cost, 'salvage_value': a.salvage_value,
|
||||
'book_value': a.book_value, 'total_depreciated': a.total_depreciated,
|
||||
'method': a.method, 'useful_life_years': a.useful_life_years,
|
||||
'acquisition_date': str(a.acquisition_date),
|
||||
'in_service_date': str(a.in_service_date) if a.in_service_date else None,
|
||||
'category_id': a.category_id.id if a.category_id else None,
|
||||
'category_name': a.category_id.name if a.category_id else None,
|
||||
'currency_code': a.currency_id.name,
|
||||
} for a in assets],
|
||||
}
|
||||
|
||||
@http.route('/fusion/assets/get_detail', type='jsonrpc', auth='user')
|
||||
def get_detail(self, asset_id):
|
||||
asset = request.env['fusion.asset'].browse(int(asset_id))
|
||||
if not asset.exists():
|
||||
raise ValidationError(_("Asset %s not found") % asset_id)
|
||||
return {
|
||||
'asset': {
|
||||
'id': asset.id, 'name': asset.name, 'code': asset.code or '',
|
||||
'state': asset.state, 'cost': asset.cost,
|
||||
'salvage_value': asset.salvage_value,
|
||||
'book_value': asset.book_value,
|
||||
'total_depreciated': asset.total_depreciated,
|
||||
'method': asset.method,
|
||||
'useful_life_years': asset.useful_life_years,
|
||||
'declining_rate_pct': asset.declining_rate_pct,
|
||||
'total_units_expected': asset.total_units_expected,
|
||||
'units_used_to_date': asset.units_used_to_date,
|
||||
'prorate_convention': asset.prorate_convention,
|
||||
'acquisition_date': str(asset.acquisition_date),
|
||||
'in_service_date': str(asset.in_service_date) if asset.in_service_date else None,
|
||||
'disposed_date': str(asset.disposed_date) if asset.disposed_date else None,
|
||||
'category_id': asset.category_id.id if asset.category_id else None,
|
||||
'category_name': asset.category_id.name if asset.category_id else None,
|
||||
'currency_id': asset.currency_id.id,
|
||||
'currency_code': asset.currency_id.name,
|
||||
},
|
||||
'depreciation_lines': [{
|
||||
'id': l.id, 'period_index': l.period_index,
|
||||
'scheduled_date': str(l.scheduled_date),
|
||||
'amount': l.amount, 'accumulated': l.accumulated,
|
||||
'book_value_at_end': l.book_value_at_end,
|
||||
'is_posted': l.is_posted,
|
||||
'posted_date': str(l.posted_date) if l.posted_date else None,
|
||||
} for l in asset.depreciation_line_ids.sorted('period_index')],
|
||||
'anomalies': [{
|
||||
'id': a.id, 'anomaly_type': a.anomaly_type,
|
||||
'severity': a.severity, 'detail': a.detail or '',
|
||||
'state': a.state,
|
||||
} for a in request.env['fusion.asset.anomaly'].search([
|
||||
('asset_id', '=', asset.id), ('state', 'in', ('new', 'acknowledged'))
|
||||
])],
|
||||
}
|
||||
|
||||
@http.route('/fusion/assets/compute_schedule', type='jsonrpc', auth='user')
|
||||
def compute_schedule(self, asset_id, recompute=False):
|
||||
asset = request.env['fusion.asset'].sudo().browse(int(asset_id))
|
||||
engine = request.env['fusion.asset.engine'].sudo()
|
||||
return engine.compute_depreciation_schedule(asset, recompute=bool(recompute))
|
||||
|
||||
@http.route('/fusion/assets/post_depreciation', type='jsonrpc', auth='user')
|
||||
def post_depreciation(self, asset_id, period_date=None):
|
||||
asset = request.env['fusion.asset'].sudo().browse(int(asset_id))
|
||||
engine = request.env['fusion.asset.engine'].sudo()
|
||||
return engine.post_depreciation_entry(asset, period_date=_parse_date(period_date))
|
||||
|
||||
@http.route('/fusion/assets/dispose', type='jsonrpc', auth='user')
|
||||
def dispose(self, asset_id, sale_amount=0, sale_date=None,
|
||||
sale_partner_id=None, disposal_type='sale'):
|
||||
asset = request.env['fusion.asset'].sudo().browse(int(asset_id))
|
||||
engine = request.env['fusion.asset.engine'].sudo()
|
||||
partner = None
|
||||
if sale_partner_id:
|
||||
partner = request.env['res.partner'].sudo().browse(int(sale_partner_id))
|
||||
return engine.dispose_asset(
|
||||
asset, sale_amount=float(sale_amount),
|
||||
sale_date=_parse_date(sale_date),
|
||||
sale_partner=partner, disposal_type=disposal_type,
|
||||
)
|
||||
|
||||
@http.route('/fusion/assets/get_anomalies', type='jsonrpc', auth='user')
|
||||
def get_anomalies(self, asset_id=None, severity=None, state='new', limit=50,
|
||||
company_id=None):
|
||||
company_id = int(company_id) if company_id else request.env.company.id
|
||||
Anomaly = request.env['fusion.asset.anomaly'].sudo()
|
||||
domain = [('company_id', '=', company_id)]
|
||||
if asset_id:
|
||||
domain.append(('asset_id', '=', int(asset_id)))
|
||||
if severity:
|
||||
domain.append(('severity', '=', severity))
|
||||
if state:
|
||||
domain.append(('state', '=', state))
|
||||
anomalies = Anomaly.search(domain, limit=int(limit), order='detected_at desc')
|
||||
return {
|
||||
'count': len(anomalies),
|
||||
'anomalies': [{
|
||||
'id': a.id, 'asset_id': a.asset_id.id, 'asset_name': a.asset_id.name,
|
||||
'anomaly_type': a.anomaly_type, 'severity': a.severity,
|
||||
'expected': a.expected, 'actual': a.actual,
|
||||
'variance_pct': a.variance_pct, 'detail': a.detail or '',
|
||||
'state': a.state,
|
||||
'detected_at': str(a.detected_at),
|
||||
} for a in anomalies],
|
||||
}
|
||||
|
||||
@http.route('/fusion/assets/suggest_useful_life', type='jsonrpc', auth='user')
|
||||
def suggest_useful_life(self, description, amount=None, partner_name=None):
|
||||
from odoo.addons.fusion_accounting_assets.services.useful_life_predictor import (
|
||||
predict_useful_life,
|
||||
)
|
||||
return predict_useful_life(
|
||||
request.env, description=description,
|
||||
amount=float(amount) if amount is not None else None,
|
||||
partner_name=partner_name,
|
||||
)
|
||||
|
||||
@http.route('/fusion/assets/get_partner_history', type='jsonrpc', auth='user')
|
||||
def get_partner_history(self, partner_id, limit=20):
|
||||
Asset = request.env['fusion.asset'].sudo()
|
||||
assets = Asset.search([
|
||||
('source_invoice_line_id.partner_id', '=', int(partner_id)),
|
||||
], limit=int(limit), order='acquisition_date desc')
|
||||
return {
|
||||
'partner_id': int(partner_id),
|
||||
'count': len(assets),
|
||||
'assets': [{
|
||||
'id': a.id, 'name': a.name,
|
||||
'cost': a.cost, 'book_value': a.book_value,
|
||||
'state': a.state,
|
||||
'acquisition_date': str(a.acquisition_date),
|
||||
} for a in assets],
|
||||
}
|
||||
24
fusion_accounting_assets/data/cron.xml
Normal file
24
fusion_accounting_assets/data/cron.xml
Normal file
@@ -0,0 +1,24 @@
|
||||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<odoo noupdate="1">
|
||||
|
||||
<record id="cron_fusion_assets_post_depreciation" model="ir.cron">
|
||||
<field name="name">Fusion Assets — Post Due Depreciation</field>
|
||||
<field name="model_id" ref="model_fusion_assets_cron"/>
|
||||
<field name="state">code</field>
|
||||
<field name="code">model._cron_post_due_depreciation()</field>
|
||||
<field name="interval_number">1</field>
|
||||
<field name="interval_type">days</field>
|
||||
<field name="active" eval="True"/>
|
||||
</record>
|
||||
|
||||
<record id="cron_fusion_assets_anomaly_scan" model="ir.cron">
|
||||
<field name="name">Fusion Assets — Monthly Anomaly Scan</field>
|
||||
<field name="model_id" ref="model_fusion_assets_cron"/>
|
||||
<field name="state">code</field>
|
||||
<field name="code">model._cron_anomaly_scan()</field>
|
||||
<field name="interval_number">30</field>
|
||||
<field name="interval_type">days</field>
|
||||
<field name="active" eval="True"/>
|
||||
</record>
|
||||
|
||||
</odoo>
|
||||
@@ -5,3 +5,4 @@ from . import fusion_asset_disposal
|
||||
from . import fusion_asset_anomaly
|
||||
from . import account_move
|
||||
from . import fusion_asset_engine
|
||||
from . import fusion_assets_cron
|
||||
|
||||
85
fusion_accounting_assets/models/fusion_assets_cron.py
Normal file
85
fusion_accounting_assets/models/fusion_assets_cron.py
Normal file
@@ -0,0 +1,85 @@
|
||||
"""Cron handlers for fusion_accounting_assets.
|
||||
|
||||
- _cron_post_due_depreciation: daily, post due depreciation lines for running assets
|
||||
- _cron_anomaly_scan: monthly, scan for schedule variance and create anomaly records
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
from odoo import api, fields, models
|
||||
|
||||
from ..services.anomaly_detection import detect_schedule_variance
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FusionAssetsCron(models.AbstractModel):
|
||||
_name = "fusion.assets.cron"
|
||||
_description = "Fusion Assets Cron Handlers"
|
||||
|
||||
@api.model
|
||||
def _cron_post_due_depreciation(self):
|
||||
"""For each running asset, post any due un-posted depreciation lines."""
|
||||
today = fields.Date.today()
|
||||
engine = self.env['fusion.asset.engine']
|
||||
Asset = self.env['fusion.asset']
|
||||
running_assets = Asset.search([('state', '=', 'running')])
|
||||
posted_total = 0
|
||||
for asset in running_assets:
|
||||
try:
|
||||
with self.env.cr.savepoint():
|
||||
result = engine.post_depreciation_entry(asset, period_date=today)
|
||||
posted_total += result.get('posted_count', 0)
|
||||
except Exception as e: # noqa: BLE001
|
||||
_logger.warning("Cron post failed for asset %s: %s", asset.id, e)
|
||||
_logger.info(
|
||||
"Cron: posted depreciation on %d lines across %d running assets",
|
||||
posted_total, len(running_assets),
|
||||
)
|
||||
|
||||
@api.model
|
||||
def _cron_anomaly_scan(self):
|
||||
"""For each running asset, compare expected accumulated depreciation
|
||||
vs posted, and persist any variance flags."""
|
||||
Asset = self.env['fusion.asset']
|
||||
Anomaly = self.env['fusion.asset.anomaly']
|
||||
running_assets = Asset.search([('state', '=', 'running')])
|
||||
flagged = 0
|
||||
today = fields.Date.today()
|
||||
for asset in running_assets:
|
||||
try:
|
||||
expected = sum(
|
||||
l.amount for l in asset.depreciation_line_ids
|
||||
if l.scheduled_date and l.scheduled_date <= today
|
||||
)
|
||||
actual = asset.total_depreciated
|
||||
anomaly = detect_schedule_variance(
|
||||
asset_id=asset.id, asset_name=asset.name,
|
||||
expected_accumulated=expected, actual_accumulated=actual,
|
||||
)
|
||||
if anomaly is None:
|
||||
continue
|
||||
anomaly_dict = anomaly.to_dict()
|
||||
existing = Anomaly.search([
|
||||
('asset_id', '=', asset.id),
|
||||
('anomaly_type', '=', anomaly_dict['anomaly_type']),
|
||||
('state', 'in', ('new', 'acknowledged')),
|
||||
], limit=1)
|
||||
if existing:
|
||||
continue
|
||||
Anomaly.create({
|
||||
'asset_id': asset.id,
|
||||
'anomaly_type': anomaly_dict['anomaly_type'],
|
||||
'severity': anomaly_dict['severity'],
|
||||
'expected': anomaly_dict['expected'],
|
||||
'actual': anomaly_dict['actual'],
|
||||
'variance_pct': anomaly_dict['variance_pct'],
|
||||
'detail': anomaly_dict['detail'],
|
||||
})
|
||||
flagged += 1
|
||||
except Exception as e: # noqa: BLE001
|
||||
_logger.warning("Cron anomaly scan failed for asset %s: %s", asset.id, e)
|
||||
_logger.info(
|
||||
"Cron: scanned %d assets, flagged %d anomalies",
|
||||
len(running_assets), flagged,
|
||||
)
|
||||
@@ -11,3 +11,7 @@ from . import test_fusion_asset_anomaly
|
||||
from . import test_account_move_inherit
|
||||
from . import test_fusion_asset_engine
|
||||
from . import test_engine_integration
|
||||
from . import test_assets_controller
|
||||
from . import test_assets_adapter
|
||||
from . import test_asset_tools
|
||||
from . import test_assets_cron
|
||||
|
||||
56
fusion_accounting_assets/tests/test_asset_tools.py
Normal file
56
fusion_accounting_assets/tests/test_asset_tools.py
Normal file
@@ -0,0 +1,56 @@
|
||||
"""Tests for the 5 fusion-asset AI tools."""
|
||||
|
||||
from datetime import date
|
||||
|
||||
from odoo.tests import tagged
|
||||
from odoo.tests.common import TransactionCase
|
||||
|
||||
from odoo.addons.fusion_accounting_ai.services.tools import asset_management as tools
|
||||
|
||||
|
||||
@tagged('post_install', '-at_install')
|
||||
class TestFusionAssetTools(TransactionCase):
|
||||
|
||||
def test_fusion_list_assets(self):
|
||||
self.env['fusion.asset'].create({
|
||||
'name': 'Tool Test', 'cost': 1000,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
'method': 'straight_line', 'useful_life_years': 4,
|
||||
})
|
||||
result = tools.fusion_list_assets(self.env, {'company_id': self.env.company.id})
|
||||
self.assertGreaterEqual(result.get('count', 0), 1)
|
||||
|
||||
def test_fusion_get_asset_detail(self):
|
||||
asset = self.env['fusion.asset'].create({
|
||||
'name': 'Detail Test', 'cost': 1500,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
'method': 'straight_line', 'useful_life_years': 4,
|
||||
})
|
||||
result = tools.fusion_get_asset_detail(self.env, {'asset_id': asset.id})
|
||||
self.assertEqual(result['asset']['name'], 'Detail Test')
|
||||
|
||||
def test_fusion_compute_schedule(self):
|
||||
asset = self.env['fusion.asset'].create({
|
||||
'name': 'Schedule Test', 'cost': 2000,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
'method': 'straight_line', 'useful_life_years': 4,
|
||||
})
|
||||
result = tools.fusion_compute_asset_schedule(self.env, {'asset_id': asset.id})
|
||||
self.assertEqual(result['lines_created'], 4)
|
||||
|
||||
def test_fusion_suggest_useful_life(self):
|
||||
self.env['ir.config_parameter'].sudo().search([
|
||||
('key', 'in', ['fusion_accounting.provider.asset_useful_life',
|
||||
'fusion_accounting.provider.default'])
|
||||
]).unlink()
|
||||
result = tools.fusion_suggest_asset_useful_life(self.env, {
|
||||
'description': 'desk',
|
||||
})
|
||||
self.assertEqual(result['useful_life_years'], 7)
|
||||
|
||||
def test_tools_registered_in_dispatch(self):
|
||||
from odoo.addons.fusion_accounting_ai.services.tools import TOOL_DISPATCH
|
||||
for tool_name in ['fusion_list_assets', 'fusion_get_asset_detail',
|
||||
'fusion_compute_asset_schedule', 'fusion_dispose_asset',
|
||||
'fusion_suggest_asset_useful_life']:
|
||||
self.assertIn(tool_name, TOOL_DISPATCH)
|
||||
40
fusion_accounting_assets/tests/test_assets_adapter.py
Normal file
40
fusion_accounting_assets/tests/test_assets_adapter.py
Normal file
@@ -0,0 +1,40 @@
|
||||
"""AssetsAdapter wiring tests — fusion-mode dispatch."""
|
||||
|
||||
from datetime import date
|
||||
|
||||
from odoo.tests import tagged
|
||||
from odoo.tests.common import TransactionCase
|
||||
|
||||
from odoo.addons.fusion_accounting_ai.services.data_adapters.assets import (
|
||||
AssetsAdapter,
|
||||
)
|
||||
|
||||
|
||||
@tagged('post_install', '-at_install')
|
||||
class TestAssetsAdapter(TransactionCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.adapter = AssetsAdapter(self.env)
|
||||
|
||||
def test_list_assets_via_fusion(self):
|
||||
self.env['fusion.asset'].create({
|
||||
'name': 'Adapter Test', 'cost': 1000,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
'method': 'straight_line', 'useful_life_years': 4,
|
||||
})
|
||||
result = self.adapter.list_assets_via_fusion(company_id=self.env.company.id)
|
||||
self.assertGreaterEqual(result['count'], 1)
|
||||
|
||||
def test_suggest_useful_life_via_fusion_uses_templated_fallback(self):
|
||||
self.env['ir.config_parameter'].sudo().search([
|
||||
('key', 'in', ['fusion_accounting.provider.asset_useful_life',
|
||||
'fusion_accounting.provider.default'])
|
||||
]).unlink()
|
||||
result = self.adapter.suggest_useful_life_via_fusion(description='laptop')
|
||||
self.assertEqual(result['useful_life_years'], 4)
|
||||
self.assertEqual(result['depreciation_method'], 'straight_line')
|
||||
|
||||
def test_dispose_asset_via_community_returns_error(self):
|
||||
result = self.adapter.dispose_asset_via_community(asset_id=1, sale_amount=100)
|
||||
self.assertIn('error', result)
|
||||
103
fusion_accounting_assets/tests/test_assets_controller.py
Normal file
103
fusion_accounting_assets/tests/test_assets_controller.py
Normal file
@@ -0,0 +1,103 @@
|
||||
"""Controller tests using HttpCase."""
|
||||
|
||||
import json
|
||||
from datetime import date
|
||||
|
||||
from odoo.tests import tagged
|
||||
from odoo.tests.common import HttpCase, new_test_user
|
||||
|
||||
|
||||
@tagged('post_install', '-at_install')
|
||||
class TestAssetsController(HttpCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.user = new_test_user(
|
||||
self.env, login='assets_test_user',
|
||||
groups='base.group_user,account.group_account_invoice',
|
||||
)
|
||||
|
||||
def _jsonrpc(self, endpoint, params):
|
||||
self.authenticate('assets_test_user', 'assets_test_user')
|
||||
url = f'/fusion/assets/{endpoint}'
|
||||
body = {'jsonrpc': '2.0', 'method': 'call', 'params': params, 'id': 1}
|
||||
response = self.url_open(
|
||||
url, data=json.dumps(body),
|
||||
headers={'Content-Type': 'application/json'},
|
||||
)
|
||||
self.assertEqual(
|
||||
response.status_code, 200,
|
||||
f"{endpoint} returned {response.status_code}: {response.text[:300]}",
|
||||
)
|
||||
result = response.json()
|
||||
if 'error' in result:
|
||||
self.fail(f"{endpoint} errored: {result['error']}")
|
||||
return result.get('result', {})
|
||||
|
||||
def test_list_returns_dict(self):
|
||||
result = self._jsonrpc('list', {'company_id': self.env.company.id})
|
||||
self.assertIn('assets', result)
|
||||
self.assertIn('total', result)
|
||||
|
||||
def test_get_detail_returns_asset(self):
|
||||
asset = self.env['fusion.asset'].create({
|
||||
'name': 'Ctrl Test Asset', 'cost': 5000,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
'method': 'straight_line', 'useful_life_years': 5,
|
||||
})
|
||||
result = self._jsonrpc('get_detail', {'asset_id': asset.id})
|
||||
self.assertEqual(result['asset']['id'], asset.id)
|
||||
self.assertIn('depreciation_lines', result)
|
||||
|
||||
def test_compute_schedule_creates_lines(self):
|
||||
asset = self.env['fusion.asset'].create({
|
||||
'name': 'CompTest', 'cost': 4000,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
'method': 'straight_line', 'useful_life_years': 4,
|
||||
})
|
||||
result = self._jsonrpc('compute_schedule', {'asset_id': asset.id})
|
||||
self.assertEqual(result['lines_created'], 4)
|
||||
|
||||
def test_post_depreciation_after_running(self):
|
||||
asset = self.env['fusion.asset'].create({
|
||||
'name': 'PostTest', 'cost': 3000,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
'in_service_date': date(2026, 1, 1),
|
||||
'method': 'straight_line', 'useful_life_years': 3,
|
||||
})
|
||||
self.env['fusion.asset.engine'].compute_depreciation_schedule(asset)
|
||||
asset.action_set_running()
|
||||
result = self._jsonrpc('post_depreciation', {'asset_id': asset.id})
|
||||
self.assertEqual(result['posted_count'], 1)
|
||||
|
||||
def test_dispose_marks_asset_disposed(self):
|
||||
asset = self.env['fusion.asset'].create({
|
||||
'name': 'DispTest', 'cost': 6000,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
'in_service_date': date(2026, 1, 1),
|
||||
'method': 'straight_line', 'useful_life_years': 3,
|
||||
})
|
||||
self.env['fusion.asset.engine'].compute_depreciation_schedule(asset)
|
||||
asset.action_set_running()
|
||||
result = self._jsonrpc('dispose', {
|
||||
'asset_id': asset.id, 'sale_amount': 4000,
|
||||
'sale_date': '2027-06-01', 'disposal_type': 'sale',
|
||||
})
|
||||
self.assertIn('disposal_id', result)
|
||||
asset.invalidate_recordset(['state'])
|
||||
self.assertEqual(asset.state, 'disposed')
|
||||
|
||||
def test_get_anomalies_returns_list(self):
|
||||
result = self._jsonrpc('get_anomalies', {'company_id': self.env.company.id})
|
||||
self.assertIn('anomalies', result)
|
||||
|
||||
def test_suggest_useful_life_returns_dict(self):
|
||||
result = self._jsonrpc('suggest_useful_life', {'description': 'Dell laptop'})
|
||||
self.assertIn('useful_life_years', result)
|
||||
self.assertIn('depreciation_method', result)
|
||||
self.assertEqual(result['useful_life_years'], 4)
|
||||
|
||||
def test_get_partner_history(self):
|
||||
partner = self.env['res.partner'].create({'name': 'History Test Partner'})
|
||||
result = self._jsonrpc('get_partner_history', {'partner_id': partner.id})
|
||||
self.assertEqual(result['partner_id'], partner.id)
|
||||
28
fusion_accounting_assets/tests/test_assets_cron.py
Normal file
28
fusion_accounting_assets/tests/test_assets_cron.py
Normal file
@@ -0,0 +1,28 @@
|
||||
"""Cron handler smoke tests."""
|
||||
|
||||
from datetime import date
|
||||
|
||||
from odoo.tests import tagged
|
||||
from odoo.tests.common import TransactionCase
|
||||
|
||||
|
||||
@tagged('post_install', '-at_install')
|
||||
class TestFusionAssetsCron(TransactionCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.cron = self.env['fusion.assets.cron']
|
||||
self.asset = self.env['fusion.asset'].create({
|
||||
'name': 'Cron Test', 'cost': 4000,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
'in_service_date': date(2026, 1, 1),
|
||||
'method': 'straight_line', 'useful_life_years': 4,
|
||||
})
|
||||
self.env['fusion.asset.engine'].compute_depreciation_schedule(self.asset)
|
||||
self.asset.action_set_running()
|
||||
|
||||
def test_cron_post_due_depreciation_runs(self):
|
||||
self.cron._cron_post_due_depreciation()
|
||||
|
||||
def test_cron_anomaly_scan_runs(self):
|
||||
self.cron._cron_anomaly_scan()
|
||||
Reference in New Issue
Block a user