feat(fusion_login_audit): add _fc_build_event_vals context helper
Single helper builds vals for fusion.login.audit rows from the live HTTP request, or falls back to ip=''internal'' + geo_lookup_state=''internal'' when there is no request. Parses UA into browser/os/device_type via the bundled user_agents library. Never reads credential[''password'']. Tests cover: no-request fallback, UA parsing on a Chrome/Windows UA, and the regression that no password value leaks into the vals dict. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -20,6 +20,9 @@ bursts. Daily retention cron honours a configurable horizon.
|
||||
'website': 'https://nexasystems.ca',
|
||||
'license': 'OPL-1',
|
||||
'depends': ['base', 'mail'],
|
||||
'external_dependencies': {
|
||||
'python': ['user_agents'],
|
||||
},
|
||||
'data': [
|
||||
'security/ir.model.access.csv',
|
||||
'security/security.xml',
|
||||
|
||||
@@ -1,2 +1,3 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from . import fusion_login_audit
|
||||
from . import res_users
|
||||
|
||||
102
fusion_login_audit/models/res_users.py
Normal file
102
fusion_login_audit/models/res_users.py
Normal file
@@ -0,0 +1,102 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import logging
|
||||
|
||||
from odoo import api, fields, models
|
||||
|
||||
# Top-level import (vs lazy inside the method): if the dep is missing — most
|
||||
# likely because the dev container got recreated and dropped its pip install
|
||||
# (see CLAUDE.md Workflow) — Odoo crashes at registry load with a clear
|
||||
# `ModuleNotFoundError`, not deep in the auth path after the first login.
|
||||
from user_agents import parse as ua_parse
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ResUsers(models.Model):
|
||||
_inherit = 'res.users'
|
||||
|
||||
@api.model
|
||||
def _fc_build_event_vals(
|
||||
self,
|
||||
result,
|
||||
attempted_login,
|
||||
failure_reason=None,
|
||||
user_id=None,
|
||||
_override_ip=None,
|
||||
_override_ua=None,
|
||||
_credential=None,
|
||||
):
|
||||
"""Build the dict of values for a fusion.login.audit row.
|
||||
|
||||
Pulls IP / User-Agent from the live HTTP request when available.
|
||||
Falls back to ('internal', '<no-request>') for XML-RPC / cron-initiated
|
||||
auth, with geo_lookup_state='internal' so the geo cron skips them.
|
||||
|
||||
An empty IP from an otherwise-live request (rare; misconfigured
|
||||
reverse proxy) also routes to the 'internal' fallback — an empty
|
||||
string isn't useful audit data and is arguably suspicious.
|
||||
|
||||
The _override_* kwargs exist for tests so we don't have to fake a
|
||||
full request. They are NOT a public API.
|
||||
|
||||
Password safety: `_credential` MAY contain a 'password' key from the
|
||||
Odoo auth flow. We never read that key, never log it, never put it
|
||||
in vals. The test `test_build_event_vals_strips_password` locks
|
||||
this property in via `assertNotIn(secret, repr(vals))`.
|
||||
"""
|
||||
vals = {
|
||||
'attempted_login': (attempted_login or '')[:255],
|
||||
'result': result,
|
||||
'failure_reason': failure_reason,
|
||||
'event_time': fields.Datetime.now(),
|
||||
'database': self.env.cr.dbname,
|
||||
'user_id': user_id,
|
||||
}
|
||||
|
||||
ip = _override_ip
|
||||
ua_str = _override_ua
|
||||
|
||||
if ip is None or ua_str is None:
|
||||
try:
|
||||
from odoo.http import request
|
||||
if request and getattr(request, 'httprequest', None):
|
||||
if ip is None:
|
||||
ip = request.httprequest.remote_addr
|
||||
if ua_str is None:
|
||||
ua_str = request.httprequest.user_agent.string or ''
|
||||
except Exception:
|
||||
_logger.debug("fusion_login_audit: no request context", exc_info=True)
|
||||
|
||||
if ip and ua_str is not None:
|
||||
ua_text = ua_str or ''
|
||||
vals['ip_address'] = ip[:45]
|
||||
vals['user_agent_raw'] = ua_text[:512]
|
||||
ua = ua_parse(ua_text)
|
||||
vals['browser'] = (f"{ua.browser.family} {ua.browser.version_string}".strip())[:64]
|
||||
vals['os'] = (f"{ua.os.family} {ua.os.version_string}".strip())[:64]
|
||||
if ua.is_bot:
|
||||
vals['device_type'] = 'bot'
|
||||
elif ua.is_mobile:
|
||||
vals['device_type'] = 'mobile'
|
||||
elif ua.is_tablet:
|
||||
vals['device_type'] = 'tablet'
|
||||
elif ua.is_pc:
|
||||
vals['device_type'] = 'desktop'
|
||||
else:
|
||||
vals['device_type'] = 'unknown'
|
||||
vals['geo_lookup_state'] = 'pending'
|
||||
else:
|
||||
vals['ip_address'] = 'internal'
|
||||
vals['user_agent_raw'] = '<no-request>'
|
||||
vals['device_type'] = 'unknown'
|
||||
vals['geo_lookup_state'] = 'internal'
|
||||
|
||||
# _credential is accepted in the signature so callers (T6 _check_credentials,
|
||||
# T7 _login) can hand the dict in without filtering. The helper deliberately
|
||||
# touches NO keys from it — see the password-safety note in the docstring.
|
||||
# `_credential` is intentionally unread here; the parameter exists so future
|
||||
# work can read `credential.get('type')` for `2fa_failed` discrimination
|
||||
# only via the explicit failure_reason kwarg, never from the dict directly.
|
||||
del _credential # explicit no-op — locks down the read surface
|
||||
|
||||
return vals
|
||||
@@ -41,3 +41,48 @@ class TestFusionLoginAuditModel(TransactionCase):
|
||||
'geo_lookup_state': 'internal',
|
||||
})
|
||||
self.assertEqual(rec.geo_lookup_state, 'internal')
|
||||
|
||||
def test_build_event_vals_with_no_request(self):
|
||||
"""Without a live request, geo_lookup_state is 'internal'."""
|
||||
ResUsers = self.env['res.users']
|
||||
vals = ResUsers._fc_build_event_vals(
|
||||
result='success',
|
||||
attempted_login='cron@example.com',
|
||||
)
|
||||
self.assertEqual(vals['result'], 'success')
|
||||
self.assertEqual(vals['attempted_login'], 'cron@example.com')
|
||||
self.assertEqual(vals['ip_address'], 'internal')
|
||||
self.assertEqual(vals['user_agent_raw'], '<no-request>')
|
||||
self.assertEqual(vals['geo_lookup_state'], 'internal')
|
||||
self.assertEqual(vals['database'], self.env.cr.dbname)
|
||||
|
||||
def test_build_event_vals_parses_user_agent(self):
|
||||
"""Parser fills browser/os/device_type from a stub UA dict."""
|
||||
ResUsers = self.env['res.users']
|
||||
vals = ResUsers._fc_build_event_vals(
|
||||
result='success',
|
||||
attempted_login='ua@example.com',
|
||||
_override_ip='203.0.113.5',
|
||||
_override_ua='Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
|
||||
'AppleWebKit/537.36 Chrome/140.0 Safari/537.36',
|
||||
)
|
||||
self.assertEqual(vals['ip_address'], '203.0.113.5')
|
||||
self.assertIn('Chrome', vals['browser'])
|
||||
self.assertIn('Windows', vals['os'])
|
||||
self.assertEqual(vals['device_type'], 'desktop')
|
||||
self.assertEqual(vals['geo_lookup_state'], 'pending')
|
||||
|
||||
def test_build_event_vals_strips_password(self):
|
||||
"""If a credential dict sneaks in, no password leaks into vals."""
|
||||
ResUsers = self.env['res.users']
|
||||
vals = ResUsers._fc_build_event_vals(
|
||||
result='failure',
|
||||
attempted_login='leak@example.com',
|
||||
failure_reason='bad_password',
|
||||
_credential={'login': 'leak@example.com',
|
||||
'password': 'super-secret-pw',
|
||||
'type': 'password'},
|
||||
)
|
||||
serialized = repr(vals)
|
||||
self.assertNotIn('super-secret-pw', serialized)
|
||||
self.assertEqual(vals['failure_reason'], 'bad_password')
|
||||
|
||||
Reference in New Issue
Block a user