feat(fusion_login_audit): async geo enrichment cron

5-min cron processes up to 100 pending rows per pass: private IPs
short-circuit to state=private_ip; same-IP cache (30 days) avoids
duplicate ip-api.com calls; reverse DNS via socket with 1.5s timeout;
HTTP lookup respects ip-api''s X-Rl rate-limit header. Tests cover
private-IP shortcut, cache hit (no HTTP), and internal-state skip --
no network calls needed.

Per-row isolation uses cr.savepoint() instead of cr.commit() because
Odoo 19 TestCursor raises AssertionError on commit/rollback. Recorded
the gotcha as CLAUDE.md rule #14.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
gsinghpal
2026-05-26 21:50:24 -04:00
parent 1b8038d8e8
commit 86b8e59c95
4 changed files with 225 additions and 1 deletions

View File

@@ -12,5 +12,16 @@
<field name="active" eval="True"/>
</record>
<record id="cron_geo_enrich" model="ir.cron">
<field name="name">Fusion Login Audit: Geo Enrichment</field>
<field name="model_id" ref="model_fusion_login_audit"/>
<field name="state">code</field>
<field name="code">model._fc_geo_enrich_pending(limit=100)</field>
<field name="interval_number">5</field>
<field name="interval_type">minutes</field>
<field name="active" eval="True"/>
<field name="priority">10</field>
</record>
</data>
</odoo>

View File

