Compare commits
19 Commits
fusion_acc
...
de6d8fda3e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
de6d8fda3e | ||
|
|
9092a78be2 | ||
|
|
79cd0216ff | ||
|
|
3e8b7b1e82 | ||
|
|
345c971d59 | ||
|
|
54922a0b32 | ||
|
|
38a6e375e6 | ||
|
|
8659f51935 | ||
|
|
5c89763191 | ||
|
|
b68d1b1c66 | ||
|
|
0439d81675 | ||
|
|
70e4404d9b | ||
|
|
bc7ba27d77 | ||
|
|
19cbed5b37 | ||
|
|
b7c171f983 | ||
|
|
bece120ee3 | ||
|
|
3e73ca0eb7 | ||
|
|
99b6990dd6 | ||
|
|
fdfaf7e779 |
165
fusion_accounting/PHASE_3_PLAN.md
Normal file
165
fusion_accounting/PHASE_3_PLAN.md
Normal file
@@ -0,0 +1,165 @@
|
||||
# Phase 3 — Fusion Accounting Assets Implementation Plan
|
||||
|
||||
**Module:** `fusion_accounting_assets`
|
||||
**Branch:** `fusion_accounting/phase-3-assets`
|
||||
**Pre-phase tag:** `fusion_accounting/pre-phase-3`
|
||||
**Estimated tasks:** ~50
|
||||
**Reference:** `/Users/gurpreet/Github/RePackaged-Odoo/accounting/account_asset/` (~2258 LOC Python)
|
||||
|
||||
## Goal
|
||||
|
||||
Replace Odoo Enterprise's `account_asset` module — asset management with depreciation schedules, disposal, partial sale, and reporting. CORE scope: 3 depreciation methods (straight-line, declining balance, units of production), full asset lifecycle, depreciation board, disposal/sale wizards. AI augmentation: utilization anomaly detection + AI-suggested useful life from invoice context. Coexists with Enterprise.
|
||||
|
||||
## Architecture (HYBRID engine, Phase 1+2 pattern)
|
||||
|
||||
```
|
||||
fusion.asset.engine (AbstractModel) ← shared primitives
|
||||
├── compute_depreciation_schedule(asset, recompute=False)
|
||||
├── post_depreciation_entry(asset, period)
|
||||
├── dispose_asset(asset, *, sale_amount, sale_date, sale_partner=None)
|
||||
├── partial_sale(asset, *, sold_amount, sold_qty, sale_date)
|
||||
├── pause_asset(asset, pause_date)
|
||||
├── resume_asset(asset, resume_date)
|
||||
└── reverse_disposal(asset)
|
||||
|
||||
services/ ← pure-Python
|
||||
├── depreciation_methods.py → straight_line, declining_balance, units_of_production
|
||||
├── prorate.py → first/last period prorating (calendar/365/etc.)
|
||||
├── salvage_value.py → end-of-life value math
|
||||
├── anomaly_detection.py → utilization variance vs expected
|
||||
├── useful_life_predictor.py → LLM-suggested useful life from invoice description
|
||||
└── useful_life_prompt.py → provider-agnostic LLM prompt
|
||||
|
||||
models/
|
||||
├── fusion_asset.py → main fusion.asset model
|
||||
├── fusion_asset_depreciation_line.py → depreciation board lines
|
||||
├── fusion_asset_category.py → categories with default settings
|
||||
├── fusion_asset_disposal.py → disposal records
|
||||
├── fusion_asset_anomaly.py → flagged utilization issues
|
||||
├── fusion_asset_engine.py → AbstractModel orchestrator
|
||||
└── account_move.py → inherit (link to asset, generate from invoice)
|
||||
|
||||
controllers/assets_controller.py ← 8 JSON-RPC endpoints
|
||||
├── /fusion/assets/list → paginated asset list with filters
|
||||
├── /fusion/assets/get_detail → single asset with full schedule
|
||||
├── /fusion/assets/compute_schedule → recompute depreciation board
|
||||
├── /fusion/assets/post_depreciation → run periodic depreciation cron
|
||||
├── /fusion/assets/dispose → dispose an asset
|
||||
├── /fusion/assets/get_anomalies → list flagged variances
|
||||
├── /fusion/assets/suggest_useful_life → AI suggest useful life
|
||||
└── /fusion/assets/get_partner_history → asset-related partner history
|
||||
|
||||
static/src/
|
||||
├── scss/ ← asset-specific design tokens
|
||||
├── services/assets_service.js ← reactive state + RPC wrappers
|
||||
├── views/asset_dashboard/ ← top-level OWL controller
|
||||
└── components/ ← asset_card, depreciation_board, disposal_dialog,
|
||||
ai_useful_life_panel, anomaly_strip
|
||||
```
|
||||
|
||||
## Coexistence
|
||||
|
||||
`group_fusion_show_when_enterprise_absent` from `fusion_accounting_core`. Asset menu only visible when `account_asset` NOT installed. Engine + AI tools always available.
|
||||
|
||||
## Tasks (50 total)
|
||||
|
||||
### Group 1: Foundation (1-2)
|
||||
1. Safety net (DONE)
|
||||
2. Plan doc + module skeleton
|
||||
|
||||
### Group 2: Pure-Python services TDD (3-7)
|
||||
3. `services/depreciation_methods.py` — straight_line + declining_balance + units_of_production (TDD)
|
||||
4. `services/prorate.py` — first/last period prorating
|
||||
5. `services/salvage_value.py` — end-of-life math
|
||||
6. `services/anomaly_detection.py` — utilization variance
|
||||
7. `services/useful_life_predictor.py` + `useful_life_prompt.py` — LLM integration
|
||||
|
||||
### Group 3: Persisted models (8-13)
|
||||
8. `models/fusion_asset.py` — main asset model with state machine
|
||||
9. `models/fusion_asset_depreciation_line.py` — depreciation board lines
|
||||
10. `models/fusion_asset_category.py` — categories with defaults
|
||||
11. `models/fusion_asset_disposal.py` — disposal records
|
||||
12. `models/fusion_asset_anomaly.py` — flagged anomalies
|
||||
13. `models/account_move.py` (inherit) — link asset to invoice
|
||||
|
||||
### Group 4: Engine (14-15)
|
||||
14. `models/fusion_asset_engine.py` — 7-method API
|
||||
15. Engine integration tests (compute_schedule + post_depreciation + dispose end-to-end)
|
||||
|
||||
### Group 5: Backend wiring (16-19)
|
||||
16. JSON-RPC controller (8 endpoints)
|
||||
17. AssetsAdapter wiring `_via_fusion` paths
|
||||
18. 5 new AI tools
|
||||
19. Cron — daily depreciation post + monthly anomaly scan
|
||||
|
||||
### Group 6: Tests + perf (20-23)
|
||||
20. Property-based tests (Hypothesis: schedule sums == cost - salvage)
|
||||
21. Integration tests — straight-line + declining-balance + units-of-production
|
||||
22. Materialized view for asset book values (perf)
|
||||
23. Performance benchmarks
|
||||
|
||||
### Group 7: Frontend OWL (24-31)
|
||||
24. SCSS tokens + main asset stylesheet (light + dark)
|
||||
25. `assets_service.js` (reactive state + RPC wrappers)
|
||||
26. `asset_dashboard` (top-level kanban + summary)
|
||||
27. `asset_card` (one asset summary card)
|
||||
28. `asset_detail_panel` (right-side: schedule, history, AI suggestions)
|
||||
29. `depreciation_board` (table view of schedule with edit chevrons)
|
||||
30. `disposal_dialog` (sale/scrap wizard)
|
||||
31. Fusion-only: `ai_useful_life_panel` + `anomaly_strip`
|
||||
|
||||
### Group 8: Wizards (32-35)
|
||||
32. Asset creation wizard (from invoice line)
|
||||
33. Disposal wizard (sale, scrap, donation)
|
||||
34. Partial sale wizard
|
||||
35. Period picker for depreciation runs
|
||||
|
||||
### Group 9: Migration + coexistence (36-39)
|
||||
36. Migration wizard inheritance — backfill from account.asset rows
|
||||
37. Audit report PDF (per-company asset count, total NBV, etc.)
|
||||
38. Menu + window action with coexistence group filter
|
||||
39. Coexistence test
|
||||
|
||||
### Group 10: Final tests + polish (40-50)
|
||||
40. 5 OWL tour tests
|
||||
41. Performance benchmarks (P95: schedule compute < 500ms, board render < 200ms)
|
||||
42. Optimize if benchmarks fail (conditional)
|
||||
43. Local LLM compat test for useful_life_predictor
|
||||
44. Update meta-module manifest
|
||||
45. CLAUDE.md, UPGRADE_NOTES.md, README.md
|
||||
46. End-to-end smoke + tag phase-3-complete + push
|
||||
47-50. Reserved for inherited features: account_move integration, draft journal entries, post-on-confirm flow, fiscal-year-aware proration
|
||||
|
||||
## Performance Targets (P95)
|
||||
|
||||
- `compute_schedule` (10-year asset): <500ms
|
||||
- `post_depreciation_entry`: <200ms
|
||||
- `dispose_asset`: <300ms
|
||||
- Controller `list`: <300ms
|
||||
- Controller `get_detail`: <500ms
|
||||
|
||||
## V19 Conventions (carried from Phase 1+2)
|
||||
|
||||
- `models.Constraint` not `_sql_constraints`
|
||||
- No `@api.depends('id')` on stored compute fields
|
||||
- `@route(type='jsonrpc')` not `type='json'`
|
||||
- `ir.cron` has no `numbercall` field
|
||||
- `res.groups.user_ids` not `users`
|
||||
- `ir.ui.menu.group_ids` not `groups_id`
|
||||
- `models.Constraint` for unique-keys
|
||||
- `env.flush_all()` before MV REFRESH
|
||||
- REFRESH MATERIALIZED VIEW CONCURRENTLY needs autocommit cursor
|
||||
|
||||
## Test Targets
|
||||
|
||||
Match Phase 1+2 test pyramid:
|
||||
- Unit (pure-Python services)
|
||||
- Integration (engine end-to-end)
|
||||
- Property-based (Hypothesis: schedule total invariants)
|
||||
- Controller (HttpCase JSON-RPC)
|
||||
- MV correctness
|
||||
- Performance benchmarks (tagged 'benchmark')
|
||||
- OWL tours (tagged 'tour')
|
||||
- Local LLM smoke (tagged 'local_llm')
|
||||
|
||||
Phase 1+2 final: 287 tests. Phase 3 target: ~140-180 additional → ~430-470 total.
|
||||
@@ -1,42 +1,98 @@
|
||||
"""Assets data adapter."""
|
||||
"""Assets data adapter — routes asset queries through fusion engine if installed."""
|
||||
|
||||
from .base import DataAdapter
|
||||
from ._registry import register_adapter
|
||||
|
||||
|
||||
class AssetsAdapter(DataAdapter):
|
||||
FUSION_MODEL = 'fusion.asset'
|
||||
FUSION_MODEL = 'fusion.asset.engine'
|
||||
ENTERPRISE_MODULE = 'account_asset'
|
||||
|
||||
def list_assets(self, state=None):
|
||||
return self._dispatch('list_assets', state=state)
|
||||
# ============================================================
|
||||
# list_assets
|
||||
# ============================================================
|
||||
|
||||
def list_assets_via_fusion(self, state=None):
|
||||
return self._read_fusion('fusion.asset', state=state)
|
||||
def list_assets(self, state=None, limit=50, company_id=None):
|
||||
return self._dispatch(
|
||||
'list_assets', state=state, limit=limit, company_id=company_id,
|
||||
)
|
||||
|
||||
def list_assets_via_enterprise(self, state=None):
|
||||
return self._read_fusion('account.asset', state=state)
|
||||
def list_assets_via_fusion(self, **kwargs):
|
||||
if 'fusion.asset.engine' not in self.env.registry:
|
||||
return {'assets': [], 'count': 0, 'total': 0}
|
||||
Asset = self.env['fusion.asset'].sudo()
|
||||
domain = [('company_id', '=', kwargs.get('company_id') or self.env.company.id)]
|
||||
if kwargs.get('state'):
|
||||
domain.append(('state', '=', kwargs['state']))
|
||||
total = Asset.search_count(domain)
|
||||
assets = Asset.search(
|
||||
domain, limit=int(kwargs.get('limit', 50)),
|
||||
order='acquisition_date desc',
|
||||
)
|
||||
return {
|
||||
'count': len(assets), 'total': total,
|
||||
'assets': [{
|
||||
'id': a.id, 'name': a.name, 'state': a.state,
|
||||
'cost': a.cost, 'book_value': a.book_value,
|
||||
'method': a.method,
|
||||
'category_name': a.category_id.name if a.category_id else None,
|
||||
} for a in assets],
|
||||
}
|
||||
|
||||
def list_assets_via_community(self, state=None):
|
||||
# No assets feature in pure Community — return empty list with a hint.
|
||||
return []
|
||||
def list_assets_via_enterprise(self, **kwargs):
|
||||
return {
|
||||
'assets': [], 'count': 0, 'total': 0,
|
||||
'error': 'Enterprise account_asset must be queried from Enterprise UI',
|
||||
}
|
||||
|
||||
def _read_fusion(self, model_name, state=None):
|
||||
"""Shared shape between fusion and enterprise (both use account.asset-like API)."""
|
||||
Model = self.env[model_name].sudo()
|
||||
domain = []
|
||||
if state:
|
||||
domain.append(('state', '=', state))
|
||||
records = Model.search(domain, limit=200)
|
||||
out = []
|
||||
for r in records:
|
||||
out.append({
|
||||
'id': r.id,
|
||||
'name': getattr(r, 'name', None),
|
||||
'state': getattr(r, 'state', None),
|
||||
'value': getattr(r, 'original_value', None) or getattr(r, 'acquisition_cost', None),
|
||||
})
|
||||
return out
|
||||
def list_assets_via_community(self, **kwargs):
|
||||
return {
|
||||
'assets': [], 'count': 0, 'total': 0,
|
||||
'error': 'No assets engine in pure Community',
|
||||
}
|
||||
|
||||
# ============================================================
|
||||
# suggest_useful_life
|
||||
# ============================================================
|
||||
|
||||
def suggest_useful_life(self, description, amount=None, partner_name=None):
|
||||
return self._dispatch(
|
||||
'suggest_useful_life',
|
||||
description=description, amount=amount, partner_name=partner_name,
|
||||
)
|
||||
|
||||
def suggest_useful_life_via_fusion(self, **kwargs):
|
||||
if 'fusion.asset.engine' not in self.env.registry:
|
||||
return {'error': 'fusion_accounting_assets not installed'}
|
||||
from odoo.addons.fusion_accounting_assets.services.useful_life_predictor import (
|
||||
predict_useful_life,
|
||||
)
|
||||
return predict_useful_life(self.env, **kwargs)
|
||||
|
||||
def suggest_useful_life_via_enterprise(self, **kwargs):
|
||||
return {'error': 'AI useful-life suggestion is fusion-only'}
|
||||
|
||||
def suggest_useful_life_via_community(self, **kwargs):
|
||||
return {'error': 'AI useful-life suggestion is fusion-only'}
|
||||
|
||||
# ============================================================
|
||||
# dispose_asset
|
||||
# ============================================================
|
||||
|
||||
def dispose_asset(self, asset_id, **kwargs):
|
||||
return self._dispatch('dispose_asset', asset_id=asset_id, **kwargs)
|
||||
|
||||
def dispose_asset_via_fusion(self, asset_id, **kwargs):
|
||||
if 'fusion.asset.engine' not in self.env.registry:
|
||||
return {'error': 'fusion_accounting_assets not installed'}
|
||||
asset = self.env['fusion.asset'].sudo().browse(int(asset_id))
|
||||
return self.env['fusion.asset.engine'].sudo().dispose_asset(asset, **kwargs)
|
||||
|
||||
def dispose_asset_via_enterprise(self, asset_id, **kwargs):
|
||||
return {'error': 'Enterprise asset disposal must use Enterprise UI'}
|
||||
|
||||
def dispose_asset_via_community(self, asset_id, **kwargs):
|
||||
return {'error': 'Community has no asset disposal flow'}
|
||||
|
||||
|
||||
register_adapter('assets', AssetsAdapter)
|
||||
|
||||
@@ -10,11 +10,13 @@ from .adp import TOOLS as ADP_TOOLS
|
||||
from .reporting import TOOLS as REPORTING_TOOLS
|
||||
from .audit import TOOLS as AUDIT_TOOLS
|
||||
from .financial_reports import TOOLS as FINANCIAL_REPORTS_TOOLS
|
||||
from .asset_management import TOOLS as ASSET_MANAGEMENT_TOOLS
|
||||
|
||||
TOOL_DISPATCH = {}
|
||||
for tools_dict in [
|
||||
BANK_RECON_TOOLS, HST_TOOLS, AR_TOOLS, AP_TOOLS, JOURNAL_TOOLS,
|
||||
MONTH_END_TOOLS, PAYROLL_TOOLS, INVENTORY_TOOLS, ADP_TOOLS,
|
||||
REPORTING_TOOLS, AUDIT_TOOLS, FINANCIAL_REPORTS_TOOLS,
|
||||
ASSET_MANAGEMENT_TOOLS,
|
||||
]:
|
||||
TOOL_DISPATCH.update(tools_dict)
|
||||
|
||||
77
fusion_accounting_ai/services/tools/asset_management.py
Normal file
77
fusion_accounting_ai/services/tools/asset_management.py
Normal file
@@ -0,0 +1,77 @@
|
||||
"""Fusion-engine-routed AI tools for asset management."""
|
||||
|
||||
import logging
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def fusion_list_assets(env, params):
|
||||
if 'fusion.asset.engine' not in env.registry:
|
||||
return {'error': 'fusion_accounting_assets not installed'}
|
||||
from ..data_adapters import get_adapter
|
||||
adapter = get_adapter(env, 'assets')
|
||||
return adapter.list_assets(
|
||||
state=params.get('state'),
|
||||
limit=int(params.get('limit', 50)),
|
||||
company_id=int(params['company_id']) if params.get('company_id') else env.company.id,
|
||||
)
|
||||
|
||||
|
||||
def fusion_get_asset_detail(env, params):
|
||||
if 'fusion.asset.engine' not in env.registry:
|
||||
return {'error': 'fusion_accounting_assets not installed'}
|
||||
Asset = env['fusion.asset']
|
||||
asset = Asset.browse(int(params['asset_id']))
|
||||
if not asset.exists():
|
||||
return {'error': 'Asset not found'}
|
||||
return {
|
||||
'asset': {
|
||||
'id': asset.id, 'name': asset.name, 'state': asset.state,
|
||||
'cost': asset.cost, 'book_value': asset.book_value,
|
||||
'total_depreciated': asset.total_depreciated,
|
||||
'method': asset.method, 'useful_life_years': asset.useful_life_years,
|
||||
},
|
||||
'depreciation_count': len(asset.depreciation_line_ids),
|
||||
}
|
||||
|
||||
|
||||
def fusion_compute_asset_schedule(env, params):
|
||||
if 'fusion.asset.engine' not in env.registry:
|
||||
return {'error': 'fusion_accounting_assets not installed'}
|
||||
asset = env['fusion.asset'].browse(int(params['asset_id']))
|
||||
return env['fusion.asset.engine'].compute_depreciation_schedule(
|
||||
asset, recompute=bool(params.get('recompute', False)),
|
||||
)
|
||||
|
||||
|
||||
def fusion_dispose_asset(env, params):
|
||||
if 'fusion.asset.engine' not in env.registry:
|
||||
return {'error': 'fusion_accounting_assets not installed'}
|
||||
from ..data_adapters import get_adapter
|
||||
adapter = get_adapter(env, 'assets')
|
||||
return adapter.dispose_asset(
|
||||
asset_id=int(params['asset_id']),
|
||||
sale_amount=float(params.get('sale_amount', 0)),
|
||||
disposal_type=params.get('disposal_type', 'sale'),
|
||||
)
|
||||
|
||||
|
||||
def fusion_suggest_asset_useful_life(env, params):
|
||||
if 'fusion.asset.engine' not in env.registry:
|
||||
return {'error': 'fusion_accounting_assets not installed'}
|
||||
from ..data_adapters import get_adapter
|
||||
adapter = get_adapter(env, 'assets')
|
||||
return adapter.suggest_useful_life(
|
||||
description=params.get('description', ''),
|
||||
amount=float(params['amount']) if params.get('amount') else None,
|
||||
partner_name=params.get('partner_name'),
|
||||
)
|
||||
|
||||
|
||||
TOOLS = {
|
||||
'fusion_list_assets': fusion_list_assets,
|
||||
'fusion_get_asset_detail': fusion_get_asset_detail,
|
||||
'fusion_compute_asset_schedule': fusion_compute_asset_schedule,
|
||||
'fusion_dispose_asset': fusion_dispose_asset,
|
||||
'fusion_suggest_asset_useful_life': fusion_suggest_asset_useful_life,
|
||||
}
|
||||
3
fusion_accounting_assets/__init__.py
Normal file
3
fusion_accounting_assets/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from . import models
|
||||
from . import services
|
||||
from . import controllers
|
||||
46
fusion_accounting_assets/__manifest__.py
Normal file
46
fusion_accounting_assets/__manifest__.py
Normal file
@@ -0,0 +1,46 @@
|
||||
{
|
||||
'name': 'Fusion Accounting Assets',
|
||||
'version': '19.0.1.0.17',
|
||||
'category': 'Accounting/Accounting',
|
||||
'summary': 'AI-augmented asset management with depreciation schedules.',
|
||||
'description': """
|
||||
Fusion Accounting Assets
|
||||
========================
|
||||
|
||||
A Fusion-native replacement for Odoo Enterprise's account_asset module.
|
||||
|
||||
CORE scope (Phase 3):
|
||||
- 3 depreciation methods: straight-line, declining balance, units of production
|
||||
- Asset lifecycle: draft -> running -> paused -> disposed
|
||||
- Depreciation board with editable schedule
|
||||
- Disposal (sale, scrap, donation) + partial sale wizards
|
||||
- Daily cron for posting periodic depreciation
|
||||
|
||||
AI augmentation:
|
||||
- Anomaly detection on utilization vs expected
|
||||
- AI-suggested useful life from invoice context (LLM)
|
||||
|
||||
Coexists with Enterprise: when account_asset is installed, the Fusion
|
||||
menu hides; the engine + AI tools remain available for the chat.
|
||||
""",
|
||||
'author': 'Fusion Accounting',
|
||||
'license': 'LGPL-3',
|
||||
'depends': [
|
||||
'fusion_accounting_core',
|
||||
'fusion_accounting_ai',
|
||||
'account',
|
||||
'mail',
|
||||
],
|
||||
'data': [
|
||||
'security/ir.model.access.csv',
|
||||
'data/cron.xml',
|
||||
],
|
||||
'assets': {
|
||||
'web.assets_backend': [
|
||||
],
|
||||
},
|
||||
'installable': True,
|
||||
'auto_install': False,
|
||||
'application': False,
|
||||
'icon': '/fusion_accounting_assets/static/description/icon.png',
|
||||
}
|
||||
1
fusion_accounting_assets/controllers/__init__.py
Normal file
1
fusion_accounting_assets/controllers/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from . import assets_controller
|
||||
175
fusion_accounting_assets/controllers/assets_controller.py
Normal file
175
fusion_accounting_assets/controllers/assets_controller.py
Normal file
@@ -0,0 +1,175 @@
|
||||
"""HTTP controller: 8 JSON-RPC endpoints for the OWL asset dashboard.
|
||||
|
||||
All endpoints route through fusion.asset.engine. V19 type='jsonrpc'.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from datetime import date, datetime
|
||||
|
||||
from odoo import _, http
|
||||
from odoo.exceptions import ValidationError
|
||||
from odoo.http import request
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _parse_date(value):
|
||||
if isinstance(value, date):
|
||||
return value
|
||||
if not value:
|
||||
return None
|
||||
return datetime.strptime(value, '%Y-%m-%d').date()
|
||||
|
||||
|
||||
class FusionAssetsController(http.Controller):
|
||||
|
||||
@http.route('/fusion/assets/list', type='jsonrpc', auth='user')
|
||||
def list_assets(self, state=None, category_id=None, limit=50, offset=0,
|
||||
company_id=None):
|
||||
company_id = int(company_id) if company_id else request.env.company.id
|
||||
Asset = request.env['fusion.asset'].sudo()
|
||||
domain = [('company_id', '=', company_id)]
|
||||
if state:
|
||||
domain.append(('state', '=', state))
|
||||
if category_id:
|
||||
domain.append(('category_id', '=', int(category_id)))
|
||||
total = Asset.search_count(domain)
|
||||
assets = Asset.search(domain, limit=int(limit), offset=int(offset),
|
||||
order='acquisition_date desc')
|
||||
return {
|
||||
'count': len(assets),
|
||||
'total': total,
|
||||
'assets': [{
|
||||
'id': a.id, 'name': a.name, 'code': a.code or '',
|
||||
'state': a.state, 'cost': a.cost, 'salvage_value': a.salvage_value,
|
||||
'book_value': a.book_value, 'total_depreciated': a.total_depreciated,
|
||||
'method': a.method, 'useful_life_years': a.useful_life_years,
|
||||
'acquisition_date': str(a.acquisition_date),
|
||||
'in_service_date': str(a.in_service_date) if a.in_service_date else None,
|
||||
'category_id': a.category_id.id if a.category_id else None,
|
||||
'category_name': a.category_id.name if a.category_id else None,
|
||||
'currency_code': a.currency_id.name,
|
||||
} for a in assets],
|
||||
}
|
||||
|
||||
@http.route('/fusion/assets/get_detail', type='jsonrpc', auth='user')
|
||||
def get_detail(self, asset_id):
|
||||
asset = request.env['fusion.asset'].browse(int(asset_id))
|
||||
if not asset.exists():
|
||||
raise ValidationError(_("Asset %s not found") % asset_id)
|
||||
return {
|
||||
'asset': {
|
||||
'id': asset.id, 'name': asset.name, 'code': asset.code or '',
|
||||
'state': asset.state, 'cost': asset.cost,
|
||||
'salvage_value': asset.salvage_value,
|
||||
'book_value': asset.book_value,
|
||||
'total_depreciated': asset.total_depreciated,
|
||||
'method': asset.method,
|
||||
'useful_life_years': asset.useful_life_years,
|
||||
'declining_rate_pct': asset.declining_rate_pct,
|
||||
'total_units_expected': asset.total_units_expected,
|
||||
'units_used_to_date': asset.units_used_to_date,
|
||||
'prorate_convention': asset.prorate_convention,
|
||||
'acquisition_date': str(asset.acquisition_date),
|
||||
'in_service_date': str(asset.in_service_date) if asset.in_service_date else None,
|
||||
'disposed_date': str(asset.disposed_date) if asset.disposed_date else None,
|
||||
'category_id': asset.category_id.id if asset.category_id else None,
|
||||
'category_name': asset.category_id.name if asset.category_id else None,
|
||||
'currency_id': asset.currency_id.id,
|
||||
'currency_code': asset.currency_id.name,
|
||||
},
|
||||
'depreciation_lines': [{
|
||||
'id': l.id, 'period_index': l.period_index,
|
||||
'scheduled_date': str(l.scheduled_date),
|
||||
'amount': l.amount, 'accumulated': l.accumulated,
|
||||
'book_value_at_end': l.book_value_at_end,
|
||||
'is_posted': l.is_posted,
|
||||
'posted_date': str(l.posted_date) if l.posted_date else None,
|
||||
} for l in asset.depreciation_line_ids.sorted('period_index')],
|
||||
'anomalies': [{
|
||||
'id': a.id, 'anomaly_type': a.anomaly_type,
|
||||
'severity': a.severity, 'detail': a.detail or '',
|
||||
'state': a.state,
|
||||
} for a in request.env['fusion.asset.anomaly'].search([
|
||||
('asset_id', '=', asset.id), ('state', 'in', ('new', 'acknowledged'))
|
||||
])],
|
||||
}
|
||||
|
||||
@http.route('/fusion/assets/compute_schedule', type='jsonrpc', auth='user')
|
||||
def compute_schedule(self, asset_id, recompute=False):
|
||||
asset = request.env['fusion.asset'].sudo().browse(int(asset_id))
|
||||
engine = request.env['fusion.asset.engine'].sudo()
|
||||
return engine.compute_depreciation_schedule(asset, recompute=bool(recompute))
|
||||
|
||||
@http.route('/fusion/assets/post_depreciation', type='jsonrpc', auth='user')
|
||||
def post_depreciation(self, asset_id, period_date=None):
|
||||
asset = request.env['fusion.asset'].sudo().browse(int(asset_id))
|
||||
engine = request.env['fusion.asset.engine'].sudo()
|
||||
return engine.post_depreciation_entry(asset, period_date=_parse_date(period_date))
|
||||
|
||||
@http.route('/fusion/assets/dispose', type='jsonrpc', auth='user')
|
||||
def dispose(self, asset_id, sale_amount=0, sale_date=None,
|
||||
sale_partner_id=None, disposal_type='sale'):
|
||||
asset = request.env['fusion.asset'].sudo().browse(int(asset_id))
|
||||
engine = request.env['fusion.asset.engine'].sudo()
|
||||
partner = None
|
||||
if sale_partner_id:
|
||||
partner = request.env['res.partner'].sudo().browse(int(sale_partner_id))
|
||||
return engine.dispose_asset(
|
||||
asset, sale_amount=float(sale_amount),
|
||||
sale_date=_parse_date(sale_date),
|
||||
sale_partner=partner, disposal_type=disposal_type,
|
||||
)
|
||||
|
||||
@http.route('/fusion/assets/get_anomalies', type='jsonrpc', auth='user')
|
||||
def get_anomalies(self, asset_id=None, severity=None, state='new', limit=50,
|
||||
company_id=None):
|
||||
company_id = int(company_id) if company_id else request.env.company.id
|
||||
Anomaly = request.env['fusion.asset.anomaly'].sudo()
|
||||
domain = [('company_id', '=', company_id)]
|
||||
if asset_id:
|
||||
domain.append(('asset_id', '=', int(asset_id)))
|
||||
if severity:
|
||||
domain.append(('severity', '=', severity))
|
||||
if state:
|
||||
domain.append(('state', '=', state))
|
||||
anomalies = Anomaly.search(domain, limit=int(limit), order='detected_at desc')
|
||||
return {
|
||||
'count': len(anomalies),
|
||||
'anomalies': [{
|
||||
'id': a.id, 'asset_id': a.asset_id.id, 'asset_name': a.asset_id.name,
|
||||
'anomaly_type': a.anomaly_type, 'severity': a.severity,
|
||||
'expected': a.expected, 'actual': a.actual,
|
||||
'variance_pct': a.variance_pct, 'detail': a.detail or '',
|
||||
'state': a.state,
|
||||
'detected_at': str(a.detected_at),
|
||||
} for a in anomalies],
|
||||
}
|
||||
|
||||
@http.route('/fusion/assets/suggest_useful_life', type='jsonrpc', auth='user')
|
||||
def suggest_useful_life(self, description, amount=None, partner_name=None):
|
||||
from odoo.addons.fusion_accounting_assets.services.useful_life_predictor import (
|
||||
predict_useful_life,
|
||||
)
|
||||
return predict_useful_life(
|
||||
request.env, description=description,
|
||||
amount=float(amount) if amount is not None else None,
|
||||
partner_name=partner_name,
|
||||
)
|
||||
|
||||
@http.route('/fusion/assets/get_partner_history', type='jsonrpc', auth='user')
|
||||
def get_partner_history(self, partner_id, limit=20):
|
||||
Asset = request.env['fusion.asset'].sudo()
|
||||
assets = Asset.search([
|
||||
('source_invoice_line_id.partner_id', '=', int(partner_id)),
|
||||
], limit=int(limit), order='acquisition_date desc')
|
||||
return {
|
||||
'partner_id': int(partner_id),
|
||||
'count': len(assets),
|
||||
'assets': [{
|
||||
'id': a.id, 'name': a.name,
|
||||
'cost': a.cost, 'book_value': a.book_value,
|
||||
'state': a.state,
|
||||
'acquisition_date': str(a.acquisition_date),
|
||||
} for a in assets],
|
||||
}
|
||||
24
fusion_accounting_assets/data/cron.xml
Normal file
24
fusion_accounting_assets/data/cron.xml
Normal file
@@ -0,0 +1,24 @@
|
||||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<odoo noupdate="1">
|
||||
|
||||
<record id="cron_fusion_assets_post_depreciation" model="ir.cron">
|
||||
<field name="name">Fusion Assets — Post Due Depreciation</field>
|
||||
<field name="model_id" ref="model_fusion_assets_cron"/>
|
||||
<field name="state">code</field>
|
||||
<field name="code">model._cron_post_due_depreciation()</field>
|
||||
<field name="interval_number">1</field>
|
||||
<field name="interval_type">days</field>
|
||||
<field name="active" eval="True"/>
|
||||
</record>
|
||||
|
||||
<record id="cron_fusion_assets_anomaly_scan" model="ir.cron">
|
||||
<field name="name">Fusion Assets — Monthly Anomaly Scan</field>
|
||||
<field name="model_id" ref="model_fusion_assets_cron"/>
|
||||
<field name="state">code</field>
|
||||
<field name="code">model._cron_anomaly_scan()</field>
|
||||
<field name="interval_number">30</field>
|
||||
<field name="interval_type">days</field>
|
||||
<field name="active" eval="True"/>
|
||||
</record>
|
||||
|
||||
</odoo>
|
||||
8
fusion_accounting_assets/models/__init__.py
Normal file
8
fusion_accounting_assets/models/__init__.py
Normal file
@@ -0,0 +1,8 @@
|
||||
from . import fusion_asset_category
|
||||
from . import fusion_asset
|
||||
from . import fusion_asset_depreciation_line
|
||||
from . import fusion_asset_disposal
|
||||
from . import fusion_asset_anomaly
|
||||
from . import account_move
|
||||
from . import fusion_asset_engine
|
||||
from . import fusion_assets_cron
|
||||
34
fusion_accounting_assets/models/account_move.py
Normal file
34
fusion_accounting_assets/models/account_move.py
Normal file
@@ -0,0 +1,34 @@
|
||||
"""Inherit account.move.line to link to fusion.asset records.
|
||||
|
||||
Lets us trace assets back to their source invoice line.
|
||||
"""
|
||||
|
||||
from odoo import fields, models
|
||||
|
||||
|
||||
class AccountMoveLine(models.Model):
|
||||
_inherit = "account.move.line"
|
||||
|
||||
fusion_asset_id = fields.Many2one(
|
||||
'fusion.asset', string='Created Asset',
|
||||
copy=False, ondelete='set null',
|
||||
help="Fusion asset record created from this invoice line.",
|
||||
)
|
||||
|
||||
fusion_asset_count = fields.Integer(compute='_compute_fusion_asset_count')
|
||||
|
||||
def _compute_fusion_asset_count(self):
|
||||
for line in self:
|
||||
line.fusion_asset_count = 1 if line.fusion_asset_id else 0
|
||||
|
||||
def action_open_fusion_asset(self):
|
||||
self.ensure_one()
|
||||
if not self.fusion_asset_id:
|
||||
return
|
||||
return {
|
||||
'type': 'ir.actions.act_window',
|
||||
'res_model': 'fusion.asset',
|
||||
'res_id': self.fusion_asset_id.id,
|
||||
'view_mode': 'form',
|
||||
'target': 'current',
|
||||
}
|
||||
164
fusion_accounting_assets/models/fusion_asset.py
Normal file
164
fusion_accounting_assets/models/fusion_asset.py
Normal file
@@ -0,0 +1,164 @@
|
||||
"""Fusion Asset model.
|
||||
|
||||
Lifecycle: draft -> running -> (paused -> running)* -> disposed.
|
||||
- draft: created, not yet running depreciation
|
||||
- running: depreciation board active, periodic posts happen
|
||||
- paused: depreciation suspended (e.g. asset out for repair)
|
||||
- disposed: sold/scrapped/donated; no further depreciation
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
from odoo import _, api, fields, models
|
||||
from odoo.exceptions import ValidationError
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
METHOD_SELECTION = [
|
||||
('straight_line', 'Straight Line'),
|
||||
('declining_balance', 'Declining Balance'),
|
||||
('units_of_production', 'Units of Production'),
|
||||
]
|
||||
|
||||
PRORATE_SELECTION = [
|
||||
('full_month', 'Full Month'),
|
||||
('days_365', 'Days / 365'),
|
||||
('days_period', 'Days in Period'),
|
||||
]
|
||||
|
||||
STATE_SELECTION = [
|
||||
('draft', 'Draft'),
|
||||
('running', 'Running'),
|
||||
('paused', 'Paused'),
|
||||
('disposed', 'Disposed'),
|
||||
]
|
||||
|
||||
|
||||
class FusionAsset(models.Model):
|
||||
_name = "fusion.asset"
|
||||
_description = "Fusion Fixed Asset"
|
||||
_order = "acquisition_date desc, id desc"
|
||||
_inherit = ['mail.thread', 'mail.activity.mixin']
|
||||
|
||||
name = fields.Char(required=True, tracking=True)
|
||||
code = fields.Char(help="Internal asset code (e.g. tag number).")
|
||||
company_id = fields.Many2one(
|
||||
'res.company', required=True,
|
||||
default=lambda self: self.env.company,
|
||||
)
|
||||
category_id = fields.Many2one('fusion.asset.category', tracking=True)
|
||||
state = fields.Selection(
|
||||
STATE_SELECTION, default='draft', required=True, tracking=True,
|
||||
)
|
||||
|
||||
cost = fields.Monetary(
|
||||
required=True, tracking=True,
|
||||
help="Original acquisition cost.",
|
||||
)
|
||||
salvage_value = fields.Monetary(
|
||||
default=0.0, tracking=True,
|
||||
help="Estimated end-of-life value.",
|
||||
)
|
||||
acquisition_date = fields.Date(
|
||||
required=True, default=fields.Date.today, tracking=True,
|
||||
)
|
||||
in_service_date = fields.Date(
|
||||
tracking=True,
|
||||
help="Date depreciation actually begins.",
|
||||
)
|
||||
disposed_date = fields.Date(readonly=True, tracking=True)
|
||||
currency_id = fields.Many2one(
|
||||
'res.currency', required=True,
|
||||
default=lambda self: self.env.company.currency_id,
|
||||
)
|
||||
|
||||
method = fields.Selection(
|
||||
METHOD_SELECTION, required=True, default='straight_line', tracking=True,
|
||||
)
|
||||
useful_life_years = fields.Integer(
|
||||
default=5, tracking=True,
|
||||
help="For straight_line / declining_balance.",
|
||||
)
|
||||
declining_rate_pct = fields.Float(
|
||||
default=20.0,
|
||||
help="For declining_balance method, e.g. 20.0 = 20%/year.",
|
||||
)
|
||||
total_units_expected = fields.Float(
|
||||
help="For units_of_production method.",
|
||||
)
|
||||
units_used_to_date = fields.Float(
|
||||
default=0.0,
|
||||
help="For units_of_production: track usage.",
|
||||
)
|
||||
prorate_convention = fields.Selection(
|
||||
PRORATE_SELECTION, default='days_period', required=True,
|
||||
)
|
||||
|
||||
source_invoice_line_id = fields.Many2one(
|
||||
'account.move.line', string='Source Invoice Line',
|
||||
help="The invoice line that originated this asset.",
|
||||
)
|
||||
parent_id = fields.Many2one(
|
||||
'fusion.asset', help='For partial-sale child assets.',
|
||||
)
|
||||
|
||||
depreciation_line_ids = fields.One2many(
|
||||
'fusion.asset.depreciation.line', 'asset_id',
|
||||
string='Depreciation Lines',
|
||||
)
|
||||
book_value = fields.Monetary(compute='_compute_book_value', store=True)
|
||||
total_depreciated = fields.Monetary(compute='_compute_book_value', store=True)
|
||||
last_posted_date = fields.Date(compute='_compute_last_posted_date', store=True)
|
||||
|
||||
@api.depends('cost', 'depreciation_line_ids.amount', 'depreciation_line_ids.is_posted')
|
||||
def _compute_book_value(self):
|
||||
for asset in self:
|
||||
posted = sum(l.amount for l in asset.depreciation_line_ids if l.is_posted)
|
||||
asset.total_depreciated = posted
|
||||
asset.book_value = asset.cost - posted
|
||||
|
||||
@api.depends('depreciation_line_ids.is_posted', 'depreciation_line_ids.scheduled_date')
|
||||
def _compute_last_posted_date(self):
|
||||
for asset in self:
|
||||
posted_dates = [
|
||||
l.scheduled_date for l in asset.depreciation_line_ids if l.is_posted
|
||||
]
|
||||
asset.last_posted_date = max(posted_dates) if posted_dates else False
|
||||
|
||||
def action_set_running(self):
|
||||
for asset in self:
|
||||
if asset.state != 'draft':
|
||||
raise ValidationError(_("Only draft assets can be set running."))
|
||||
if not asset.in_service_date:
|
||||
asset.in_service_date = fields.Date.today()
|
||||
asset.state = 'running'
|
||||
|
||||
def action_pause(self):
|
||||
for asset in self:
|
||||
if asset.state != 'running':
|
||||
raise ValidationError(_("Only running assets can be paused."))
|
||||
asset.state = 'paused'
|
||||
|
||||
def action_resume(self):
|
||||
for asset in self:
|
||||
if asset.state != 'paused':
|
||||
raise ValidationError(_("Only paused assets can be resumed."))
|
||||
asset.state = 'running'
|
||||
|
||||
def action_set_draft(self):
|
||||
for asset in self:
|
||||
if asset.state not in ('draft', 'paused'):
|
||||
raise ValidationError(
|
||||
_("Cannot reset to draft from %s.") % asset.state,
|
||||
)
|
||||
asset.state = 'draft'
|
||||
|
||||
_check_cost_positive = models.Constraint(
|
||||
'CHECK(cost >= 0)',
|
||||
'Asset cost must be non-negative.',
|
||||
)
|
||||
_check_salvage_lte_cost = models.Constraint(
|
||||
'CHECK(salvage_value >= 0 AND salvage_value <= cost)',
|
||||
'Salvage value must be between 0 and cost.',
|
||||
)
|
||||
42
fusion_accounting_assets/models/fusion_asset_anomaly.py
Normal file
42
fusion_accounting_assets/models/fusion_asset_anomaly.py
Normal file
@@ -0,0 +1,42 @@
|
||||
"""Persisted asset anomaly flags from the engine's variance detection."""
|
||||
|
||||
from odoo import fields, models
|
||||
|
||||
|
||||
SEVERITY = [('low', 'Low'), ('medium', 'Medium'), ('high', 'High')]
|
||||
ANOMALY_TYPES = [
|
||||
('behind_schedule', 'Behind Schedule'),
|
||||
('ahead_of_schedule', 'Ahead of Schedule'),
|
||||
('low_utilization', 'Low Utilization'),
|
||||
]
|
||||
|
||||
|
||||
class FusionAssetAnomaly(models.Model):
|
||||
_name = "fusion.asset.anomaly"
|
||||
_description = "Flagged Asset Anomaly"
|
||||
_order = "detected_at desc, severity desc"
|
||||
|
||||
asset_id = fields.Many2one('fusion.asset', required=True, ondelete='cascade')
|
||||
company_id = fields.Many2one(related='asset_id.company_id', store=True)
|
||||
anomaly_type = fields.Selection(ANOMALY_TYPES, required=True)
|
||||
severity = fields.Selection(SEVERITY, required=True)
|
||||
expected = fields.Float()
|
||||
actual = fields.Float()
|
||||
variance_pct = fields.Float()
|
||||
detail = fields.Text()
|
||||
detected_at = fields.Datetime(default=fields.Datetime.now, required=True)
|
||||
state = fields.Selection([
|
||||
('new', 'New'),
|
||||
('acknowledged', 'Acknowledged'),
|
||||
('resolved', 'Resolved'),
|
||||
('dismissed', 'Dismissed'),
|
||||
], default='new', required=True)
|
||||
|
||||
def action_acknowledge(self):
|
||||
self.write({'state': 'acknowledged'})
|
||||
|
||||
def action_dismiss(self):
|
||||
self.write({'state': 'dismissed'})
|
||||
|
||||
def action_resolve(self):
|
||||
self.write({'state': 'resolved'})
|
||||
53
fusion_accounting_assets/models/fusion_asset_category.py
Normal file
53
fusion_accounting_assets/models/fusion_asset_category.py
Normal file
@@ -0,0 +1,53 @@
|
||||
"""Asset categories with default settings (used as templates)."""
|
||||
|
||||
from odoo import api, fields, models
|
||||
|
||||
|
||||
class FusionAssetCategory(models.Model):
|
||||
_name = "fusion.asset.category"
|
||||
_description = "Fusion Asset Category"
|
||||
_order = "sequence, name"
|
||||
|
||||
name = fields.Char(required=True, translate=True)
|
||||
sequence = fields.Integer(default=10)
|
||||
company_id = fields.Many2one(
|
||||
'res.company', default=lambda self: self.env.company,
|
||||
)
|
||||
|
||||
method = fields.Selection([
|
||||
('straight_line', 'Straight Line'),
|
||||
('declining_balance', 'Declining Balance'),
|
||||
('units_of_production', 'Units of Production'),
|
||||
], default='straight_line', required=True)
|
||||
useful_life_years = fields.Integer(default=5)
|
||||
declining_rate_pct = fields.Float(default=20.0)
|
||||
salvage_value_pct = fields.Float(
|
||||
default=0.0,
|
||||
help="% of cost (used for new assets in this category).",
|
||||
)
|
||||
prorate_convention = fields.Selection([
|
||||
('full_month', 'Full Month'),
|
||||
('days_365', 'Days / 365'),
|
||||
('days_period', 'Days in Period'),
|
||||
], default='days_period', required=True)
|
||||
|
||||
asset_account_id = fields.Many2one(
|
||||
'account.account', string='Asset Account',
|
||||
domain="[('account_type', 'in', ('asset_fixed', 'asset_non_current'))]",
|
||||
)
|
||||
depreciation_account_id = fields.Many2one(
|
||||
'account.account', string='Depreciation Account',
|
||||
domain="[('account_type', '=', 'asset_fixed')]",
|
||||
)
|
||||
expense_account_id = fields.Many2one(
|
||||
'account.account', string='Expense Account',
|
||||
domain="[('account_type', '=', 'expense_depreciation')]",
|
||||
)
|
||||
|
||||
asset_count = fields.Integer(compute='_compute_asset_count')
|
||||
|
||||
def _compute_asset_count(self):
|
||||
for cat in self:
|
||||
cat.asset_count = self.env['fusion.asset'].search_count([
|
||||
('category_id', '=', cat.id),
|
||||
])
|
||||
@@ -0,0 +1,42 @@
|
||||
"""Per-period depreciation board lines for an asset."""
|
||||
|
||||
from odoo import fields, models
|
||||
|
||||
|
||||
class FusionAssetDepreciationLine(models.Model):
|
||||
_name = "fusion.asset.depreciation.line"
|
||||
_description = "Asset Depreciation Board Line"
|
||||
_order = "asset_id, scheduled_date"
|
||||
|
||||
asset_id = fields.Many2one('fusion.asset', required=True, ondelete='cascade')
|
||||
company_id = fields.Many2one(related='asset_id.company_id', store=True)
|
||||
currency_id = fields.Many2one(related='asset_id.currency_id', store=True)
|
||||
|
||||
period_index = fields.Integer(required=True)
|
||||
scheduled_date = fields.Date(required=True)
|
||||
amount = fields.Monetary(required=True)
|
||||
accumulated = fields.Monetary()
|
||||
book_value_at_end = fields.Monetary()
|
||||
|
||||
is_posted = fields.Boolean(default=False, copy=False)
|
||||
posted_date = fields.Date(copy=False)
|
||||
move_id = fields.Many2one(
|
||||
'account.move', copy=False,
|
||||
help="Journal entry created when this line was posted.",
|
||||
)
|
||||
|
||||
def action_post(self):
|
||||
"""Mark this line as posted (without creating the journal entry yet —
|
||||
engine method post_depreciation_entry handles the actual entry creation)."""
|
||||
for line in self:
|
||||
if line.is_posted:
|
||||
continue
|
||||
line.write({
|
||||
'is_posted': True,
|
||||
'posted_date': fields.Date.today(),
|
||||
})
|
||||
|
||||
_unique_period_per_asset = models.Constraint(
|
||||
'UNIQUE(asset_id, period_index)',
|
||||
'A depreciation line for that period already exists.',
|
||||
)
|
||||
56
fusion_accounting_assets/models/fusion_asset_disposal.py
Normal file
56
fusion_accounting_assets/models/fusion_asset_disposal.py
Normal file
@@ -0,0 +1,56 @@
|
||||
"""Asset disposal records (sale, scrap, donation)."""
|
||||
|
||||
from odoo import api, fields, models
|
||||
|
||||
|
||||
DISPOSAL_TYPES = [
|
||||
('sale', 'Sale'),
|
||||
('scrap', 'Scrap'),
|
||||
('donation', 'Donation'),
|
||||
('lost', 'Lost / Stolen'),
|
||||
]
|
||||
|
||||
|
||||
class FusionAssetDisposal(models.Model):
|
||||
_name = "fusion.asset.disposal"
|
||||
_description = "Asset Disposal Record"
|
||||
_order = "disposal_date desc, id desc"
|
||||
_inherit = ['mail.thread']
|
||||
|
||||
asset_id = fields.Many2one(
|
||||
'fusion.asset', required=True, ondelete='restrict', tracking=True,
|
||||
)
|
||||
company_id = fields.Many2one(related='asset_id.company_id', store=True)
|
||||
currency_id = fields.Many2one(related='asset_id.currency_id', store=True)
|
||||
|
||||
disposal_type = fields.Selection(
|
||||
DISPOSAL_TYPES, required=True, default='sale', tracking=True,
|
||||
)
|
||||
disposal_date = fields.Date(
|
||||
required=True, default=fields.Date.today, tracking=True,
|
||||
)
|
||||
sale_amount = fields.Monetary(
|
||||
default=0.0, tracking=True,
|
||||
help="Cash received (for sale disposal type).",
|
||||
)
|
||||
sale_partner_id = fields.Many2one('res.partner', tracking=True)
|
||||
|
||||
book_value_at_disposal = fields.Monetary(
|
||||
readonly=True,
|
||||
help="Asset book value at disposal date.",
|
||||
)
|
||||
gain_loss_amount = fields.Monetary(compute='_compute_gain_loss', store=True)
|
||||
notes = fields.Text()
|
||||
|
||||
move_id = fields.Many2one(
|
||||
'account.move', readonly=True, copy=False,
|
||||
help="Journal entry created for this disposal.",
|
||||
)
|
||||
|
||||
@api.depends('sale_amount', 'book_value_at_disposal', 'disposal_type')
|
||||
def _compute_gain_loss(self):
|
||||
for d in self:
|
||||
if d.disposal_type == 'sale':
|
||||
d.gain_loss_amount = d.sale_amount - d.book_value_at_disposal
|
||||
else:
|
||||
d.gain_loss_amount = -d.book_value_at_disposal
|
||||
398
fusion_accounting_assets/models/fusion_asset_engine.py
Normal file
398
fusion_accounting_assets/models/fusion_asset_engine.py
Normal file
@@ -0,0 +1,398 @@
|
||||
"""The asset engine — orchestrator for all asset depreciation + lifecycle.
|
||||
|
||||
7-method public API. No direct ORM writes to fusion.asset.depreciation.line
|
||||
or account.move from anywhere else; everything routes through here for
|
||||
consistent validation, audit, and side-effect handling.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from datetime import date, timedelta
|
||||
|
||||
from odoo import _, api, fields, models
|
||||
from odoo.exceptions import ValidationError
|
||||
|
||||
from ..services.depreciation_methods import (
|
||||
straight_line,
|
||||
declining_balance,
|
||||
units_of_production,
|
||||
)
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FusionAssetEngine(models.AbstractModel):
|
||||
_name = "fusion.asset.engine"
|
||||
_description = "Fusion Asset Engine"
|
||||
|
||||
# ============================================================
|
||||
# PUBLIC API (7 methods)
|
||||
# ============================================================
|
||||
|
||||
@api.model
|
||||
def compute_depreciation_schedule(self, asset, *, recompute: bool = False) -> dict:
|
||||
"""Compute (or re-compute) the depreciation board for an asset.
|
||||
|
||||
If recompute=False and posted lines exist, ONLY un-posted future lines
|
||||
are regenerated. If recompute=True, all unposted lines are wiped and
|
||||
regenerated from scratch using current asset config.
|
||||
"""
|
||||
if not asset:
|
||||
raise ValidationError(_("asset is required"))
|
||||
asset.ensure_one()
|
||||
|
||||
self._validate_asset_for_schedule(asset)
|
||||
|
||||
Line = self.env['fusion.asset.depreciation.line'].sudo()
|
||||
if recompute:
|
||||
Line.search([
|
||||
('asset_id', '=', asset.id),
|
||||
('is_posted', '=', False),
|
||||
]).unlink()
|
||||
|
||||
existing_posted = Line.search([
|
||||
('asset_id', '=', asset.id),
|
||||
('is_posted', '=', True),
|
||||
], order='period_index')
|
||||
start_period = max([l.period_index for l in existing_posted], default=-1) + 1
|
||||
accumulated_so_far = sum(l.amount for l in existing_posted)
|
||||
|
||||
steps = self._compute_steps(asset)
|
||||
new_steps = steps[start_period:]
|
||||
|
||||
base_date = asset.in_service_date or asset.acquisition_date
|
||||
|
||||
# Accumulated baseline at the boundary between posted and to-be-created
|
||||
# lines: subtract the accumulated value the algorithm itself reports at
|
||||
# that boundary, then re-add the actually-posted total. This keeps the
|
||||
# board's accumulated column monotonic when picking up mid-life.
|
||||
baseline_offset = 0.0
|
||||
if start_period > 0 and start_period <= len(steps):
|
||||
baseline_offset = steps[start_period - 1].accumulated_depreciation
|
||||
|
||||
line_vals = []
|
||||
for s in new_steps:
|
||||
scheduled_date = self._add_periods(base_date, s.period_index)
|
||||
running_accumulated = round(
|
||||
accumulated_so_far + s.accumulated_depreciation - baseline_offset, 2
|
||||
)
|
||||
line_vals.append({
|
||||
'asset_id': asset.id,
|
||||
'period_index': s.period_index,
|
||||
'scheduled_date': scheduled_date,
|
||||
'amount': s.period_amount,
|
||||
'accumulated': running_accumulated,
|
||||
'book_value_at_end': s.book_value_at_end,
|
||||
'is_posted': False,
|
||||
})
|
||||
if line_vals:
|
||||
Line.create(line_vals)
|
||||
|
||||
return {
|
||||
'asset_id': asset.id,
|
||||
'lines_created': len(line_vals),
|
||||
'total_lines': len(asset.depreciation_line_ids),
|
||||
'method': asset.method,
|
||||
}
|
||||
|
||||
@api.model
|
||||
def post_depreciation_entry(self, asset, *, period_date: date = None) -> dict:
|
||||
"""Post the next-due un-posted depreciation line.
|
||||
|
||||
If period_date provided, post all lines whose scheduled_date <= period_date.
|
||||
Otherwise, post the single next un-posted line (the earliest one).
|
||||
"""
|
||||
asset.ensure_one()
|
||||
if asset.state != 'running':
|
||||
raise ValidationError(
|
||||
_("Cannot post depreciation for asset in state %s") % asset.state
|
||||
)
|
||||
|
||||
Line = self.env['fusion.asset.depreciation.line'].sudo()
|
||||
domain = [('asset_id', '=', asset.id), ('is_posted', '=', False)]
|
||||
if period_date:
|
||||
domain.append(('scheduled_date', '<=', period_date))
|
||||
unposted = Line.search(domain, order='scheduled_date, period_index')
|
||||
if not unposted:
|
||||
return {'posted_count': 0, 'reason': 'no unposted lines due'}
|
||||
|
||||
if not period_date:
|
||||
unposted = unposted[:1]
|
||||
|
||||
posted_ids = []
|
||||
for line in unposted:
|
||||
self._create_journal_entry(asset, line)
|
||||
line.action_post()
|
||||
posted_ids.append(line.id)
|
||||
|
||||
return {'posted_count': len(posted_ids), 'posted_line_ids': posted_ids}
|
||||
|
||||
@api.model
|
||||
def dispose_asset(self, asset, *, sale_amount: float = 0.0,
|
||||
sale_date: date = None, sale_partner=None,
|
||||
disposal_type: str = 'sale') -> dict:
|
||||
"""Dispose an asset (sale, scrap, donation, lost)."""
|
||||
asset.ensure_one()
|
||||
if asset.state == 'disposed':
|
||||
raise ValidationError(_("Asset already disposed."))
|
||||
sale_date = sale_date or fields.Date.today()
|
||||
|
||||
Line = self.env['fusion.asset.depreciation.line'].sudo()
|
||||
future_unposted = Line.search([
|
||||
('asset_id', '=', asset.id),
|
||||
('is_posted', '=', False),
|
||||
('scheduled_date', '>', sale_date),
|
||||
])
|
||||
future_unposted.unlink()
|
||||
|
||||
asset.invalidate_recordset(['book_value', 'total_depreciated'])
|
||||
book_value = asset.book_value
|
||||
|
||||
Disposal = self.env['fusion.asset.disposal'].sudo()
|
||||
partner_id = False
|
||||
if sale_partner:
|
||||
partner_id = sale_partner.id if hasattr(sale_partner, 'id') else sale_partner
|
||||
disposal = Disposal.create({
|
||||
'asset_id': asset.id,
|
||||
'disposal_type': disposal_type,
|
||||
'disposal_date': sale_date,
|
||||
'sale_amount': sale_amount,
|
||||
'sale_partner_id': partner_id,
|
||||
'book_value_at_disposal': book_value,
|
||||
})
|
||||
|
||||
asset.write({
|
||||
'state': 'disposed',
|
||||
'disposed_date': sale_date,
|
||||
})
|
||||
|
||||
return {
|
||||
'asset_id': asset.id,
|
||||
'disposal_id': disposal.id,
|
||||
'gain_loss_amount': disposal.gain_loss_amount,
|
||||
'book_value_at_disposal': book_value,
|
||||
}
|
||||
|
||||
@api.model
|
||||
def partial_sale(self, asset, *, sold_amount: float, sold_qty: float = None,
|
||||
sale_date: date = None, sale_partner=None) -> dict:
|
||||
"""Partially dispose: split asset into two — sold child + remaining parent.
|
||||
|
||||
sold_amount is cash received for the sold portion.
|
||||
sold_qty is the ratio of original cost to attribute to the sold portion (0..1).
|
||||
If sold_qty is None, defaults to sold_amount / cost.
|
||||
"""
|
||||
asset.ensure_one()
|
||||
if asset.state == 'disposed':
|
||||
raise ValidationError(_("Cannot partially sell a disposed asset."))
|
||||
if sold_qty is None:
|
||||
sold_qty = sold_amount / asset.cost if asset.cost else 0
|
||||
if not (0 < sold_qty < 1):
|
||||
raise ValidationError(
|
||||
_("sold_qty must be strictly between 0 and 1; got %s") % sold_qty
|
||||
)
|
||||
|
||||
sale_date = sale_date or fields.Date.today()
|
||||
|
||||
Asset = self.env['fusion.asset'].sudo()
|
||||
sold_cost = round(asset.cost * sold_qty, 2)
|
||||
sold_salvage = round(asset.salvage_value * sold_qty, 2)
|
||||
child_vals = {
|
||||
'name': f"{asset.name} (sold portion)",
|
||||
'parent_id': asset.id,
|
||||
'cost': sold_cost,
|
||||
'salvage_value': sold_salvage,
|
||||
'acquisition_date': asset.acquisition_date,
|
||||
'in_service_date': asset.in_service_date,
|
||||
'method': asset.method,
|
||||
'useful_life_years': asset.useful_life_years,
|
||||
'declining_rate_pct': asset.declining_rate_pct,
|
||||
'prorate_convention': asset.prorate_convention,
|
||||
'company_id': asset.company_id.id,
|
||||
'state': 'running',
|
||||
}
|
||||
if asset.category_id:
|
||||
child_vals['category_id'] = asset.category_id.id
|
||||
child = Asset.create(child_vals)
|
||||
|
||||
new_cost = round(asset.cost - sold_cost, 2)
|
||||
new_salvage = round(asset.salvage_value - sold_salvage, 2)
|
||||
asset.write({
|
||||
'cost': new_cost,
|
||||
'salvage_value': new_salvage,
|
||||
})
|
||||
self.compute_depreciation_schedule(asset, recompute=True)
|
||||
|
||||
result = self.dispose_asset(
|
||||
child, sale_amount=sold_amount, sale_date=sale_date,
|
||||
sale_partner=sale_partner, disposal_type='sale',
|
||||
)
|
||||
return {
|
||||
'parent_asset_id': asset.id,
|
||||
'child_asset_id': child.id,
|
||||
'disposal_id': result['disposal_id'],
|
||||
'gain_loss_amount': result['gain_loss_amount'],
|
||||
}
|
||||
|
||||
@api.model
|
||||
def pause_asset(self, asset, pause_date: date = None) -> dict:
|
||||
"""Pause depreciation. Wraps asset.action_pause for API symmetry and
|
||||
to log the pause date for downstream auditing."""
|
||||
asset.ensure_one()
|
||||
asset.action_pause()
|
||||
return {
|
||||
'asset_id': asset.id,
|
||||
'pause_date': pause_date or fields.Date.today(),
|
||||
'state': 'paused',
|
||||
}
|
||||
|
||||
@api.model
|
||||
def resume_asset(self, asset, resume_date: date = None) -> dict:
|
||||
"""Resume a paused asset."""
|
||||
asset.ensure_one()
|
||||
asset.action_resume()
|
||||
return {
|
||||
'asset_id': asset.id,
|
||||
'resume_date': resume_date or fields.Date.today(),
|
||||
'state': 'running',
|
||||
}
|
||||
|
||||
@api.model
|
||||
def reverse_disposal(self, asset) -> dict:
|
||||
"""Reverse a disposal (rare — recovery from accidental sale entry)."""
|
||||
asset.ensure_one()
|
||||
if asset.state != 'disposed':
|
||||
raise ValidationError(_("Asset is not disposed."))
|
||||
|
||||
Disposal = self.env['fusion.asset.disposal'].sudo()
|
||||
last_disposal = Disposal.search(
|
||||
[('asset_id', '=', asset.id)],
|
||||
order='disposal_date desc, id desc', limit=1,
|
||||
)
|
||||
if last_disposal and last_disposal.move_id:
|
||||
try:
|
||||
last_disposal.move_id.button_cancel()
|
||||
except Exception as e: # noqa: BLE001
|
||||
_logger.warning("Could not cancel disposal move: %s", e)
|
||||
if last_disposal:
|
||||
last_disposal.unlink()
|
||||
asset.write({'state': 'running', 'disposed_date': False})
|
||||
return {'asset_id': asset.id, 'state': 'running'}
|
||||
|
||||
# ============================================================
|
||||
# PRIVATE HELPERS
|
||||
# ============================================================
|
||||
|
||||
def _validate_asset_for_schedule(self, asset):
|
||||
if asset.cost <= 0:
|
||||
raise ValidationError(_("Asset cost must be > 0 to compute schedule."))
|
||||
if asset.method == 'units_of_production' and not asset.total_units_expected:
|
||||
raise ValidationError(_(
|
||||
"Units of Production assets need total_units_expected set."
|
||||
))
|
||||
if asset.method in ('straight_line', 'declining_balance'):
|
||||
if asset.useful_life_years < 1:
|
||||
raise ValidationError(_("useful_life_years must be >= 1."))
|
||||
if asset.salvage_value > asset.cost:
|
||||
raise ValidationError(_("Salvage value cannot exceed cost."))
|
||||
|
||||
def _compute_steps(self, asset) -> list:
|
||||
"""Dispatch to the appropriate depreciation method service."""
|
||||
if asset.method == 'straight_line':
|
||||
return straight_line(
|
||||
cost=asset.cost,
|
||||
salvage_value=asset.salvage_value,
|
||||
n_periods=asset.useful_life_years,
|
||||
)
|
||||
if asset.method == 'declining_balance':
|
||||
return declining_balance(
|
||||
cost=asset.cost,
|
||||
salvage_value=asset.salvage_value,
|
||||
n_periods=asset.useful_life_years,
|
||||
rate=asset.declining_rate_pct / 100.0,
|
||||
)
|
||||
if asset.method == 'units_of_production':
|
||||
# Phase 3 simple: assume even per-period units. Phase 3.5 can read
|
||||
# from a per-period usage table populated by maintenance/IoT data.
|
||||
if asset.useful_life_years:
|
||||
per_period = asset.total_units_expected / asset.useful_life_years
|
||||
periods = asset.useful_life_years
|
||||
else:
|
||||
per_period = asset.total_units_expected
|
||||
periods = 1
|
||||
return units_of_production(
|
||||
cost=asset.cost,
|
||||
salvage_value=asset.salvage_value,
|
||||
total_units_expected=asset.total_units_expected,
|
||||
units_per_period=[per_period] * periods,
|
||||
)
|
||||
return []
|
||||
|
||||
def _add_periods(self, base_date: date, n_periods: int) -> date:
|
||||
"""Add (n_periods + 1) yearly increments to base_date and step back one
|
||||
day, giving the period-end date.
|
||||
|
||||
Phase 3.5 can split this into monthly/quarterly variants when the asset
|
||||
carries a sub-annual frequency.
|
||||
"""
|
||||
try:
|
||||
return base_date.replace(year=base_date.year + n_periods + 1) - timedelta(days=1)
|
||||
except ValueError:
|
||||
return base_date.replace(
|
||||
year=base_date.year + n_periods + 1, day=28,
|
||||
) - timedelta(days=1)
|
||||
|
||||
def _create_journal_entry(self, asset, line):
|
||||
"""Create the journal entry for a depreciation line.
|
||||
|
||||
Phase 3 keeps this minimal: requires the category to have both
|
||||
depreciation_account_id and expense_account_id wired up. Without that,
|
||||
the line is still posted (is_posted flag) but no move is created.
|
||||
Phase 3.5 will add multi-currency, allocation rules, and analytic tags.
|
||||
"""
|
||||
category = asset.category_id
|
||||
if not category or not (category.depreciation_account_id and category.expense_account_id):
|
||||
_logger.debug(
|
||||
"No accounts on category for asset %s; skipping journal entry",
|
||||
asset.id,
|
||||
)
|
||||
return None
|
||||
Move = self.env['account.move'].sudo()
|
||||
journal = self.env['account.journal'].search([
|
||||
('type', '=', 'general'),
|
||||
('company_id', '=', asset.company_id.id),
|
||||
], limit=1)
|
||||
if not journal:
|
||||
_logger.warning(
|
||||
"No general journal for company %s; skipping move creation",
|
||||
asset.company_id.name,
|
||||
)
|
||||
return None
|
||||
try:
|
||||
move = Move.create({
|
||||
'date': line.scheduled_date,
|
||||
'journal_id': journal.id,
|
||||
'ref': f"Depreciation: {asset.name} (P{line.period_index + 1})",
|
||||
'line_ids': [
|
||||
(0, 0, {
|
||||
'name': f"Depreciation expense - {asset.name}",
|
||||
'account_id': category.expense_account_id.id,
|
||||
'debit': line.amount,
|
||||
'credit': 0,
|
||||
}),
|
||||
(0, 0, {
|
||||
'name': f"Accumulated depreciation - {asset.name}",
|
||||
'account_id': category.depreciation_account_id.id,
|
||||
'debit': 0,
|
||||
'credit': line.amount,
|
||||
}),
|
||||
],
|
||||
})
|
||||
move.action_post()
|
||||
line.write({'move_id': move.id})
|
||||
return move
|
||||
except Exception as e: # noqa: BLE001
|
||||
_logger.warning(
|
||||
"Failed to create depreciation move for asset %s line %s: %s",
|
||||
asset.id, line.id, e,
|
||||
)
|
||||
return None
|
||||
85
fusion_accounting_assets/models/fusion_assets_cron.py
Normal file
85
fusion_accounting_assets/models/fusion_assets_cron.py
Normal file
@@ -0,0 +1,85 @@
|
||||
"""Cron handlers for fusion_accounting_assets.
|
||||
|
||||
- _cron_post_due_depreciation: daily, post due depreciation lines for running assets
|
||||
- _cron_anomaly_scan: monthly, scan for schedule variance and create anomaly records
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
from odoo import api, fields, models
|
||||
|
||||
from ..services.anomaly_detection import detect_schedule_variance
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FusionAssetsCron(models.AbstractModel):
|
||||
_name = "fusion.assets.cron"
|
||||
_description = "Fusion Assets Cron Handlers"
|
||||
|
||||
@api.model
|
||||
def _cron_post_due_depreciation(self):
|
||||
"""For each running asset, post any due un-posted depreciation lines."""
|
||||
today = fields.Date.today()
|
||||
engine = self.env['fusion.asset.engine']
|
||||
Asset = self.env['fusion.asset']
|
||||
running_assets = Asset.search([('state', '=', 'running')])
|
||||
posted_total = 0
|
||||
for asset in running_assets:
|
||||
try:
|
||||
with self.env.cr.savepoint():
|
||||
result = engine.post_depreciation_entry(asset, period_date=today)
|
||||
posted_total += result.get('posted_count', 0)
|
||||
except Exception as e: # noqa: BLE001
|
||||
_logger.warning("Cron post failed for asset %s: %s", asset.id, e)
|
||||
_logger.info(
|
||||
"Cron: posted depreciation on %d lines across %d running assets",
|
||||
posted_total, len(running_assets),
|
||||
)
|
||||
|
||||
@api.model
|
||||
def _cron_anomaly_scan(self):
|
||||
"""For each running asset, compare expected accumulated depreciation
|
||||
vs posted, and persist any variance flags."""
|
||||
Asset = self.env['fusion.asset']
|
||||
Anomaly = self.env['fusion.asset.anomaly']
|
||||
running_assets = Asset.search([('state', '=', 'running')])
|
||||
flagged = 0
|
||||
today = fields.Date.today()
|
||||
for asset in running_assets:
|
||||
try:
|
||||
expected = sum(
|
||||
l.amount for l in asset.depreciation_line_ids
|
||||
if l.scheduled_date and l.scheduled_date <= today
|
||||
)
|
||||
actual = asset.total_depreciated
|
||||
anomaly = detect_schedule_variance(
|
||||
asset_id=asset.id, asset_name=asset.name,
|
||||
expected_accumulated=expected, actual_accumulated=actual,
|
||||
)
|
||||
if anomaly is None:
|
||||
continue
|
||||
anomaly_dict = anomaly.to_dict()
|
||||
existing = Anomaly.search([
|
||||
('asset_id', '=', asset.id),
|
||||
('anomaly_type', '=', anomaly_dict['anomaly_type']),
|
||||
('state', 'in', ('new', 'acknowledged')),
|
||||
], limit=1)
|
||||
if existing:
|
||||
continue
|
||||
Anomaly.create({
|
||||
'asset_id': asset.id,
|
||||
'anomaly_type': anomaly_dict['anomaly_type'],
|
||||
'severity': anomaly_dict['severity'],
|
||||
'expected': anomaly_dict['expected'],
|
||||
'actual': anomaly_dict['actual'],
|
||||
'variance_pct': anomaly_dict['variance_pct'],
|
||||
'detail': anomaly_dict['detail'],
|
||||
})
|
||||
flagged += 1
|
||||
except Exception as e: # noqa: BLE001
|
||||
_logger.warning("Cron anomaly scan failed for asset %s: %s", asset.id, e)
|
||||
_logger.info(
|
||||
"Cron: scanned %d assets, flagged %d anomalies",
|
||||
len(running_assets), flagged,
|
||||
)
|
||||
0
fusion_accounting_assets/reports/__init__.py
Normal file
0
fusion_accounting_assets/reports/__init__.py
Normal file
11
fusion_accounting_assets/security/ir.model.access.csv
Normal file
11
fusion_accounting_assets/security/ir.model.access.csv
Normal file
@@ -0,0 +1,11 @@
|
||||
id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink
|
||||
access_fusion_asset_user,fusion.asset.user,model_fusion_asset,base.group_user,1,0,0,0
|
||||
access_fusion_asset_admin,fusion.asset.admin,model_fusion_asset,fusion_accounting_core.group_fusion_accounting_admin,1,1,1,1
|
||||
access_fusion_asset_depreciation_line_user,fusion.asset.depreciation.line.user,model_fusion_asset_depreciation_line,base.group_user,1,0,0,0
|
||||
access_fusion_asset_depreciation_line_admin,fusion.asset.depreciation.line.admin,model_fusion_asset_depreciation_line,fusion_accounting_core.group_fusion_accounting_admin,1,1,1,1
|
||||
access_fusion_asset_category_user,fusion.asset.category.user,model_fusion_asset_category,base.group_user,1,0,0,0
|
||||
access_fusion_asset_category_admin,fusion.asset.category.admin,model_fusion_asset_category,fusion_accounting_core.group_fusion_accounting_admin,1,1,1,1
|
||||
access_fusion_asset_disposal_user,fusion.asset.disposal.user,model_fusion_asset_disposal,base.group_user,1,0,0,0
|
||||
access_fusion_asset_disposal_admin,fusion.asset.disposal.admin,model_fusion_asset_disposal,fusion_accounting_core.group_fusion_accounting_admin,1,1,1,1
|
||||
access_fusion_asset_anomaly_user,fusion.asset.anomaly.user,model_fusion_asset_anomaly,base.group_user,1,0,0,0
|
||||
access_fusion_asset_anomaly_admin,fusion.asset.anomaly.admin,model_fusion_asset_anomaly,fusion_accounting_core.group_fusion_accounting_admin,1,1,1,1
|
||||
|
6
fusion_accounting_assets/services/__init__.py
Normal file
6
fusion_accounting_assets/services/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
from . import depreciation_methods
|
||||
from . import prorate
|
||||
from . import salvage_value
|
||||
from . import anomaly_detection
|
||||
from . import useful_life_prompt
|
||||
from . import useful_life_predictor
|
||||
96
fusion_accounting_assets/services/anomaly_detection.py
Normal file
96
fusion_accounting_assets/services/anomaly_detection.py
Normal file
@@ -0,0 +1,96 @@
|
||||
"""Asset utilization anomaly detection.
|
||||
|
||||
Flags assets where actual usage / posted depreciation deviates significantly
|
||||
from the expected schedule. Three signal types:
|
||||
- behind_schedule: actual depreciation < expected by > threshold pct
|
||||
- ahead_of_schedule: actual > expected (over-depreciated; scrap or recompute)
|
||||
- low_utilization: units_used < expected_units_per_period (waste alert)
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class AssetAnomaly:
|
||||
asset_id: int
|
||||
asset_name: str
|
||||
anomaly_type: str
|
||||
severity: str
|
||||
expected: float
|
||||
actual: float
|
||||
variance_pct: float
|
||||
detail: str
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
'asset_id': self.asset_id,
|
||||
'asset_name': self.asset_name,
|
||||
'anomaly_type': self.anomaly_type,
|
||||
'severity': self.severity,
|
||||
'expected': self.expected,
|
||||
'actual': self.actual,
|
||||
'variance_pct': self.variance_pct,
|
||||
'detail': self.detail,
|
||||
}
|
||||
|
||||
|
||||
DEFAULT_LOW_THRESHOLD_PCT = 10.0
|
||||
DEFAULT_MEDIUM_THRESHOLD_PCT = 25.0
|
||||
DEFAULT_HIGH_THRESHOLD_PCT = 50.0
|
||||
|
||||
|
||||
def detect_schedule_variance(*, asset_id: int, asset_name: str,
|
||||
expected_accumulated: float,
|
||||
actual_accumulated: float) -> AssetAnomaly | None:
|
||||
"""Compare expected accumulated depreciation vs actual posted."""
|
||||
if expected_accumulated <= 0:
|
||||
return None
|
||||
variance_amt = actual_accumulated - expected_accumulated
|
||||
variance_pct = abs(variance_amt) / expected_accumulated * 100
|
||||
if variance_pct < DEFAULT_LOW_THRESHOLD_PCT:
|
||||
return None
|
||||
direction = 'ahead_of_schedule' if variance_amt > 0 else 'behind_schedule'
|
||||
if variance_pct >= DEFAULT_HIGH_THRESHOLD_PCT:
|
||||
severity = 'high'
|
||||
elif variance_pct >= DEFAULT_MEDIUM_THRESHOLD_PCT:
|
||||
severity = 'medium'
|
||||
else:
|
||||
severity = 'low'
|
||||
detail = f"Posted ${actual_accumulated:,.2f} vs expected ${expected_accumulated:,.2f}"
|
||||
return AssetAnomaly(
|
||||
asset_id=asset_id,
|
||||
asset_name=asset_name,
|
||||
anomaly_type=direction,
|
||||
severity=severity,
|
||||
expected=expected_accumulated,
|
||||
actual=actual_accumulated,
|
||||
variance_pct=round(variance_pct, 1),
|
||||
detail=detail,
|
||||
)
|
||||
|
||||
|
||||
def detect_low_utilization(*, asset_id: int, asset_name: str,
|
||||
expected_units: float,
|
||||
actual_units: float) -> AssetAnomaly | None:
|
||||
"""For units-of-production assets: flag low actual usage."""
|
||||
if expected_units <= 0:
|
||||
return None
|
||||
if actual_units >= expected_units * 0.9:
|
||||
return None
|
||||
deficit_pct = (expected_units - actual_units) / expected_units * 100
|
||||
if deficit_pct >= 50:
|
||||
severity = 'high'
|
||||
elif deficit_pct >= 25:
|
||||
severity = 'medium'
|
||||
else:
|
||||
severity = 'low'
|
||||
return AssetAnomaly(
|
||||
asset_id=asset_id,
|
||||
asset_name=asset_name,
|
||||
anomaly_type='low_utilization',
|
||||
severity=severity,
|
||||
expected=expected_units,
|
||||
actual=actual_units,
|
||||
variance_pct=round(deficit_pct, 1),
|
||||
detail=f"Used {actual_units:.0f} of expected {expected_units:.0f} units",
|
||||
)
|
||||
116
fusion_accounting_assets/services/depreciation_methods.py
Normal file
116
fusion_accounting_assets/services/depreciation_methods.py
Normal file
@@ -0,0 +1,116 @@
|
||||
"""Depreciation method primitives.
|
||||
|
||||
Three methods supported:
|
||||
- straight_line: equal periodic charge over useful_life
|
||||
- declining_balance: % per period of remaining book value
|
||||
- units_of_production: charge proportional to units used / total units expected
|
||||
|
||||
All return a list of DepreciationStep dataclasses (period_index, period_amount,
|
||||
accumulated_depreciation, book_value_at_end). Total depreciation always
|
||||
sums to (cost - salvage_value), within 1-cent rounding tolerance.
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Literal
|
||||
|
||||
|
||||
Method = Literal['straight_line', 'declining_balance', 'units_of_production']
|
||||
|
||||
|
||||
@dataclass
|
||||
class DepreciationStep:
|
||||
period_index: int
|
||||
period_amount: float
|
||||
accumulated_depreciation: float
|
||||
book_value_at_end: float
|
||||
|
||||
|
||||
def straight_line(*, cost: float, salvage_value: float = 0.0,
|
||||
n_periods: int) -> list[DepreciationStep]:
|
||||
"""Equal charge per period: (cost - salvage) / n_periods.
|
||||
|
||||
Last period absorbs rounding so total == cost - salvage exactly.
|
||||
"""
|
||||
if n_periods < 1:
|
||||
return []
|
||||
depreciable = cost - salvage_value
|
||||
per_period = round(depreciable / n_periods, 2)
|
||||
steps = []
|
||||
accumulated = 0.0
|
||||
for i in range(n_periods):
|
||||
if i == n_periods - 1:
|
||||
amount = round(depreciable - accumulated, 2)
|
||||
else:
|
||||
amount = per_period
|
||||
accumulated = round(accumulated + amount, 2)
|
||||
book = round(cost - accumulated, 2)
|
||||
steps.append(DepreciationStep(
|
||||
period_index=i,
|
||||
period_amount=amount,
|
||||
accumulated_depreciation=accumulated,
|
||||
book_value_at_end=book,
|
||||
))
|
||||
return steps
|
||||
|
||||
|
||||
def declining_balance(*, cost: float, salvage_value: float = 0.0,
|
||||
n_periods: int, rate: float) -> list[DepreciationStep]:
|
||||
"""Apply `rate` (e.g. 0.20 = 20%) to remaining book each period.
|
||||
|
||||
Switches to straight-line when straight-line would deplete remaining book
|
||||
faster (typical Odoo behavior). Last step caps at salvage_value.
|
||||
"""
|
||||
if n_periods < 1 or rate <= 0:
|
||||
return []
|
||||
if rate >= 1:
|
||||
# Pathological: 100%+ rate. Charge full depreciable amount in period 0.
|
||||
depreciable = round(cost - salvage_value, 2)
|
||||
return [DepreciationStep(0, depreciable, depreciable, round(salvage_value, 2))]
|
||||
steps = []
|
||||
book = cost
|
||||
accumulated = 0.0
|
||||
for i in range(n_periods):
|
||||
remaining_periods = n_periods - i
|
||||
db_amount = round(book * rate, 2)
|
||||
sl_amount = round((book - salvage_value) / remaining_periods, 2) if remaining_periods else 0.0
|
||||
amount = max(db_amount, sl_amount)
|
||||
if book - amount < salvage_value:
|
||||
amount = round(book - salvage_value, 2)
|
||||
accumulated = round(accumulated + amount, 2)
|
||||
book = round(book - amount, 2)
|
||||
steps.append(DepreciationStep(
|
||||
period_index=i,
|
||||
period_amount=amount,
|
||||
accumulated_depreciation=accumulated,
|
||||
book_value_at_end=book,
|
||||
))
|
||||
if book <= salvage_value:
|
||||
break
|
||||
return steps
|
||||
|
||||
|
||||
def units_of_production(*, cost: float, salvage_value: float = 0.0,
|
||||
total_units_expected: float,
|
||||
units_per_period: list[float]) -> list[DepreciationStep]:
|
||||
"""Charge per period = (units_used / total_expected) * (cost - salvage)."""
|
||||
if total_units_expected <= 0:
|
||||
return []
|
||||
depreciable = cost - salvage_value
|
||||
per_unit = depreciable / total_units_expected
|
||||
steps = []
|
||||
accumulated = 0.0
|
||||
for i, units in enumerate(units_per_period):
|
||||
amount = round(units * per_unit, 2)
|
||||
if accumulated + amount > depreciable:
|
||||
amount = round(depreciable - accumulated, 2)
|
||||
accumulated = round(accumulated + amount, 2)
|
||||
book = round(cost - accumulated, 2)
|
||||
steps.append(DepreciationStep(
|
||||
period_index=i,
|
||||
period_amount=amount,
|
||||
accumulated_depreciation=accumulated,
|
||||
book_value_at_end=book,
|
||||
))
|
||||
if accumulated >= depreciable:
|
||||
break
|
||||
return steps
|
||||
34
fusion_accounting_assets/services/prorate.py
Normal file
34
fusion_accounting_assets/services/prorate.py
Normal file
@@ -0,0 +1,34 @@
|
||||
"""Prorating helpers for first-period and last-period depreciation.
|
||||
|
||||
When an asset starts mid-month, the first period charges only a fraction
|
||||
of the full period_amount. Three conventions:
|
||||
- 'full_month': always charge full month (no proration)
|
||||
- 'days_365': pro-rate by actual days / 365
|
||||
- 'days_period': pro-rate by actual days in period / total days in period
|
||||
"""
|
||||
|
||||
from datetime import date
|
||||
from typing import Literal
|
||||
|
||||
|
||||
ProrateConvention = Literal['full_month', 'days_365', 'days_period']
|
||||
|
||||
|
||||
def prorate_factor(*, period_start: date, period_end: date,
|
||||
asset_start: date,
|
||||
convention: ProrateConvention = 'days_period') -> float:
|
||||
"""Return a 0..1 factor for how much of `period`'s depreciation
|
||||
applies to an asset that started on `asset_start`."""
|
||||
if convention == 'full_month':
|
||||
return 1.0
|
||||
if asset_start <= period_start:
|
||||
return 1.0
|
||||
if asset_start > period_end:
|
||||
return 0.0
|
||||
actual_days = (period_end - asset_start).days + 1
|
||||
if convention == 'days_365':
|
||||
return actual_days / 365.0
|
||||
if convention == 'days_period':
|
||||
period_days = (period_end - period_start).days + 1
|
||||
return actual_days / period_days
|
||||
raise ValueError(f"Unknown convention: {convention}")
|
||||
38
fusion_accounting_assets/services/salvage_value.py
Normal file
38
fusion_accounting_assets/services/salvage_value.py
Normal file
@@ -0,0 +1,38 @@
|
||||
"""Salvage value (scrap value) calculation helpers.
|
||||
|
||||
Most clients use straight % of cost; some use fixed dollar amounts.
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Literal
|
||||
|
||||
|
||||
SalvageMethod = Literal['percentage', 'fixed', 'zero']
|
||||
|
||||
|
||||
@dataclass
|
||||
class SalvageConfig:
|
||||
method: SalvageMethod
|
||||
value: float = 0.0
|
||||
|
||||
|
||||
def compute_salvage_value(*, cost: float, config: SalvageConfig) -> float:
|
||||
"""Compute end-of-life salvage value."""
|
||||
if config.method == 'zero':
|
||||
return 0.0
|
||||
if config.method == 'percentage':
|
||||
return round(cost * config.value / 100, 2)
|
||||
if config.method == 'fixed':
|
||||
return round(config.value, 2)
|
||||
raise ValueError(f"Unknown salvage method: {config.method}")
|
||||
|
||||
|
||||
def remaining_useful_life_value(*, current_book: float, salvage: float,
|
||||
periods_used: int, total_periods: int) -> float:
|
||||
"""Estimate remaining value if asset is sold/scrapped now."""
|
||||
if total_periods <= 0:
|
||||
return current_book
|
||||
if periods_used >= total_periods:
|
||||
return salvage
|
||||
remaining_pct = (total_periods - periods_used) / total_periods
|
||||
return round(salvage + (current_book - salvage) * remaining_pct, 2)
|
||||
94
fusion_accounting_assets/services/useful_life_predictor.py
Normal file
94
fusion_accounting_assets/services/useful_life_predictor.py
Normal file
@@ -0,0 +1,94 @@
|
||||
"""AI-suggested useful life from invoice context.
|
||||
|
||||
Wraps useful_life_prompt + an LLMProvider. Returns a dict per the prompt's
|
||||
output contract. Templated fallback when no provider configured.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Templated fallback rules: (regex, years, method, rationale)
|
||||
FALLBACK_RULES = [
|
||||
(r'\b(computer|laptop|monitor|server|workstation)\b', 4, 'straight_line', 'Computer hardware'),
|
||||
(r'\b(furniture|desk|chair|cabinet)\b', 7, 'straight_line', 'Furniture'),
|
||||
(r'\b(vehicle|truck|car|van)\b', 5, 'declining_balance', 'Vehicle (CRA Class 10)'),
|
||||
(r'\b(building|warehouse)\b', 30, 'straight_line', 'Building'),
|
||||
(r'\b(software|license)\b', 4, 'straight_line', 'Software license'),
|
||||
(r'\b(equipment|machinery|machine)\b', 10, 'straight_line', 'Manufacturing equipment'),
|
||||
(r'\b(leasehold improvement)\b', 5, 'straight_line', 'Leasehold improvements'),
|
||||
]
|
||||
FALLBACK_DEFAULT = (5, 'straight_line', 'Generic fixed asset (default)')
|
||||
|
||||
|
||||
def predict_useful_life(env, *, description: str, amount: float = None,
|
||||
partner_name: str = None, provider=None) -> dict:
|
||||
"""Suggest useful life + method via LLM, with templated fallback."""
|
||||
if provider is None:
|
||||
provider = _get_provider(env)
|
||||
if provider is None:
|
||||
return _templated_fallback(description)
|
||||
|
||||
try:
|
||||
from .useful_life_prompt import build_prompt
|
||||
system, user = build_prompt(
|
||||
description=description, amount=amount, partner_name=partner_name,
|
||||
)
|
||||
response = provider.complete(
|
||||
system=system,
|
||||
messages=[{'role': 'user', 'content': user}],
|
||||
max_tokens=400, temperature=0.1,
|
||||
)
|
||||
content = response.get('content') if isinstance(response, dict) else response
|
||||
parsed = json.loads(content)
|
||||
for key in ('useful_life_years', 'depreciation_method', 'rationale'):
|
||||
if key not in parsed:
|
||||
raise ValueError(f"Missing key: {key}")
|
||||
parsed.setdefault('confidence', 0.7)
|
||||
return parsed
|
||||
except Exception as e:
|
||||
_logger.warning("Useful life LLM prediction failed (%s); falling back", e)
|
||||
return _templated_fallback(description)
|
||||
|
||||
|
||||
def _templated_fallback(description: str) -> dict:
|
||||
"""Pattern-match keyword rules. Always returns a usable dict."""
|
||||
desc_lower = description.lower() if description else ''
|
||||
for pattern, years, method, rationale in FALLBACK_RULES:
|
||||
if re.search(pattern, desc_lower):
|
||||
return {
|
||||
'useful_life_years': years,
|
||||
'depreciation_method': method,
|
||||
'rationale': rationale,
|
||||
'confidence': 0.5,
|
||||
}
|
||||
years, method, rationale = FALLBACK_DEFAULT
|
||||
return {
|
||||
'useful_life_years': years,
|
||||
'depreciation_method': method,
|
||||
'rationale': rationale,
|
||||
'confidence': 0.3,
|
||||
}
|
||||
|
||||
|
||||
def _get_provider(env):
|
||||
"""Look up provider for 'asset_useful_life' feature."""
|
||||
param = env['ir.config_parameter'].sudo()
|
||||
name = param.get_param('fusion_accounting.provider.asset_useful_life')
|
||||
if not name:
|
||||
name = param.get_param('fusion_accounting.provider.default')
|
||||
if not name:
|
||||
return None
|
||||
try:
|
||||
from odoo.addons.fusion_accounting_ai.services.adapters.openai_adapter import OpenAIAdapter
|
||||
from odoo.addons.fusion_accounting_ai.services.adapters.claude import ClaudeAdapter
|
||||
except ImportError:
|
||||
return None
|
||||
if name.startswith('openai'):
|
||||
return OpenAIAdapter(env)
|
||||
elif name.startswith('claude'):
|
||||
return ClaudeAdapter(env)
|
||||
return None
|
||||
48
fusion_accounting_assets/services/useful_life_prompt.py
Normal file
48
fusion_accounting_assets/services/useful_life_prompt.py
Normal file
@@ -0,0 +1,48 @@
|
||||
"""LLM prompt builder for AI-suggested useful life from invoice description.
|
||||
|
||||
Output contract:
|
||||
{
|
||||
"useful_life_years": <int>,
|
||||
"depreciation_method": "straight_line" | "declining_balance" | "units_of_production",
|
||||
"rationale": "<short explanation>",
|
||||
"confidence": <float 0-1>
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
SYSTEM_PROMPT = """You are an experienced accountant. Given an invoice line
|
||||
description for a fixed asset, suggest the appropriate useful life in years
|
||||
and depreciation method based on common accounting standards (IFRS / GAAP / CRA).
|
||||
|
||||
Respond ONLY with valid JSON of this exact shape:
|
||||
{
|
||||
"useful_life_years": <integer>,
|
||||
"depreciation_method": "straight_line" | "declining_balance" | "units_of_production",
|
||||
"rationale": "<one or two sentence explanation>",
|
||||
"confidence": <float between 0 and 1>
|
||||
}
|
||||
|
||||
Common useful-life conventions:
|
||||
- Furniture: 7 years, straight-line
|
||||
- Office equipment: 5 years, straight-line
|
||||
- Computers: 3-4 years, straight-line or declining
|
||||
- Vehicles: 5 years, declining-balance (CRA Class 10 30%)
|
||||
- Buildings: 25-40 years, straight-line
|
||||
- Manufacturing equipment: 10-15 years, units of production if measurable
|
||||
- Software (licenses): 3-5 years, straight-line
|
||||
- Leasehold improvements: lesser of lease term or useful life
|
||||
|
||||
Do NOT include markdown code fences. Do NOT include any prose outside the JSON."""
|
||||
|
||||
|
||||
def build_prompt(*, description: str, amount: float = None,
|
||||
partner_name: str = None) -> tuple[str, str]:
|
||||
"""Return (system, user) prompt tuple."""
|
||||
parts = [f"INVOICE LINE: {description}"]
|
||||
if amount is not None:
|
||||
parts.append(f"AMOUNT: ${amount:,.2f}")
|
||||
if partner_name:
|
||||
parts.append(f"VENDOR: {partner_name}")
|
||||
parts.append("")
|
||||
parts.append("Suggest the useful life and depreciation method per the system prompt.")
|
||||
return (SYSTEM_PROMPT, "\n".join(parts))
|
||||
BIN
fusion_accounting_assets/static/description/icon.png
Normal file
BIN
fusion_accounting_assets/static/description/icon.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 72 KiB |
17
fusion_accounting_assets/tests/__init__.py
Normal file
17
fusion_accounting_assets/tests/__init__.py
Normal file
@@ -0,0 +1,17 @@
|
||||
from . import test_depreciation_methods
|
||||
from . import test_prorate
|
||||
from . import test_salvage_value
|
||||
from . import test_asset_anomaly_detection
|
||||
from . import test_useful_life_predictor
|
||||
from . import test_fusion_asset
|
||||
from . import test_fusion_asset_depreciation_line
|
||||
from . import test_fusion_asset_category
|
||||
from . import test_fusion_asset_disposal
|
||||
from . import test_fusion_asset_anomaly
|
||||
from . import test_account_move_inherit
|
||||
from . import test_fusion_asset_engine
|
||||
from . import test_engine_integration
|
||||
from . import test_assets_controller
|
||||
from . import test_assets_adapter
|
||||
from . import test_asset_tools
|
||||
from . import test_assets_cron
|
||||
47
fusion_accounting_assets/tests/test_account_move_inherit.py
Normal file
47
fusion_accounting_assets/tests/test_account_move_inherit.py
Normal file
@@ -0,0 +1,47 @@
|
||||
from datetime import date
|
||||
|
||||
from odoo.tests.common import TransactionCase
|
||||
from odoo.tests import tagged
|
||||
|
||||
|
||||
@tagged('post_install', '-at_install')
|
||||
class TestAccountMoveLineFusionAsset(TransactionCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.asset = self.env['fusion.asset'].create({
|
||||
'name': 'Asset From Invoice',
|
||||
'cost': 8000,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
})
|
||||
self.partner = self.env['res.partner'].create({'name': 'Vendor X'})
|
||||
product = self.env['product.product'].create({'name': 'Test Asset Item'})
|
||||
bill = self.env['account.move'].create({
|
||||
'move_type': 'in_invoice',
|
||||
'partner_id': self.partner.id,
|
||||
'invoice_date': date(2026, 1, 1),
|
||||
'invoice_line_ids': [(0, 0, {
|
||||
'product_id': product.id,
|
||||
'name': 'Test asset purchase',
|
||||
'quantity': 1,
|
||||
'price_unit': 8000,
|
||||
})],
|
||||
})
|
||||
self.invoice_line = bill.invoice_line_ids[0]
|
||||
|
||||
def test_line_starts_without_asset_link(self):
|
||||
self.assertFalse(self.invoice_line.fusion_asset_id)
|
||||
self.assertEqual(self.invoice_line.fusion_asset_count, 0)
|
||||
|
||||
def test_link_invoice_line_to_asset(self):
|
||||
self.invoice_line.fusion_asset_id = self.asset
|
||||
self.assertEqual(self.invoice_line.fusion_asset_id, self.asset)
|
||||
self.invoice_line.invalidate_recordset(['fusion_asset_count'])
|
||||
self.assertEqual(self.invoice_line.fusion_asset_count, 1)
|
||||
|
||||
def test_action_open_fusion_asset_returns_window_action(self):
|
||||
self.invoice_line.fusion_asset_id = self.asset
|
||||
action = self.invoice_line.action_open_fusion_asset()
|
||||
self.assertEqual(action['res_model'], 'fusion.asset')
|
||||
self.assertEqual(action['res_id'], self.asset.id)
|
||||
self.assertEqual(action['view_mode'], 'form')
|
||||
@@ -0,0 +1,71 @@
|
||||
from odoo.tests.common import TransactionCase
|
||||
from odoo.tests import tagged
|
||||
from odoo.addons.fusion_accounting_assets.services.anomaly_detection import (
|
||||
detect_schedule_variance, detect_low_utilization, AssetAnomaly,
|
||||
)
|
||||
|
||||
|
||||
@tagged('post_install', '-at_install')
|
||||
class TestAssetAnomalyDetection(TransactionCase):
|
||||
|
||||
def test_schedule_variance_within_threshold_returns_none(self):
|
||||
# 5% variance < 10% threshold
|
||||
result = detect_schedule_variance(
|
||||
asset_id=1, asset_name='Truck', expected_accumulated=10000,
|
||||
actual_accumulated=10500,
|
||||
)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_schedule_variance_behind_schedule_low_severity(self):
|
||||
# 15% behind: low severity, behind_schedule
|
||||
result = detect_schedule_variance(
|
||||
asset_id=1, asset_name='Truck', expected_accumulated=10000,
|
||||
actual_accumulated=8500,
|
||||
)
|
||||
self.assertIsNotNone(result)
|
||||
self.assertEqual(result.anomaly_type, 'behind_schedule')
|
||||
self.assertEqual(result.severity, 'low')
|
||||
|
||||
def test_schedule_variance_ahead_high_severity(self):
|
||||
# 60% ahead: high severity
|
||||
result = detect_schedule_variance(
|
||||
asset_id=2, asset_name='Server', expected_accumulated=10000,
|
||||
actual_accumulated=16000,
|
||||
)
|
||||
self.assertIsNotNone(result)
|
||||
self.assertEqual(result.anomaly_type, 'ahead_of_schedule')
|
||||
self.assertEqual(result.severity, 'high')
|
||||
|
||||
def test_schedule_variance_zero_expected_returns_none(self):
|
||||
result = detect_schedule_variance(
|
||||
asset_id=1, asset_name='Truck', expected_accumulated=0,
|
||||
actual_accumulated=500,
|
||||
)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_low_utilization_flags_when_underused(self):
|
||||
# 60% deficit -> high severity
|
||||
result = detect_low_utilization(
|
||||
asset_id=3, asset_name='Mill', expected_units=1000, actual_units=400,
|
||||
)
|
||||
self.assertIsNotNone(result)
|
||||
self.assertEqual(result.anomaly_type, 'low_utilization')
|
||||
self.assertEqual(result.severity, 'high')
|
||||
|
||||
def test_low_utilization_within_tolerance_returns_none(self):
|
||||
# 95% used: within 10% tolerance
|
||||
result = detect_low_utilization(
|
||||
asset_id=3, asset_name='Mill', expected_units=1000, actual_units=950,
|
||||
)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_anomaly_to_dict_round_trip(self):
|
||||
anomaly = AssetAnomaly(
|
||||
asset_id=1, asset_name='X', anomaly_type='behind_schedule',
|
||||
severity='medium', expected=100.0, actual=70.0, variance_pct=30.0,
|
||||
detail='example',
|
||||
)
|
||||
d = anomaly.to_dict()
|
||||
self.assertEqual(d['asset_id'], 1)
|
||||
self.assertEqual(d['anomaly_type'], 'behind_schedule')
|
||||
self.assertEqual(d['severity'], 'medium')
|
||||
56
fusion_accounting_assets/tests/test_asset_tools.py
Normal file
56
fusion_accounting_assets/tests/test_asset_tools.py
Normal file
@@ -0,0 +1,56 @@
|
||||
"""Tests for the 5 fusion-asset AI tools."""
|
||||
|
||||
from datetime import date
|
||||
|
||||
from odoo.tests import tagged
|
||||
from odoo.tests.common import TransactionCase
|
||||
|
||||
from odoo.addons.fusion_accounting_ai.services.tools import asset_management as tools
|
||||
|
||||
|
||||
@tagged('post_install', '-at_install')
|
||||
class TestFusionAssetTools(TransactionCase):
|
||||
|
||||
def test_fusion_list_assets(self):
|
||||
self.env['fusion.asset'].create({
|
||||
'name': 'Tool Test', 'cost': 1000,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
'method': 'straight_line', 'useful_life_years': 4,
|
||||
})
|
||||
result = tools.fusion_list_assets(self.env, {'company_id': self.env.company.id})
|
||||
self.assertGreaterEqual(result.get('count', 0), 1)
|
||||
|
||||
def test_fusion_get_asset_detail(self):
|
||||
asset = self.env['fusion.asset'].create({
|
||||
'name': 'Detail Test', 'cost': 1500,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
'method': 'straight_line', 'useful_life_years': 4,
|
||||
})
|
||||
result = tools.fusion_get_asset_detail(self.env, {'asset_id': asset.id})
|
||||
self.assertEqual(result['asset']['name'], 'Detail Test')
|
||||
|
||||
def test_fusion_compute_schedule(self):
|
||||
asset = self.env['fusion.asset'].create({
|
||||
'name': 'Schedule Test', 'cost': 2000,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
'method': 'straight_line', 'useful_life_years': 4,
|
||||
})
|
||||
result = tools.fusion_compute_asset_schedule(self.env, {'asset_id': asset.id})
|
||||
self.assertEqual(result['lines_created'], 4)
|
||||
|
||||
def test_fusion_suggest_useful_life(self):
|
||||
self.env['ir.config_parameter'].sudo().search([
|
||||
('key', 'in', ['fusion_accounting.provider.asset_useful_life',
|
||||
'fusion_accounting.provider.default'])
|
||||
]).unlink()
|
||||
result = tools.fusion_suggest_asset_useful_life(self.env, {
|
||||
'description': 'desk',
|
||||
})
|
||||
self.assertEqual(result['useful_life_years'], 7)
|
||||
|
||||
def test_tools_registered_in_dispatch(self):
|
||||
from odoo.addons.fusion_accounting_ai.services.tools import TOOL_DISPATCH
|
||||
for tool_name in ['fusion_list_assets', 'fusion_get_asset_detail',
|
||||
'fusion_compute_asset_schedule', 'fusion_dispose_asset',
|
||||
'fusion_suggest_asset_useful_life']:
|
||||
self.assertIn(tool_name, TOOL_DISPATCH)
|
||||
40
fusion_accounting_assets/tests/test_assets_adapter.py
Normal file
40
fusion_accounting_assets/tests/test_assets_adapter.py
Normal file
@@ -0,0 +1,40 @@
|
||||
"""AssetsAdapter wiring tests — fusion-mode dispatch."""
|
||||
|
||||
from datetime import date
|
||||
|
||||
from odoo.tests import tagged
|
||||
from odoo.tests.common import TransactionCase
|
||||
|
||||
from odoo.addons.fusion_accounting_ai.services.data_adapters.assets import (
|
||||
AssetsAdapter,
|
||||
)
|
||||
|
||||
|
||||
@tagged('post_install', '-at_install')
|
||||
class TestAssetsAdapter(TransactionCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.adapter = AssetsAdapter(self.env)
|
||||
|
||||
def test_list_assets_via_fusion(self):
|
||||
self.env['fusion.asset'].create({
|
||||
'name': 'Adapter Test', 'cost': 1000,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
'method': 'straight_line', 'useful_life_years': 4,
|
||||
})
|
||||
result = self.adapter.list_assets_via_fusion(company_id=self.env.company.id)
|
||||
self.assertGreaterEqual(result['count'], 1)
|
||||
|
||||
def test_suggest_useful_life_via_fusion_uses_templated_fallback(self):
|
||||
self.env['ir.config_parameter'].sudo().search([
|
||||
('key', 'in', ['fusion_accounting.provider.asset_useful_life',
|
||||
'fusion_accounting.provider.default'])
|
||||
]).unlink()
|
||||
result = self.adapter.suggest_useful_life_via_fusion(description='laptop')
|
||||
self.assertEqual(result['useful_life_years'], 4)
|
||||
self.assertEqual(result['depreciation_method'], 'straight_line')
|
||||
|
||||
def test_dispose_asset_via_community_returns_error(self):
|
||||
result = self.adapter.dispose_asset_via_community(asset_id=1, sale_amount=100)
|
||||
self.assertIn('error', result)
|
||||
103
fusion_accounting_assets/tests/test_assets_controller.py
Normal file
103
fusion_accounting_assets/tests/test_assets_controller.py
Normal file
@@ -0,0 +1,103 @@
|
||||
"""Controller tests using HttpCase."""
|
||||
|
||||
import json
|
||||
from datetime import date
|
||||
|
||||
from odoo.tests import tagged
|
||||
from odoo.tests.common import HttpCase, new_test_user
|
||||
|
||||
|
||||
@tagged('post_install', '-at_install')
|
||||
class TestAssetsController(HttpCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.user = new_test_user(
|
||||
self.env, login='assets_test_user',
|
||||
groups='base.group_user,account.group_account_invoice',
|
||||
)
|
||||
|
||||
def _jsonrpc(self, endpoint, params):
|
||||
self.authenticate('assets_test_user', 'assets_test_user')
|
||||
url = f'/fusion/assets/{endpoint}'
|
||||
body = {'jsonrpc': '2.0', 'method': 'call', 'params': params, 'id': 1}
|
||||
response = self.url_open(
|
||||
url, data=json.dumps(body),
|
||||
headers={'Content-Type': 'application/json'},
|
||||
)
|
||||
self.assertEqual(
|
||||
response.status_code, 200,
|
||||
f"{endpoint} returned {response.status_code}: {response.text[:300]}",
|
||||
)
|
||||
result = response.json()
|
||||
if 'error' in result:
|
||||
self.fail(f"{endpoint} errored: {result['error']}")
|
||||
return result.get('result', {})
|
||||
|
||||
def test_list_returns_dict(self):
|
||||
result = self._jsonrpc('list', {'company_id': self.env.company.id})
|
||||
self.assertIn('assets', result)
|
||||
self.assertIn('total', result)
|
||||
|
||||
def test_get_detail_returns_asset(self):
|
||||
asset = self.env['fusion.asset'].create({
|
||||
'name': 'Ctrl Test Asset', 'cost': 5000,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
'method': 'straight_line', 'useful_life_years': 5,
|
||||
})
|
||||
result = self._jsonrpc('get_detail', {'asset_id': asset.id})
|
||||
self.assertEqual(result['asset']['id'], asset.id)
|
||||
self.assertIn('depreciation_lines', result)
|
||||
|
||||
def test_compute_schedule_creates_lines(self):
|
||||
asset = self.env['fusion.asset'].create({
|
||||
'name': 'CompTest', 'cost': 4000,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
'method': 'straight_line', 'useful_life_years': 4,
|
||||
})
|
||||
result = self._jsonrpc('compute_schedule', {'asset_id': asset.id})
|
||||
self.assertEqual(result['lines_created'], 4)
|
||||
|
||||
def test_post_depreciation_after_running(self):
|
||||
asset = self.env['fusion.asset'].create({
|
||||
'name': 'PostTest', 'cost': 3000,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
'in_service_date': date(2026, 1, 1),
|
||||
'method': 'straight_line', 'useful_life_years': 3,
|
||||
})
|
||||
self.env['fusion.asset.engine'].compute_depreciation_schedule(asset)
|
||||
asset.action_set_running()
|
||||
result = self._jsonrpc('post_depreciation', {'asset_id': asset.id})
|
||||
self.assertEqual(result['posted_count'], 1)
|
||||
|
||||
def test_dispose_marks_asset_disposed(self):
|
||||
asset = self.env['fusion.asset'].create({
|
||||
'name': 'DispTest', 'cost': 6000,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
'in_service_date': date(2026, 1, 1),
|
||||
'method': 'straight_line', 'useful_life_years': 3,
|
||||
})
|
||||
self.env['fusion.asset.engine'].compute_depreciation_schedule(asset)
|
||||
asset.action_set_running()
|
||||
result = self._jsonrpc('dispose', {
|
||||
'asset_id': asset.id, 'sale_amount': 4000,
|
||||
'sale_date': '2027-06-01', 'disposal_type': 'sale',
|
||||
})
|
||||
self.assertIn('disposal_id', result)
|
||||
asset.invalidate_recordset(['state'])
|
||||
self.assertEqual(asset.state, 'disposed')
|
||||
|
||||
def test_get_anomalies_returns_list(self):
|
||||
result = self._jsonrpc('get_anomalies', {'company_id': self.env.company.id})
|
||||
self.assertIn('anomalies', result)
|
||||
|
||||
def test_suggest_useful_life_returns_dict(self):
|
||||
result = self._jsonrpc('suggest_useful_life', {'description': 'Dell laptop'})
|
||||
self.assertIn('useful_life_years', result)
|
||||
self.assertIn('depreciation_method', result)
|
||||
self.assertEqual(result['useful_life_years'], 4)
|
||||
|
||||
def test_get_partner_history(self):
|
||||
partner = self.env['res.partner'].create({'name': 'History Test Partner'})
|
||||
result = self._jsonrpc('get_partner_history', {'partner_id': partner.id})
|
||||
self.assertEqual(result['partner_id'], partner.id)
|
||||
28
fusion_accounting_assets/tests/test_assets_cron.py
Normal file
28
fusion_accounting_assets/tests/test_assets_cron.py
Normal file
@@ -0,0 +1,28 @@
|
||||
"""Cron handler smoke tests."""
|
||||
|
||||
from datetime import date
|
||||
|
||||
from odoo.tests import tagged
|
||||
from odoo.tests.common import TransactionCase
|
||||
|
||||
|
||||
@tagged('post_install', '-at_install')
|
||||
class TestFusionAssetsCron(TransactionCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.cron = self.env['fusion.assets.cron']
|
||||
self.asset = self.env['fusion.asset'].create({
|
||||
'name': 'Cron Test', 'cost': 4000,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
'in_service_date': date(2026, 1, 1),
|
||||
'method': 'straight_line', 'useful_life_years': 4,
|
||||
})
|
||||
self.env['fusion.asset.engine'].compute_depreciation_schedule(self.asset)
|
||||
self.asset.action_set_running()
|
||||
|
||||
def test_cron_post_due_depreciation_runs(self):
|
||||
self.cron._cron_post_due_depreciation()
|
||||
|
||||
def test_cron_anomaly_scan_runs(self):
|
||||
self.cron._cron_anomaly_scan()
|
||||
88
fusion_accounting_assets/tests/test_depreciation_methods.py
Normal file
88
fusion_accounting_assets/tests/test_depreciation_methods.py
Normal file
@@ -0,0 +1,88 @@
|
||||
from odoo.tests.common import TransactionCase
|
||||
from odoo.tests import tagged
|
||||
from odoo.addons.fusion_accounting_assets.services.depreciation_methods import (
|
||||
straight_line, declining_balance, units_of_production,
|
||||
)
|
||||
|
||||
|
||||
@tagged('post_install', '-at_install')
|
||||
class TestStraightLine(TransactionCase):
|
||||
|
||||
def test_total_equals_cost_minus_salvage(self):
|
||||
steps = straight_line(cost=10000, salvage_value=1000, n_periods=5)
|
||||
total = sum(s.period_amount for s in steps)
|
||||
self.assertAlmostEqual(total, 9000, places=2)
|
||||
|
||||
def test_per_period_equal_except_last(self):
|
||||
steps = straight_line(cost=10000, salvage_value=0, n_periods=4)
|
||||
self.assertEqual([s.period_amount for s in steps], [2500.0] * 4)
|
||||
|
||||
def test_last_period_absorbs_rounding(self):
|
||||
steps = straight_line(cost=10000, salvage_value=0, n_periods=3)
|
||||
total = sum(s.period_amount for s in steps)
|
||||
self.assertAlmostEqual(total, 10000, places=2)
|
||||
|
||||
def test_zero_periods_returns_empty(self):
|
||||
self.assertEqual(straight_line(cost=10000, n_periods=0), [])
|
||||
|
||||
def test_book_value_decreasing(self):
|
||||
steps = straight_line(cost=10000, salvage_value=1000, n_periods=5)
|
||||
for i in range(1, len(steps)):
|
||||
self.assertLess(steps[i].book_value_at_end, steps[i - 1].book_value_at_end)
|
||||
|
||||
|
||||
@tagged('post_install', '-at_install')
|
||||
class TestDecliningBalance(TransactionCase):
|
||||
|
||||
def test_total_does_not_exceed_depreciable(self):
|
||||
steps = declining_balance(cost=10000, salvage_value=1000, n_periods=10, rate=0.20)
|
||||
total = sum(s.period_amount for s in steps)
|
||||
self.assertLessEqual(total, 9000.01)
|
||||
|
||||
def test_does_not_go_below_salvage(self):
|
||||
steps = declining_balance(cost=10000, salvage_value=1000, n_periods=10, rate=0.50)
|
||||
for s in steps:
|
||||
self.assertGreaterEqual(s.book_value_at_end, 999.99)
|
||||
|
||||
def test_zero_rate_returns_empty(self):
|
||||
self.assertEqual(declining_balance(cost=10000, n_periods=5, rate=0), [])
|
||||
|
||||
def test_pathological_100pct_rate_one_period(self):
|
||||
steps = declining_balance(cost=10000, salvage_value=500, n_periods=10, rate=1.0)
|
||||
self.assertEqual(len(steps), 1)
|
||||
self.assertAlmostEqual(steps[0].period_amount, 9500, places=2)
|
||||
|
||||
|
||||
@tagged('post_install', '-at_install')
|
||||
class TestUnitsOfProduction(TransactionCase):
|
||||
|
||||
def test_total_proportional_to_units_used(self):
|
||||
steps = units_of_production(
|
||||
cost=20000, salvage_value=2000,
|
||||
total_units_expected=10000,
|
||||
units_per_period=[1000, 2000, 3000, 4000],
|
||||
)
|
||||
total = sum(s.period_amount for s in steps)
|
||||
self.assertAlmostEqual(total, 18000, places=1)
|
||||
|
||||
def test_partial_use_partial_depreciation(self):
|
||||
steps = units_of_production(
|
||||
cost=10000, salvage_value=0,
|
||||
total_units_expected=1000,
|
||||
units_per_period=[200],
|
||||
)
|
||||
self.assertAlmostEqual(steps[0].period_amount, 2000, places=2)
|
||||
|
||||
def test_zero_total_units_returns_empty(self):
|
||||
self.assertEqual(
|
||||
units_of_production(cost=10000, total_units_expected=0, units_per_period=[100]),
|
||||
[],
|
||||
)
|
||||
|
||||
def test_does_not_overshoot_salvage(self):
|
||||
steps = units_of_production(
|
||||
cost=10000, salvage_value=1000,
|
||||
total_units_expected=1000,
|
||||
units_per_period=[2000],
|
||||
)
|
||||
self.assertAlmostEqual(steps[0].period_amount, 9000, places=2)
|
||||
151
fusion_accounting_assets/tests/test_engine_integration.py
Normal file
151
fusion_accounting_assets/tests/test_engine_integration.py
Normal file
@@ -0,0 +1,151 @@
|
||||
"""End-to-end engine integration tests.
|
||||
|
||||
Each test creates a complete realistic asset (with category and accounts),
|
||||
runs the engine through a full lifecycle, and asserts both the model state
|
||||
and the journal entries (where category accounts are configured).
|
||||
"""
|
||||
|
||||
from datetime import date
|
||||
|
||||
from odoo.tests.common import TransactionCase
|
||||
from odoo.tests import tagged
|
||||
from odoo.exceptions import ValidationError
|
||||
|
||||
|
||||
@tagged('post_install', '-at_install', 'integration')
|
||||
class TestAssetEngineIntegration(TransactionCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.engine = self.env['fusion.asset.engine']
|
||||
Account = self.env['account.account']
|
||||
company_id = self.env.company.id
|
||||
self.expense_account = Account.search([
|
||||
('account_type', '=', 'expense_depreciation'),
|
||||
('company_ids', 'in', company_id),
|
||||
], limit=1)
|
||||
if not self.expense_account:
|
||||
self.expense_account = Account.create({
|
||||
'name': 'Test Depreciation Expense',
|
||||
'code': '7180',
|
||||
'account_type': 'expense_depreciation',
|
||||
'company_ids': [(6, 0, [company_id])],
|
||||
})
|
||||
self.dep_account = Account.search([
|
||||
('account_type', '=', 'asset_fixed'),
|
||||
('company_ids', 'in', company_id),
|
||||
], limit=1)
|
||||
if not self.dep_account:
|
||||
self.dep_account = Account.create({
|
||||
'name': 'Test Accumulated Depreciation',
|
||||
'code': '1690',
|
||||
'account_type': 'asset_fixed',
|
||||
'company_ids': [(6, 0, [company_id])],
|
||||
})
|
||||
self.category = self.env['fusion.asset.category'].create({
|
||||
'name': 'Test Category',
|
||||
'method': 'straight_line',
|
||||
'useful_life_years': 5,
|
||||
'asset_account_id': self.dep_account.id,
|
||||
'depreciation_account_id': self.dep_account.id,
|
||||
'expense_account_id': self.expense_account.id,
|
||||
})
|
||||
|
||||
def _make_asset(self, **kwargs):
|
||||
defaults = {
|
||||
'name': 'Integration Asset',
|
||||
'cost': 12000,
|
||||
'salvage_value': 0,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
'in_service_date': date(2026, 1, 1),
|
||||
'method': 'straight_line',
|
||||
'useful_life_years': 4,
|
||||
'category_id': self.category.id,
|
||||
}
|
||||
defaults.update(kwargs)
|
||||
return self.env['fusion.asset'].create(defaults)
|
||||
|
||||
def test_full_lifecycle_straight_line(self):
|
||||
asset = self._make_asset()
|
||||
self.engine.compute_depreciation_schedule(asset)
|
||||
self.assertEqual(len(asset.depreciation_line_ids), 4)
|
||||
self.assertAlmostEqual(
|
||||
sum(asset.depreciation_line_ids.mapped('amount')), 12000, places=2,
|
||||
)
|
||||
|
||||
asset.action_set_running()
|
||||
for _i in range(2):
|
||||
result = self.engine.post_depreciation_entry(asset)
|
||||
self.assertEqual(result['posted_count'], 1)
|
||||
asset.invalidate_recordset(['book_value', 'total_depreciated'])
|
||||
self.assertAlmostEqual(asset.total_depreciated, 6000, places=2)
|
||||
|
||||
def test_post_creates_journal_entry_when_accounts_configured(self):
|
||||
asset = self._make_asset()
|
||||
self.engine.compute_depreciation_schedule(asset)
|
||||
asset.action_set_running()
|
||||
self.engine.post_depreciation_entry(asset)
|
||||
first = asset.depreciation_line_ids.sorted('period_index')[0]
|
||||
self.assertTrue(first.move_id, "Expected journal entry on posted line")
|
||||
moves = first.move_id
|
||||
self.assertAlmostEqual(
|
||||
sum(moves.line_ids.mapped('debit')),
|
||||
sum(moves.line_ids.mapped('credit')),
|
||||
places=2,
|
||||
)
|
||||
|
||||
def test_dispose_caps_future_lines(self):
|
||||
asset = self._make_asset()
|
||||
self.engine.compute_depreciation_schedule(asset)
|
||||
asset.action_set_running()
|
||||
self.engine.post_depreciation_entry(asset)
|
||||
self.engine.dispose_asset(
|
||||
asset, sale_amount=5000, sale_date=date(2027, 6, 1),
|
||||
)
|
||||
self.assertEqual(asset.state, 'disposed')
|
||||
unposted = asset.depreciation_line_ids.filtered(lambda l: not l.is_posted)
|
||||
for line in unposted:
|
||||
self.assertLessEqual(line.scheduled_date, date(2027, 6, 1))
|
||||
|
||||
def test_dispose_records_correct_book_value(self):
|
||||
asset = self._make_asset()
|
||||
self.engine.compute_depreciation_schedule(asset)
|
||||
asset.action_set_running()
|
||||
for _i in range(2):
|
||||
self.engine.post_depreciation_entry(asset)
|
||||
result = self.engine.dispose_asset(
|
||||
asset, sale_amount=8000, sale_date=date(2028, 6, 1),
|
||||
)
|
||||
# Book value at disposal = cost - accumulated = 12000 - 6000 = 6000.
|
||||
self.assertAlmostEqual(result['book_value_at_disposal'], 6000, places=2)
|
||||
# Gain = 8000 - 6000 = 2000.
|
||||
self.assertAlmostEqual(result['gain_loss_amount'], 2000, places=2)
|
||||
|
||||
def test_partial_sale_30pct(self):
|
||||
asset = self._make_asset(cost=10000, salvage_value=0)
|
||||
self.engine.compute_depreciation_schedule(asset)
|
||||
asset.action_set_running()
|
||||
result = self.engine.partial_sale(
|
||||
asset, sold_amount=3500, sold_qty=0.3,
|
||||
sale_date=date(2027, 1, 1),
|
||||
)
|
||||
asset.invalidate_recordset(['cost'])
|
||||
self.assertAlmostEqual(asset.cost, 7000, places=2)
|
||||
child = self.env['fusion.asset'].browse(result['child_asset_id'])
|
||||
self.assertAlmostEqual(child.cost, 3000, places=2)
|
||||
self.assertEqual(child.state, 'disposed')
|
||||
# Child has no posted depreciation; book_value at disposal = 3000.
|
||||
# Gain = 3500 - 3000 = 500.
|
||||
self.assertAlmostEqual(result['gain_loss_amount'], 500, places=0)
|
||||
|
||||
def test_pause_then_resume_lifecycle(self):
|
||||
asset = self._make_asset()
|
||||
self.engine.compute_depreciation_schedule(asset)
|
||||
asset.action_set_running()
|
||||
self.engine.post_depreciation_entry(asset)
|
||||
self.engine.pause_asset(asset)
|
||||
with self.assertRaises(ValidationError):
|
||||
self.engine.post_depreciation_entry(asset)
|
||||
self.engine.resume_asset(asset)
|
||||
result = self.engine.post_depreciation_entry(asset)
|
||||
self.assertEqual(result['posted_count'], 1)
|
||||
59
fusion_accounting_assets/tests/test_fusion_asset.py
Normal file
59
fusion_accounting_assets/tests/test_fusion_asset.py
Normal file
@@ -0,0 +1,59 @@
|
||||
from datetime import date
|
||||
|
||||
from odoo.tests.common import TransactionCase
|
||||
from odoo.tests import tagged
|
||||
from odoo.exceptions import ValidationError
|
||||
|
||||
|
||||
@tagged('post_install', '-at_install')
|
||||
class TestFusionAsset(TransactionCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.asset_vals = {
|
||||
'name': 'Test Asset',
|
||||
'cost': 10000,
|
||||
'salvage_value': 1000,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
'method': 'straight_line',
|
||||
'useful_life_years': 5,
|
||||
}
|
||||
|
||||
def test_create_minimal(self):
|
||||
a = self.env['fusion.asset'].create(self.asset_vals)
|
||||
self.assertEqual(a.state, 'draft')
|
||||
self.assertEqual(a.book_value, 10000)
|
||||
|
||||
def test_state_transitions_draft_to_running(self):
|
||||
a = self.env['fusion.asset'].create(self.asset_vals)
|
||||
a.action_set_running()
|
||||
self.assertEqual(a.state, 'running')
|
||||
self.assertTrue(a.in_service_date)
|
||||
|
||||
def test_pause_resume(self):
|
||||
a = self.env['fusion.asset'].create(self.asset_vals)
|
||||
a.action_set_running()
|
||||
a.action_pause()
|
||||
self.assertEqual(a.state, 'paused')
|
||||
a.action_resume()
|
||||
self.assertEqual(a.state, 'running')
|
||||
|
||||
def test_cannot_pause_from_draft(self):
|
||||
a = self.env['fusion.asset'].create(self.asset_vals)
|
||||
with self.assertRaises(ValidationError):
|
||||
a.action_pause()
|
||||
|
||||
def test_negative_cost_rejected(self):
|
||||
with self.assertRaises(Exception):
|
||||
self.env['fusion.asset'].create({**self.asset_vals, 'cost': -100})
|
||||
|
||||
def test_salvage_exceeds_cost_rejected(self):
|
||||
with self.assertRaises(Exception):
|
||||
self.env['fusion.asset'].create(
|
||||
{**self.asset_vals, 'cost': 1000, 'salvage_value': 5000},
|
||||
)
|
||||
|
||||
def test_book_value_starts_at_cost(self):
|
||||
a = self.env['fusion.asset'].create(self.asset_vals)
|
||||
self.assertEqual(a.book_value, a.cost)
|
||||
self.assertEqual(a.total_depreciated, 0)
|
||||
49
fusion_accounting_assets/tests/test_fusion_asset_anomaly.py
Normal file
49
fusion_accounting_assets/tests/test_fusion_asset_anomaly.py
Normal file
@@ -0,0 +1,49 @@
|
||||
from datetime import date
|
||||
|
||||
from odoo.tests.common import TransactionCase
|
||||
from odoo.tests import tagged
|
||||
|
||||
|
||||
@tagged('post_install', '-at_install')
|
||||
class TestFusionAssetAnomaly(TransactionCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.asset = self.env['fusion.asset'].create({
|
||||
'name': 'Watched Asset',
|
||||
'cost': 5000,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
})
|
||||
|
||||
def _make_anomaly(self, **kw):
|
||||
vals = {
|
||||
'asset_id': self.asset.id,
|
||||
'anomaly_type': 'behind_schedule',
|
||||
'severity': 'medium',
|
||||
'expected': 1000.0,
|
||||
'actual': 700.0,
|
||||
'variance_pct': -30.0,
|
||||
}
|
||||
vals.update(kw)
|
||||
return self.env['fusion.asset.anomaly'].create(vals)
|
||||
|
||||
def test_create_defaults_state_new(self):
|
||||
a = self._make_anomaly()
|
||||
self.assertEqual(a.state, 'new')
|
||||
self.assertTrue(a.detected_at)
|
||||
self.assertEqual(a.company_id, self.asset.company_id)
|
||||
|
||||
def test_acknowledge_transitions(self):
|
||||
a = self._make_anomaly()
|
||||
a.action_acknowledge()
|
||||
self.assertEqual(a.state, 'acknowledged')
|
||||
|
||||
def test_dismiss_transitions(self):
|
||||
a = self._make_anomaly()
|
||||
a.action_dismiss()
|
||||
self.assertEqual(a.state, 'dismissed')
|
||||
|
||||
def test_resolve_transitions(self):
|
||||
a = self._make_anomaly(anomaly_type='low_utilization', severity='high')
|
||||
a.action_resolve()
|
||||
self.assertEqual(a.state, 'resolved')
|
||||
35
fusion_accounting_assets/tests/test_fusion_asset_category.py
Normal file
35
fusion_accounting_assets/tests/test_fusion_asset_category.py
Normal file
@@ -0,0 +1,35 @@
|
||||
from datetime import date
|
||||
|
||||
from odoo.tests.common import TransactionCase
|
||||
from odoo.tests import tagged
|
||||
|
||||
|
||||
@tagged('post_install', '-at_install')
|
||||
class TestFusionAssetCategory(TransactionCase):
|
||||
|
||||
def test_create_with_defaults(self):
|
||||
cat = self.env['fusion.asset.category'].create({'name': 'Computers'})
|
||||
self.assertEqual(cat.method, 'straight_line')
|
||||
self.assertEqual(cat.useful_life_years, 5)
|
||||
self.assertEqual(cat.prorate_convention, 'days_period')
|
||||
self.assertEqual(cat.asset_count, 0)
|
||||
|
||||
def test_asset_count_reflects_linked_assets(self):
|
||||
cat = self.env['fusion.asset.category'].create({'name': 'Vehicles'})
|
||||
for i in range(3):
|
||||
self.env['fusion.asset'].create({
|
||||
'name': f'Truck {i}',
|
||||
'cost': 50000,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
'method': 'declining_balance',
|
||||
'category_id': cat.id,
|
||||
})
|
||||
cat.invalidate_recordset(['asset_count'])
|
||||
self.assertEqual(cat.asset_count, 3)
|
||||
|
||||
def test_method_must_be_in_selection(self):
|
||||
with self.assertRaises(Exception):
|
||||
self.env['fusion.asset.category'].create({
|
||||
'name': 'Bogus',
|
||||
'method': 'not_a_method',
|
||||
})
|
||||
@@ -0,0 +1,62 @@
|
||||
from datetime import date
|
||||
|
||||
from odoo.tests.common import TransactionCase
|
||||
from odoo.tests import tagged
|
||||
|
||||
|
||||
@tagged('post_install', '-at_install')
|
||||
class TestFusionAssetDepreciationLine(TransactionCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.asset = self.env['fusion.asset'].create({
|
||||
'name': 'Asset for Lines',
|
||||
'cost': 12000,
|
||||
'salvage_value': 0,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
'method': 'straight_line',
|
||||
'useful_life_years': 1,
|
||||
})
|
||||
|
||||
def _make_line(self, period_index, amount=1000.0, scheduled_date=None):
|
||||
return self.env['fusion.asset.depreciation.line'].create({
|
||||
'asset_id': self.asset.id,
|
||||
'period_index': period_index,
|
||||
'scheduled_date': scheduled_date or date(2026, period_index, 28),
|
||||
'amount': amount,
|
||||
})
|
||||
|
||||
def test_create_line_defaults_unposted(self):
|
||||
line = self._make_line(1)
|
||||
self.assertFalse(line.is_posted)
|
||||
self.assertFalse(line.posted_date)
|
||||
self.assertFalse(line.move_id)
|
||||
self.assertEqual(line.company_id, self.asset.company_id)
|
||||
self.assertEqual(line.currency_id, self.asset.currency_id)
|
||||
|
||||
def test_action_post_marks_line_posted(self):
|
||||
line = self._make_line(2)
|
||||
line.action_post()
|
||||
self.assertTrue(line.is_posted)
|
||||
self.assertTrue(line.posted_date)
|
||||
|
||||
def test_action_post_idempotent_keeps_first_date(self):
|
||||
line = self._make_line(3)
|
||||
line.action_post()
|
||||
first_date = line.posted_date
|
||||
line.action_post()
|
||||
self.assertEqual(line.posted_date, first_date)
|
||||
|
||||
def test_unique_period_per_asset(self):
|
||||
self._make_line(4)
|
||||
with self.assertRaises(Exception):
|
||||
self._make_line(4)
|
||||
|
||||
def test_book_value_reflects_posted_lines_only(self):
|
||||
l1 = self._make_line(5, amount=1000)
|
||||
self._make_line(6, amount=1500)
|
||||
self.assertEqual(self.asset.book_value, 12000)
|
||||
l1.action_post()
|
||||
self.asset.invalidate_recordset(['book_value', 'total_depreciated'])
|
||||
self.assertEqual(self.asset.total_depreciated, 1000)
|
||||
self.assertEqual(self.asset.book_value, 11000)
|
||||
56
fusion_accounting_assets/tests/test_fusion_asset_disposal.py
Normal file
56
fusion_accounting_assets/tests/test_fusion_asset_disposal.py
Normal file
@@ -0,0 +1,56 @@
|
||||
from datetime import date
|
||||
|
||||
from odoo.tests.common import TransactionCase
|
||||
from odoo.tests import tagged
|
||||
|
||||
|
||||
@tagged('post_install', '-at_install')
|
||||
class TestFusionAssetDisposal(TransactionCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.asset = self.env['fusion.asset'].create({
|
||||
'name': 'Disposable Asset',
|
||||
'cost': 10000,
|
||||
'salvage_value': 0,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
'method': 'straight_line',
|
||||
'useful_life_years': 5,
|
||||
})
|
||||
|
||||
def test_create_minimal_sale(self):
|
||||
d = self.env['fusion.asset.disposal'].create({
|
||||
'asset_id': self.asset.id,
|
||||
'disposal_type': 'sale',
|
||||
'sale_amount': 7000,
|
||||
'book_value_at_disposal': 6000,
|
||||
})
|
||||
self.assertEqual(d.gain_loss_amount, 1000)
|
||||
self.assertEqual(d.company_id, self.asset.company_id)
|
||||
|
||||
def test_sale_at_loss(self):
|
||||
d = self.env['fusion.asset.disposal'].create({
|
||||
'asset_id': self.asset.id,
|
||||
'disposal_type': 'sale',
|
||||
'sale_amount': 4000,
|
||||
'book_value_at_disposal': 6000,
|
||||
})
|
||||
self.assertEqual(d.gain_loss_amount, -2000)
|
||||
|
||||
def test_scrap_full_loss(self):
|
||||
d = self.env['fusion.asset.disposal'].create({
|
||||
'asset_id': self.asset.id,
|
||||
'disposal_type': 'scrap',
|
||||
'sale_amount': 0,
|
||||
'book_value_at_disposal': 6000,
|
||||
})
|
||||
self.assertEqual(d.gain_loss_amount, -6000)
|
||||
|
||||
def test_donation_ignores_sale_amount(self):
|
||||
d = self.env['fusion.asset.disposal'].create({
|
||||
'asset_id': self.asset.id,
|
||||
'disposal_type': 'donation',
|
||||
'sale_amount': 999,
|
||||
'book_value_at_disposal': 6000,
|
||||
})
|
||||
self.assertEqual(d.gain_loss_amount, -6000)
|
||||
115
fusion_accounting_assets/tests/test_fusion_asset_engine.py
Normal file
115
fusion_accounting_assets/tests/test_fusion_asset_engine.py
Normal file
@@ -0,0 +1,115 @@
|
||||
from datetime import date
|
||||
|
||||
from odoo.tests.common import TransactionCase
|
||||
from odoo.tests import tagged
|
||||
from odoo.exceptions import ValidationError
|
||||
|
||||
|
||||
@tagged('post_install', '-at_install')
|
||||
class TestFusionAssetEngine(TransactionCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.engine = self.env['fusion.asset.engine']
|
||||
self.asset = self.env['fusion.asset'].create({
|
||||
'name': 'Test Engine Asset',
|
||||
'cost': 10000,
|
||||
'salvage_value': 1000,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
'in_service_date': date(2026, 1, 1),
|
||||
'method': 'straight_line',
|
||||
'useful_life_years': 5,
|
||||
})
|
||||
|
||||
def test_engine_model_exists(self):
|
||||
self.assertIn('fusion.asset.engine', self.env.registry)
|
||||
|
||||
def test_compute_schedule_straight_line(self):
|
||||
result = self.engine.compute_depreciation_schedule(self.asset)
|
||||
self.assertEqual(result['lines_created'], 5)
|
||||
lines = self.asset.depreciation_line_ids
|
||||
self.assertEqual(len(lines), 5)
|
||||
# Total depreciation should equal cost - salvage = 9000
|
||||
total = sum(lines.mapped('amount'))
|
||||
self.assertAlmostEqual(total, 9000, places=2)
|
||||
|
||||
def test_compute_schedule_declining_balance(self):
|
||||
self.asset.write({'method': 'declining_balance', 'declining_rate_pct': 30.0})
|
||||
self.engine.compute_depreciation_schedule(self.asset)
|
||||
lines = self.asset.depreciation_line_ids
|
||||
self.assertGreater(len(lines), 0)
|
||||
# First-period amount should be cost * rate = 10000 * 0.3 = 3000
|
||||
first = lines.sorted('period_index')[0]
|
||||
self.assertAlmostEqual(first.amount, 3000, places=2)
|
||||
|
||||
def test_compute_schedule_recompute_wipes_unposted(self):
|
||||
self.engine.compute_depreciation_schedule(self.asset)
|
||||
self.asset.write({'useful_life_years': 8})
|
||||
self.engine.compute_depreciation_schedule(self.asset, recompute=True)
|
||||
self.assertEqual(len(self.asset.depreciation_line_ids), 8)
|
||||
|
||||
def test_compute_schedule_validates_zero_cost(self):
|
||||
# Bypass DB constraint with sudo + the constraint allows cost >= 0,
|
||||
# but engine validation requires cost > 0.
|
||||
bad = self.env['fusion.asset'].create({
|
||||
'name': 'Zero',
|
||||
'cost': 0,
|
||||
'acquisition_date': date(2026, 1, 1),
|
||||
'method': 'straight_line',
|
||||
'useful_life_years': 5,
|
||||
})
|
||||
with self.assertRaises(ValidationError):
|
||||
self.engine.compute_depreciation_schedule(bad)
|
||||
|
||||
def test_post_depreciation_entry_marks_line_posted(self):
|
||||
self.engine.compute_depreciation_schedule(self.asset)
|
||||
self.asset.action_set_running()
|
||||
result = self.engine.post_depreciation_entry(self.asset)
|
||||
self.assertEqual(result['posted_count'], 1)
|
||||
first_line = self.asset.depreciation_line_ids.sorted('period_index')[0]
|
||||
self.assertTrue(first_line.is_posted)
|
||||
|
||||
def test_post_depreciation_only_after_running(self):
|
||||
self.engine.compute_depreciation_schedule(self.asset)
|
||||
# asset is still in 'draft' state
|
||||
with self.assertRaises(ValidationError):
|
||||
self.engine.post_depreciation_entry(self.asset)
|
||||
|
||||
def test_dispose_asset_creates_disposal_record(self):
|
||||
self.engine.compute_depreciation_schedule(self.asset)
|
||||
self.asset.action_set_running()
|
||||
result = self.engine.dispose_asset(
|
||||
self.asset, sale_amount=5000, sale_date=date(2027, 6, 1),
|
||||
)
|
||||
self.assertEqual(self.asset.state, 'disposed')
|
||||
self.assertIn('disposal_id', result)
|
||||
self.assertEqual(result['book_value_at_disposal'], self.asset.book_value)
|
||||
|
||||
def test_partial_sale_creates_child_and_disposes(self):
|
||||
self.engine.compute_depreciation_schedule(self.asset)
|
||||
self.asset.action_set_running()
|
||||
original_cost = self.asset.cost
|
||||
result = self.engine.partial_sale(
|
||||
self.asset, sold_amount=3000, sold_qty=0.3,
|
||||
sale_date=date(2027, 6, 1),
|
||||
)
|
||||
self.assertIn('parent_asset_id', result)
|
||||
self.assertIn('child_asset_id', result)
|
||||
self.asset.invalidate_recordset(['cost'])
|
||||
expected_remaining = round(original_cost * 0.7, 2)
|
||||
self.assertAlmostEqual(self.asset.cost, expected_remaining, places=2)
|
||||
|
||||
def test_pause_resume_round_trip(self):
|
||||
self.asset.action_set_running()
|
||||
self.engine.pause_asset(self.asset)
|
||||
self.assertEqual(self.asset.state, 'paused')
|
||||
self.engine.resume_asset(self.asset)
|
||||
self.assertEqual(self.asset.state, 'running')
|
||||
|
||||
def test_reverse_disposal_restores_running_state(self):
|
||||
self.engine.compute_depreciation_schedule(self.asset)
|
||||
self.asset.action_set_running()
|
||||
self.engine.dispose_asset(self.asset, sale_amount=5000)
|
||||
self.assertEqual(self.asset.state, 'disposed')
|
||||
self.engine.reverse_disposal(self.asset)
|
||||
self.assertEqual(self.asset.state, 'running')
|
||||
65
fusion_accounting_assets/tests/test_prorate.py
Normal file
65
fusion_accounting_assets/tests/test_prorate.py
Normal file
@@ -0,0 +1,65 @@
|
||||
from datetime import date
|
||||
|
||||
from odoo.tests.common import TransactionCase
|
||||
from odoo.tests import tagged
|
||||
from odoo.addons.fusion_accounting_assets.services.prorate import prorate_factor
|
||||
|
||||
|
||||
@tagged('post_install', '-at_install')
|
||||
class TestProrate(TransactionCase):
|
||||
|
||||
def test_full_month_convention_always_one(self):
|
||||
f = prorate_factor(
|
||||
period_start=date(2026, 1, 1),
|
||||
period_end=date(2026, 1, 31),
|
||||
asset_start=date(2026, 1, 15),
|
||||
convention='full_month',
|
||||
)
|
||||
self.assertEqual(f, 1.0)
|
||||
|
||||
def test_asset_starts_before_period_full_factor(self):
|
||||
f = prorate_factor(
|
||||
period_start=date(2026, 1, 1),
|
||||
period_end=date(2026, 1, 31),
|
||||
asset_start=date(2025, 12, 1),
|
||||
convention='days_period',
|
||||
)
|
||||
self.assertEqual(f, 1.0)
|
||||
|
||||
def test_asset_starts_after_period_zero_factor(self):
|
||||
f = prorate_factor(
|
||||
period_start=date(2026, 1, 1),
|
||||
period_end=date(2026, 1, 31),
|
||||
asset_start=date(2026, 2, 5),
|
||||
convention='days_period',
|
||||
)
|
||||
self.assertEqual(f, 0.0)
|
||||
|
||||
def test_days_period_mid_month(self):
|
||||
# Jan 16 -> Jan 31 inclusive = 16 days; period = 31 days
|
||||
f = prorate_factor(
|
||||
period_start=date(2026, 1, 1),
|
||||
period_end=date(2026, 1, 31),
|
||||
asset_start=date(2026, 1, 16),
|
||||
convention='days_period',
|
||||
)
|
||||
self.assertAlmostEqual(f, 16 / 31, places=5)
|
||||
|
||||
def test_days_365_mid_month(self):
|
||||
# 16 days / 365
|
||||
f = prorate_factor(
|
||||
period_start=date(2026, 1, 1),
|
||||
period_end=date(2026, 1, 31),
|
||||
asset_start=date(2026, 1, 16),
|
||||
convention='days_365',
|
||||
)
|
||||
self.assertAlmostEqual(f, 16 / 365.0, places=5)
|
||||
|
||||
def test_unknown_convention_raises(self):
|
||||
with self.assertRaises(ValueError):
|
||||
prorate_factor(
|
||||
period_start=date(2026, 1, 1),
|
||||
period_end=date(2026, 1, 31),
|
||||
asset_start=date(2026, 1, 15),
|
||||
convention='bogus', # type: ignore[arg-type]
|
||||
)
|
||||
45
fusion_accounting_assets/tests/test_salvage_value.py
Normal file
45
fusion_accounting_assets/tests/test_salvage_value.py
Normal file
@@ -0,0 +1,45 @@
|
||||
from odoo.tests.common import TransactionCase
|
||||
from odoo.tests import tagged
|
||||
from odoo.addons.fusion_accounting_assets.services.salvage_value import (
|
||||
SalvageConfig, compute_salvage_value, remaining_useful_life_value,
|
||||
)
|
||||
|
||||
|
||||
@tagged('post_install', '-at_install')
|
||||
class TestSalvageValue(TransactionCase):
|
||||
|
||||
def test_zero_method_returns_zero(self):
|
||||
v = compute_salvage_value(cost=10000, config=SalvageConfig(method='zero'))
|
||||
self.assertEqual(v, 0.0)
|
||||
|
||||
def test_percentage_method(self):
|
||||
v = compute_salvage_value(
|
||||
cost=10000, config=SalvageConfig(method='percentage', value=10),
|
||||
)
|
||||
self.assertAlmostEqual(v, 1000.0, places=2)
|
||||
|
||||
def test_fixed_method(self):
|
||||
v = compute_salvage_value(
|
||||
cost=10000, config=SalvageConfig(method='fixed', value=750),
|
||||
)
|
||||
self.assertAlmostEqual(v, 750.0, places=2)
|
||||
|
||||
def test_unknown_method_raises(self):
|
||||
with self.assertRaises(ValueError):
|
||||
compute_salvage_value(
|
||||
cost=10000,
|
||||
config=SalvageConfig(method='bogus', value=0), # type: ignore[arg-type]
|
||||
)
|
||||
|
||||
def test_remaining_useful_life_value_midway(self):
|
||||
# Halfway through life; current book 6000, salvage 1000 -> 1000 + 5000*0.5 = 3500
|
||||
v = remaining_useful_life_value(
|
||||
current_book=6000, salvage=1000, periods_used=5, total_periods=10,
|
||||
)
|
||||
self.assertAlmostEqual(v, 3500.0, places=2)
|
||||
|
||||
def test_remaining_useful_life_value_at_end_returns_salvage(self):
|
||||
v = remaining_useful_life_value(
|
||||
current_book=1200, salvage=1000, periods_used=10, total_periods=10,
|
||||
)
|
||||
self.assertEqual(v, 1000.0)
|
||||
61
fusion_accounting_assets/tests/test_useful_life_predictor.py
Normal file
61
fusion_accounting_assets/tests/test_useful_life_predictor.py
Normal file
@@ -0,0 +1,61 @@
|
||||
from odoo.tests.common import TransactionCase
|
||||
from odoo.tests import tagged
|
||||
from odoo.addons.fusion_accounting_assets.services.useful_life_predictor import (
|
||||
predict_useful_life,
|
||||
)
|
||||
from odoo.addons.fusion_accounting_assets.services.useful_life_prompt import (
|
||||
SYSTEM_PROMPT, build_prompt,
|
||||
)
|
||||
|
||||
|
||||
@tagged('post_install', '-at_install')
|
||||
class TestUsefulLifePredictor(TransactionCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
# Ensure no provider configured for these fallback tests.
|
||||
self.env['ir.config_parameter'].sudo().search([
|
||||
('key', 'in', [
|
||||
'fusion_accounting.provider.asset_useful_life',
|
||||
'fusion_accounting.provider.default',
|
||||
])
|
||||
]).unlink()
|
||||
|
||||
def test_fallback_computer(self):
|
||||
result = predict_useful_life(self.env, description="Dell laptop")
|
||||
self.assertEqual(result['useful_life_years'], 4)
|
||||
self.assertEqual(result['depreciation_method'], 'straight_line')
|
||||
|
||||
def test_fallback_furniture(self):
|
||||
result = predict_useful_life(self.env, description="office desk")
|
||||
self.assertEqual(result['useful_life_years'], 7)
|
||||
|
||||
def test_fallback_vehicle_uses_declining(self):
|
||||
result = predict_useful_life(self.env, description="Ford F-150 truck")
|
||||
self.assertEqual(result['useful_life_years'], 5)
|
||||
self.assertEqual(result['depreciation_method'], 'declining_balance')
|
||||
|
||||
def test_fallback_default_for_unknown(self):
|
||||
result = predict_useful_life(self.env, description="mystery widget")
|
||||
self.assertEqual(result['useful_life_years'], 5)
|
||||
self.assertEqual(result['confidence'], 0.3)
|
||||
|
||||
def test_returns_dict_with_required_keys(self):
|
||||
result = predict_useful_life(self.env, description="server")
|
||||
for key in ('useful_life_years', 'depreciation_method', 'rationale', 'confidence'):
|
||||
self.assertIn(key, result)
|
||||
|
||||
|
||||
@tagged('post_install', '-at_install')
|
||||
class TestUsefulLifePrompt(TransactionCase):
|
||||
|
||||
def test_system_prompt_requires_json(self):
|
||||
self.assertIn('JSON', SYSTEM_PROMPT)
|
||||
|
||||
def test_build_prompt_returns_tuple(self):
|
||||
result = build_prompt(description='test')
|
||||
self.assertEqual(len(result), 2)
|
||||
|
||||
def test_user_prompt_includes_amount(self):
|
||||
_, user = build_prompt(description='laptop', amount=2000)
|
||||
self.assertIn('2,000', user)
|
||||
0
fusion_accounting_assets/wizards/__init__.py
Normal file
0
fusion_accounting_assets/wizards/__init__.py
Normal file
Reference in New Issue
Block a user