diff --git a/fusion_login_audit/__manifest__.py b/fusion_login_audit/__manifest__.py index c6f6453b..b92415b6 100644 --- a/fusion_login_audit/__manifest__.py +++ b/fusion_login_audit/__manifest__.py @@ -20,6 +20,9 @@ bursts. Daily retention cron honours a configurable horizon. 'website': 'https://nexasystems.ca', 'license': 'OPL-1', 'depends': ['base', 'mail'], + 'external_dependencies': { + 'python': ['user_agents'], + }, 'data': [ 'security/ir.model.access.csv', 'security/security.xml', diff --git a/fusion_login_audit/models/__init__.py b/fusion_login_audit/models/__init__.py index e966d1fc..8dbc95dc 100644 --- a/fusion_login_audit/models/__init__.py +++ b/fusion_login_audit/models/__init__.py @@ -1,2 +1,3 @@ # -*- coding: utf-8 -*- from . import fusion_login_audit +from . import res_users diff --git a/fusion_login_audit/models/res_users.py b/fusion_login_audit/models/res_users.py new file mode 100644 index 00000000..c312a6a4 --- /dev/null +++ b/fusion_login_audit/models/res_users.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +import logging + +from odoo import api, fields, models + +# Top-level import (vs lazy inside the method): if the dep is missing — most +# likely because the dev container got recreated and dropped its pip install +# (see CLAUDE.md Workflow) — Odoo crashes at registry load with a clear +# `ModuleNotFoundError`, not deep in the auth path after the first login. +from user_agents import parse as ua_parse + +_logger = logging.getLogger(__name__) + + +class ResUsers(models.Model): + _inherit = 'res.users' + + @api.model + def _fc_build_event_vals( + self, + result, + attempted_login, + failure_reason=None, + user_id=None, + _override_ip=None, + _override_ua=None, + _credential=None, + ): + """Build the dict of values for a fusion.login.audit row. + + Pulls IP / User-Agent from the live HTTP request when available. + Falls back to ('internal', '') for XML-RPC / cron-initiated + auth, with geo_lookup_state='internal' so the geo cron skips them. + + An empty IP from an otherwise-live request (rare; misconfigured + reverse proxy) also routes to the 'internal' fallback — an empty + string isn't useful audit data and is arguably suspicious. + + The _override_* kwargs exist for tests so we don't have to fake a + full request. They are NOT a public API. + + Password safety: `_credential` MAY contain a 'password' key from the + Odoo auth flow. We never read that key, never log it, never put it + in vals. The test `test_build_event_vals_strips_password` locks + this property in via `assertNotIn(secret, repr(vals))`. + """ + vals = { + 'attempted_login': (attempted_login or '')[:255], + 'result': result, + 'failure_reason': failure_reason, + 'event_time': fields.Datetime.now(), + 'database': self.env.cr.dbname, + 'user_id': user_id, + } + + ip = _override_ip + ua_str = _override_ua + + if ip is None or ua_str is None: + try: + from odoo.http import request + if request and getattr(request, 'httprequest', None): + if ip is None: + ip = request.httprequest.remote_addr + if ua_str is None: + ua_str = request.httprequest.user_agent.string or '' + except Exception: + _logger.debug("fusion_login_audit: no request context", exc_info=True) + + if ip and ua_str is not None: + ua_text = ua_str or '' + vals['ip_address'] = ip[:45] + vals['user_agent_raw'] = ua_text[:512] + ua = ua_parse(ua_text) + vals['browser'] = (f"{ua.browser.family} {ua.browser.version_string}".strip())[:64] + vals['os'] = (f"{ua.os.family} {ua.os.version_string}".strip())[:64] + if ua.is_bot: + vals['device_type'] = 'bot' + elif ua.is_mobile: + vals['device_type'] = 'mobile' + elif ua.is_tablet: + vals['device_type'] = 'tablet' + elif ua.is_pc: + vals['device_type'] = 'desktop' + else: + vals['device_type'] = 'unknown' + vals['geo_lookup_state'] = 'pending' + else: + vals['ip_address'] = 'internal' + vals['user_agent_raw'] = '' + vals['device_type'] = 'unknown' + vals['geo_lookup_state'] = 'internal' + + # _credential is accepted in the signature so callers (T6 _check_credentials, + # T7 _login) can hand the dict in without filtering. The helper deliberately + # touches NO keys from it — see the password-safety note in the docstring. + # `_credential` is intentionally unread here; the parameter exists so future + # work can read `credential.get('type')` for `2fa_failed` discrimination + # only via the explicit failure_reason kwarg, never from the dict directly. + del _credential # explicit no-op — locks down the read surface + + return vals diff --git a/fusion_login_audit/tests/test_login_audit.py b/fusion_login_audit/tests/test_login_audit.py index 73abc737..863c3d8e 100644 --- a/fusion_login_audit/tests/test_login_audit.py +++ b/fusion_login_audit/tests/test_login_audit.py @@ -41,3 +41,48 @@ class TestFusionLoginAuditModel(TransactionCase): 'geo_lookup_state': 'internal', }) self.assertEqual(rec.geo_lookup_state, 'internal') + + def test_build_event_vals_with_no_request(self): + """Without a live request, geo_lookup_state is 'internal'.""" + ResUsers = self.env['res.users'] + vals = ResUsers._fc_build_event_vals( + result='success', + attempted_login='cron@example.com', + ) + self.assertEqual(vals['result'], 'success') + self.assertEqual(vals['attempted_login'], 'cron@example.com') + self.assertEqual(vals['ip_address'], 'internal') + self.assertEqual(vals['user_agent_raw'], '') + self.assertEqual(vals['geo_lookup_state'], 'internal') + self.assertEqual(vals['database'], self.env.cr.dbname) + + def test_build_event_vals_parses_user_agent(self): + """Parser fills browser/os/device_type from a stub UA dict.""" + ResUsers = self.env['res.users'] + vals = ResUsers._fc_build_event_vals( + result='success', + attempted_login='ua@example.com', + _override_ip='203.0.113.5', + _override_ua='Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' + 'AppleWebKit/537.36 Chrome/140.0 Safari/537.36', + ) + self.assertEqual(vals['ip_address'], '203.0.113.5') + self.assertIn('Chrome', vals['browser']) + self.assertIn('Windows', vals['os']) + self.assertEqual(vals['device_type'], 'desktop') + self.assertEqual(vals['geo_lookup_state'], 'pending') + + def test_build_event_vals_strips_password(self): + """If a credential dict sneaks in, no password leaks into vals.""" + ResUsers = self.env['res.users'] + vals = ResUsers._fc_build_event_vals( + result='failure', + attempted_login='leak@example.com', + failure_reason='bad_password', + _credential={'login': 'leak@example.com', + 'password': 'super-secret-pw', + 'type': 'password'}, + ) + serialized = repr(vals) + self.assertNotIn('super-secret-pw', serialized) + self.assertEqual(vals['failure_reason'], 'bad_password')