Files
Odoo-Modules/fusion_login_audit/tests/test_login_audit.py
gsinghpal a7cf44249d feat(fusion_login_audit): hook unknown-user failures via _login
Overrides res.users._login. When the login string does not resolve to
any user, super() raises AccessDenied; we record a row with user_id=NULL
and failure_reason="unknown_user", then re-raise. Closes the gap where
typo'd or scanned logins would otherwise vanish from the audit trail.

The existing _fc_record_login_event helper writes through an independent
registry.cursor(), so the audit row survives the rollback that follows
the re-raised AccessDenied.

Note: in Odoo 19 _login is a plain instance method (not the classmethod
it was in earlier versions) and takes (credential, user_agent_env). The
original plan was written for the classmethod signature; corrected here
and recorded in CLAUDE.md rule #10 so future-Claude does not waste time
re-discovering it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-27 09:03:58 -04:00

230 lines
10 KiB
Python

# -*- coding: utf-8 -*-
from odoo.tests.common import TransactionCase, tagged
@tagged('post_install', '-at_install')
class TestFusionLoginAuditModel(TransactionCase):
def setUp(self):
# `_fc_record_login_event` uses `registry.cursor()` so that the audit
# row survives the outer rollback that follows AccessDenied (see
# res_users.py for the rationale). Inside a TransactionCase that
# rolls back per test, a fresh cursor on a new connection cannot
# see uncommitted records (the freshly-created test user FKs into
# the audit row), so we put the registry in test mode — that swaps
# `registry.cursor()` for a TestCursor that wraps the test cursor.
super().setUp()
self.registry_enter_test_mode()
def test_model_exists_and_creates(self):
"""Audit row can be created with all expected fields."""
Audit = self.env['fusion.login.audit'].sudo()
rec = Audit.create({
'attempted_login': 'demo@example.com',
'result': 'success',
'ip_address': '203.0.113.5',
'user_agent_raw': 'Mozilla/5.0 Test',
'browser': 'Test 1.0',
'os': 'TestOS',
'device_type': 'desktop',
'database': self.env.cr.dbname,
'geo_lookup_state': 'pending',
})
self.assertTrue(rec.id)
self.assertEqual(rec.result, 'success')
self.assertEqual(rec.geo_lookup_state, 'pending')
self.assertEqual(rec.database, self.env.cr.dbname)
self.assertTrue(rec.event_time) # default fires
def test_failure_reason_optional(self):
"""failure_reason is null on success rows."""
rec = self.env['fusion.login.audit'].sudo().create({
'attempted_login': 'demo@example.com',
'result': 'success',
})
self.assertFalse(rec.failure_reason)
def test_geo_state_internal_value(self):
"""`internal` is an accepted geo_lookup_state value (distinct from private_ip)."""
rec = self.env['fusion.login.audit'].sudo().create({
'attempted_login': 'demo@example.com',
'result': 'success',
'geo_lookup_state': 'internal',
})
self.assertEqual(rec.geo_lookup_state, 'internal')
def test_build_event_vals_with_no_request(self):
"""Without a live request, geo_lookup_state is 'internal'."""
ResUsers = self.env['res.users']
vals = ResUsers._fc_build_event_vals(
result='success',
attempted_login='cron@example.com',
)
self.assertEqual(vals['result'], 'success')
self.assertEqual(vals['attempted_login'], 'cron@example.com')
self.assertEqual(vals['ip_address'], 'internal')
self.assertEqual(vals['user_agent_raw'], '<no-request>')
self.assertEqual(vals['geo_lookup_state'], 'internal')
self.assertEqual(vals['database'], self.env.cr.dbname)
def test_build_event_vals_parses_user_agent(self):
"""Parser fills browser/os/device_type from a stub UA dict."""
ResUsers = self.env['res.users']
vals = ResUsers._fc_build_event_vals(
result='success',
attempted_login='ua@example.com',
_override_ip='203.0.113.5',
_override_ua='Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 Chrome/140.0 Safari/537.36',
)
self.assertEqual(vals['ip_address'], '203.0.113.5')
self.assertIn('Chrome', vals['browser'])
self.assertIn('Windows', vals['os'])
self.assertEqual(vals['device_type'], 'desktop')
self.assertEqual(vals['geo_lookup_state'], 'pending')
def test_build_event_vals_strips_password(self):
"""If a credential dict sneaks in, no password leaks into vals."""
ResUsers = self.env['res.users']
vals = ResUsers._fc_build_event_vals(
result='failure',
attempted_login='leak@example.com',
failure_reason='bad_password',
_credential={'login': 'leak@example.com',
'password': 'super-secret-pw',
'type': 'password'},
)
serialized = repr(vals)
self.assertNotIn('super-secret-pw', serialized)
self.assertEqual(vals['failure_reason'], 'bad_password')
def test_update_last_login_writes_audit_row(self):
"""Calling _update_last_login on a user creates a success row."""
user = self.env['res.users'].sudo().create({
'name': 'Audit Tester',
'login': 'audit-tester@example.com',
'password': 'audit-tester-pw-1',
})
Audit = self.env['fusion.login.audit'].sudo()
before = Audit.search_count([('user_id', '=', user.id)])
user._update_last_login()
after = Audit.search_count([('user_id', '=', user.id)])
self.assertEqual(after, before + 1)
row = Audit.search([('user_id', '=', user.id)],
order='event_time desc', limit=1)
self.assertEqual(row.result, 'success')
self.assertEqual(row.attempted_login, user.login)
self.assertFalse(row.failure_reason)
self.assertEqual(row.database, self.env.cr.dbname)
def test_audit_write_failure_does_not_block_login(self):
"""An exception inside the audit write must not propagate."""
from unittest.mock import patch
user = self.env['res.users'].sudo().create({
'name': 'Resilient Tester',
'login': 'resilient-tester@example.com',
'password': 'resilient-tester-pw-1',
})
def boom(self_, vals):
raise RuntimeError('simulated audit DB failure')
with patch.object(type(self.env['fusion.login.audit']),
'create', boom):
# Must not raise.
user._update_last_login()
def test_bad_password_writes_failure_row(self):
"""A wrong password creates a result=failure row with failure_reason='bad_password'."""
from odoo.exceptions import AccessDenied
user = self.env['res.users'].sudo().create({
'name': 'Wrongpw Tester',
'login': 'wrongpw-tester@example.com',
'password': 'wrongpw-tester-pw-1',
})
Audit = self.env['fusion.login.audit'].sudo()
before = Audit.search_count([('attempted_login', '=', user.login),
('result', '=', 'failure')])
# NB: cannot use `self.assertRaises(AccessDenied)` — it opens an extra
# savepoint (see odoo/tests/common.py::_assertRaises) that rolls back
# the audit row written from inside the override.
raised = False
try:
user._check_credentials(
{'login': user.login, 'password': 'definitely-wrong',
'type': 'password'},
{'interactive': False},
)
except AccessDenied:
raised = True
self.assertTrue(raised, "AccessDenied not raised on wrong password")
after = Audit.search_count([('attempted_login', '=', user.login),
('result', '=', 'failure')])
self.assertEqual(after, before + 1)
row = Audit.search([('attempted_login', '=', user.login),
('result', '=', 'failure')],
order='event_time desc', limit=1)
self.assertEqual(row.failure_reason, 'bad_password')
self.assertEqual(row.user_id, user)
def test_bad_password_never_appears_in_row(self):
"""The attempted password string never lands in any field."""
from odoo.exceptions import AccessDenied
secret = 'NeverInTheRow-9f3a82'
user = self.env['res.users'].sudo().create({
'name': 'Leak Test',
'login': 'leak-test-2@example.com',
'password': 'leak-test-pw-1',
})
# NB: manual try/except instead of assertRaises — see note above.
raised = False
try:
user._check_credentials(
{'login': user.login, 'password': secret, 'type': 'password'},
{'interactive': False},
)
except AccessDenied:
raised = True
self.assertTrue(raised, "AccessDenied not raised on wrong password")
row = self.env['fusion.login.audit'].sudo().search(
[('attempted_login', '=', user.login),
('result', '=', 'failure')],
order='event_time desc', limit=1)
self.assertTrue(row, "Audit row not created for bad-password attempt")
for fname in ('attempted_login', 'failure_reason', 'user_agent_raw',
'browser', 'os', 'ip_address', 'ip_hostname',
'city', 'country_name', 'country_code', 'geo_state',
'database'):
self.assertNotIn(secret, (row[fname] or ''),
f"Password leaked into field {fname}")
def test_unknown_user_writes_failure_row(self):
"""A login attempt for a username that does not exist gets logged
with user_id=NULL and failure_reason='unknown_user'."""
from odoo.exceptions import AccessDenied
bogus = 'this-user-does-not-exist@example.com'
Audit = self.env['fusion.login.audit'].sudo()
before = Audit.search_count([('attempted_login', '=', bogus)])
# NB: manual try/except instead of assertRaises — see comment in
# test_bad_password_writes_failure_row. _login is an instance method
# in Odoo 19 (not a classmethod as in earlier versions); we call it
# on the empty recordset of res.users, which matches what
# authenticate() does internally.
raised = False
try:
self.env['res.users']._login(
{'login': bogus, 'password': 'whatever',
'type': 'password'},
{'interactive': False},
)
except AccessDenied:
raised = True
self.assertTrue(raised, "AccessDenied must propagate after the audit write")
after = Audit.search_count([('attempted_login', '=', bogus)])
self.assertEqual(after, before + 1)
row = Audit.search([('attempted_login', '=', bogus)],
order='event_time desc', limit=1)
self.assertFalse(row.user_id)
self.assertEqual(row.failure_reason, 'unknown_user')
self.assertEqual(row.result, 'failure')