Overrides res.users._update_last_login to create a fusion.login.audit row with result=success after the parent runs. The write goes through sudo() + mail_create_nolog=True. Any exception in the audit path is caught and logged but never propagates — a broken audit table must never block a real user from logging in. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
125 lines
5.2 KiB
Python
125 lines
5.2 KiB
Python
# -*- coding: utf-8 -*-
|
|
from odoo.tests.common import TransactionCase, tagged
|
|
|
|
|
|
@tagged('post_install', '-at_install')
|
|
class TestFusionLoginAuditModel(TransactionCase):
|
|
|
|
def test_model_exists_and_creates(self):
|
|
"""Audit row can be created with all expected fields."""
|
|
Audit = self.env['fusion.login.audit'].sudo()
|
|
rec = Audit.create({
|
|
'attempted_login': 'demo@example.com',
|
|
'result': 'success',
|
|
'ip_address': '203.0.113.5',
|
|
'user_agent_raw': 'Mozilla/5.0 Test',
|
|
'browser': 'Test 1.0',
|
|
'os': 'TestOS',
|
|
'device_type': 'desktop',
|
|
'database': self.env.cr.dbname,
|
|
'geo_lookup_state': 'pending',
|
|
})
|
|
self.assertTrue(rec.id)
|
|
self.assertEqual(rec.result, 'success')
|
|
self.assertEqual(rec.geo_lookup_state, 'pending')
|
|
self.assertEqual(rec.database, self.env.cr.dbname)
|
|
self.assertTrue(rec.event_time) # default fires
|
|
|
|
def test_failure_reason_optional(self):
|
|
"""failure_reason is null on success rows."""
|
|
rec = self.env['fusion.login.audit'].sudo().create({
|
|
'attempted_login': 'demo@example.com',
|
|
'result': 'success',
|
|
})
|
|
self.assertFalse(rec.failure_reason)
|
|
|
|
def test_geo_state_internal_value(self):
|
|
"""`internal` is an accepted geo_lookup_state value (distinct from private_ip)."""
|
|
rec = self.env['fusion.login.audit'].sudo().create({
|
|
'attempted_login': 'demo@example.com',
|
|
'result': 'success',
|
|
'geo_lookup_state': 'internal',
|
|
})
|
|
self.assertEqual(rec.geo_lookup_state, 'internal')
|
|
|
|
def test_build_event_vals_with_no_request(self):
|
|
"""Without a live request, geo_lookup_state is 'internal'."""
|
|
ResUsers = self.env['res.users']
|
|
vals = ResUsers._fc_build_event_vals(
|
|
result='success',
|
|
attempted_login='cron@example.com',
|
|
)
|
|
self.assertEqual(vals['result'], 'success')
|
|
self.assertEqual(vals['attempted_login'], 'cron@example.com')
|
|
self.assertEqual(vals['ip_address'], 'internal')
|
|
self.assertEqual(vals['user_agent_raw'], '<no-request>')
|
|
self.assertEqual(vals['geo_lookup_state'], 'internal')
|
|
self.assertEqual(vals['database'], self.env.cr.dbname)
|
|
|
|
def test_build_event_vals_parses_user_agent(self):
|
|
"""Parser fills browser/os/device_type from a stub UA dict."""
|
|
ResUsers = self.env['res.users']
|
|
vals = ResUsers._fc_build_event_vals(
|
|
result='success',
|
|
attempted_login='ua@example.com',
|
|
_override_ip='203.0.113.5',
|
|
_override_ua='Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
|
|
'AppleWebKit/537.36 Chrome/140.0 Safari/537.36',
|
|
)
|
|
self.assertEqual(vals['ip_address'], '203.0.113.5')
|
|
self.assertIn('Chrome', vals['browser'])
|
|
self.assertIn('Windows', vals['os'])
|
|
self.assertEqual(vals['device_type'], 'desktop')
|
|
self.assertEqual(vals['geo_lookup_state'], 'pending')
|
|
|
|
def test_build_event_vals_strips_password(self):
|
|
"""If a credential dict sneaks in, no password leaks into vals."""
|
|
ResUsers = self.env['res.users']
|
|
vals = ResUsers._fc_build_event_vals(
|
|
result='failure',
|
|
attempted_login='leak@example.com',
|
|
failure_reason='bad_password',
|
|
_credential={'login': 'leak@example.com',
|
|
'password': 'super-secret-pw',
|
|
'type': 'password'},
|
|
)
|
|
serialized = repr(vals)
|
|
self.assertNotIn('super-secret-pw', serialized)
|
|
self.assertEqual(vals['failure_reason'], 'bad_password')
|
|
|
|
def test_update_last_login_writes_audit_row(self):
|
|
"""Calling _update_last_login on a user creates a success row."""
|
|
user = self.env['res.users'].sudo().create({
|
|
'name': 'Audit Tester',
|
|
'login': 'audit-tester@example.com',
|
|
'password': 'audit-tester-pw-1',
|
|
})
|
|
Audit = self.env['fusion.login.audit'].sudo()
|
|
before = Audit.search_count([('user_id', '=', user.id)])
|
|
user._update_last_login()
|
|
after = Audit.search_count([('user_id', '=', user.id)])
|
|
self.assertEqual(after, before + 1)
|
|
row = Audit.search([('user_id', '=', user.id)],
|
|
order='event_time desc', limit=1)
|
|
self.assertEqual(row.result, 'success')
|
|
self.assertEqual(row.attempted_login, user.login)
|
|
self.assertFalse(row.failure_reason)
|
|
self.assertEqual(row.database, self.env.cr.dbname)
|
|
|
|
def test_audit_write_failure_does_not_block_login(self):
|
|
"""An exception inside the audit write must not propagate."""
|
|
from unittest.mock import patch
|
|
user = self.env['res.users'].sudo().create({
|
|
'name': 'Resilient Tester',
|
|
'login': 'resilient-tester@example.com',
|
|
'password': 'resilient-tester-pw-1',
|
|
})
|
|
|
|
def boom(self_, vals):
|
|
raise RuntimeError('simulated audit DB failure')
|
|
|
|
with patch.object(type(self.env['fusion.login.audit']),
|
|
'create', boom):
|
|
# Must not raise.
|
|
user._update_last_login()
|