118 lines
4.5 KiB
Python
118 lines
4.5 KiB
Python
"""Cron handlers for fusion_accounting_reports.
|
|
|
|
Two scheduled jobs:
|
|
- _cron_anomaly_scan: daily P&L variance scan -> persist anomalies
|
|
- _cron_mv_refresh: every 15 min CONCURRENTLY refresh the MV"""
|
|
|
|
import logging
|
|
from datetime import timedelta
|
|
|
|
import odoo
|
|
from odoo import api, fields, models
|
|
|
|
from ..services.anomaly_detection import detect
|
|
from ..services.date_periods import month_bounds
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FusionReportsCron(models.AbstractModel):
|
|
_name = "fusion.reports.cron"
|
|
_description = "Fusion Reports Cron Handlers"
|
|
|
|
@api.model
|
|
def _cron_anomaly_scan(self):
|
|
"""Run last-month P&L vs prior-year-same-month and persist anomalies."""
|
|
today = fields.Date.today()
|
|
# Walk back into the previous full calendar month.
|
|
last_month = today.replace(day=1) - timedelta(days=1)
|
|
period = month_bounds(last_month)
|
|
|
|
Report = self.env['fusion.report'].sudo()
|
|
Anomaly = self.env['fusion.report.anomaly'].sudo()
|
|
engine = self.env['fusion.report.engine']
|
|
|
|
for company in self.env['res.company'].search([]):
|
|
try:
|
|
pnl_def = Report.search(
|
|
[
|
|
('report_type', '=', 'pnl'),
|
|
'|', ('company_id', '=', company.id),
|
|
('company_id', '=', False),
|
|
],
|
|
limit=1,
|
|
)
|
|
if not pnl_def:
|
|
continue
|
|
result = engine.compute_pnl(
|
|
period,
|
|
comparison='previous_year',
|
|
company_id=company.id,
|
|
)
|
|
anomalies = detect(result)
|
|
for a in anomalies:
|
|
existing = Anomaly.search(
|
|
[
|
|
('report_id', '=', pnl_def.id),
|
|
('company_id', '=', company.id),
|
|
('period_from', '=', period.date_from),
|
|
('period_to', '=', period.date_to),
|
|
('row_id', '=', a['row_id']),
|
|
],
|
|
limit=1,
|
|
)
|
|
vals = {
|
|
'report_id': pnl_def.id,
|
|
'company_id': company.id,
|
|
'period_from': period.date_from,
|
|
'period_to': period.date_to,
|
|
'row_id': a['row_id'],
|
|
'label': a['label'],
|
|
'current_amount': a['current_amount'],
|
|
'comparison_amount': a['comparison_amount'],
|
|
'variance_amount': a['variance_amount'],
|
|
'variance_pct': a['variance_pct'],
|
|
'severity': a['severity'],
|
|
'direction': a['direction'],
|
|
}
|
|
if existing:
|
|
existing.write(vals)
|
|
else:
|
|
Anomaly.create(vals)
|
|
_logger.info(
|
|
"Anomaly scan for company %s: %d flagged",
|
|
company.id, len(anomalies),
|
|
)
|
|
except Exception as e:
|
|
_logger.exception(
|
|
"Anomaly scan failed for company %s: %s", company.id, e,
|
|
)
|
|
|
|
@api.model
|
|
def _cron_mv_refresh(self):
|
|
"""REFRESH CONCURRENTLY via dedicated autocommit cursor.
|
|
|
|
REFRESH MATERIALIZED VIEW CONCURRENTLY cannot run inside a
|
|
transaction block, so we open a separate connection with autocommit
|
|
enabled. The blocking REFRESH is used as a fallback if the
|
|
concurrent path fails (e.g. on a cold MV with no rows yet)."""
|
|
try:
|
|
db_name = self.env.cr.dbname
|
|
db = odoo.sql_db.db_connect(db_name)
|
|
with db.cursor() as cron_cr:
|
|
cron_cr._cnx.set_session(autocommit=True)
|
|
cron_cr.execute(
|
|
"REFRESH MATERIALIZED VIEW CONCURRENTLY "
|
|
"fusion_account_balance_mv"
|
|
)
|
|
_logger.debug("MV refresh CONCURRENTLY succeeded")
|
|
except Exception as e:
|
|
_logger.warning(
|
|
"CONCURRENTLY refresh failed (%s); blocking fallback", e)
|
|
try:
|
|
self.env['fusion.account.balance.mv']._refresh(
|
|
concurrently=False)
|
|
except Exception as e2:
|
|
_logger.exception(
|
|
"Blocking MV refresh also failed: %s", e2)
|