Files
Odoo-Modules/fusion_login_audit/models/fusion_login_audit.py
gsinghpal 1b8038d8e8 feat(fusion_login_audit): nightly retention GC cron
Adds _fc_retention_gc() that deletes rows older than the configured
horizon (default 365 days; 0 = keep forever). Registered as a daily
ir.cron. Tests verify both the delete path and the "keep forever"
short-circuit.

Also documents the Odoo 19 gotcha that ir.cron dropped the numbercall
field (the legacy "-1 = run forever" pattern now raises ValueError at
install time; just omit the field).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-27 09:03:59 -04:00

106 lines
4.1 KiB
Python

# -*- coding: utf-8 -*-
from odoo import api, fields, models
class FusionLoginAudit(models.Model):
_name = 'fusion.login.audit'
_description = 'Login Audit Event'
_order = 'event_time desc, id desc'
_rec_name = 'attempted_login'
user_id = fields.Many2one(
'res.users', string='User', ondelete='set null', index=True,
help='Null when the attempted login did not match any user.',
)
attempted_login = fields.Char(
string='Attempted Login', size=255, required=True, index=True,
)
result = fields.Selection(
[('success', 'Success'), ('failure', 'Failure')],
string='Result', required=True, index=True,
)
failure_reason = fields.Selection(
[
('bad_password', 'Bad password'),
('unknown_user', 'Unknown user'),
('disabled_user', 'Disabled user'),
('2fa_failed', '2FA failed'),
('other', 'Other'),
],
string='Failure Reason',
)
event_time = fields.Datetime(
string='Event Time', required=True, index=True,
default=fields.Datetime.now,
)
ip_address = fields.Char(string='IP Address', size=45)
ip_hostname = fields.Char(string='Reverse DNS', size=255)
country_code = fields.Char(string='Country Code', size=2, index=True)
country_name = fields.Char(string='Country', size=64)
city = fields.Char(string='City', size=128)
geo_state = fields.Char(string='Region', size=64)
geo_lookup_state = fields.Selection(
[
('pending', 'Pending'),
('done', 'Done'),
('private_ip', 'Private IP'),
('internal', 'Internal (no request)'),
('failed', 'Lookup failed'),
],
string='Geo Lookup State', default='pending', index=True,
)
user_agent_raw = fields.Char(string='User Agent', size=512)
browser = fields.Char(string='Browser', size=64)
os = fields.Char(string='OS', size=64)
device_type = fields.Selection(
[
('desktop', 'Desktop'),
('mobile', 'Mobile'),
('tablet', 'Tablet'),
('bot', 'Bot'),
('unknown', 'Unknown'),
],
string='Device Type', default='unknown',
)
database = fields.Char(string='Database', size=64)
# Odoo 19 replaces the legacy `_sql_constraints = [...]` list with
# declarative `models.Constraint` attributes. The plan template used the
# legacy form, which now only emits a warning and is silently dropped.
_result_failure_reason_consistent = models.Constraint(
"CHECK ((result = 'success' AND failure_reason IS NULL) "
"OR (result = 'failure' AND failure_reason IS NOT NULL))",
'A failure row must have a failure_reason; a success row must not.',
)
# Composite indexes supporting the three hot queries:
# - per-user history (user_id, event_time DESC)
# - failure-burst by login (attempted_login, event_time DESC)
# - geo cron worklist (geo_lookup_state, event_time)
# Odoo 19 ships `models.Index` as the declarative replacement for the
# init()/raw-SQL pattern; the attribute name becomes the index suffix
# (e.g. `_user_time_idx` -> `fusion_login_audit_user_time_idx`).
_user_time_idx = models.Index('(user_id, event_time DESC)')
_login_time_idx = models.Index('(attempted_login, event_time DESC)')
_geo_state_idx = models.Index('(geo_lookup_state, event_time)')
@api.model
def _fc_retention_gc(self):
"""Delete audit rows older than `fusion_login_audit.retention_days`.
Called daily by ir.cron. retention_days=0 means keep forever."""
from datetime import timedelta
ICP = self.env['ir.config_parameter'].sudo()
try:
days = int(ICP.get_param(
'fusion_login_audit.retention_days', 365))
except (TypeError, ValueError):
days = 365
if days <= 0:
return 0
cutoff = fields.Datetime.now() - timedelta(days=days)
old = self.sudo().search([('event_time', '<', cutoff)])
count = len(old)
if old:
old.unlink()
return count