Mail template + helpers (_fc_alert_*, _fc_recent_failure_count, _fc_send_failure_alert) wired into _check_credentials so that crossing the consecutive-failure threshold within the window queues exactly one mail.mail per attempted login per 60-minute cooldown. Master switch x_fc_login_audit_alert_enabled honoured. Recipients are members of base.group_system with a non-empty email and share=False; the __system__ superuser is excluded by Odoo''s default user filter. Tests (3 new, 22 total green): test_failure_burst_queues_one_email test_cooldown_suppresses_second_alert test_alert_disabled_master_switch setUp ensures base.user_admin has an email (fusion-dev''s admin user ships without one; the only user with an email is __system__, which is filtered out of standard res.users searches). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
395 lines
18 KiB
Python
395 lines
18 KiB
Python
# -*- coding: utf-8 -*-
|
|
from odoo.tests.common import TransactionCase, tagged
|
|
|
|
|
|
@tagged('post_install', '-at_install')
|
|
class TestFusionLoginAuditModel(TransactionCase):
|
|
|
|
def setUp(self):
|
|
# `_fc_record_login_event` uses `registry.cursor()` so that the audit
|
|
# row survives the outer rollback that follows AccessDenied (see
|
|
# res_users.py for the rationale). Inside a TransactionCase that
|
|
# rolls back per test, a fresh cursor on a new connection cannot
|
|
# see uncommitted records (the freshly-created test user FKs into
|
|
# the audit row), so we put the registry in test mode — that swaps
|
|
# `registry.cursor()` for a TestCursor that wraps the test cursor.
|
|
super().setUp()
|
|
self.registry_enter_test_mode()
|
|
# The alert tests below assume at least one admin has an email
|
|
# (otherwise the recipient filter empties and no mail is queued).
|
|
# In a fresh fusion-dev DB, base.user_admin's email is NULL; the
|
|
# superuser (__system__) has an email but is filtered out of normal
|
|
# res.users searches. Ensure admin has a usable email.
|
|
admin = self.env.ref('base.user_admin')
|
|
if not admin.email:
|
|
admin.sudo().write({'email': 'admin@test.example.com'})
|
|
|
|
def test_model_exists_and_creates(self):
|
|
"""Audit row can be created with all expected fields."""
|
|
Audit = self.env['fusion.login.audit'].sudo()
|
|
rec = Audit.create({
|
|
'attempted_login': 'demo@example.com',
|
|
'result': 'success',
|
|
'ip_address': '203.0.113.5',
|
|
'user_agent_raw': 'Mozilla/5.0 Test',
|
|
'browser': 'Test 1.0',
|
|
'os': 'TestOS',
|
|
'device_type': 'desktop',
|
|
'database': self.env.cr.dbname,
|
|
'geo_lookup_state': 'pending',
|
|
})
|
|
self.assertTrue(rec.id)
|
|
self.assertEqual(rec.result, 'success')
|
|
self.assertEqual(rec.geo_lookup_state, 'pending')
|
|
self.assertEqual(rec.database, self.env.cr.dbname)
|
|
self.assertTrue(rec.event_time) # default fires
|
|
|
|
def test_failure_reason_optional(self):
|
|
"""failure_reason is null on success rows."""
|
|
rec = self.env['fusion.login.audit'].sudo().create({
|
|
'attempted_login': 'demo@example.com',
|
|
'result': 'success',
|
|
})
|
|
self.assertFalse(rec.failure_reason)
|
|
|
|
def test_geo_state_internal_value(self):
|
|
"""`internal` is an accepted geo_lookup_state value (distinct from private_ip)."""
|
|
rec = self.env['fusion.login.audit'].sudo().create({
|
|
'attempted_login': 'demo@example.com',
|
|
'result': 'success',
|
|
'geo_lookup_state': 'internal',
|
|
})
|
|
self.assertEqual(rec.geo_lookup_state, 'internal')
|
|
|
|
def test_build_event_vals_with_no_request(self):
|
|
"""Without a live request, geo_lookup_state is 'internal'."""
|
|
ResUsers = self.env['res.users']
|
|
vals = ResUsers._fc_build_event_vals(
|
|
result='success',
|
|
attempted_login='cron@example.com',
|
|
)
|
|
self.assertEqual(vals['result'], 'success')
|
|
self.assertEqual(vals['attempted_login'], 'cron@example.com')
|
|
self.assertEqual(vals['ip_address'], 'internal')
|
|
self.assertEqual(vals['user_agent_raw'], '<no-request>')
|
|
self.assertEqual(vals['geo_lookup_state'], 'internal')
|
|
self.assertEqual(vals['database'], self.env.cr.dbname)
|
|
|
|
def test_build_event_vals_parses_user_agent(self):
|
|
"""Parser fills browser/os/device_type from a stub UA dict."""
|
|
ResUsers = self.env['res.users']
|
|
vals = ResUsers._fc_build_event_vals(
|
|
result='success',
|
|
attempted_login='ua@example.com',
|
|
_override_ip='203.0.113.5',
|
|
_override_ua='Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
|
|
'AppleWebKit/537.36 Chrome/140.0 Safari/537.36',
|
|
)
|
|
self.assertEqual(vals['ip_address'], '203.0.113.5')
|
|
self.assertIn('Chrome', vals['browser'])
|
|
self.assertIn('Windows', vals['os'])
|
|
self.assertEqual(vals['device_type'], 'desktop')
|
|
self.assertEqual(vals['geo_lookup_state'], 'pending')
|
|
|
|
def test_build_event_vals_strips_password(self):
|
|
"""If a credential dict sneaks in, no password leaks into vals."""
|
|
ResUsers = self.env['res.users']
|
|
vals = ResUsers._fc_build_event_vals(
|
|
result='failure',
|
|
attempted_login='leak@example.com',
|
|
failure_reason='bad_password',
|
|
_credential={'login': 'leak@example.com',
|
|
'password': 'super-secret-pw',
|
|
'type': 'password'},
|
|
)
|
|
serialized = repr(vals)
|
|
self.assertNotIn('super-secret-pw', serialized)
|
|
self.assertEqual(vals['failure_reason'], 'bad_password')
|
|
|
|
def test_update_last_login_writes_audit_row(self):
|
|
"""Calling _update_last_login on a user creates a success row."""
|
|
user = self.env['res.users'].sudo().create({
|
|
'name': 'Audit Tester',
|
|
'login': 'audit-tester@example.com',
|
|
'password': 'audit-tester-pw-1',
|
|
})
|
|
Audit = self.env['fusion.login.audit'].sudo()
|
|
before = Audit.search_count([('user_id', '=', user.id)])
|
|
user._update_last_login()
|
|
after = Audit.search_count([('user_id', '=', user.id)])
|
|
self.assertEqual(after, before + 1)
|
|
row = Audit.search([('user_id', '=', user.id)],
|
|
order='event_time desc', limit=1)
|
|
self.assertEqual(row.result, 'success')
|
|
self.assertEqual(row.attempted_login, user.login)
|
|
self.assertFalse(row.failure_reason)
|
|
self.assertEqual(row.database, self.env.cr.dbname)
|
|
|
|
def test_audit_write_failure_does_not_block_login(self):
|
|
"""An exception inside the audit write must not propagate."""
|
|
from unittest.mock import patch
|
|
user = self.env['res.users'].sudo().create({
|
|
'name': 'Resilient Tester',
|
|
'login': 'resilient-tester@example.com',
|
|
'password': 'resilient-tester-pw-1',
|
|
})
|
|
|
|
def boom(self_, vals):
|
|
raise RuntimeError('simulated audit DB failure')
|
|
|
|
with patch.object(type(self.env['fusion.login.audit']),
|
|
'create', boom):
|
|
# Must not raise.
|
|
user._update_last_login()
|
|
|
|
def test_bad_password_writes_failure_row(self):
|
|
"""A wrong password creates a result=failure row with failure_reason='bad_password'."""
|
|
from odoo.exceptions import AccessDenied
|
|
user = self.env['res.users'].sudo().create({
|
|
'name': 'Wrongpw Tester',
|
|
'login': 'wrongpw-tester@example.com',
|
|
'password': 'wrongpw-tester-pw-1',
|
|
})
|
|
Audit = self.env['fusion.login.audit'].sudo()
|
|
before = Audit.search_count([('attempted_login', '=', user.login),
|
|
('result', '=', 'failure')])
|
|
# NB: cannot use `self.assertRaises(AccessDenied)` — it opens an extra
|
|
# savepoint (see odoo/tests/common.py::_assertRaises) that rolls back
|
|
# the audit row written from inside the override.
|
|
raised = False
|
|
try:
|
|
user._check_credentials(
|
|
{'login': user.login, 'password': 'definitely-wrong',
|
|
'type': 'password'},
|
|
{'interactive': False},
|
|
)
|
|
except AccessDenied:
|
|
raised = True
|
|
self.assertTrue(raised, "AccessDenied not raised on wrong password")
|
|
after = Audit.search_count([('attempted_login', '=', user.login),
|
|
('result', '=', 'failure')])
|
|
self.assertEqual(after, before + 1)
|
|
row = Audit.search([('attempted_login', '=', user.login),
|
|
('result', '=', 'failure')],
|
|
order='event_time desc', limit=1)
|
|
self.assertEqual(row.failure_reason, 'bad_password')
|
|
self.assertEqual(row.user_id, user)
|
|
|
|
def test_bad_password_never_appears_in_row(self):
|
|
"""The attempted password string never lands in any field."""
|
|
from odoo.exceptions import AccessDenied
|
|
secret = 'NeverInTheRow-9f3a82'
|
|
user = self.env['res.users'].sudo().create({
|
|
'name': 'Leak Test',
|
|
'login': 'leak-test-2@example.com',
|
|
'password': 'leak-test-pw-1',
|
|
})
|
|
# NB: manual try/except instead of assertRaises — see note above.
|
|
raised = False
|
|
try:
|
|
user._check_credentials(
|
|
{'login': user.login, 'password': secret, 'type': 'password'},
|
|
{'interactive': False},
|
|
)
|
|
except AccessDenied:
|
|
raised = True
|
|
self.assertTrue(raised, "AccessDenied not raised on wrong password")
|
|
row = self.env['fusion.login.audit'].sudo().search(
|
|
[('attempted_login', '=', user.login),
|
|
('result', '=', 'failure')],
|
|
order='event_time desc', limit=1)
|
|
self.assertTrue(row, "Audit row not created for bad-password attempt")
|
|
for fname in ('attempted_login', 'failure_reason', 'user_agent_raw',
|
|
'browser', 'os', 'ip_address', 'ip_hostname',
|
|
'city', 'country_name', 'country_code', 'geo_state',
|
|
'database'):
|
|
self.assertNotIn(secret, (row[fname] or ''),
|
|
f"Password leaked into field {fname}")
|
|
|
|
def test_unknown_user_writes_failure_row(self):
|
|
"""A login attempt for a username that does not exist gets logged
|
|
with user_id=NULL and failure_reason='unknown_user'."""
|
|
from odoo.exceptions import AccessDenied
|
|
bogus = 'this-user-does-not-exist@example.com'
|
|
Audit = self.env['fusion.login.audit'].sudo()
|
|
before = Audit.search_count([('attempted_login', '=', bogus)])
|
|
# NB: manual try/except instead of assertRaises — see comment in
|
|
# test_bad_password_writes_failure_row. _login is an instance method
|
|
# in Odoo 19 (not a classmethod as in earlier versions); we call it
|
|
# on the empty recordset of res.users, which matches what
|
|
# authenticate() does internally.
|
|
raised = False
|
|
try:
|
|
self.env['res.users']._login(
|
|
{'login': bogus, 'password': 'whatever',
|
|
'type': 'password'},
|
|
{'interactive': False},
|
|
)
|
|
except AccessDenied:
|
|
raised = True
|
|
self.assertTrue(raised, "AccessDenied must propagate after the audit write")
|
|
after = Audit.search_count([('attempted_login', '=', bogus)])
|
|
self.assertEqual(after, before + 1)
|
|
row = Audit.search([('attempted_login', '=', bogus)],
|
|
order='event_time desc', limit=1)
|
|
self.assertFalse(row.user_id)
|
|
self.assertEqual(row.failure_reason, 'unknown_user')
|
|
self.assertEqual(row.result, 'failure')
|
|
|
|
def test_computed_last_successful_login(self):
|
|
"""x_fc_last_successful_login reflects the latest success row."""
|
|
user = self.env['res.users'].sudo().create({
|
|
'name': 'Compute Tester',
|
|
'login': 'compute-tester@example.com',
|
|
'password': 'compute-tester-pw-1',
|
|
})
|
|
# Use registry cursor so the audit row survives the transactional
|
|
# boundary the way the auth-time path does.
|
|
with self.env.registry.cursor() as audit_cr:
|
|
from odoo import api
|
|
audit_env = api.Environment(audit_cr, self.env.uid, self.env.context)
|
|
audit_env['fusion.login.audit'].sudo().create({
|
|
'user_id': user.id,
|
|
'attempted_login': user.login,
|
|
'result': 'success',
|
|
'database': self.env.cr.dbname,
|
|
'ip_address': '198.51.100.42',
|
|
})
|
|
user.invalidate_recordset(['x_fc_last_successful_login',
|
|
'x_fc_login_audit_count',
|
|
'x_fc_last_login_ip'])
|
|
self.assertTrue(user.x_fc_last_successful_login)
|
|
self.assertGreaterEqual(user.x_fc_login_audit_count, 1)
|
|
self.assertEqual(user.x_fc_last_login_ip, '198.51.100.42')
|
|
|
|
def test_action_view_login_audit_returns_window_action(self):
|
|
"""The smart-button action returns an act_window scoped to this user."""
|
|
user = self.env['res.users'].sudo().create({
|
|
'name': 'Action Tester',
|
|
'login': 'action-tester@example.com',
|
|
'password': 'action-tester-pw-1',
|
|
})
|
|
action = user.action_fc_view_login_audit()
|
|
self.assertEqual(action['res_model'], 'fusion.login.audit')
|
|
self.assertEqual(action['type'], 'ir.actions.act_window')
|
|
# Domain must filter to this user
|
|
self.assertIn(('user_id', '=', user.id), action['domain'])
|
|
|
|
def test_settings_round_trip(self):
|
|
"""Writing settings persists them via ir.config_parameter."""
|
|
Settings = self.env['res.config.settings'].sudo()
|
|
Settings.create({
|
|
'x_fc_login_audit_retention_days': 90,
|
|
'x_fc_login_audit_alert_threshold': 3,
|
|
'x_fc_login_audit_alert_window_min': 5,
|
|
'x_fc_login_audit_alert_enabled': False,
|
|
}).execute()
|
|
ICP = self.env['ir.config_parameter'].sudo()
|
|
self.assertEqual(ICP.get_param('fusion_login_audit.retention_days'), '90')
|
|
self.assertEqual(ICP.get_param('fusion_login_audit.alert_threshold'), '3')
|
|
self.assertEqual(ICP.get_param('fusion_login_audit.alert_window_min'), '5')
|
|
# Odoo's set_param deletes the row when the value is falsy, so a
|
|
# Boolean field set to False yields get_param() == False (Python
|
|
# bool, the default), not the string 'False'.
|
|
self.assertFalse(ICP.get_param('fusion_login_audit.alert_enabled'))
|
|
|
|
def test_failure_burst_queues_one_email(self):
|
|
"""N consecutive failures (within window) queue exactly one mail.mail."""
|
|
from odoo.exceptions import AccessDenied
|
|
ICP = self.env['ir.config_parameter'].sudo()
|
|
ICP.set_param('fusion_login_audit.alert_threshold', '3')
|
|
ICP.set_param('fusion_login_audit.alert_window_min', '15')
|
|
ICP.set_param('fusion_login_audit.alert_enabled', 'True')
|
|
# Clear any cooldown leftover from earlier tests.
|
|
ICP.set_param('fusion_login_audit.last_alert:burst@example.com', '')
|
|
|
|
user = self.env['res.users'].sudo().create({
|
|
'name': 'Burst Tester',
|
|
'login': 'burst@example.com',
|
|
'password': 'burst-tester-pw-1',
|
|
})
|
|
Mail = self.env['mail.mail'].sudo()
|
|
before = Mail.search_count([('subject', 'ilike', 'burst@example.com')])
|
|
for _i in range(3):
|
|
raised = False
|
|
try:
|
|
user._check_credentials(
|
|
{'login': user.login, 'password': 'wrong',
|
|
'type': 'password'},
|
|
{'interactive': False},
|
|
)
|
|
except AccessDenied:
|
|
raised = True
|
|
self.assertTrue(raised)
|
|
after = Mail.search_count([('subject', 'ilike', 'burst@example.com')])
|
|
self.assertEqual(after, before + 1,
|
|
"Exactly one alert mail should be queued")
|
|
|
|
def test_cooldown_suppresses_second_alert(self):
|
|
"""Failures beyond the threshold within the cooldown queue zero more emails."""
|
|
from odoo.exceptions import AccessDenied
|
|
ICP = self.env['ir.config_parameter'].sudo()
|
|
ICP.set_param('fusion_login_audit.alert_threshold', '3')
|
|
ICP.set_param('fusion_login_audit.alert_window_min', '15')
|
|
ICP.set_param('fusion_login_audit.alert_enabled', 'True')
|
|
ICP.set_param('fusion_login_audit.last_alert:cool@example.com', '')
|
|
|
|
user = self.env['res.users'].sudo().create({
|
|
'name': 'Cooldown Tester',
|
|
'login': 'cool@example.com',
|
|
'password': 'cooldown-tester-pw-1',
|
|
})
|
|
Mail = self.env['mail.mail'].sudo()
|
|
for _i in range(3):
|
|
try:
|
|
user._check_credentials(
|
|
{'login': user.login, 'password': 'wrong',
|
|
'type': 'password'},
|
|
{'interactive': False},
|
|
)
|
|
except AccessDenied:
|
|
pass
|
|
count_after_3 = Mail.search_count([('subject', 'ilike', 'cool@example.com')])
|
|
for _i in range(2):
|
|
try:
|
|
user._check_credentials(
|
|
{'login': user.login, 'password': 'wrong',
|
|
'type': 'password'},
|
|
{'interactive': False},
|
|
)
|
|
except AccessDenied:
|
|
pass
|
|
count_after_5 = Mail.search_count([('subject', 'ilike', 'cool@example.com')])
|
|
self.assertEqual(count_after_5, count_after_3,
|
|
"Cooldown should suppress additional emails")
|
|
|
|
def test_alert_disabled_master_switch(self):
|
|
"""alert_enabled=False suppresses all alerts regardless of threshold."""
|
|
from odoo.exceptions import AccessDenied
|
|
ICP = self.env['ir.config_parameter'].sudo()
|
|
ICP.set_param('fusion_login_audit.alert_threshold', '1')
|
|
ICP.set_param('fusion_login_audit.alert_window_min', '15')
|
|
# Use the actual boolean field's storage semantics — see CLAUDE.md rule #5.
|
|
# Writing False through the settings form deletes the param; here we
|
|
# set the string 'False' explicitly to simulate "disabled".
|
|
ICP.set_param('fusion_login_audit.alert_enabled', 'False')
|
|
ICP.set_param('fusion_login_audit.last_alert:disabled@example.com', '')
|
|
|
|
user = self.env['res.users'].sudo().create({
|
|
'name': 'Disabled Tester',
|
|
'login': 'disabled@example.com',
|
|
'password': 'disabled-tester-pw-1',
|
|
})
|
|
Mail = self.env['mail.mail'].sudo()
|
|
before = Mail.search_count([('subject', 'ilike', 'disabled@example.com')])
|
|
try:
|
|
user._check_credentials(
|
|
{'login': user.login, 'password': 'wrong',
|
|
'type': 'password'},
|
|
{'interactive': False},
|
|
)
|
|
except AccessDenied:
|
|
pass
|
|
after = Mail.search_count([('subject', 'ilike', 'disabled@example.com')])
|
|
self.assertEqual(after, before, "Disabled alerts should queue nothing")
|