When the login string resolves to an existing user and the password is
wrong, BOTH overrides used to write a failure row:
- _check_credentials wrapper: result=failure, reason=bad_password
- _login wrapper (catching the propagating AccessDenied): result=
failure, reason=unknown_user
Discovered in production smoke on westin-v19 after the deploy: a
single failed login for info@gsafinancialconsulting.com produced two
audit rows (one bad_password, one unknown_user). The unknown_user
label was wrong — the user IS in the system.
Fix: _login now checks whether the login string resolves to any user
BEFORE writing the unknown_user row. If yes, _check_credentials
already logged the attempt and _login skips. If no, the user lookup
in super() failed and _login is the only chance to log.
Regression test test_login_known_user_bad_password_single_row asserts
exactly one row per attempt and that the row carries bad_password
(not unknown_user) when the user exists.
30 tests green locally; production smoke on westin-v19 confirms:
one row per failed login, bad_password, IP 172.18.0.1 captured.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
393 lines
16 KiB
Python
393 lines
16 KiB
Python
# -*- coding: utf-8 -*-
|
|
import logging
|
|
|
|
from odoo import _, api, fields, models
|
|
from odoo.exceptions import AccessDenied
|
|
|
|
# Top-level import (vs lazy inside the method): if the dep is missing — most
|
|
# likely because the dev container got recreated and dropped its pip install
|
|
# (see CLAUDE.md Workflow) — Odoo crashes at registry load with a clear
|
|
# `ModuleNotFoundError`, not deep in the auth path after the first login.
|
|
from user_agents import parse as ua_parse
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ResUsers(models.Model):
|
|
_inherit = 'res.users'
|
|
|
|
@api.model
|
|
def _fc_build_event_vals(
|
|
self,
|
|
result,
|
|
attempted_login,
|
|
failure_reason=None,
|
|
user_id=None,
|
|
_override_ip=None,
|
|
_override_ua=None,
|
|
_credential=None,
|
|
):
|
|
"""Build the dict of values for a fusion.login.audit row.
|
|
|
|
Pulls IP / User-Agent from the live HTTP request when available.
|
|
Falls back to ('internal', '<no-request>') for XML-RPC / cron-initiated
|
|
auth, with geo_lookup_state='internal' so the geo cron skips them.
|
|
|
|
An empty IP from an otherwise-live request (rare; misconfigured
|
|
reverse proxy) also routes to the 'internal' fallback — an empty
|
|
string isn't useful audit data and is arguably suspicious.
|
|
|
|
The _override_* kwargs exist for tests so we don't have to fake a
|
|
full request. They are NOT a public API.
|
|
|
|
Password safety: `_credential` MAY contain a 'password' key from the
|
|
Odoo auth flow. We never read that key, never log it, never put it
|
|
in vals. The test `test_build_event_vals_strips_password` locks
|
|
this property in via `assertNotIn(secret, repr(vals))`.
|
|
"""
|
|
vals = {
|
|
'attempted_login': (attempted_login or '')[:255],
|
|
'result': result,
|
|
'failure_reason': failure_reason,
|
|
'event_time': fields.Datetime.now(),
|
|
'database': self.env.cr.dbname,
|
|
'user_id': user_id,
|
|
}
|
|
|
|
ip = _override_ip
|
|
ua_str = _override_ua
|
|
|
|
if ip is None or ua_str is None:
|
|
try:
|
|
from odoo.http import request
|
|
if request and getattr(request, 'httprequest', None):
|
|
if ip is None:
|
|
ip = request.httprequest.remote_addr
|
|
if ua_str is None:
|
|
ua_str = request.httprequest.user_agent.string or ''
|
|
except Exception:
|
|
_logger.debug("fusion_login_audit: no request context", exc_info=True)
|
|
|
|
if ip and ua_str is not None:
|
|
ua_text = ua_str or ''
|
|
vals['ip_address'] = ip[:45]
|
|
vals['user_agent_raw'] = ua_text[:512]
|
|
ua = ua_parse(ua_text)
|
|
vals['browser'] = (f"{ua.browser.family} {ua.browser.version_string}".strip())[:64]
|
|
vals['os'] = (f"{ua.os.family} {ua.os.version_string}".strip())[:64]
|
|
if ua.is_bot:
|
|
vals['device_type'] = 'bot'
|
|
elif ua.is_mobile:
|
|
vals['device_type'] = 'mobile'
|
|
elif ua.is_tablet:
|
|
vals['device_type'] = 'tablet'
|
|
elif ua.is_pc:
|
|
vals['device_type'] = 'desktop'
|
|
else:
|
|
vals['device_type'] = 'unknown'
|
|
vals['geo_lookup_state'] = 'pending'
|
|
else:
|
|
vals['ip_address'] = 'internal'
|
|
vals['user_agent_raw'] = '<no-request>'
|
|
vals['device_type'] = 'unknown'
|
|
vals['geo_lookup_state'] = 'internal'
|
|
|
|
# _credential is accepted in the signature so callers (T6 _check_credentials,
|
|
# T7 _login) can hand the dict in without filtering. The helper deliberately
|
|
# touches NO keys from it — see the password-safety note in the docstring.
|
|
# `_credential` is intentionally unread here; the parameter exists so future
|
|
# work can read `credential.get('type')` for `2fa_failed` discrimination
|
|
# only via the explicit failure_reason kwarg, never from the dict directly.
|
|
del _credential # explicit no-op — locks down the read surface
|
|
|
|
return vals
|
|
|
|
def _fc_record_login_event(self, result, failure_reason=None,
|
|
user_id=None, attempted_login=None,
|
|
_credential=None):
|
|
"""Build vals + create the audit row via sudo. Never raises.
|
|
|
|
The row is written through an INDEPENDENT cursor
|
|
(``registry.cursor()``) so that:
|
|
|
|
* A failure-path call from ``_check_credentials`` survives the
|
|
outer transaction rollback that follows ``AccessDenied``
|
|
(the HTTP layer closes the cursor without committing, see
|
|
``odoo/service/model.py:retrying``).
|
|
* A broken audit table can never block a real user from logging
|
|
in: the cursor block is wrapped in try/except; exceptions are
|
|
logged and swallowed.
|
|
|
|
The independent cursor commits on context exit. Note that this
|
|
means the row is durable even if the caller's transaction later
|
|
rolls back — intentional for audit semantics: a recorded bad
|
|
password should NOT disappear because some unrelated downstream
|
|
op blew up.
|
|
"""
|
|
try:
|
|
vals = self._fc_build_event_vals(
|
|
result=result,
|
|
attempted_login=attempted_login
|
|
or (self.login if self else None)
|
|
or 'unknown',
|
|
failure_reason=failure_reason,
|
|
user_id=user_id or (self.id if self else None),
|
|
_credential=_credential,
|
|
)
|
|
with self.env.registry.cursor() as audit_cr:
|
|
audit_env = api.Environment(audit_cr, self.env.uid, self.env.context)
|
|
audit_env['fusion.login.audit'].sudo().with_context(
|
|
mail_create_nolog=True
|
|
).create(vals)
|
|
except Exception:
|
|
_logger.exception(
|
|
"fusion_login_audit: failed to record %s row for %s",
|
|
result, attempted_login or (self.login if self else 'unknown'),
|
|
)
|
|
|
|
def _update_last_login(self):
|
|
result = super()._update_last_login()
|
|
# Self is the singleton recordset of the user that just logged in.
|
|
self._fc_record_login_event(result='success')
|
|
return result
|
|
|
|
def _fc_alert_threshold(self):
|
|
ICP = self.env['ir.config_parameter'].sudo()
|
|
try:
|
|
return max(1, int(ICP.get_param(
|
|
'fusion_login_audit.alert_threshold', 5)))
|
|
except (TypeError, ValueError):
|
|
return 5
|
|
|
|
def _fc_alert_window_min(self):
|
|
ICP = self.env['ir.config_parameter'].sudo()
|
|
try:
|
|
return max(1, int(ICP.get_param(
|
|
'fusion_login_audit.alert_window_min', 15)))
|
|
except (TypeError, ValueError):
|
|
return 15
|
|
|
|
def _fc_alert_enabled(self):
|
|
ICP = self.env['ir.config_parameter'].sudo()
|
|
# CLAUDE.md rule #5: Boolean config_parameter deletes on False.
|
|
# An absent key means True (the default). Explicit 'False' or 'false'
|
|
# means disabled.
|
|
raw = ICP.get_param('fusion_login_audit.alert_enabled', 'True')
|
|
return str(raw).strip().lower() != 'false'
|
|
|
|
def _fc_recent_failure_count(self, attempted_login):
|
|
"""Failures for this attempted_login within the alert window."""
|
|
from datetime import timedelta
|
|
if not attempted_login:
|
|
return 0
|
|
cutoff = fields.Datetime.now() - timedelta(
|
|
minutes=self._fc_alert_window_min())
|
|
return self.env['fusion.login.audit'].sudo().search_count([
|
|
('attempted_login', '=', attempted_login),
|
|
('result', '=', 'failure'),
|
|
('event_time', '>=', cutoff),
|
|
])
|
|
|
|
def _fc_send_failure_alert(self, attempted_login):
|
|
"""Queue one alert mail unless cooldown is active. Cooldown is
|
|
60 minutes, keyed by attempted_login, stored in ir.config_parameter."""
|
|
from datetime import timedelta
|
|
if not self._fc_alert_enabled():
|
|
return
|
|
if not attempted_login:
|
|
return
|
|
ICP = self.env['ir.config_parameter'].sudo()
|
|
cd_key = f'fusion_login_audit.last_alert:{attempted_login}'
|
|
cd_raw = ICP.get_param(cd_key)
|
|
now = fields.Datetime.now()
|
|
if cd_raw:
|
|
try:
|
|
last = fields.Datetime.from_string(cd_raw)
|
|
if last and (now - last) < timedelta(minutes=60):
|
|
return # cooldown active
|
|
except (TypeError, ValueError):
|
|
pass
|
|
|
|
window = self._fc_alert_window_min()
|
|
cutoff = now - timedelta(minutes=window)
|
|
Audit = self.env['fusion.login.audit'].sudo()
|
|
rows = Audit.search([
|
|
('attempted_login', '=', attempted_login),
|
|
('result', '=', 'failure'),
|
|
('event_time', '>=', cutoff),
|
|
], order='event_time desc', limit=20)
|
|
|
|
# Admin recipients: members of base.group_system (the Settings group)
|
|
# who have an email set and are not portal/share users. Note:
|
|
# CLAUDE.md rule #6 — res.groups has no `users` field in Odoo 19, so
|
|
# search res.users by group_ids directly. The __system__ superuser
|
|
# (uid=1) is excluded automatically by Odoo's default user filter.
|
|
admins = self.env['res.users'].sudo().search([
|
|
('group_ids', 'in', self.env.ref('base.group_system').id),
|
|
('email', '!=', False),
|
|
('share', '=', False),
|
|
])
|
|
if not admins:
|
|
return
|
|
|
|
tmpl = self.env.ref(
|
|
'fusion_login_audit.mail_template_failure_burst',
|
|
raise_if_not_found=False)
|
|
if not tmpl:
|
|
return
|
|
|
|
# CLAUDE.md rule #12: in mail.template QWeb, `ctx` IS env.context.
|
|
# So `ctx.get('foo')` resolves to env.context.get('foo'). Pass data
|
|
# by SPREADING keys into the context, not wrapping in a dict.
|
|
# `with_context(ctx=ctx_data)` would silently render an empty subject.
|
|
ctx_data = {
|
|
'attempted_login': attempted_login,
|
|
'failure_count': len(rows),
|
|
'window_min': window,
|
|
'rows': [{
|
|
'event_time': fields.Datetime.to_string(r.event_time),
|
|
'ip_address': r.ip_address or '',
|
|
'country_code': r.country_code or '',
|
|
'browser': r.browser or '',
|
|
'os': r.os or '',
|
|
} for r in rows],
|
|
}
|
|
for admin in admins:
|
|
tmpl.with_context(**ctx_data).send_mail(
|
|
admin.id,
|
|
email_values={'email_to': admin.email,
|
|
'auto_delete': True},
|
|
force_send=False,
|
|
)
|
|
ICP.set_param(cd_key, fields.Datetime.to_string(now))
|
|
|
|
def _check_credentials(self, credential, env):
|
|
try:
|
|
return super()._check_credentials(credential, env)
|
|
except AccessDenied:
|
|
cred_type = (credential or {}).get('type', 'password')
|
|
reason = '2fa_failed' if cred_type == 'totp' else 'bad_password'
|
|
attempted_login = (credential or {}).get('login') or self.login
|
|
self._fc_record_login_event(
|
|
result='failure',
|
|
failure_reason=reason,
|
|
user_id=self.id,
|
|
attempted_login=attempted_login,
|
|
_credential=credential,
|
|
)
|
|
try:
|
|
if self._fc_recent_failure_count(attempted_login) \
|
|
>= self._fc_alert_threshold():
|
|
self._fc_send_failure_alert(attempted_login)
|
|
except Exception:
|
|
_logger.exception(
|
|
"fusion_login_audit: failed to send failure alert")
|
|
raise
|
|
|
|
def _login(self, credential, user_agent_env):
|
|
"""Catch the unknown-user branch of upstream _login.
|
|
|
|
In Odoo 19 ``_login`` is an *instance* method (not a classmethod as in
|
|
earlier versions). Upstream raises ``AccessDenied`` in three cases:
|
|
|
|
1. Unknown login string — ``_assert_can_auth`` or the user-lookup
|
|
``search()`` returns empty → ``_check_credentials`` never fires →
|
|
THIS override is the only chance to record the attempt.
|
|
2. Wrong password — user exists, ``_check_credentials`` raises →
|
|
our ``_check_credentials`` override already wrote a ``bad_password``
|
|
row → re-raise propagates up to here. We MUST NOT write a second
|
|
row.
|
|
3. 2FA failure — same as #2 but ``failure_reason='2fa_failed'``.
|
|
|
|
We distinguish #1 from #2/#3 by checking whether the login string
|
|
resolves to any user. If it does, ``_check_credentials`` ran (and
|
|
already logged); if it doesn't, the user lookup failed and we log
|
|
``unknown_user`` here.
|
|
|
|
``_fc_record_login_event`` writes through an INDEPENDENT cursor
|
|
(``self.env.registry.cursor()``), so the audit row survives the
|
|
outer transaction rollback that follows the re-raised
|
|
``AccessDenied``. Audit-side exceptions never block the re-raise.
|
|
"""
|
|
try:
|
|
return super()._login(credential, user_agent_env)
|
|
except AccessDenied:
|
|
login = (credential or {}).get('login') or ''
|
|
try:
|
|
user_exists = bool(self.sudo().search(
|
|
[('login', '=', login)], limit=1))
|
|
except Exception:
|
|
user_exists = False # be permissive — log the row anyway
|
|
if not user_exists:
|
|
self._fc_record_login_event(
|
|
result='failure',
|
|
failure_reason='unknown_user',
|
|
user_id=False,
|
|
attempted_login=login or 'unknown',
|
|
_credential=credential,
|
|
)
|
|
raise
|
|
|
|
# ──────────────────────────────────────────────────────────────────
|
|
# Per-user surface — fields + action method backing the smart button
|
|
# and "Login Activity" tab on the user form view.
|
|
# ──────────────────────────────────────────────────────────────────
|
|
|
|
x_fc_login_audit_ids = fields.One2many(
|
|
'fusion.login.audit', 'user_id',
|
|
string='Login Activity',
|
|
)
|
|
x_fc_login_audit_count = fields.Integer(
|
|
string='Login Audit Count',
|
|
compute='_compute_x_fc_login_audit_count',
|
|
)
|
|
x_fc_last_successful_login = fields.Datetime(
|
|
string='Last Successful Login',
|
|
compute='_compute_x_fc_last_successful_login',
|
|
store=True,
|
|
)
|
|
x_fc_last_login_ip = fields.Char(
|
|
string='Last Login IP', size=45,
|
|
compute='_compute_x_fc_last_successful_login',
|
|
store=True,
|
|
)
|
|
|
|
@api.depends('x_fc_login_audit_ids')
|
|
def _compute_x_fc_login_audit_count(self):
|
|
# Odoo 19: read_group → _read_group, returns list of tuples
|
|
# (group_key, aggregate_value) when given groupby + aggregates.
|
|
Audit = self.env['fusion.login.audit'].sudo()
|
|
rows = Audit._read_group(
|
|
domain=[('user_id', 'in', self.ids)],
|
|
groupby=['user_id'],
|
|
aggregates=['__count'],
|
|
)
|
|
counts = {user.id: count for user, count in rows}
|
|
for user in self:
|
|
user.x_fc_login_audit_count = counts.get(user.id, 0)
|
|
|
|
@api.depends('x_fc_login_audit_ids.event_time',
|
|
'x_fc_login_audit_ids.result',
|
|
'x_fc_login_audit_ids.ip_address')
|
|
def _compute_x_fc_last_successful_login(self):
|
|
Audit = self.env['fusion.login.audit'].sudo()
|
|
for user in self:
|
|
row = Audit.search(
|
|
[('user_id', '=', user.id), ('result', '=', 'success')],
|
|
order='event_time desc', limit=1,
|
|
)
|
|
user.x_fc_last_successful_login = row.event_time or False
|
|
user.x_fc_last_login_ip = row.ip_address or False
|
|
|
|
def action_fc_view_login_audit(self):
|
|
self.ensure_one()
|
|
return {
|
|
'name': _('Login Activity'),
|
|
'type': 'ir.actions.act_window',
|
|
'res_model': 'fusion.login.audit',
|
|
'view_mode': 'list,form',
|
|
'domain': [('user_id', '=', self.id)],
|
|
'context': {'create': False, 'edit': False, 'delete': False,
|
|
'default_user_id': self.id},
|
|
}
|