feat(fusion_accounting_reports): 2 cron jobs (anomaly scan + MV refresh)
Made-with: Cursor
This commit is contained in:
@@ -3,3 +3,4 @@ from . import fusion_report_engine
|
||||
from . import fusion_report_commentary
|
||||
from . import fusion_report_anomaly
|
||||
from . import fusion_account_balance_mv
|
||||
from . import fusion_reports_cron
|
||||
|
||||
117
fusion_accounting_reports/models/fusion_reports_cron.py
Normal file
117
fusion_accounting_reports/models/fusion_reports_cron.py
Normal file
@@ -0,0 +1,117 @@
|
||||
"""Cron handlers for fusion_accounting_reports.
|
||||
|
||||
Two scheduled jobs:
|
||||
- _cron_anomaly_scan: daily P&L variance scan -> persist anomalies
|
||||
- _cron_mv_refresh: every 15 min CONCURRENTLY refresh the MV"""
|
||||
|
||||
import logging
|
||||
from datetime import timedelta
|
||||
|
||||
import odoo
|
||||
from odoo import api, fields, models
|
||||
|
||||
from ..services.anomaly_detection import detect
|
||||
from ..services.date_periods import month_bounds
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FusionReportsCron(models.AbstractModel):
|
||||
_name = "fusion.reports.cron"
|
||||
_description = "Fusion Reports Cron Handlers"
|
||||
|
||||
@api.model
|
||||
def _cron_anomaly_scan(self):
|
||||
"""Run last-month P&L vs prior-year-same-month and persist anomalies."""
|
||||
today = fields.Date.today()
|
||||
# Walk back into the previous full calendar month.
|
||||
last_month = today.replace(day=1) - timedelta(days=1)
|
||||
period = month_bounds(last_month)
|
||||
|
||||
Report = self.env['fusion.report'].sudo()
|
||||
Anomaly = self.env['fusion.report.anomaly'].sudo()
|
||||
engine = self.env['fusion.report.engine']
|
||||
|
||||
for company in self.env['res.company'].search([]):
|
||||
try:
|
||||
pnl_def = Report.search(
|
||||
[
|
||||
('report_type', '=', 'pnl'),
|
||||
'|', ('company_id', '=', company.id),
|
||||
('company_id', '=', False),
|
||||
],
|
||||
limit=1,
|
||||
)
|
||||
if not pnl_def:
|
||||
continue
|
||||
result = engine.compute_pnl(
|
||||
period,
|
||||
comparison='previous_year',
|
||||
company_id=company.id,
|
||||
)
|
||||
anomalies = detect(result)
|
||||
for a in anomalies:
|
||||
existing = Anomaly.search(
|
||||
[
|
||||
('report_id', '=', pnl_def.id),
|
||||
('company_id', '=', company.id),
|
||||
('period_from', '=', period.date_from),
|
||||
('period_to', '=', period.date_to),
|
||||
('row_id', '=', a['row_id']),
|
||||
],
|
||||
limit=1,
|
||||
)
|
||||
vals = {
|
||||
'report_id': pnl_def.id,
|
||||
'company_id': company.id,
|
||||
'period_from': period.date_from,
|
||||
'period_to': period.date_to,
|
||||
'row_id': a['row_id'],
|
||||
'label': a['label'],
|
||||
'current_amount': a['current_amount'],
|
||||
'comparison_amount': a['comparison_amount'],
|
||||
'variance_amount': a['variance_amount'],
|
||||
'variance_pct': a['variance_pct'],
|
||||
'severity': a['severity'],
|
||||
'direction': a['direction'],
|
||||
}
|
||||
if existing:
|
||||
existing.write(vals)
|
||||
else:
|
||||
Anomaly.create(vals)
|
||||
_logger.info(
|
||||
"Anomaly scan for company %s: %d flagged",
|
||||
company.id, len(anomalies),
|
||||
)
|
||||
except Exception as e:
|
||||
_logger.exception(
|
||||
"Anomaly scan failed for company %s: %s", company.id, e,
|
||||
)
|
||||
|
||||
@api.model
|
||||
def _cron_mv_refresh(self):
|
||||
"""REFRESH CONCURRENTLY via dedicated autocommit cursor.
|
||||
|
||||
REFRESH MATERIALIZED VIEW CONCURRENTLY cannot run inside a
|
||||
transaction block, so we open a separate connection with autocommit
|
||||
enabled. The blocking REFRESH is used as a fallback if the
|
||||
concurrent path fails (e.g. on a cold MV with no rows yet)."""
|
||||
try:
|
||||
db_name = self.env.cr.dbname
|
||||
db = odoo.sql_db.db_connect(db_name)
|
||||
with db.cursor() as cron_cr:
|
||||
cron_cr._cnx.set_session(autocommit=True)
|
||||
cron_cr.execute(
|
||||
"REFRESH MATERIALIZED VIEW CONCURRENTLY "
|
||||
"fusion_account_balance_mv"
|
||||
)
|
||||
_logger.debug("MV refresh CONCURRENTLY succeeded")
|
||||
except Exception as e:
|
||||
_logger.warning(
|
||||
"CONCURRENTLY refresh failed (%s); blocking fallback", e)
|
||||
try:
|
||||
self.env['fusion.account.balance.mv']._refresh(
|
||||
concurrently=False)
|
||||
except Exception as e2:
|
||||
_logger.exception(
|
||||
"Blocking MV refresh also failed: %s", e2)
|
||||
Reference in New Issue
Block a user