# -*- coding: utf-8 -*- import ipaddress import logging import socket from datetime import timedelta import requests from odoo import api, fields, models _logger = logging.getLogger(__name__) class FusionLoginAudit(models.Model): _name = 'fusion.login.audit' _description = 'Login Audit Event' _order = 'event_time desc, id desc' _rec_name = 'attempted_login' user_id = fields.Many2one( 'res.users', string='User', ondelete='set null', index=True, help='Null when the attempted login did not match any user.', ) attempted_login = fields.Char( string='Attempted Login', size=255, required=True, index=True, ) result = fields.Selection( [('success', 'Success'), ('failure', 'Failure')], string='Result', required=True, index=True, ) failure_reason = fields.Selection( [ ('bad_password', 'Bad password'), ('unknown_user', 'Unknown user'), ('disabled_user', 'Disabled user'), ('2fa_failed', '2FA failed'), ('other', 'Other'), ], string='Failure Reason', ) event_time = fields.Datetime( string='Event Time', required=True, index=True, default=fields.Datetime.now, ) ip_address = fields.Char(string='IP Address', size=45) ip_hostname = fields.Char(string='Reverse DNS', size=255) country_code = fields.Char(string='Country Code', size=2, index=True) country_name = fields.Char(string='Country', size=64) city = fields.Char(string='City', size=128) geo_state = fields.Char(string='Region', size=64) geo_lookup_state = fields.Selection( [ ('pending', 'Pending'), ('done', 'Done'), ('private_ip', 'Private IP'), ('internal', 'Internal (no request)'), ('failed', 'Lookup failed'), ], string='Geo Lookup State', default='pending', index=True, ) user_agent_raw = fields.Char(string='User Agent', size=512) browser = fields.Char(string='Browser', size=64) os = fields.Char(string='OS', size=64) device_type = fields.Selection( [ ('desktop', 'Desktop'), ('mobile', 'Mobile'), ('tablet', 'Tablet'), ('bot', 'Bot'), ('unknown', 'Unknown'), ], string='Device Type', default='unknown', ) database = fields.Char(string='Database', size=64) login_kind = fields.Selection( [ ('interactive', 'Interactive'), ('service', 'Service / Automation'), ], string='Login Kind', default='interactive', index=True, help="Interactive = a real browser/HTTP login. Service = server-to-server " "auth (XML-RPC/JSON-RPC, cron, inter-instance sync) with no HTTP " "request. Service logins are hidden from the default Login Events " "view to keep it focused on real user activity.", ) # Odoo 19 replaces the legacy `_sql_constraints = [...]` list with # declarative `models.Constraint` attributes. The plan template used the # legacy form, which now only emits a warning and is silently dropped. _result_failure_reason_consistent = models.Constraint( "CHECK ((result = 'success' AND failure_reason IS NULL) " "OR (result = 'failure' AND failure_reason IS NOT NULL))", 'A failure row must have a failure_reason; a success row must not.', ) # Composite indexes supporting the three hot queries: # - per-user history (user_id, event_time DESC) # - failure-burst by login (attempted_login, event_time DESC) # - geo cron worklist (geo_lookup_state, event_time) # Odoo 19 ships `models.Index` as the declarative replacement for the # init()/raw-SQL pattern; the attribute name becomes the index suffix # (e.g. `_user_time_idx` -> `fusion_login_audit_user_time_idx`). _user_time_idx = models.Index('(user_id, event_time DESC)') _login_time_idx = models.Index('(attempted_login, event_time DESC)') _geo_state_idx = models.Index('(geo_lookup_state, event_time)') @api.model def _fc_retention_gc(self): """Delete audit rows older than `fusion_login_audit.retention_days`. Called daily by ir.cron. retention_days=0 means keep forever.""" ICP = self.env['ir.config_parameter'].sudo() try: days = int(ICP.get_param( 'fusion_login_audit.retention_days', 365)) except (TypeError, ValueError): days = 365 if days <= 0: return 0 cutoff = fields.Datetime.now() - timedelta(days=days) old = self.sudo().search([('event_time', '<', cutoff)]) count = len(old) if old: old.unlink() return count _FC_PRIVATE_NETWORKS = ( ipaddress.ip_network('10.0.0.0/8'), ipaddress.ip_network('172.16.0.0/12'), ipaddress.ip_network('192.168.0.0/16'), ipaddress.ip_network('127.0.0.0/8'), ipaddress.ip_network('::1/128'), ipaddress.ip_network('fe80::/10'), ) @api.model def _fc_is_private_ip(self, ip): if not ip or ip == 'internal': return False # 'internal' uses its own state try: addr = ipaddress.ip_address(ip) except ValueError: return False return any(addr in net for net in self._FC_PRIVATE_NETWORKS) @api.model def _fc_geo_cache_hit(self, ip): """Return a dict of geo fields if we've resolved this IP in the last 30 days, else None.""" if not ip: return None cutoff = fields.Datetime.now() - timedelta(days=30) cached = self.sudo().search([ ('ip_address', '=', ip), ('geo_lookup_state', '=', 'done'), ('event_time', '>=', cutoff), ], limit=1, order='event_time desc') if not cached: return None return { 'country_code': cached.country_code, 'country_name': cached.country_name, 'city': cached.city, 'geo_state': cached.geo_state, 'ip_hostname': cached.ip_hostname, } @api.model def _fc_geo_reverse_dns(self, ip): try: socket.setdefaulttimeout(1.5) host, _aliases, _ips = socket.gethostbyaddr(ip) return (host or '')[:255] except (socket.herror, socket.gaierror, OSError): return '' finally: socket.setdefaulttimeout(None) @api.model def _fc_geo_http_lookup(self, ip): """Call ip-api.com. Returns (vals_dict, rate_limited_bool). Falls back to ({}, False) on any error.""" try: resp = requests.get( 'http://ip-api.com/json/' + ip, params={'fields': 'status,country,countryCode,regionName,city'}, timeout=3, headers={'User-Agent': 'Odoo-FusionLoginAudit/19.0'}, ) rate_limited = resp.headers.get('X-Rl', '') == '0' if resp.status_code != 200: return ({}, rate_limited) data = resp.json() or {} if data.get('status') != 'success': return ({}, rate_limited) return ({ 'country_code': (data.get('countryCode') or '')[:2], 'country_name': (data.get('country') or '')[:64], 'geo_state': (data.get('regionName') or '')[:64], 'city': (data.get('city') or '')[:128], }, rate_limited) except (requests.RequestException, ValueError): _logger.warning("fusion_login_audit: geo lookup failed for %s", ip, exc_info=True) return ({}, False) @api.model def _fc_geo_enrich_pending(self, limit=100): """Cron worker: process up to `limit` pending rows. Per-row isolation is provided by `cr.savepoint()` rather than `cr.commit()`/`cr.rollback()` — the latter raises an AssertionError inside a TransactionCase (Odoo's test cursor refuses commit/rollback). Savepoints work in both prod and tests; the outer cron transaction commits the lot once the method returns. One bad IP rolls back only its own savepoint, so the rest of the batch still lands. """ pending = self.sudo().search( [('geo_lookup_state', '=', 'pending')], order='event_time asc', limit=limit, ) if not pending: return 0 processed = 0 stop_after_this = False for row in pending: ip = row.ip_address try: with self.env.cr.savepoint(): if self._fc_is_private_ip(ip): row.write({ 'geo_lookup_state': 'private_ip', 'country_code': '--', 'country_name': 'Private network', 'city': 'Private network', }) processed += 1 continue cached = self._fc_geo_cache_hit(ip) if cached: cached['geo_lookup_state'] = 'done' row.write(cached) processed += 1 continue hostname = self._fc_geo_reverse_dns(ip) if ip and ip != 'internal' else '' vals, rate_limited = self._fc_geo_http_lookup(ip) if ip and ip != 'internal' else ({}, False) if vals: vals['ip_hostname'] = hostname vals['geo_lookup_state'] = 'done' row.write(vals) else: row.write({ 'geo_lookup_state': 'failed', 'ip_hostname': hostname, }) processed += 1 if rate_limited: _logger.info("fusion_login_audit: ip-api rate limit " "hit, stopping batch early") stop_after_this = True except Exception: _logger.exception( "fusion_login_audit: geo enrich failed for row %s", row.id) if stop_after_this: break return processed