@@ -1,6 +1,15 @@
# -*- coding: utf-8 -*-
import ipaddress
import logging
import socket
from datetime import timedelta
import requests
from odoo import api, fields, models
_logger = logging.getLogger(__name__)
class FusionLoginAudit(models.Model):
_name = 'fusion.login.audit'
@@ -88,7 +97,6 @@ class FusionLoginAudit(models.Model):
def _fc_retention_gc(self):
"""Delete audit rows older than `fusion_login_audit.retention_days`.
Called daily by ir.cron. retention_days=0 means keep forever."""
from datetime import timedelta
ICP = self.env['ir.config_parameter'].sudo()
try:
days = int(ICP.get_param(
@@ -103,3 +111,146 @@ class FusionLoginAudit(models.Model):
if old:
old.unlink()
return count
_FC_PRIVATE_NETWORKS = (
ipaddress.ip_network('10.0.0.0/8'),
ipaddress.ip_network('172.16.0.0/12'),
ipaddress.ip_network('192.168.0.0/16'),
ipaddress.ip_network('127.0.0.0/8'),
ipaddress.ip_network('::1/128'),
ipaddress.ip_network('fe80::/10'),
)
@api.model
def _fc_is_private_ip(self, ip):
if not ip or ip == 'internal':
return False # 'internal' uses its own state
try:
addr = ipaddress.ip_address(ip)
except ValueError:
return False
return any(addr in net for net in self._FC_PRIVATE_NETWORKS)
@api.model
def _fc_geo_cache_hit(self, ip):
"""Return a dict of geo fields if we've resolved this IP in the last
30 days, else None."""
if not ip:
return None
cutoff = fields.Datetime.now() - timedelta(days=30)
cached = self.sudo().search([
('ip_address', '=', ip),
('geo_lookup_state', '=', 'done'),
('event_time', '>=', cutoff),
], limit=1, order='event_time desc')
if not cached:
return None
return {
'country_code': cached.country_code,
'country_name': cached.country_name,
'city': cached.city,
'geo_state': cached.geo_state,
'ip_hostname': cached.ip_hostname,
}
@api.model
def _fc_geo_reverse_dns(self, ip):
try:
socket.setdefaulttimeout(1.5)
host, _aliases, _ips = socket.gethostbyaddr(ip)
return (host or '')[:255]
except (socket.herror, socket.gaierror, OSError):
return ''
finally:
socket.setdefaulttimeout(None)
@api.model
def _fc_geo_http_lookup(self, ip):
"""Call ip-api.com. Returns (vals_dict, rate_limited_bool).
Falls back to ({}, False) on any error."""
try:
resp = requests.get(
'http://ip-api.com/json/' + ip,
params={'fields': 'status,country,countryCode,regionName,city'},
timeout=3,
headers={'User-Agent': 'Odoo-FusionLoginAudit/19.0'},
)
rate_limited = resp.headers.get('X-Rl', '') == '0'
if resp.status_code != 200:
return ({}, rate_limited)
data = resp.json() or {}
if data.get('status') != 'success':
return ({}, rate_limited)
return ({
'country_code': (data.get('countryCode') or '')[:2],
'country_name': (data.get('country') or '')[:64],
'geo_state': (data.get('regionName') or '')[:64],
'city': (data.get('city') or '')[:128],
}, rate_limited)
except (requests.RequestException, ValueError):
_logger.warning("fusion_login_audit: geo lookup failed for %s",
ip, exc_info=True)
return ({}, False)
@api.model
def _fc_geo_enrich_pending(self, limit=100):
"""Cron worker: process up to `limit` pending rows.
Per-row isolation is provided by `cr.savepoint()` rather than
`cr.commit()`/`cr.rollback()` — the latter raises an AssertionError
inside a TransactionCase (Odoo's test cursor refuses commit/rollback).
Savepoints work in both prod and tests; the outer cron transaction
commits the lot once the method returns. One bad IP rolls back only
its own savepoint, so the rest of the batch still lands.
"""
pending = self.sudo().search(
[('geo_lookup_state', '=', 'pending')],
order='event_time asc', limit=limit,
)
if not pending:
return 0
processed = 0
stop_after_this = False
for row in pending:
ip = row.ip_address
try:
with self.env.cr.savepoint():
if self._fc_is_private_ip(ip):
row.write({
'geo_lookup_state': 'private_ip',
'country_code': '--',
'country_name': 'Private network',
'city': 'Private network',
})
processed += 1
continue
cached = self._fc_geo_cache_hit(ip)
if cached:
cached['geo_lookup_state'] = 'done'
row.write(cached)
processed += 1
continue
hostname = self._fc_geo_reverse_dns(ip) if ip and ip != 'internal' else ''
vals, rate_limited = self._fc_geo_http_lookup(ip) if ip and ip != 'internal' else ({}, False)
if vals:
vals['ip_hostname'] = hostname
vals['geo_lookup_state'] = 'done'
row.write(vals)
else:
row.write({
'geo_lookup_state': 'failed',
'ip_hostname': hostname,
})
processed += 1
if rate_limited:
_logger.info("fusion_login_audit: ip-api rate limit "
"hit, stopping batch early")
stop_after_this = True
except Exception:
_logger.exception(
"fusion_login_audit: geo enrich failed for row %s", row.id)
if stop_after_this:
break
return processed

View File

@@ -440,3 +440,64 @@ class TestFusionLoginAuditModel(TransactionCase):
self.assertTrue(Audit.browse(ancient_id).exists(),
"retention_days=0 must keep everything")
def test_geo_private_ip_shortcut(self):
"""Private IPs short-circuit to state='private_ip' without HTTP."""
Audit = self.env['fusion.login.audit'].sudo()
rec = Audit.create({
'attempted_login': 'lan@example.com',
'result': 'success',
'ip_address': '192.168.1.40',
'geo_lookup_state': 'pending',
})
Audit._fc_geo_enrich_pending(limit=10)
rec.invalidate_recordset()
self.assertEqual(rec.geo_lookup_state, 'private_ip')
self.assertEqual(rec.country_code, '--')
def test_geo_cache_hit_avoids_http(self):
"""A second row with the same recent IP copies from cache."""
from unittest.mock import patch
Audit = self.env['fusion.login.audit'].sudo()
# Seed a "done" row from the same IP.
Audit.create({
'attempted_login': 'seed@example.com',
'result': 'success',
'ip_address': '203.0.113.99',
'geo_lookup_state': 'done',
'country_code': 'CA',
'country_name': 'Canada',
'city': 'Toronto',
'geo_state': 'Ontario',
})
target = Audit.create({
'attempted_login': 'hit@example.com',
'result': 'success',
'ip_address': '203.0.113.99',
'geo_lookup_state': 'pending',
})
with patch(
'odoo.addons.fusion_login_audit.models.fusion_login_audit.requests.get'
) as mock_get:
Audit._fc_geo_enrich_pending(limit=10)
mock_get.assert_not_called()
target.invalidate_recordset()
self.assertEqual(target.geo_lookup_state, 'done')
self.assertEqual(target.country_code, 'CA')
self.assertEqual(target.city, 'Toronto')
def test_geo_internal_skipped(self):
"""Rows with geo_lookup_state='internal' are not picked up."""
Audit = self.env['fusion.login.audit'].sudo()
rec = Audit.create({
'attempted_login': 'cron@example.com',
'result': 'success',
'ip_address': 'internal',
'geo_lookup_state': 'internal',
})
# Should be a no-op for 'internal' state (cron only picks 'pending').
Audit._fc_geo_enrich_pending(limit=10)
rec.invalidate_recordset()
self.assertEqual(rec.geo_lookup_state, 'internal')