feat(fusion_login_audit): hook bad-password failures via _check_credentials
Wraps res.users._check_credentials. On AccessDenied, records a row with result=failure and failure_reason='bad_password' (or '2fa_failed' when credential['type'] == 'totp'), then re-raises. Regression test asserts the attempted password value never lands in any audit field. The audit row is written through registry.cursor() (independent cursor) so it survives the rollback that follows AccessDenied — in production odoo/service/model.py::retrying resets the transaction and http.py closes the cursor without committing, in tests assertRaises opens its own savepoint. Either way an inline write would vanish. Tests enter registry_test_mode and use manual try/except to keep the audit row visible across the savepoint hierarchy. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -2,6 +2,7 @@
|
||||
import logging
|
||||
|
||||
from odoo import api, fields, models
|
||||
from odoo.exceptions import AccessDenied
|
||||
|
||||
# Top-level import (vs lazy inside the method): if the dep is missing — most
|
||||
# likely because the dev container got recreated and dropped its pip install
|
||||
@@ -106,9 +107,22 @@ class ResUsers(models.Model):
|
||||
_credential=None):
|
||||
"""Build vals + create the audit row via sudo. Never raises.
|
||||
|
||||
Audit writes are wrapped so that a broken audit table can never
|
||||
block a real user from logging in. The exception is logged and
|
||||
swallowed; auth proceeds normally.
|
||||
The row is written through an INDEPENDENT cursor
|
||||
(``registry.cursor()``) so that:
|
||||
|
||||
* A failure-path call from ``_check_credentials`` survives the
|
||||
outer transaction rollback that follows ``AccessDenied``
|
||||
(the HTTP layer closes the cursor without committing, see
|
||||
``odoo/service/model.py:retrying``).
|
||||
* A broken audit table can never block a real user from logging
|
||||
in: the cursor block is wrapped in try/except; exceptions are
|
||||
logged and swallowed.
|
||||
|
||||
The independent cursor commits on context exit. Note that this
|
||||
means the row is durable even if the caller's transaction later
|
||||
rolls back — intentional for audit semantics: a recorded bad
|
||||
password should NOT disappear because some unrelated downstream
|
||||
op blew up.
|
||||
"""
|
||||
try:
|
||||
vals = self._fc_build_event_vals(
|
||||
@@ -120,9 +134,11 @@ class ResUsers(models.Model):
|
||||
user_id=user_id or (self.id if self else None),
|
||||
_credential=_credential,
|
||||
)
|
||||
self.env['fusion.login.audit'].sudo().with_context(
|
||||
mail_create_nolog=True
|
||||
).create(vals)
|
||||
with self.env.registry.cursor() as audit_cr:
|
||||
audit_env = api.Environment(audit_cr, self.env.uid, self.env.context)
|
||||
audit_env['fusion.login.audit'].sudo().with_context(
|
||||
mail_create_nolog=True
|
||||
).create(vals)
|
||||
except Exception:
|
||||
_logger.exception(
|
||||
"fusion_login_audit: failed to record %s row for %s",
|
||||
@@ -134,3 +150,18 @@ class ResUsers(models.Model):
|
||||
# Self is the singleton recordset of the user that just logged in.
|
||||
self._fc_record_login_event(result='success')
|
||||
return result
|
||||
|
||||
def _check_credentials(self, credential, env):
|
||||
try:
|
||||
return super()._check_credentials(credential, env)
|
||||
except AccessDenied:
|
||||
cred_type = (credential or {}).get('type', 'password')
|
||||
reason = '2fa_failed' if cred_type == 'totp' else 'bad_password'
|
||||
self._fc_record_login_event(
|
||||
result='failure',
|
||||
failure_reason=reason,
|
||||
user_id=self.id,
|
||||
attempted_login=(credential or {}).get('login') or self.login,
|
||||
_credential=credential,
|
||||
)
|
||||
raise
|
||||
|
||||
@@ -5,6 +5,17 @@ from odoo.tests.common import TransactionCase, tagged
|
||||
@tagged('post_install', '-at_install')
|
||||
class TestFusionLoginAuditModel(TransactionCase):
|
||||
|
||||
def setUp(self):
|
||||
# `_fc_record_login_event` uses `registry.cursor()` so that the audit
|
||||
# row survives the outer rollback that follows AccessDenied (see
|
||||
# res_users.py for the rationale). Inside a TransactionCase that
|
||||
# rolls back per test, a fresh cursor on a new connection cannot
|
||||
# see uncommitted records (the freshly-created test user FKs into
|
||||
# the audit row), so we put the registry in test mode — that swaps
|
||||
# `registry.cursor()` for a TestCursor that wraps the test cursor.
|
||||
super().setUp()
|
||||
self.registry_enter_test_mode()
|
||||
|
||||
def test_model_exists_and_creates(self):
|
||||
"""Audit row can be created with all expected fields."""
|
||||
Audit = self.env['fusion.login.audit'].sudo()
|
||||
@@ -122,3 +133,67 @@ class TestFusionLoginAuditModel(TransactionCase):
|
||||
'create', boom):
|
||||
# Must not raise.
|
||||
user._update_last_login()
|
||||
|
||||
def test_bad_password_writes_failure_row(self):
|
||||
"""A wrong password creates a result=failure row with failure_reason='bad_password'."""
|
||||
from odoo.exceptions import AccessDenied
|
||||
user = self.env['res.users'].sudo().create({
|
||||
'name': 'Wrongpw Tester',
|
||||
'login': 'wrongpw-tester@example.com',
|
||||
'password': 'wrongpw-tester-pw-1',
|
||||
})
|
||||
Audit = self.env['fusion.login.audit'].sudo()
|
||||
before = Audit.search_count([('attempted_login', '=', user.login),
|
||||
('result', '=', 'failure')])
|
||||
# NB: cannot use `self.assertRaises(AccessDenied)` — it opens an extra
|
||||
# savepoint (see odoo/tests/common.py::_assertRaises) that rolls back
|
||||
# the audit row written from inside the override.
|
||||
raised = False
|
||||
try:
|
||||
user._check_credentials(
|
||||
{'login': user.login, 'password': 'definitely-wrong',
|
||||
'type': 'password'},
|
||||
{'interactive': False},
|
||||
)
|
||||
except AccessDenied:
|
||||
raised = True
|
||||
self.assertTrue(raised, "AccessDenied not raised on wrong password")
|
||||
after = Audit.search_count([('attempted_login', '=', user.login),
|
||||
('result', '=', 'failure')])
|
||||
self.assertEqual(after, before + 1)
|
||||
row = Audit.search([('attempted_login', '=', user.login),
|
||||
('result', '=', 'failure')],
|
||||
order='event_time desc', limit=1)
|
||||
self.assertEqual(row.failure_reason, 'bad_password')
|
||||
self.assertEqual(row.user_id, user)
|
||||
|
||||
def test_bad_password_never_appears_in_row(self):
|
||||
"""The attempted password string never lands in any field."""
|
||||
from odoo.exceptions import AccessDenied
|
||||
secret = 'NeverInTheRow-9f3a82'
|
||||
user = self.env['res.users'].sudo().create({
|
||||
'name': 'Leak Test',
|
||||
'login': 'leak-test-2@example.com',
|
||||
'password': 'leak-test-pw-1',
|
||||
})
|
||||
# NB: manual try/except instead of assertRaises — see note above.
|
||||
raised = False
|
||||
try:
|
||||
user._check_credentials(
|
||||
{'login': user.login, 'password': secret, 'type': 'password'},
|
||||
{'interactive': False},
|
||||
)
|
||||
except AccessDenied:
|
||||
raised = True
|
||||
self.assertTrue(raised, "AccessDenied not raised on wrong password")
|
||||
row = self.env['fusion.login.audit'].sudo().search(
|
||||
[('attempted_login', '=', user.login),
|
||||
('result', '=', 'failure')],
|
||||
order='event_time desc', limit=1)
|
||||
self.assertTrue(row, "Audit row not created for bad-password attempt")
|
||||
for fname in ('attempted_login', 'failure_reason', 'user_agent_raw',
|
||||
'browser', 'os', 'ip_address', 'ip_hostname',
|
||||
'city', 'country_name', 'country_code', 'geo_state',
|
||||
'database'):
|
||||
self.assertNotIn(secret, (row[fname] or ''),
|
||||
f"Password leaked into field {fname}")
|
||||
|
||||
Reference in New Issue
Block a user