diff --git a/fusion_login_audit/models/res_users.py b/fusion_login_audit/models/res_users.py index e14b3ee6..bd8f1293 100644 --- a/fusion_login_audit/models/res_users.py +++ b/fusion_login_audit/models/res_users.py @@ -2,6 +2,7 @@ import logging from odoo import api, fields, models +from odoo.exceptions import AccessDenied # Top-level import (vs lazy inside the method): if the dep is missing — most # likely because the dev container got recreated and dropped its pip install @@ -106,9 +107,22 @@ class ResUsers(models.Model): _credential=None): """Build vals + create the audit row via sudo. Never raises. - Audit writes are wrapped so that a broken audit table can never - block a real user from logging in. The exception is logged and - swallowed; auth proceeds normally. + The row is written through an INDEPENDENT cursor + (``registry.cursor()``) so that: + + * A failure-path call from ``_check_credentials`` survives the + outer transaction rollback that follows ``AccessDenied`` + (the HTTP layer closes the cursor without committing, see + ``odoo/service/model.py:retrying``). + * A broken audit table can never block a real user from logging + in: the cursor block is wrapped in try/except; exceptions are + logged and swallowed. + + The independent cursor commits on context exit. Note that this + means the row is durable even if the caller's transaction later + rolls back — intentional for audit semantics: a recorded bad + password should NOT disappear because some unrelated downstream + op blew up. """ try: vals = self._fc_build_event_vals( @@ -120,9 +134,11 @@ class ResUsers(models.Model): user_id=user_id or (self.id if self else None), _credential=_credential, ) - self.env['fusion.login.audit'].sudo().with_context( - mail_create_nolog=True - ).create(vals) + with self.env.registry.cursor() as audit_cr: + audit_env = api.Environment(audit_cr, self.env.uid, self.env.context) + audit_env['fusion.login.audit'].sudo().with_context( + mail_create_nolog=True + ).create(vals) except Exception: _logger.exception( "fusion_login_audit: failed to record %s row for %s", @@ -134,3 +150,18 @@ class ResUsers(models.Model): # Self is the singleton recordset of the user that just logged in. self._fc_record_login_event(result='success') return result + + def _check_credentials(self, credential, env): + try: + return super()._check_credentials(credential, env) + except AccessDenied: + cred_type = (credential or {}).get('type', 'password') + reason = '2fa_failed' if cred_type == 'totp' else 'bad_password' + self._fc_record_login_event( + result='failure', + failure_reason=reason, + user_id=self.id, + attempted_login=(credential or {}).get('login') or self.login, + _credential=credential, + ) + raise diff --git a/fusion_login_audit/tests/test_login_audit.py b/fusion_login_audit/tests/test_login_audit.py index 7a23fa90..0dcfc3ae 100644 --- a/fusion_login_audit/tests/test_login_audit.py +++ b/fusion_login_audit/tests/test_login_audit.py @@ -5,6 +5,17 @@ from odoo.tests.common import TransactionCase, tagged @tagged('post_install', '-at_install') class TestFusionLoginAuditModel(TransactionCase): + def setUp(self): + # `_fc_record_login_event` uses `registry.cursor()` so that the audit + # row survives the outer rollback that follows AccessDenied (see + # res_users.py for the rationale). Inside a TransactionCase that + # rolls back per test, a fresh cursor on a new connection cannot + # see uncommitted records (the freshly-created test user FKs into + # the audit row), so we put the registry in test mode — that swaps + # `registry.cursor()` for a TestCursor that wraps the test cursor. + super().setUp() + self.registry_enter_test_mode() + def test_model_exists_and_creates(self): """Audit row can be created with all expected fields.""" Audit = self.env['fusion.login.audit'].sudo() @@ -122,3 +133,67 @@ class TestFusionLoginAuditModel(TransactionCase): 'create', boom): # Must not raise. user._update_last_login() + + def test_bad_password_writes_failure_row(self): + """A wrong password creates a result=failure row with failure_reason='bad_password'.""" + from odoo.exceptions import AccessDenied + user = self.env['res.users'].sudo().create({ + 'name': 'Wrongpw Tester', + 'login': 'wrongpw-tester@example.com', + 'password': 'wrongpw-tester-pw-1', + }) + Audit = self.env['fusion.login.audit'].sudo() + before = Audit.search_count([('attempted_login', '=', user.login), + ('result', '=', 'failure')]) + # NB: cannot use `self.assertRaises(AccessDenied)` — it opens an extra + # savepoint (see odoo/tests/common.py::_assertRaises) that rolls back + # the audit row written from inside the override. + raised = False + try: + user._check_credentials( + {'login': user.login, 'password': 'definitely-wrong', + 'type': 'password'}, + {'interactive': False}, + ) + except AccessDenied: + raised = True + self.assertTrue(raised, "AccessDenied not raised on wrong password") + after = Audit.search_count([('attempted_login', '=', user.login), + ('result', '=', 'failure')]) + self.assertEqual(after, before + 1) + row = Audit.search([('attempted_login', '=', user.login), + ('result', '=', 'failure')], + order='event_time desc', limit=1) + self.assertEqual(row.failure_reason, 'bad_password') + self.assertEqual(row.user_id, user) + + def test_bad_password_never_appears_in_row(self): + """The attempted password string never lands in any field.""" + from odoo.exceptions import AccessDenied + secret = 'NeverInTheRow-9f3a82' + user = self.env['res.users'].sudo().create({ + 'name': 'Leak Test', + 'login': 'leak-test-2@example.com', + 'password': 'leak-test-pw-1', + }) + # NB: manual try/except instead of assertRaises — see note above. + raised = False + try: + user._check_credentials( + {'login': user.login, 'password': secret, 'type': 'password'}, + {'interactive': False}, + ) + except AccessDenied: + raised = True + self.assertTrue(raised, "AccessDenied not raised on wrong password") + row = self.env['fusion.login.audit'].sudo().search( + [('attempted_login', '=', user.login), + ('result', '=', 'failure')], + order='event_time desc', limit=1) + self.assertTrue(row, "Audit row not created for bad-password attempt") + for fname in ('attempted_login', 'failure_reason', 'user_agent_raw', + 'browser', 'os', 'ip_address', 'ip_hostname', + 'city', 'country_name', 'country_code', 'geo_state', + 'database'): + self.assertNotIn(secret, (row[fname] or ''), + f"Password leaked into field {fname}")