diff --git a/fusion_login_audit/models/res_users.py b/fusion_login_audit/models/res_users.py index c312a6a4..e14b3ee6 100644 --- a/fusion_login_audit/models/res_users.py +++ b/fusion_login_audit/models/res_users.py @@ -100,3 +100,37 @@ class ResUsers(models.Model): del _credential # explicit no-op — locks down the read surface return vals + + def _fc_record_login_event(self, result, failure_reason=None, + user_id=None, attempted_login=None, + _credential=None): + """Build vals + create the audit row via sudo. Never raises. + + Audit writes are wrapped so that a broken audit table can never + block a real user from logging in. The exception is logged and + swallowed; auth proceeds normally. + """ + try: + vals = self._fc_build_event_vals( + result=result, + attempted_login=attempted_login + or (self.login if self else None) + or 'unknown', + failure_reason=failure_reason, + user_id=user_id or (self.id if self else None), + _credential=_credential, + ) + self.env['fusion.login.audit'].sudo().with_context( + mail_create_nolog=True + ).create(vals) + except Exception: + _logger.exception( + "fusion_login_audit: failed to record %s row for %s", + result, attempted_login or (self.login if self else 'unknown'), + ) + + def _update_last_login(self): + result = super()._update_last_login() + # Self is the singleton recordset of the user that just logged in. + self._fc_record_login_event(result='success') + return result diff --git a/fusion_login_audit/tests/test_login_audit.py b/fusion_login_audit/tests/test_login_audit.py index 863c3d8e..7a23fa90 100644 --- a/fusion_login_audit/tests/test_login_audit.py +++ b/fusion_login_audit/tests/test_login_audit.py @@ -86,3 +86,39 @@ class TestFusionLoginAuditModel(TransactionCase): serialized = repr(vals) self.assertNotIn('super-secret-pw', serialized) self.assertEqual(vals['failure_reason'], 'bad_password') + + def test_update_last_login_writes_audit_row(self): + """Calling _update_last_login on a user creates a success row.""" + user = self.env['res.users'].sudo().create({ + 'name': 'Audit Tester', + 'login': 'audit-tester@example.com', + 'password': 'audit-tester-pw-1', + }) + Audit = self.env['fusion.login.audit'].sudo() + before = Audit.search_count([('user_id', '=', user.id)]) + user._update_last_login() + after = Audit.search_count([('user_id', '=', user.id)]) + self.assertEqual(after, before + 1) + row = Audit.search([('user_id', '=', user.id)], + order='event_time desc', limit=1) + self.assertEqual(row.result, 'success') + self.assertEqual(row.attempted_login, user.login) + self.assertFalse(row.failure_reason) + self.assertEqual(row.database, self.env.cr.dbname) + + def test_audit_write_failure_does_not_block_login(self): + """An exception inside the audit write must not propagate.""" + from unittest.mock import patch + user = self.env['res.users'].sudo().create({ + 'name': 'Resilient Tester', + 'login': 'resilient-tester@example.com', + 'password': 'resilient-tester-pw-1', + }) + + def boom(self_, vals): + raise RuntimeError('simulated audit DB failure') + + with patch.object(type(self.env['fusion.login.audit']), + 'create', boom): + # Must not raise. + user._update_last_login()