"""Cron handlers for fusion_accounting_assets. - _cron_post_due_depreciation: daily, post due depreciation lines for running assets - _cron_anomaly_scan: monthly, scan for schedule variance and create anomaly records """ import logging from odoo import api, fields, models from ..services.anomaly_detection import detect_schedule_variance _logger = logging.getLogger(__name__) class FusionAssetsCron(models.AbstractModel): _name = "fusion.assets.cron" _description = "Fusion Assets Cron Handlers" @api.model def _cron_post_due_depreciation(self): """For each running asset, post any due un-posted depreciation lines.""" today = fields.Date.today() engine = self.env['fusion.asset.engine'] Asset = self.env['fusion.asset'] running_assets = Asset.search([('state', '=', 'running')]) posted_total = 0 for asset in running_assets: try: with self.env.cr.savepoint(): result = engine.post_depreciation_entry(asset, period_date=today) posted_total += result.get('posted_count', 0) except Exception as e: # noqa: BLE001 _logger.warning("Cron post failed for asset %s: %s", asset.id, e) _logger.info( "Cron: posted depreciation on %d lines across %d running assets", posted_total, len(running_assets), ) # Keep the book-value MV in sync after posting so the dashboard # reflects today's numbers without waiting for the dedicated MV cron. try: self.env['fusion.asset.book.values.mv']._refresh() except Exception as e: # noqa: BLE001 _logger.warning("Post-cron MV refresh failed: %s", e) @api.model def _cron_refresh_book_values_mv(self): """Refresh the per-asset book value MV (hourly).""" self.env['fusion.asset.book.values.mv']._refresh() @api.model def _cron_anomaly_scan(self): """For each running asset, compare expected accumulated depreciation vs posted, and persist any variance flags.""" Asset = self.env['fusion.asset'] Anomaly = self.env['fusion.asset.anomaly'] running_assets = Asset.search([('state', '=', 'running')]) flagged = 0 today = fields.Date.today() for asset in running_assets: try: expected = sum( l.amount for l in asset.depreciation_line_ids if l.scheduled_date and l.scheduled_date <= today ) actual = asset.total_depreciated anomaly = detect_schedule_variance( asset_id=asset.id, asset_name=asset.name, expected_accumulated=expected, actual_accumulated=actual, ) if anomaly is None: continue anomaly_dict = anomaly.to_dict() existing = Anomaly.search([ ('asset_id', '=', asset.id), ('anomaly_type', '=', anomaly_dict['anomaly_type']), ('state', 'in', ('new', 'acknowledged')), ], limit=1) if existing: continue Anomaly.create({ 'asset_id': asset.id, 'anomaly_type': anomaly_dict['anomaly_type'], 'severity': anomaly_dict['severity'], 'expected': anomaly_dict['expected'], 'actual': anomaly_dict['actual'], 'variance_pct': anomaly_dict['variance_pct'], 'detail': anomaly_dict['detail'], }) flagged += 1 except Exception as e: # noqa: BLE001 _logger.warning("Cron anomaly scan failed for asset %s: %s", asset.id, e) _logger.info( "Cron: scanned %d assets, flagged %d anomalies", len(running_assets), flagged, )