Files
Odoo-Modules/fusion_accounting_ai/services/data_adapters/assets.py
2026-04-19 17:15:24 -04:00

99 lines
3.8 KiB
Python

"""Assets data adapter — routes asset queries through fusion engine if installed."""
from .base import DataAdapter
from ._registry import register_adapter
class AssetsAdapter(DataAdapter):
FUSION_MODEL = 'fusion.asset.engine'
ENTERPRISE_MODULE = 'account_asset'
# ============================================================
# list_assets
# ============================================================
def list_assets(self, state=None, limit=50, company_id=None):
return self._dispatch(
'list_assets', state=state, limit=limit, company_id=company_id,
)
def list_assets_via_fusion(self, **kwargs):
if 'fusion.asset.engine' not in self.env.registry:
return {'assets': [], 'count': 0, 'total': 0}
Asset = self.env['fusion.asset'].sudo()
domain = [('company_id', '=', kwargs.get('company_id') or self.env.company.id)]
if kwargs.get('state'):
domain.append(('state', '=', kwargs['state']))
total = Asset.search_count(domain)
assets = Asset.search(
domain, limit=int(kwargs.get('limit', 50)),
order='acquisition_date desc',
)
return {
'count': len(assets), 'total': total,
'assets': [{
'id': a.id, 'name': a.name, 'state': a.state,
'cost': a.cost, 'book_value': a.book_value,
'method': a.method,
'category_name': a.category_id.name if a.category_id else None,
} for a in assets],
}
def list_assets_via_enterprise(self, **kwargs):
return {
'assets': [], 'count': 0, 'total': 0,
'error': 'Enterprise account_asset must be queried from Enterprise UI',
}
def list_assets_via_community(self, **kwargs):
return {
'assets': [], 'count': 0, 'total': 0,
'error': 'No assets engine in pure Community',
}
# ============================================================
# suggest_useful_life
# ============================================================
def suggest_useful_life(self, description, amount=None, partner_name=None):
return self._dispatch(
'suggest_useful_life',
description=description, amount=amount, partner_name=partner_name,
)
def suggest_useful_life_via_fusion(self, **kwargs):
if 'fusion.asset.engine' not in self.env.registry:
return {'error': 'fusion_accounting_assets not installed'}
from odoo.addons.fusion_accounting_assets.services.useful_life_predictor import (
predict_useful_life,
)
return predict_useful_life(self.env, **kwargs)
def suggest_useful_life_via_enterprise(self, **kwargs):
return {'error': 'AI useful-life suggestion is fusion-only'}
def suggest_useful_life_via_community(self, **kwargs):
return {'error': 'AI useful-life suggestion is fusion-only'}
# ============================================================
# dispose_asset
# ============================================================
def dispose_asset(self, asset_id, **kwargs):
return self._dispatch('dispose_asset', asset_id=asset_id, **kwargs)
def dispose_asset_via_fusion(self, asset_id, **kwargs):
if 'fusion.asset.engine' not in self.env.registry:
return {'error': 'fusion_accounting_assets not installed'}
asset = self.env['fusion.asset'].sudo().browse(int(asset_id))
return self.env['fusion.asset.engine'].sudo().dispose_asset(asset, **kwargs)
def dispose_asset_via_enterprise(self, asset_id, **kwargs):
return {'error': 'Enterprise asset disposal must use Enterprise UI'}
def dispose_asset_via_community(self, asset_id, **kwargs):
return {'error': 'Community has no asset disposal flow'}
register_adapter('assets', AssetsAdapter)