From a2d13cf83b48a417ca63344882fdf45d811fee25 Mon Sep 17 00:00:00 2001 From: gsinghpal Date: Tue, 26 May 2026 21:43:18 -0400 Subject: [PATCH] feat(fusion_login_audit): failure-burst alert email + cooldown Mail template + helpers (_fc_alert_*, _fc_recent_failure_count, _fc_send_failure_alert) wired into _check_credentials so that crossing the consecutive-failure threshold within the window queues exactly one mail.mail per attempted login per 60-minute cooldown. Master switch x_fc_login_audit_alert_enabled honoured. Recipients are members of base.group_system with a non-empty email and share=False; the __system__ superuser is excluded by Odoo''s default user filter. Tests (3 new, 22 total green): test_failure_burst_queues_one_email test_cooldown_suppresses_second_alert test_alert_disabled_master_switch setUp ensures base.user_admin has an email (fusion-dev''s admin user ships without one; the only user with an email is __system__, which is filtered out of standard res.users searches). Co-Authored-By: Claude Opus 4.7 (1M context) --- fusion_login_audit/__manifest__.py | 1 + .../data/mail_template_data.xml | 46 +++++++ fusion_login_audit/models/res_users.py | 120 +++++++++++++++++- fusion_login_audit/tests/test_login_audit.py | 108 ++++++++++++++++ 4 files changed, 274 insertions(+), 1 deletion(-) create mode 100644 fusion_login_audit/data/mail_template_data.xml diff --git a/fusion_login_audit/__manifest__.py b/fusion_login_audit/__manifest__.py index d6f5cc06..122256cf 100644 --- a/fusion_login_audit/__manifest__.py +++ b/fusion_login_audit/__manifest__.py @@ -26,6 +26,7 @@ bursts. Daily retention cron honours a configurable horizon. 'data': [ 'security/ir.model.access.csv', 'security/security.xml', + 'data/mail_template_data.xml', 'views/fusion_login_audit_views.xml', 'views/res_users_views.xml', 'views/res_config_settings_views.xml', diff --git a/fusion_login_audit/data/mail_template_data.xml b/fusion_login_audit/data/mail_template_data.xml new file mode 100644 index 00000000..a252fd69 --- /dev/null +++ b/fusion_login_audit/data/mail_template_data.xml @@ -0,0 +1,46 @@ + + + + + + Fusion Login Audit — Failure Burst Alert + + [Login Audit] Failed login attempts for {{ ctx.get('attempted_login') }} + +
+

The login audit detected + failed login attempt(s) + in the last minute(s) for + .

+

Most recent attempts:

+ + + + + + + + + + + + + + +
TimeIPCountryBrowserOS
+ + + + +
+

+ Sent by Fusion Login Audit. Tune the threshold and window in + Settings → General Settings → Login Audit. +

+
+
+
+ +
+
diff --git a/fusion_login_audit/models/res_users.py b/fusion_login_audit/models/res_users.py index b7abca7a..65034bca 100644 --- a/fusion_login_audit/models/res_users.py +++ b/fusion_login_audit/models/res_users.py @@ -151,19 +151,137 @@ class ResUsers(models.Model): self._fc_record_login_event(result='success') return result + def _fc_alert_threshold(self): + ICP = self.env['ir.config_parameter'].sudo() + try: + return max(1, int(ICP.get_param( + 'fusion_login_audit.alert_threshold', 5))) + except (TypeError, ValueError): + return 5 + + def _fc_alert_window_min(self): + ICP = self.env['ir.config_parameter'].sudo() + try: + return max(1, int(ICP.get_param( + 'fusion_login_audit.alert_window_min', 15))) + except (TypeError, ValueError): + return 15 + + def _fc_alert_enabled(self): + ICP = self.env['ir.config_parameter'].sudo() + # CLAUDE.md rule #5: Boolean config_parameter deletes on False. + # An absent key means True (the default). Explicit 'False' or 'false' + # means disabled. + raw = ICP.get_param('fusion_login_audit.alert_enabled', 'True') + return str(raw).strip().lower() != 'false' + + def _fc_recent_failure_count(self, attempted_login): + """Failures for this attempted_login within the alert window.""" + from datetime import timedelta + if not attempted_login: + return 0 + cutoff = fields.Datetime.now() - timedelta( + minutes=self._fc_alert_window_min()) + return self.env['fusion.login.audit'].sudo().search_count([ + ('attempted_login', '=', attempted_login), + ('result', '=', 'failure'), + ('event_time', '>=', cutoff), + ]) + + def _fc_send_failure_alert(self, attempted_login): + """Queue one alert mail unless cooldown is active. Cooldown is + 60 minutes, keyed by attempted_login, stored in ir.config_parameter.""" + from datetime import timedelta + if not self._fc_alert_enabled(): + return + if not attempted_login: + return + ICP = self.env['ir.config_parameter'].sudo() + cd_key = f'fusion_login_audit.last_alert:{attempted_login}' + cd_raw = ICP.get_param(cd_key) + now = fields.Datetime.now() + if cd_raw: + try: + last = fields.Datetime.from_string(cd_raw) + if last and (now - last) < timedelta(minutes=60): + return # cooldown active + except (TypeError, ValueError): + pass + + window = self._fc_alert_window_min() + cutoff = now - timedelta(minutes=window) + Audit = self.env['fusion.login.audit'].sudo() + rows = Audit.search([ + ('attempted_login', '=', attempted_login), + ('result', '=', 'failure'), + ('event_time', '>=', cutoff), + ], order='event_time desc', limit=20) + + # Admin recipients: members of base.group_system (the Settings group) + # who have an email set and are not portal/share users. Note: + # CLAUDE.md rule #6 — res.groups has no `users` field in Odoo 19, so + # search res.users by group_ids directly. The __system__ superuser + # (uid=1) is excluded automatically by Odoo's default user filter. + admins = self.env['res.users'].sudo().search([ + ('group_ids', 'in', self.env.ref('base.group_system').id), + ('email', '!=', False), + ('share', '=', False), + ]) + if not admins: + return + + tmpl = self.env.ref( + 'fusion_login_audit.mail_template_failure_burst', + raise_if_not_found=False) + if not tmpl: + return + + # CLAUDE.md rule #12: in mail.template QWeb, `ctx` IS env.context. + # So `ctx.get('foo')` resolves to env.context.get('foo'). Pass data + # by SPREADING keys into the context, not wrapping in a dict. + # `with_context(ctx=ctx_data)` would silently render an empty subject. + ctx_data = { + 'attempted_login': attempted_login, + 'failure_count': len(rows), + 'window_min': window, + 'rows': [{ + 'event_time': fields.Datetime.to_string(r.event_time), + 'ip_address': r.ip_address or '', + 'country_code': r.country_code or '', + 'browser': r.browser or '', + 'os': r.os or '', + } for r in rows], + } + for admin in admins: + tmpl.with_context(**ctx_data).send_mail( + admin.id, + email_values={'email_to': admin.email, + 'auto_delete': True}, + force_send=False, + ) + ICP.set_param(cd_key, fields.Datetime.to_string(now)) + def _check_credentials(self, credential, env): try: return super()._check_credentials(credential, env) except AccessDenied: cred_type = (credential or {}).get('type', 'password') reason = '2fa_failed' if cred_type == 'totp' else 'bad_password' + attempted_login = (credential or {}).get('login') or self.login self._fc_record_login_event( result='failure', failure_reason=reason, user_id=self.id, - attempted_login=(credential or {}).get('login') or self.login, + attempted_login=attempted_login, _credential=credential, ) + try: + if self._fc_recent_failure_count(attempted_login) \ + >= self._fc_alert_threshold(): + self._fc_send_failure_alert(attempted_login) + except Exception: + _logger.exception( + "fusion_login_audit: failed to send failure alert") raise def _login(self, credential, user_agent_env): diff --git a/fusion_login_audit/tests/test_login_audit.py b/fusion_login_audit/tests/test_login_audit.py index ff5ed1ea..34412a7e 100644 --- a/fusion_login_audit/tests/test_login_audit.py +++ b/fusion_login_audit/tests/test_login_audit.py @@ -15,6 +15,14 @@ class TestFusionLoginAuditModel(TransactionCase): # `registry.cursor()` for a TestCursor that wraps the test cursor. super().setUp() self.registry_enter_test_mode() + # The alert tests below assume at least one admin has an email + # (otherwise the recipient filter empties and no mail is queued). + # In a fresh fusion-dev DB, base.user_admin's email is NULL; the + # superuser (__system__) has an email but is filtered out of normal + # res.users searches. Ensure admin has a usable email. + admin = self.env.ref('base.user_admin') + if not admin.email: + admin.sudo().write({'email': 'admin@test.example.com'}) def test_model_exists_and_creates(self): """Audit row can be created with all expected fields.""" @@ -284,3 +292,103 @@ class TestFusionLoginAuditModel(TransactionCase): # Boolean field set to False yields get_param() == False (Python # bool, the default), not the string 'False'. self.assertFalse(ICP.get_param('fusion_login_audit.alert_enabled')) + + def test_failure_burst_queues_one_email(self): + """N consecutive failures (within window) queue exactly one mail.mail.""" + from odoo.exceptions import AccessDenied + ICP = self.env['ir.config_parameter'].sudo() + ICP.set_param('fusion_login_audit.alert_threshold', '3') + ICP.set_param('fusion_login_audit.alert_window_min', '15') + ICP.set_param('fusion_login_audit.alert_enabled', 'True') + # Clear any cooldown leftover from earlier tests. + ICP.set_param('fusion_login_audit.last_alert:burst@example.com', '') + + user = self.env['res.users'].sudo().create({ + 'name': 'Burst Tester', + 'login': 'burst@example.com', + 'password': 'burst-tester-pw-1', + }) + Mail = self.env['mail.mail'].sudo() + before = Mail.search_count([('subject', 'ilike', 'burst@example.com')]) + for _i in range(3): + raised = False + try: + user._check_credentials( + {'login': user.login, 'password': 'wrong', + 'type': 'password'}, + {'interactive': False}, + ) + except AccessDenied: + raised = True + self.assertTrue(raised) + after = Mail.search_count([('subject', 'ilike', 'burst@example.com')]) + self.assertEqual(after, before + 1, + "Exactly one alert mail should be queued") + + def test_cooldown_suppresses_second_alert(self): + """Failures beyond the threshold within the cooldown queue zero more emails.""" + from odoo.exceptions import AccessDenied + ICP = self.env['ir.config_parameter'].sudo() + ICP.set_param('fusion_login_audit.alert_threshold', '3') + ICP.set_param('fusion_login_audit.alert_window_min', '15') + ICP.set_param('fusion_login_audit.alert_enabled', 'True') + ICP.set_param('fusion_login_audit.last_alert:cool@example.com', '') + + user = self.env['res.users'].sudo().create({ + 'name': 'Cooldown Tester', + 'login': 'cool@example.com', + 'password': 'cooldown-tester-pw-1', + }) + Mail = self.env['mail.mail'].sudo() + for _i in range(3): + try: + user._check_credentials( + {'login': user.login, 'password': 'wrong', + 'type': 'password'}, + {'interactive': False}, + ) + except AccessDenied: + pass + count_after_3 = Mail.search_count([('subject', 'ilike', 'cool@example.com')]) + for _i in range(2): + try: + user._check_credentials( + {'login': user.login, 'password': 'wrong', + 'type': 'password'}, + {'interactive': False}, + ) + except AccessDenied: + pass + count_after_5 = Mail.search_count([('subject', 'ilike', 'cool@example.com')]) + self.assertEqual(count_after_5, count_after_3, + "Cooldown should suppress additional emails") + + def test_alert_disabled_master_switch(self): + """alert_enabled=False suppresses all alerts regardless of threshold.""" + from odoo.exceptions import AccessDenied + ICP = self.env['ir.config_parameter'].sudo() + ICP.set_param('fusion_login_audit.alert_threshold', '1') + ICP.set_param('fusion_login_audit.alert_window_min', '15') + # Use the actual boolean field's storage semantics — see CLAUDE.md rule #5. + # Writing False through the settings form deletes the param; here we + # set the string 'False' explicitly to simulate "disabled". + ICP.set_param('fusion_login_audit.alert_enabled', 'False') + ICP.set_param('fusion_login_audit.last_alert:disabled@example.com', '') + + user = self.env['res.users'].sudo().create({ + 'name': 'Disabled Tester', + 'login': 'disabled@example.com', + 'password': 'disabled-tester-pw-1', + }) + Mail = self.env['mail.mail'].sudo() + before = Mail.search_count([('subject', 'ilike', 'disabled@example.com')]) + try: + user._check_credentials( + {'login': user.login, 'password': 'wrong', + 'type': 'password'}, + {'interactive': False}, + ) + except AccessDenied: + pass + after = Mail.search_count([('subject', 'ilike', 'disabled@example.com')]) + self.assertEqual(after, before, "Disabled alerts should queue nothing")