# -*- coding: utf-8 -*- from odoo import fields from odoo.tests.common import TransactionCase, tagged @tagged('post_install', '-at_install') class TestFusionLoginAuditModel(TransactionCase): def setUp(self): # `_fc_record_login_event` uses `registry.cursor()` so that the audit # row survives the outer rollback that follows AccessDenied (see # res_users.py for the rationale). Inside a TransactionCase that # rolls back per test, a fresh cursor on a new connection cannot # see uncommitted records (the freshly-created test user FKs into # the audit row), so we put the registry in test mode — that swaps # `registry.cursor()` for a TestCursor that wraps the test cursor. super().setUp() self.registry_enter_test_mode() # The alert tests below assume at least one admin has an email # (otherwise the recipient filter empties and no mail is queued). # In a fresh fusion-dev DB, base.user_admin's email is NULL; the # superuser (__system__) has an email but is filtered out of normal # res.users searches. Ensure admin has a usable email. admin = self.env.ref('base.user_admin') if not admin.email: admin.sudo().write({'email': 'admin@test.example.com'}) def test_model_exists_and_creates(self): """Audit row can be created with all expected fields.""" Audit = self.env['fusion.login.audit'].sudo() rec = Audit.create({ 'attempted_login': 'demo@example.com', 'result': 'success', 'ip_address': '203.0.113.5', 'user_agent_raw': 'Mozilla/5.0 Test', 'browser': 'Test 1.0', 'os': 'TestOS', 'device_type': 'desktop', 'database': self.env.cr.dbname, 'geo_lookup_state': 'pending', }) self.assertTrue(rec.id) self.assertEqual(rec.result, 'success') self.assertEqual(rec.geo_lookup_state, 'pending') self.assertEqual(rec.database, self.env.cr.dbname) self.assertTrue(rec.event_time) # default fires def test_failure_reason_optional(self): """failure_reason is null on success rows.""" rec = self.env['fusion.login.audit'].sudo().create({ 'attempted_login': 'demo@example.com', 'result': 'success', }) self.assertFalse(rec.failure_reason) def test_geo_state_internal_value(self): """`internal` is an accepted geo_lookup_state value (distinct from private_ip).""" rec = self.env['fusion.login.audit'].sudo().create({ 'attempted_login': 'demo@example.com', 'result': 'success', 'geo_lookup_state': 'internal', }) self.assertEqual(rec.geo_lookup_state, 'internal') def test_build_event_vals_with_no_request(self): """Without a live request, geo_lookup_state is 'internal'.""" ResUsers = self.env['res.users'] vals = ResUsers._fc_build_event_vals( result='success', attempted_login='cron@example.com', ) self.assertEqual(vals['result'], 'success') self.assertEqual(vals['attempted_login'], 'cron@example.com') self.assertEqual(vals['ip_address'], 'internal') self.assertEqual(vals['user_agent_raw'], '') self.assertEqual(vals['geo_lookup_state'], 'internal') self.assertEqual(vals['login_kind'], 'service') self.assertEqual(vals['database'], self.env.cr.dbname) def test_build_event_vals_parses_user_agent(self): """Parser fills browser/os/device_type from a stub UA dict.""" ResUsers = self.env['res.users'] vals = ResUsers._fc_build_event_vals( result='success', attempted_login='ua@example.com', _override_ip='203.0.113.5', _override_ua='Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' 'AppleWebKit/537.36 Chrome/140.0 Safari/537.36', ) self.assertEqual(vals['ip_address'], '203.0.113.5') self.assertIn('Chrome', vals['browser']) self.assertIn('Windows', vals['os']) self.assertEqual(vals['device_type'], 'desktop') self.assertEqual(vals['geo_lookup_state'], 'pending') self.assertEqual(vals['login_kind'], 'interactive') def test_build_event_vals_strips_password(self): """If a credential dict sneaks in, no password leaks into vals.""" ResUsers = self.env['res.users'] vals = ResUsers._fc_build_event_vals( result='failure', attempted_login='leak@example.com', failure_reason='bad_password', _credential={'login': 'leak@example.com', 'password': 'super-secret-pw', 'type': 'password'}, ) serialized = repr(vals) self.assertNotIn('super-secret-pw', serialized) self.assertEqual(vals['failure_reason'], 'bad_password') def test_update_last_login_writes_audit_row(self): """Calling _update_last_login on a user creates a success row.""" user = self.env['res.users'].sudo().create({ 'name': 'Audit Tester', 'login': 'audit-tester@example.com', 'password': 'audit-tester-pw-1', }) Audit = self.env['fusion.login.audit'].sudo() before = Audit.search_count([('user_id', '=', user.id)]) user._update_last_login() after = Audit.search_count([('user_id', '=', user.id)]) self.assertEqual(after, before + 1) row = Audit.search([('user_id', '=', user.id)], order='event_time desc', limit=1) self.assertEqual(row.result, 'success') self.assertEqual(row.attempted_login, user.login) self.assertFalse(row.failure_reason) self.assertEqual(row.database, self.env.cr.dbname) def test_audit_write_failure_does_not_block_login(self): """An exception inside the audit write must not propagate.""" from unittest.mock import patch user = self.env['res.users'].sudo().create({ 'name': 'Resilient Tester', 'login': 'resilient-tester@example.com', 'password': 'resilient-tester-pw-1', }) def boom(self_, vals): raise RuntimeError('simulated audit DB failure') with patch.object(type(self.env['fusion.login.audit']), 'create', boom): # Must not raise. user._update_last_login() def test_bad_password_writes_failure_row(self): """A wrong password creates a result=failure row with failure_reason='bad_password'.""" from odoo.exceptions import AccessDenied user = self.env['res.users'].sudo().create({ 'name': 'Wrongpw Tester', 'login': 'wrongpw-tester@example.com', 'password': 'wrongpw-tester-pw-1', }) Audit = self.env['fusion.login.audit'].sudo() before = Audit.search_count([('attempted_login', '=', user.login), ('result', '=', 'failure')]) # NB: cannot use `self.assertRaises(AccessDenied)` — it opens an extra # savepoint (see odoo/tests/common.py::_assertRaises) that rolls back # the audit row written from inside the override. raised = False try: user._check_credentials( {'login': user.login, 'password': 'definitely-wrong', 'type': 'password'}, {'interactive': False}, ) except AccessDenied: raised = True self.assertTrue(raised, "AccessDenied not raised on wrong password") after = Audit.search_count([('attempted_login', '=', user.login), ('result', '=', 'failure')]) self.assertEqual(after, before + 1) row = Audit.search([('attempted_login', '=', user.login), ('result', '=', 'failure')], order='event_time desc', limit=1) self.assertEqual(row.failure_reason, 'bad_password') self.assertEqual(row.user_id, user) def test_bad_password_never_appears_in_row(self): """The attempted password string never lands in any field.""" from odoo.exceptions import AccessDenied secret = 'NeverInTheRow-9f3a82' user = self.env['res.users'].sudo().create({ 'name': 'Leak Test', 'login': 'leak-test-2@example.com', 'password': 'leak-test-pw-1', }) # NB: manual try/except instead of assertRaises — see note above. raised = False try: user._check_credentials( {'login': user.login, 'password': secret, 'type': 'password'}, {'interactive': False}, ) except AccessDenied: raised = True self.assertTrue(raised, "AccessDenied not raised on wrong password") row = self.env['fusion.login.audit'].sudo().search( [('attempted_login', '=', user.login), ('result', '=', 'failure')], order='event_time desc', limit=1) self.assertTrue(row, "Audit row not created for bad-password attempt") for fname in ('attempted_login', 'failure_reason', 'user_agent_raw', 'browser', 'os', 'ip_address', 'ip_hostname', 'city', 'country_name', 'country_code', 'geo_state', 'database'): self.assertNotIn(secret, (row[fname] or ''), f"Password leaked into field {fname}") def test_unknown_user_writes_failure_row(self): """A login attempt for a username that does not exist gets logged with user_id=NULL and failure_reason='unknown_user'.""" from odoo.exceptions import AccessDenied bogus = 'this-user-does-not-exist@example.com' Audit = self.env['fusion.login.audit'].sudo() before = Audit.search_count([('attempted_login', '=', bogus)]) # NB: manual try/except instead of assertRaises — see comment in # test_bad_password_writes_failure_row. _login is an instance method # in Odoo 19 (not a classmethod as in earlier versions); we call it # on the empty recordset of res.users, which matches what # authenticate() does internally. raised = False try: self.env['res.users']._login( {'login': bogus, 'password': 'whatever', 'type': 'password'}, {'interactive': False}, ) except AccessDenied: raised = True self.assertTrue(raised, "AccessDenied must propagate after the audit write") after = Audit.search_count([('attempted_login', '=', bogus)]) self.assertEqual(after, before + 1) row = Audit.search([('attempted_login', '=', bogus)], order='event_time desc', limit=1) self.assertFalse(row.user_id) self.assertEqual(row.failure_reason, 'unknown_user') self.assertEqual(row.result, 'failure') def test_login_known_user_bad_password_single_row(self): """When _login is the entry point for an existing user with the wrong password, only ONE failure row is written (bad_password from _check_credentials) — NOT two (bad_password + unknown_user). The unknown_user branch must only fire when the login string does not resolve to any user. Regression test for the duplicate-row bug discovered during the production deploy smoke on westin-v19: a single failed login for an existing user was creating two audit rows. """ from odoo.exceptions import AccessDenied user = self.env['res.users'].sudo().create({ 'name': 'NoDupTester', 'login': 'nodup-tester@example.com', 'password': 'nodup-tester-pw-1', }) Audit = self.env['fusion.login.audit'].sudo() before = Audit.search_count([('attempted_login', '=', user.login)]) raised = False try: self.env['res.users']._login( {'login': user.login, 'password': 'wrong-not-the-real-one', 'type': 'password'}, {'interactive': False}, ) except AccessDenied: raised = True self.assertTrue(raised) after = Audit.search_count([('attempted_login', '=', user.login)]) self.assertEqual(after - before, 1, "Exactly one row per failed login attempt — not two") row = Audit.search([('attempted_login', '=', user.login)], order='event_time desc', limit=1) self.assertEqual(row.failure_reason, 'bad_password', "Existing-user failure must record bad_password, " "not unknown_user (the user IS in the system)") def test_computed_last_successful_login(self): """x_fc_last_successful_login reflects the latest success row.""" user = self.env['res.users'].sudo().create({ 'name': 'Compute Tester', 'login': 'compute-tester@example.com', 'password': 'compute-tester-pw-1', }) # Use registry cursor so the audit row survives the transactional # boundary the way the auth-time path does. with self.env.registry.cursor() as audit_cr: from odoo import api audit_env = api.Environment(audit_cr, self.env.uid, self.env.context) audit_env['fusion.login.audit'].sudo().create({ 'user_id': user.id, 'attempted_login': user.login, 'result': 'success', 'database': self.env.cr.dbname, 'ip_address': '198.51.100.42', }) user.invalidate_recordset(['x_fc_last_successful_login', 'x_fc_login_audit_count', 'x_fc_last_login_ip']) self.assertTrue(user.x_fc_last_successful_login) self.assertGreaterEqual(user.x_fc_login_audit_count, 1) self.assertEqual(user.x_fc_last_login_ip, '198.51.100.42') def test_last_login_fields_not_stored(self): """Regression guard for the 2026-06-03 invitation-acceptance hang. x_fc_last_successful_login / x_fc_last_login_ip MUST stay non-stored. When they were store=True (computed from the audit One2many), creating the success audit row through the independent registry cursor forced a write-back onto the very res_users row the request had already locked (auth_signup had just set the password) -> a self-deadlock Postgres cannot see (the holder shows 'idle in transaction'). Workers wedged for up to limit_time_real and odoo-westin became unresponsive whenever an invitation was accepted. Non-stored means audit-row creation never touches res_users, so the deadlock cannot form. """ fields_ = self.env['res.users']._fields self.assertFalse( fields_['x_fc_last_successful_login'].store, "x_fc_last_successful_login must be non-stored (see docstring)") self.assertFalse( fields_['x_fc_last_login_ip'].store, "x_fc_last_login_ip must be non-stored (see docstring)") def test_audit_row_create_does_not_write_res_users(self): """Creating a login-audit row must not write the linked res_users row. This is the behavioural half of the deadlock guard: with the fields non-stored, inserting an audit row for a user leaves that user's write_date untouched (no recompute -> no res_users UPDATE -> nothing to contend with the request's own row lock). """ user = self.env['res.users'].sudo().create({ 'name': 'NoWriteback Tester', 'login': 'nowriteback-tester@example.com', 'password': 'nowriteback-tester-pw-1', }) user.flush_recordset() before = user.write_date self.env['fusion.login.audit'].sudo().create({ 'user_id': user.id, 'attempted_login': user.login, 'result': 'success', 'database': self.env.cr.dbname, 'ip_address': '198.51.100.7', }) user.invalidate_recordset() self.assertEqual( user.write_date, before, "Audit-row create must not write back to res_users") def test_action_view_login_audit_returns_window_action(self): """The smart-button action returns an act_window scoped to this user.""" user = self.env['res.users'].sudo().create({ 'name': 'Action Tester', 'login': 'action-tester@example.com', 'password': 'action-tester-pw-1', }) action = user.action_fc_view_login_audit() self.assertEqual(action['res_model'], 'fusion.login.audit') self.assertEqual(action['type'], 'ir.actions.act_window') # Domain must filter to this user self.assertIn(('user_id', '=', user.id), action['domain']) def test_settings_round_trip(self): """Writing settings persists them via ir.config_parameter.""" Settings = self.env['res.config.settings'].sudo() Settings.create({ 'x_fc_login_audit_retention_days': 90, 'x_fc_login_audit_alert_threshold': 3, 'x_fc_login_audit_alert_window_min': 5, 'x_fc_login_audit_alert_enabled': False, }).execute() ICP = self.env['ir.config_parameter'].sudo() self.assertEqual(ICP.get_param('fusion_login_audit.retention_days'), '90') self.assertEqual(ICP.get_param('fusion_login_audit.alert_threshold'), '3') self.assertEqual(ICP.get_param('fusion_login_audit.alert_window_min'), '5') # Odoo's set_param deletes the row when the value is falsy, so a # Boolean field set to False yields get_param() == False (Python # bool, the default), not the string 'False'. self.assertFalse(ICP.get_param('fusion_login_audit.alert_enabled')) def test_failure_burst_queues_one_email(self): """N consecutive failures (within window) queue exactly one mail.mail.""" from odoo.exceptions import AccessDenied ICP = self.env['ir.config_parameter'].sudo() ICP.set_param('fusion_login_audit.alert_threshold', '3') ICP.set_param('fusion_login_audit.alert_window_min', '15') ICP.set_param('fusion_login_audit.alert_enabled', 'True') # Clear any cooldown leftover from earlier tests. ICP.set_param('fusion_login_audit.last_alert:burst@example.com', '') user = self.env['res.users'].sudo().create({ 'name': 'Burst Tester', 'login': 'burst@example.com', 'password': 'burst-tester-pw-1', }) Mail = self.env['mail.mail'].sudo() before = Mail.search_count([('subject', 'ilike', 'burst@example.com')]) for _i in range(3): raised = False try: user._check_credentials( {'login': user.login, 'password': 'wrong', 'type': 'password'}, {'interactive': False}, ) except AccessDenied: raised = True self.assertTrue(raised) after = Mail.search_count([('subject', 'ilike', 'burst@example.com')]) self.assertEqual(after, before + 1, "Exactly one alert mail should be queued") def test_cooldown_suppresses_second_alert(self): """Failures beyond the threshold within the cooldown queue zero more emails.""" from odoo.exceptions import AccessDenied ICP = self.env['ir.config_parameter'].sudo() ICP.set_param('fusion_login_audit.alert_threshold', '3') ICP.set_param('fusion_login_audit.alert_window_min', '15') ICP.set_param('fusion_login_audit.alert_enabled', 'True') ICP.set_param('fusion_login_audit.last_alert:cool@example.com', '') user = self.env['res.users'].sudo().create({ 'name': 'Cooldown Tester', 'login': 'cool@example.com', 'password': 'cooldown-tester-pw-1', }) Mail = self.env['mail.mail'].sudo() for _i in range(3): try: user._check_credentials( {'login': user.login, 'password': 'wrong', 'type': 'password'}, {'interactive': False}, ) except AccessDenied: pass count_after_3 = Mail.search_count([('subject', 'ilike', 'cool@example.com')]) for _i in range(2): try: user._check_credentials( {'login': user.login, 'password': 'wrong', 'type': 'password'}, {'interactive': False}, ) except AccessDenied: pass count_after_5 = Mail.search_count([('subject', 'ilike', 'cool@example.com')]) self.assertEqual(count_after_5, count_after_3, "Cooldown should suppress additional emails") def test_alert_disabled_master_switch(self): """alert_enabled=False suppresses all alerts regardless of threshold.""" from odoo.exceptions import AccessDenied ICP = self.env['ir.config_parameter'].sudo() ICP.set_param('fusion_login_audit.alert_threshold', '1') ICP.set_param('fusion_login_audit.alert_window_min', '15') # Use the actual boolean field's storage semantics — see CLAUDE.md rule #5. # Writing False through the settings form deletes the param; here we # set the string 'False' explicitly to simulate "disabled". ICP.set_param('fusion_login_audit.alert_enabled', 'False') ICP.set_param('fusion_login_audit.last_alert:disabled@example.com', '') user = self.env['res.users'].sudo().create({ 'name': 'Disabled Tester', 'login': 'disabled@example.com', 'password': 'disabled-tester-pw-1', }) Mail = self.env['mail.mail'].sudo() before = Mail.search_count([('subject', 'ilike', 'disabled@example.com')]) try: user._check_credentials( {'login': user.login, 'password': 'wrong', 'type': 'password'}, {'interactive': False}, ) except AccessDenied: pass after = Mail.search_count([('subject', 'ilike', 'disabled@example.com')]) self.assertEqual(after, before, "Disabled alerts should queue nothing") def test_retention_gc_deletes_old_rows(self): """The GC method deletes rows older than retention_days.""" from datetime import timedelta ICP = self.env['ir.config_parameter'].sudo() ICP.set_param('fusion_login_audit.retention_days', '30') now = fields.Datetime.now() Audit = self.env['fusion.login.audit'].sudo() old = Audit.create({ 'attempted_login': 'gc-old@example.com', 'result': 'success', 'event_time': now - timedelta(days=45), }) recent = Audit.create({ 'attempted_login': 'gc-recent@example.com', 'result': 'success', 'event_time': now - timedelta(days=5), }) old_id, recent_id = old.id, recent.id Audit._fc_retention_gc() self.assertFalse(Audit.browse(old_id).exists(), "Row older than retention_days should be gone") self.assertTrue(Audit.browse(recent_id).exists(), "Row inside retention_days should survive") def test_retention_zero_keeps_forever(self): """retention_days=0 keeps all rows.""" from datetime import timedelta ICP = self.env['ir.config_parameter'].sudo() ICP.set_param('fusion_login_audit.retention_days', '0') now = fields.Datetime.now() Audit = self.env['fusion.login.audit'].sudo() ancient = Audit.create({ 'attempted_login': 'forever@example.com', 'result': 'success', 'event_time': now - timedelta(days=3650), }) ancient_id = ancient.id Audit._fc_retention_gc() self.assertTrue(Audit.browse(ancient_id).exists(), "retention_days=0 must keep everything") def test_geo_private_ip_shortcut(self): """Private IPs short-circuit to state='private_ip' without HTTP.""" Audit = self.env['fusion.login.audit'].sudo() rec = Audit.create({ 'attempted_login': 'lan@example.com', 'result': 'success', 'ip_address': '192.168.1.40', 'geo_lookup_state': 'pending', }) Audit._fc_geo_enrich_pending(limit=10) rec.invalidate_recordset() self.assertEqual(rec.geo_lookup_state, 'private_ip') self.assertEqual(rec.country_code, '--') def test_geo_cache_hit_avoids_http(self): """A second row with the same recent IP copies from cache.""" from unittest.mock import patch Audit = self.env['fusion.login.audit'].sudo() # Seed a "done" row from the same IP. Audit.create({ 'attempted_login': 'seed@example.com', 'result': 'success', 'ip_address': '203.0.113.99', 'geo_lookup_state': 'done', 'country_code': 'CA', 'country_name': 'Canada', 'city': 'Toronto', 'geo_state': 'Ontario', }) target = Audit.create({ 'attempted_login': 'hit@example.com', 'result': 'success', 'ip_address': '203.0.113.99', 'geo_lookup_state': 'pending', }) with patch( 'odoo.addons.fusion_login_audit.models.fusion_login_audit.requests.get' ) as mock_get: Audit._fc_geo_enrich_pending(limit=10) mock_get.assert_not_called() target.invalidate_recordset() self.assertEqual(target.geo_lookup_state, 'done') self.assertEqual(target.country_code, 'CA') self.assertEqual(target.city, 'Toronto') def test_geo_internal_skipped(self): """Rows with geo_lookup_state='internal' are not picked up.""" Audit = self.env['fusion.login.audit'].sudo() rec = Audit.create({ 'attempted_login': 'cron@example.com', 'result': 'success', 'ip_address': 'internal', 'geo_lookup_state': 'internal', }) # Should be a no-op for 'internal' state (cron only picks 'pending'). Audit._fc_geo_enrich_pending(limit=10) rec.invalidate_recordset() self.assertEqual(rec.geo_lookup_state, 'internal')