-
-
-
-
-
×
+
+
+ Subject *
+
+
+
+
+
+ Your email
+ we'll reply here — edit if you'd like replies elsewhere
+
+
+
+
+
+
+
+
+
+
+
+ Error code / traceback
+ paste any error message or stack trace
+
+
+
+
+
+
Attachments
+
+
+ Attach files
+
+
+
+
+ Capturing…
+ Capture screenshot
+
+
+
+
+
+
+
+
+
+
+ Thanks — ticket
+
#
+ created
with attachment(s) .
+ You'll get replies by email, and can follow up under
My Tickets .
+
+
+
+ attachment(s) could not be uploaded.
+ Open the ticket from My Tickets and add them there.
+
+
+
+
+
+
+ Mine
+ All (deployment)
+
+
+
+ Loading your tickets…
+
+
+
+
+
+
+ No tickets yet. Use the New tab to report a bug or request a feature.
+
+
-
-
-
-
-
-
- Thanks — ticket
-
- #
- created
with attachment(s) .
+
+
+
+ Loading…
+
+
+
+
+
+
+ No messages yet.
+
+
+
+
+
+
+
+
+
+ attachment(s) —
+ open the full ticket to download.
+
+
+
+
+
+
+
+
+
+ Your reply
+
+
+
+
-
-
-
- Submit
-
-
- Close
-
+
+
+
+
+ Submit
+
+ Close
+
+
+
+
+
+ Send reply
+
+
+ Back
+
+
+
+ Close
+
diff --git a/fusion_helpdesk/static/src/xml/fusion_helpdesk_systray.xml b/fusion_helpdesk/static/src/xml/fusion_helpdesk_systray.xml
index 4aefb028..9d9bbdfe 100644
--- a/fusion_helpdesk/static/src/xml/fusion_helpdesk_systray.xml
+++ b/fusion_helpdesk/static/src/xml/fusion_helpdesk_systray.xml
@@ -5,11 +5,14 @@
+
diff --git a/fusion_helpdesk/tests/__init__.py b/fusion_helpdesk/tests/__init__.py
new file mode 100644
index 00000000..ef0e8181
--- /dev/null
+++ b/fusion_helpdesk/tests/__init__.py
@@ -0,0 +1,3 @@
+# -*- coding: utf-8 -*-
+from . import test_utils
+from . import test_seen
diff --git a/fusion_helpdesk/tests/test_seen.py b/fusion_helpdesk/tests/test_seen.py
new file mode 100644
index 00000000..e3862232
--- /dev/null
+++ b/fusion_helpdesk/tests/test_seen.py
@@ -0,0 +1,27 @@
+# -*- coding: utf-8 -*-
+# Copyright 2026 Nexa Systems Inc.
+# License OPL-1
+"""Tests for fusion.helpdesk.ticket.seen read-tracking."""
+from odoo.tests import TransactionCase, tagged
+
+
+@tagged('post_install', '-at_install', 'fusion_helpdesk')
+class TestSeen(TransactionCase):
+
+ def test_mark_seen_upserts_and_is_monotonic(self):
+ Seen = self.env['fusion.helpdesk.ticket.seen']
+ Seen._mark_seen(central_ticket_id=42, last_message_id=100)
+ Seen._mark_seen(central_ticket_id=42, last_message_id=120)
+ Seen._mark_seen(central_ticket_id=42, last_message_id=90) # stale, ignored
+ rec = Seen.search([
+ ('user_id', '=', self.env.uid),
+ ('central_ticket_id', '=', 42),
+ ])
+ self.assertEqual(len(rec), 1, "should upsert, not duplicate")
+ self.assertEqual(rec.last_seen_message_id, 120, "monotonic — never moves back")
+
+ def test_seen_map(self):
+ Seen = self.env['fusion.helpdesk.ticket.seen']
+ Seen._mark_seen(1, 10)
+ Seen._mark_seen(2, 20)
+ self.assertEqual(Seen._seen_map([1, 2, 3]), {1: 10, 2: 20})
diff --git a/fusion_helpdesk/tests/test_utils.py b/fusion_helpdesk/tests/test_utils.py
new file mode 100644
index 00000000..3943b68e
--- /dev/null
+++ b/fusion_helpdesk/tests/test_utils.py
@@ -0,0 +1,131 @@
+# -*- coding: utf-8 -*-
+# Copyright 2026 Nexa Systems Inc.
+# License OPL-1
+"""Unit tests for the pure helpers in fusion_helpdesk.utils.
+
+These need no live central Odoo — they pin the identity keystone, the
+scoping security boundary, the public-message filter and the unread
+maths as plain data transformations.
+"""
+from odoo.tests import TransactionCase, tagged
+
+from odoo.addons.fusion_helpdesk.utils import (
+ build_ticket_vals,
+ build_scope_domain,
+ is_public_message,
+ compute_unread_count,
+ _norm_email,
+)
+
+
+@tagged('post_install', '-at_install', 'fusion_helpdesk')
+class TestBuildTicketVals(TransactionCase):
+
+ def test_identity_fields_present(self):
+ vals = build_ticket_vals(
+ kind='bug', subject='X', body_html='
b
',
+ team_id=1, client_label='ENTECH',
+ reporter_name='John Doe', reporter_email='john@entech.com',
+ company_name='ENTECH Inc',
+ )
+ self.assertEqual(vals['partner_email'], 'john@entech.com')
+ self.assertEqual(vals['partner_name'], 'John Doe')
+ self.assertEqual(vals['x_fc_client_label'], 'ENTECH')
+ self.assertEqual(vals['partner_company_name'], 'ENTECH Inc')
+ self.assertEqual(vals['team_id'], 1)
+ self.assertIn('X', vals['name'])
+ self.assertIn('[ENTECH]', vals['name'])
+
+ def test_no_email_omits_partner_email(self):
+ vals = build_ticket_vals(
+ kind='feature', subject='Y', body_html='
b
',
+ team_id=False, client_label='', reporter_name='Jane',
+ reporter_email='', company_name='',
+ )
+ self.assertNotIn('partner_email', vals) # never send an empty email
+ self.assertNotIn('team_id', vals) # omit falsy team
+ self.assertNotIn('x_fc_client_label', vals) # omit empty label
+ self.assertEqual(vals['partner_name'], 'Jane')
+ self.assertIn('Feature Request', vals['name'])
+
+
+@tagged('post_install', '-at_install', 'fusion_helpdesk')
+class TestScopeDomain(TransactionCase):
+
+ def test_regular_scope_binds_email_and_label(self):
+ dom = build_scope_domain(label='ENTECH', email='john@entech.com', is_admin=False)
+ self.assertIn(('x_fc_client_label', '=', 'ENTECH'), dom)
+ self.assertIn(('partner_email', '=ilike', 'john@entech.com'), dom)
+
+ def test_admin_scope_binds_label_only(self):
+ dom = build_scope_domain(label='ENTECH', email='a@entech.com', is_admin=True)
+ self.assertIn(('x_fc_client_label', '=', 'ENTECH'), dom)
+ self.assertFalse(any(t[0] == 'partner_email' for t in dom))
+
+ def test_empty_label_never_matches_everything(self):
+ dom = build_scope_domain(label='', email='', is_admin=True)
+ # label term must be present and must NOT be an empty string
+ label_terms = [t for t in dom if t[0] == 'x_fc_client_label']
+ self.assertEqual(len(label_terms), 1)
+ self.assertNotEqual(label_terms[0][2], '')
+
+ def test_wildcard_email_cannot_widen_scope(self):
+ # IDOR guard: a self-set email of '%' must NOT become a match-all
+ # =ilike term — the wildcard has to be escaped to a literal.
+ dom = build_scope_domain(label='ENTECH', email='%', is_admin=False)
+ email_terms = [t for t in dom if t[0] == 'partner_email']
+ self.assertEqual(len(email_terms), 1)
+ self.assertEqual(email_terms[0][2], '\\%',
+ "'%' must be escaped so ILIKE matches it literally")
+
+ def test_underscore_in_real_email_is_escaped_but_preserved(self):
+ dom = build_scope_domain(label='ENTECH', email='john_doe@x.com', is_admin=False)
+ email_terms = [t for t in dom if t[0] == 'partner_email']
+ self.assertEqual(email_terms[0][2], 'john\\_doe@x.com')
+
+
+@tagged('post_install', '-at_install', 'fusion_helpdesk')
+class TestMessageFilterAndUnread(TransactionCase):
+
+ def test_internal_note_is_not_public(self):
+ self.assertFalse(is_public_message({'subtype_is_internal': True}))
+ self.assertTrue(is_public_message({'subtype_is_internal': False}))
+ self.assertTrue(is_public_message({})) # default visible
+
+ def test_unread_count(self):
+ tickets = [
+ {'id': 1, 'last_support_msg_id': 10}, # seen 10 -> read
+ {'id': 2, 'last_support_msg_id': 5}, # seen 3 -> unread
+ {'id': 3, 'last_support_msg_id': 0}, # no support msg
+ ]
+ seen = {1: 10, 2: 3}
+ self.assertEqual(compute_unread_count(tickets, seen), 1)
+
+ def test_unread_count_unseen_ticket_counts(self):
+ tickets = [{'id': 9, 'last_support_msg_id': 4}]
+ self.assertEqual(compute_unread_count(tickets, {}), 1)
+
+
+@tagged('post_install', '-at_install', 'fusion_helpdesk')
+class TestNormEmail(TransactionCase):
+
+ def test_valid_email_is_normalised_lowercase(self):
+ self.assertEqual(_norm_email('John@Entech.COM'), 'john@entech.com')
+
+ def test_first_valid_candidate_wins(self):
+ # confirmed reply email empty -> fall back to the next valid one
+ self.assertEqual(_norm_email('', 'not an email', 'jane@x.com'), 'jane@x.com')
+
+ def test_wildcard_is_rejected(self):
+ # IDOR guard: a self-set '%' must not survive as a scope key
+ self.assertEqual(_norm_email('%'), '')
+
+ def test_non_email_login_falls_through_to_empty(self):
+ self.assertEqual(_norm_email('admin', 'also-not-email', ''), '')
+
+ def test_controller_namespace_resolves_norm_email(self):
+ # Regression: _norm_email was called in controllers/main.py
+ # (submit + _identity) but never imported/defined -> NameError on
+ # every inbox endpoint. Guard that the name is resolvable there.
+ from odoo.addons.fusion_helpdesk.controllers import main
+ self.assertTrue(hasattr(main, '_norm_email'))
diff --git a/fusion_helpdesk/utils.py b/fusion_helpdesk/utils.py
new file mode 100644
index 00000000..1c0bcc02
--- /dev/null
+++ b/fusion_helpdesk/utils.py
@@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+# Copyright 2026 Nexa Systems Inc.
+# License OPL-1
+"""Pure helpers for fusion_helpdesk.
+
+No Odoo environment, no `request` — just data in, data out. Everything
+here is unit-testable in isolation, which is what lets us validate the
+identity keystone, the server-side scoping boundary, the public-message
+filter and the unread maths without a live central Odoo to talk to.
+"""
+from odoo.tools import email_normalize
+
+# Sentinel used so a missing label/email can never widen a domain to
+# "match everything". An empty string in `=`/`=ilike` would match rows
+# whose field is also empty; '__none__' will simply match nothing.
+_NO_MATCH = '__none__'
+
+
+def escape_like(value):
+ """Escape SQL LIKE/ILIKE wildcards so a user-supplied value can never
+ widen an `=ilike` match to other rows.
+
+ `res.users.email` is self-writeable and unvalidated, so without this a
+ user could set their email to ``%`` and have ``partner_email =ilike '%'``
+ match EVERY ticket in their deployment (a cross-user IDOR). Escaping the
+ backslash first, then ``%`` and ``_``, makes those characters match
+ literally. Real emails containing ``_`` (e.g. ``john_doe@x.com``) keep
+ working — the underscore is matched as a literal, which is what we want.
+ """
+ return (value or '').replace('\\', '\\\\').replace('%', '\\%').replace('_', '\\_')
+
+
+def _norm_email(*candidates):
+ """Return the first candidate that normalises to a valid email, else ''.
+
+ Used to derive the inbox scope key from a chain of fallbacks (the
+ confirmed reply email -> ``user.email`` -> ``user.login``).
+ ``email_normalize`` lowercases the address and returns a falsy value for
+ anything that is not exactly one valid email — including a self-set
+ wildcard like ``%`` — so the value fed into ``build_scope_domain`` can
+ never widen the scope. Pairs with :func:`escape_like` as defense in depth
+ against the ``partner_email =ilike`` IDOR.
+ """
+ for candidate in candidates:
+ normalized = email_normalize(candidate or '')
+ if normalized:
+ return normalized
+ return ''
+
+
+def build_ticket_vals(kind, subject, body_html, team_id, client_label,
+ reporter_name, reporter_email, company_name):
+ """Construct the `helpdesk.ticket` create vals for a forwarded report.
+
+ The identity fields (`partner_email`, `partner_name`,
+ `partner_company_name`) drive native helpdesk find-or-create of the
+ customer partner + follower subscription on the central Odoo, and
+ `x_fc_client_label` tags the deployment for the scoped inbox.
+ """
+ kind_label = 'Bug Report' if kind == 'bug' else 'Feature Request'
+ prefix = ('[%s] ' % client_label) if client_label else ''
+ vals = {
+ 'name': '%s%s: %s' % (prefix, kind_label, subject or '(untitled)'),
+ 'description': body_html,
+ 'partner_name': reporter_name or '',
+ }
+ if team_id:
+ vals['team_id'] = team_id
+ if reporter_email:
+ vals['partner_email'] = reporter_email
+ if company_name:
+ vals['partner_company_name'] = company_name
+ if client_label:
+ vals['x_fc_client_label'] = client_label
+ return vals
+
+
+def build_scope_domain(label, email, is_admin):
+ """Server-side ticket scope for the embedded inbox.
+
+ `x_fc_client_label` is ALWAYS bound (defense in depth) so neither a
+ regular user nor a deployment admin can ever read another
+ deployment's tickets — even though the shared bot can technically see
+ every ticket on the central Odoo. Regular users are additionally
+ bound to their own `partner_email`.
+ """
+ domain = [('x_fc_client_label', '=', label or _NO_MATCH)]
+ if not is_admin:
+ safe_email = escape_like(email)
+ domain.append(('partner_email', '=ilike', safe_email or _NO_MATCH))
+ return domain
+
+
+def is_public_message(msg):
+ """True when a message is customer-visible (not an internal note).
+
+ `msg` is a plain dict carrying a `subtype_is_internal` flag resolved
+ from the central `mail.message.subtype`. Internal notes must never be
+ shown to a client in the embedded inbox.
+ """
+ return not msg.get('subtype_is_internal', False)
+
+
+def compute_unread_count(tickets, seen_by_id):
+ """Number of tickets with a support reply the user hasn't seen.
+
+ `tickets` is a list of dicts each carrying `id` and
+ `last_support_msg_id` (id of the latest customer-visible support
+ message, 0 if none). `seen_by_id` maps central ticket id -> last
+ message id the user has seen (absent => 0 baseline).
+ """
+ count = 0
+ for ticket in tickets:
+ last = ticket.get('last_support_msg_id') or 0
+ if last and last > (seen_by_id.get(ticket['id']) or 0):
+ count += 1
+ return count
diff --git a/fusion_helpdesk_central/__manifest__.py b/fusion_helpdesk_central/__manifest__.py
index e1199021..8f2f61e7 100644
--- a/fusion_helpdesk_central/__manifest__.py
+++ b/fusion_helpdesk_central/__manifest__.py
@@ -3,7 +3,7 @@
# License OPL-1
{
'name': 'Fusion Helpdesk Central — Client API Keys',
- 'version': '19.0.1.0.2',
+ 'version': '19.0.1.1.0',
'category': 'Productivity',
'summary': 'Admin UI on the central Odoo for issuing per-client API '
'keys used by fusion_helpdesk client deployments.',
@@ -28,7 +28,9 @@ Depends only on `helpdesk`. No client-side install needed.
'data': [
'security/ir.model.access.csv',
'data/ir_config_parameter_data.xml',
+ 'data/mail_template_ack.xml',
'views/fusion_helpdesk_client_key_views.xml',
+ 'views/helpdesk_ticket_views.xml',
],
'installable': True,
'auto_install': False,
diff --git a/fusion_helpdesk_central/data/mail_template_ack.xml b/fusion_helpdesk_central/data/mail_template_ack.xml
new file mode 100644
index 00000000..7efdfa6a
--- /dev/null
+++ b/fusion_helpdesk_central/data/mail_template_ack.xml
@@ -0,0 +1,55 @@
+
+
+
+
+
+ Helpdesk: Ticket Acknowledgement (Fusion)
+
+ We received your request [{{ object.ticket_ref or object.id }}]
+ {{ object.partner_id.id }}
+ {{ object.partner_id.lang }}
+
+
+
+
Hello ,
+
+ Thanks for reaching out — we've received your request and our
+ support team will be in touch. Here are the details:
+
+
+
+ Reference
+
+
+
+ Subject
+
+
+
+
+
+ View & track your ticket
+
+
+
+ You can reply directly to this email to add information, follow up
+ from the link above, or sign up for an account from that page to
+ manage all of your requests in one place.
+
+
—
+
+
+
+
+
diff --git a/fusion_helpdesk_central/models/__init__.py b/fusion_helpdesk_central/models/__init__.py
index 91073a9b..61b5123a 100644
--- a/fusion_helpdesk_central/models/__init__.py
+++ b/fusion_helpdesk_central/models/__init__.py
@@ -1,2 +1,3 @@
# -*- coding: utf-8 -*-
from . import fusion_helpdesk_client_key
+from . import helpdesk_ticket
diff --git a/fusion_helpdesk_central/models/helpdesk_ticket.py b/fusion_helpdesk_central/models/helpdesk_ticket.py
new file mode 100644
index 00000000..fd9c872b
--- /dev/null
+++ b/fusion_helpdesk_central/models/helpdesk_ticket.py
@@ -0,0 +1,57 @@
+# -*- coding: utf-8 -*-
+# Copyright 2026 Nexa Systems Inc.
+# License OPL-1
+"""Central-side helpdesk.ticket extensions for the customer follow-up flow.
+
+Adds the `x_fc_client_label` deployment tag (set by the in-app reporter so
+the embedded inbox can scope per client) and sends a branded acknowledgement
+email — carrying the portal magic link — when an in-app ticket is created.
+"""
+import logging
+
+from odoo import api, fields, models
+
+_logger = logging.getLogger(__name__)
+
+
+class HelpdeskTicket(models.Model):
+ _inherit = 'helpdesk.ticket'
+
+ x_fc_client_label = fields.Char(
+ string='Client Deployment', index=True, copy=False,
+ help='Deployment tag (e.g. ENTECH) set by the fusion_helpdesk in-app '
+ 'reporter. Scopes the embedded "My Tickets" inbox per client and '
+ 'lets support filter tickets by originating deployment.',
+ )
+
+ @api.model_create_multi
+ def create(self, vals_list):
+ tickets = super().create(vals_list)
+ tickets._fc_send_ack_email()
+ return tickets
+
+ def _fc_send_ack_email(self):
+ """Send the branded acknowledgement (with magic link) to the customer.
+
+ Only fires for in-app-channel tickets (those tagged with a client
+ label) that have a customer email — external web-form submissions
+ rely on the native website confirmation, so this won't double-send.
+ The whole thing is best-effort: a template/mail failure must never
+ block ticket creation, so we log and move on.
+ """
+ template = self.env.ref(
+ 'fusion_helpdesk_central.mail_template_ticket_ack',
+ raise_if_not_found=False,
+ )
+ if not template:
+ return
+ for ticket in self:
+ if not (ticket.x_fc_client_label and ticket.partner_email):
+ continue
+ try:
+ template.send_mail(ticket.id, force_send=False)
+ except Exception: # noqa: BLE001 — ack must never block create
+ _logger.exception(
+ 'fusion_helpdesk_central: acknowledgement email failed '
+ 'for ticket %s (%s)', ticket.id, ticket.x_fc_client_label,
+ )
diff --git a/fusion_helpdesk_central/tests/__init__.py b/fusion_helpdesk_central/tests/__init__.py
new file mode 100644
index 00000000..2bded2f8
--- /dev/null
+++ b/fusion_helpdesk_central/tests/__init__.py
@@ -0,0 +1,2 @@
+# -*- coding: utf-8 -*-
+from . import test_identity
diff --git a/fusion_helpdesk_central/tests/test_identity.py b/fusion_helpdesk_central/tests/test_identity.py
new file mode 100644
index 00000000..f886efad
--- /dev/null
+++ b/fusion_helpdesk_central/tests/test_identity.py
@@ -0,0 +1,68 @@
+# -*- coding: utf-8 -*-
+# Copyright 2026 Nexa Systems Inc.
+# License OPL-1
+"""Identity-keystone tests for the central helpdesk extensions.
+
+Runs on an Enterprise environment (helpdesk installed) — e.g. odoo-nexa or
+odoo-trial. Validates that passing partner_email resolves the customer +
+follower (native), that the client label is stored, and that the branded
+acknowledgement only fires for in-app-channel tickets.
+"""
+from odoo.tests import TransactionCase, tagged
+
+
+@tagged('post_install', '-at_install', 'fusion_helpdesk_central')
+class TestTicketIdentity(TransactionCase):
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ cls.team = cls.env['helpdesk.team'].search([], limit=1)
+
+ def _ack_mails(self, ticket):
+ return self.env['mail.mail'].search([
+ ('model', '=', 'helpdesk.ticket'),
+ ('res_id', '=', ticket.id),
+ ]).filtered(lambda m: 'received your request' in (m.subject or ''))
+
+ def test_partner_resolution_follower_and_label(self):
+ ticket = self.env['helpdesk.ticket'].create({
+ 'name': 'Keystone test',
+ 'team_id': self.team.id,
+ 'partner_email': 'keystone.newperson@example.com',
+ 'partner_name': 'Key Stone',
+ 'x_fc_client_label': 'ENTECH',
+ })
+ self.assertEqual(ticket.x_fc_client_label, 'ENTECH')
+ self.assertTrue(
+ ticket.partner_id,
+ "native create() should find-or-create a partner from partner_email")
+ self.assertEqual(ticket.partner_id.email, 'keystone.newperson@example.com')
+ self.assertIn(
+ ticket.partner_id, ticket.message_partner_ids,
+ "the customer must be subscribed as a follower")
+
+ def test_ack_email_for_inapp_ticket(self):
+ ticket = self.env['helpdesk.ticket'].create({
+ 'name': 'Ack test',
+ 'team_id': self.team.id,
+ 'partner_email': 'ack.person@example.com',
+ 'partner_name': 'Ack Person',
+ 'x_fc_client_label': 'ENTECH',
+ })
+ self.assertTrue(
+ self._ack_mails(ticket),
+ "an in-app ticket with a customer email should get our ack email")
+
+ def test_no_ack_without_client_label(self):
+ # Simulates an external web-form ticket — no client label, so our
+ # acknowledgement must NOT fire (avoids double-acknowledgement).
+ ticket = self.env['helpdesk.ticket'].create({
+ 'name': 'External web ticket',
+ 'team_id': self.team.id,
+ 'partner_email': 'external.web@example.com',
+ 'partner_name': 'External Web',
+ })
+ self.assertFalse(
+ self._ack_mails(ticket),
+ "no client label => our acknowledgement should not send")
diff --git a/fusion_helpdesk_central/views/helpdesk_ticket_views.xml b/fusion_helpdesk_central/views/helpdesk_ticket_views.xml
new file mode 100644
index 00000000..61f73229
--- /dev/null
+++ b/fusion_helpdesk_central/views/helpdesk_ticket_views.xml
@@ -0,0 +1,34 @@
+
+
+
+
+
+ fhc.helpdesk.ticket.list.client_label
+ helpdesk.ticket
+
+
+
+
+
+
+
+
+
+ fhc.helpdesk.ticket.search.client_label
+ helpdesk.ticket
+
+
+
+
+
+
+
+
+
+
diff --git a/fusion_login_audit/__init__.py b/fusion_login_audit/__init__.py
new file mode 100644
index 00000000..a0fdc10f
--- /dev/null
+++ b/fusion_login_audit/__init__.py
@@ -0,0 +1,2 @@
+# -*- coding: utf-8 -*-
+from . import models
diff --git a/fusion_login_audit/__manifest__.py b/fusion_login_audit/__manifest__.py
new file mode 100644
index 00000000..01c2ae6d
--- /dev/null
+++ b/fusion_login_audit/__manifest__.py
@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+# Copyright 2026 Nexa Systems Inc.
+# License OPL-1 (Odoo Proprietary License v1.0)
+{
+ 'name': 'Fusion Login Audit',
+ 'version': '19.0.1.0.0',
+ 'category': 'Tools',
+ 'summary': 'Durable login audit log with geo-enrichment, retention, and failure alerts.',
+ 'description': """
+Fusion Login Audit
+==================
+
+Captures every password authentication event (success + failure) in a
+dedicated, append-only audit table. Surfaces history on the user form
+as a smart button + tab (admins only). Async-enriches IPs with country,
+city, and reverse DNS. Emails Settings admins on consecutive-failure
+bursts. Daily retention cron honours a configurable horizon.
+ """,
+ 'author': 'Nexa Systems Inc.',
+ 'website': 'https://nexasystems.ca',
+ 'license': 'OPL-1',
+ 'depends': ['base', 'mail', 'base_setup'],
+ 'external_dependencies': {
+ 'python': ['user_agents'],
+ },
+ 'data': [
+ 'security/ir.model.access.csv',
+ 'security/security.xml',
+ 'data/mail_template_data.xml',
+ 'data/ir_cron_data.xml',
+ 'views/fusion_login_audit_views.xml',
+ 'views/res_users_views.xml',
+ 'views/res_config_settings_views.xml',
+ 'views/menus.xml',
+ ],
+ 'installable': True,
+ 'application': False,
+ 'auto_install': False,
+}
diff --git a/fusion_login_audit/data/ir_cron_data.xml b/fusion_login_audit/data/ir_cron_data.xml
new file mode 100644
index 00000000..aaeaf103
--- /dev/null
+++ b/fusion_login_audit/data/ir_cron_data.xml
@@ -0,0 +1,27 @@
+
+
+
+
+
+ Fusion Login Audit: Retention GC
+
+ code
+ model._fc_retention_gc()
+ 1
+ days
+
+
+
+
+ Fusion Login Audit: Geo Enrichment
+
+ code
+ model._fc_geo_enrich_pending(limit=100)
+ 5
+ minutes
+
+ 10
+
+
+
+
diff --git a/fusion_login_audit/data/mail_template_data.xml b/fusion_login_audit/data/mail_template_data.xml
new file mode 100644
index 00000000..a252fd69
--- /dev/null
+++ b/fusion_login_audit/data/mail_template_data.xml
@@ -0,0 +1,46 @@
+
+
+
+
+
+ Fusion Login Audit — Failure Burst Alert
+
+ [Login Audit] Failed login attempts for {{ ctx.get('attempted_login') }}
+
+
+
The login audit detected
+ failed login attempt(s)
+ in the last minute(s) for
+ .
+
Most recent attempts:
+
+
+
+ Time
+ IP
+ Country
+ Browser
+ OS
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Sent by Fusion Login Audit. Tune the threshold and window in
+ Settings → General Settings → Login Audit.
+
+
+
+
+
+
+
diff --git a/fusion_login_audit/models/__init__.py b/fusion_login_audit/models/__init__.py
new file mode 100644
index 00000000..535855d5
--- /dev/null
+++ b/fusion_login_audit/models/__init__.py
@@ -0,0 +1,4 @@
+# -*- coding: utf-8 -*-
+from . import fusion_login_audit
+from . import res_users
+from . import res_config_settings
diff --git a/fusion_login_audit/models/fusion_login_audit.py b/fusion_login_audit/models/fusion_login_audit.py
new file mode 100644
index 00000000..3cc5ebc2
--- /dev/null
+++ b/fusion_login_audit/models/fusion_login_audit.py
@@ -0,0 +1,256 @@
+# -*- coding: utf-8 -*-
+import ipaddress
+import logging
+import socket
+from datetime import timedelta
+
+import requests
+
+from odoo import api, fields, models
+
+_logger = logging.getLogger(__name__)
+
+
+class FusionLoginAudit(models.Model):
+ _name = 'fusion.login.audit'
+ _description = 'Login Audit Event'
+ _order = 'event_time desc, id desc'
+ _rec_name = 'attempted_login'
+
+ user_id = fields.Many2one(
+ 'res.users', string='User', ondelete='set null', index=True,
+ help='Null when the attempted login did not match any user.',
+ )
+ attempted_login = fields.Char(
+ string='Attempted Login', size=255, required=True, index=True,
+ )
+ result = fields.Selection(
+ [('success', 'Success'), ('failure', 'Failure')],
+ string='Result', required=True, index=True,
+ )
+ failure_reason = fields.Selection(
+ [
+ ('bad_password', 'Bad password'),
+ ('unknown_user', 'Unknown user'),
+ ('disabled_user', 'Disabled user'),
+ ('2fa_failed', '2FA failed'),
+ ('other', 'Other'),
+ ],
+ string='Failure Reason',
+ )
+ event_time = fields.Datetime(
+ string='Event Time', required=True, index=True,
+ default=fields.Datetime.now,
+ )
+ ip_address = fields.Char(string='IP Address', size=45)
+ ip_hostname = fields.Char(string='Reverse DNS', size=255)
+ country_code = fields.Char(string='Country Code', size=2, index=True)
+ country_name = fields.Char(string='Country', size=64)
+ city = fields.Char(string='City', size=128)
+ geo_state = fields.Char(string='Region', size=64)
+ geo_lookup_state = fields.Selection(
+ [
+ ('pending', 'Pending'),
+ ('done', 'Done'),
+ ('private_ip', 'Private IP'),
+ ('internal', 'Internal (no request)'),
+ ('failed', 'Lookup failed'),
+ ],
+ string='Geo Lookup State', default='pending', index=True,
+ )
+ user_agent_raw = fields.Char(string='User Agent', size=512)
+ browser = fields.Char(string='Browser', size=64)
+ os = fields.Char(string='OS', size=64)
+ device_type = fields.Selection(
+ [
+ ('desktop', 'Desktop'),
+ ('mobile', 'Mobile'),
+ ('tablet', 'Tablet'),
+ ('bot', 'Bot'),
+ ('unknown', 'Unknown'),
+ ],
+ string='Device Type', default='unknown',
+ )
+ database = fields.Char(string='Database', size=64)
+
+ # Odoo 19 replaces the legacy `_sql_constraints = [...]` list with
+ # declarative `models.Constraint` attributes. The plan template used the
+ # legacy form, which now only emits a warning and is silently dropped.
+ _result_failure_reason_consistent = models.Constraint(
+ "CHECK ((result = 'success' AND failure_reason IS NULL) "
+ "OR (result = 'failure' AND failure_reason IS NOT NULL))",
+ 'A failure row must have a failure_reason; a success row must not.',
+ )
+
+ # Composite indexes supporting the three hot queries:
+ # - per-user history (user_id, event_time DESC)
+ # - failure-burst by login (attempted_login, event_time DESC)
+ # - geo cron worklist (geo_lookup_state, event_time)
+ # Odoo 19 ships `models.Index` as the declarative replacement for the
+ # init()/raw-SQL pattern; the attribute name becomes the index suffix
+ # (e.g. `_user_time_idx` -> `fusion_login_audit_user_time_idx`).
+ _user_time_idx = models.Index('(user_id, event_time DESC)')
+ _login_time_idx = models.Index('(attempted_login, event_time DESC)')
+ _geo_state_idx = models.Index('(geo_lookup_state, event_time)')
+
+ @api.model
+ def _fc_retention_gc(self):
+ """Delete audit rows older than `fusion_login_audit.retention_days`.
+ Called daily by ir.cron. retention_days=0 means keep forever."""
+ ICP = self.env['ir.config_parameter'].sudo()
+ try:
+ days = int(ICP.get_param(
+ 'fusion_login_audit.retention_days', 365))
+ except (TypeError, ValueError):
+ days = 365
+ if days <= 0:
+ return 0
+ cutoff = fields.Datetime.now() - timedelta(days=days)
+ old = self.sudo().search([('event_time', '<', cutoff)])
+ count = len(old)
+ if old:
+ old.unlink()
+ return count
+
+ _FC_PRIVATE_NETWORKS = (
+ ipaddress.ip_network('10.0.0.0/8'),
+ ipaddress.ip_network('172.16.0.0/12'),
+ ipaddress.ip_network('192.168.0.0/16'),
+ ipaddress.ip_network('127.0.0.0/8'),
+ ipaddress.ip_network('::1/128'),
+ ipaddress.ip_network('fe80::/10'),
+ )
+
+ @api.model
+ def _fc_is_private_ip(self, ip):
+ if not ip or ip == 'internal':
+ return False # 'internal' uses its own state
+ try:
+ addr = ipaddress.ip_address(ip)
+ except ValueError:
+ return False
+ return any(addr in net for net in self._FC_PRIVATE_NETWORKS)
+
+ @api.model
+ def _fc_geo_cache_hit(self, ip):
+ """Return a dict of geo fields if we've resolved this IP in the last
+ 30 days, else None."""
+ if not ip:
+ return None
+ cutoff = fields.Datetime.now() - timedelta(days=30)
+ cached = self.sudo().search([
+ ('ip_address', '=', ip),
+ ('geo_lookup_state', '=', 'done'),
+ ('event_time', '>=', cutoff),
+ ], limit=1, order='event_time desc')
+ if not cached:
+ return None
+ return {
+ 'country_code': cached.country_code,
+ 'country_name': cached.country_name,
+ 'city': cached.city,
+ 'geo_state': cached.geo_state,
+ 'ip_hostname': cached.ip_hostname,
+ }
+
+ @api.model
+ def _fc_geo_reverse_dns(self, ip):
+ try:
+ socket.setdefaulttimeout(1.5)
+ host, _aliases, _ips = socket.gethostbyaddr(ip)
+ return (host or '')[:255]
+ except (socket.herror, socket.gaierror, OSError):
+ return ''
+ finally:
+ socket.setdefaulttimeout(None)
+
+ @api.model
+ def _fc_geo_http_lookup(self, ip):
+ """Call ip-api.com. Returns (vals_dict, rate_limited_bool).
+ Falls back to ({}, False) on any error."""
+ try:
+ resp = requests.get(
+ 'http://ip-api.com/json/' + ip,
+ params={'fields': 'status,country,countryCode,regionName,city'},
+ timeout=3,
+ headers={'User-Agent': 'Odoo-FusionLoginAudit/19.0'},
+ )
+ rate_limited = resp.headers.get('X-Rl', '') == '0'
+ if resp.status_code != 200:
+ return ({}, rate_limited)
+ data = resp.json() or {}
+ if data.get('status') != 'success':
+ return ({}, rate_limited)
+ return ({
+ 'country_code': (data.get('countryCode') or '')[:2],
+ 'country_name': (data.get('country') or '')[:64],
+ 'geo_state': (data.get('regionName') or '')[:64],
+ 'city': (data.get('city') or '')[:128],
+ }, rate_limited)
+ except (requests.RequestException, ValueError):
+ _logger.warning("fusion_login_audit: geo lookup failed for %s",
+ ip, exc_info=True)
+ return ({}, False)
+
+ @api.model
+ def _fc_geo_enrich_pending(self, limit=100):
+ """Cron worker: process up to `limit` pending rows.
+
+ Per-row isolation is provided by `cr.savepoint()` rather than
+ `cr.commit()`/`cr.rollback()` — the latter raises an AssertionError
+ inside a TransactionCase (Odoo's test cursor refuses commit/rollback).
+ Savepoints work in both prod and tests; the outer cron transaction
+ commits the lot once the method returns. One bad IP rolls back only
+ its own savepoint, so the rest of the batch still lands.
+ """
+ pending = self.sudo().search(
+ [('geo_lookup_state', '=', 'pending')],
+ order='event_time asc', limit=limit,
+ )
+ if not pending:
+ return 0
+ processed = 0
+ stop_after_this = False
+ for row in pending:
+ ip = row.ip_address
+ try:
+ with self.env.cr.savepoint():
+ if self._fc_is_private_ip(ip):
+ row.write({
+ 'geo_lookup_state': 'private_ip',
+ 'country_code': '--',
+ 'country_name': 'Private network',
+ 'city': 'Private network',
+ })
+ processed += 1
+ continue
+
+ cached = self._fc_geo_cache_hit(ip)
+ if cached:
+ cached['geo_lookup_state'] = 'done'
+ row.write(cached)
+ processed += 1
+ continue
+
+ hostname = self._fc_geo_reverse_dns(ip) if ip and ip != 'internal' else ''
+ vals, rate_limited = self._fc_geo_http_lookup(ip) if ip and ip != 'internal' else ({}, False)
+ if vals:
+ vals['ip_hostname'] = hostname
+ vals['geo_lookup_state'] = 'done'
+ row.write(vals)
+ else:
+ row.write({
+ 'geo_lookup_state': 'failed',
+ 'ip_hostname': hostname,
+ })
+ processed += 1
+ if rate_limited:
+ _logger.info("fusion_login_audit: ip-api rate limit "
+ "hit, stopping batch early")
+ stop_after_this = True
+ except Exception:
+ _logger.exception(
+ "fusion_login_audit: geo enrich failed for row %s", row.id)
+ if stop_after_this:
+ break
+ return processed
diff --git a/fusion_login_audit/models/res_config_settings.py b/fusion_login_audit/models/res_config_settings.py
new file mode 100644
index 00000000..2a3a8328
--- /dev/null
+++ b/fusion_login_audit/models/res_config_settings.py
@@ -0,0 +1,31 @@
+# -*- coding: utf-8 -*-
+from odoo import fields, models
+
+
+class ResConfigSettings(models.TransientModel):
+ _inherit = 'res.config.settings'
+
+ x_fc_login_audit_retention_days = fields.Integer(
+ string='Login Audit Retention (days)',
+ default=365,
+ config_parameter='fusion_login_audit.retention_days',
+ help='Login audit rows older than this are deleted by the nightly '
+ 'cron. Set to 0 to keep forever.',
+ )
+ x_fc_login_audit_alert_threshold = fields.Integer(
+ string='Alert After N Consecutive Failures',
+ default=5,
+ config_parameter='fusion_login_audit.alert_threshold',
+ help='When this many failures for the same attempted login occur '
+ 'within the alert window, Settings admins receive one email.',
+ )
+ x_fc_login_audit_alert_window_min = fields.Integer(
+ string='Alert Window (minutes)',
+ default=15,
+ config_parameter='fusion_login_audit.alert_window_min',
+ )
+ x_fc_login_audit_alert_enabled = fields.Boolean(
+ string='Send Failed-Login Alerts',
+ default=True,
+ config_parameter='fusion_login_audit.alert_enabled',
+ )
diff --git a/fusion_login_audit/models/res_users.py b/fusion_login_audit/models/res_users.py
new file mode 100644
index 00000000..c776b244
--- /dev/null
+++ b/fusion_login_audit/models/res_users.py
@@ -0,0 +1,392 @@
+# -*- coding: utf-8 -*-
+import logging
+
+from odoo import _, api, fields, models
+from odoo.exceptions import AccessDenied
+
+# Top-level import (vs lazy inside the method): if the dep is missing — most
+# likely because the dev container got recreated and dropped its pip install
+# (see CLAUDE.md Workflow) — Odoo crashes at registry load with a clear
+# `ModuleNotFoundError`, not deep in the auth path after the first login.
+from user_agents import parse as ua_parse
+
+_logger = logging.getLogger(__name__)
+
+
+class ResUsers(models.Model):
+ _inherit = 'res.users'
+
+ @api.model
+ def _fc_build_event_vals(
+ self,
+ result,
+ attempted_login,
+ failure_reason=None,
+ user_id=None,
+ _override_ip=None,
+ _override_ua=None,
+ _credential=None,
+ ):
+ """Build the dict of values for a fusion.login.audit row.
+
+ Pulls IP / User-Agent from the live HTTP request when available.
+ Falls back to ('internal', '
') for XML-RPC / cron-initiated
+ auth, with geo_lookup_state='internal' so the geo cron skips them.
+
+ An empty IP from an otherwise-live request (rare; misconfigured
+ reverse proxy) also routes to the 'internal' fallback — an empty
+ string isn't useful audit data and is arguably suspicious.
+
+ The _override_* kwargs exist for tests so we don't have to fake a
+ full request. They are NOT a public API.
+
+ Password safety: `_credential` MAY contain a 'password' key from the
+ Odoo auth flow. We never read that key, never log it, never put it
+ in vals. The test `test_build_event_vals_strips_password` locks
+ this property in via `assertNotIn(secret, repr(vals))`.
+ """
+ vals = {
+ 'attempted_login': (attempted_login or '')[:255],
+ 'result': result,
+ 'failure_reason': failure_reason,
+ 'event_time': fields.Datetime.now(),
+ 'database': self.env.cr.dbname,
+ 'user_id': user_id,
+ }
+
+ ip = _override_ip
+ ua_str = _override_ua
+
+ if ip is None or ua_str is None:
+ try:
+ from odoo.http import request
+ if request and getattr(request, 'httprequest', None):
+ if ip is None:
+ ip = request.httprequest.remote_addr
+ if ua_str is None:
+ ua_str = request.httprequest.user_agent.string or ''
+ except Exception:
+ _logger.debug("fusion_login_audit: no request context", exc_info=True)
+
+ if ip and ua_str is not None:
+ ua_text = ua_str or ''
+ vals['ip_address'] = ip[:45]
+ vals['user_agent_raw'] = ua_text[:512]
+ ua = ua_parse(ua_text)
+ vals['browser'] = (f"{ua.browser.family} {ua.browser.version_string}".strip())[:64]
+ vals['os'] = (f"{ua.os.family} {ua.os.version_string}".strip())[:64]
+ if ua.is_bot:
+ vals['device_type'] = 'bot'
+ elif ua.is_mobile:
+ vals['device_type'] = 'mobile'
+ elif ua.is_tablet:
+ vals['device_type'] = 'tablet'
+ elif ua.is_pc:
+ vals['device_type'] = 'desktop'
+ else:
+ vals['device_type'] = 'unknown'
+ vals['geo_lookup_state'] = 'pending'
+ else:
+ vals['ip_address'] = 'internal'
+ vals['user_agent_raw'] = ''
+ vals['device_type'] = 'unknown'
+ vals['geo_lookup_state'] = 'internal'
+
+ # _credential is accepted in the signature so callers (T6 _check_credentials,
+ # T7 _login) can hand the dict in without filtering. The helper deliberately
+ # touches NO keys from it — see the password-safety note in the docstring.
+ # `_credential` is intentionally unread here; the parameter exists so future
+ # work can read `credential.get('type')` for `2fa_failed` discrimination
+ # only via the explicit failure_reason kwarg, never from the dict directly.
+ del _credential # explicit no-op — locks down the read surface
+
+ return vals
+
+ def _fc_record_login_event(self, result, failure_reason=None,
+ user_id=None, attempted_login=None,
+ _credential=None):
+ """Build vals + create the audit row via sudo. Never raises.
+
+ The row is written through an INDEPENDENT cursor
+ (``registry.cursor()``) so that:
+
+ * A failure-path call from ``_check_credentials`` survives the
+ outer transaction rollback that follows ``AccessDenied``
+ (the HTTP layer closes the cursor without committing, see
+ ``odoo/service/model.py:retrying``).
+ * A broken audit table can never block a real user from logging
+ in: the cursor block is wrapped in try/except; exceptions are
+ logged and swallowed.
+
+ The independent cursor commits on context exit. Note that this
+ means the row is durable even if the caller's transaction later
+ rolls back — intentional for audit semantics: a recorded bad
+ password should NOT disappear because some unrelated downstream
+ op blew up.
+ """
+ try:
+ vals = self._fc_build_event_vals(
+ result=result,
+ attempted_login=attempted_login
+ or (self.login if self else None)
+ or 'unknown',
+ failure_reason=failure_reason,
+ user_id=user_id or (self.id if self else None),
+ _credential=_credential,
+ )
+ with self.env.registry.cursor() as audit_cr:
+ audit_env = api.Environment(audit_cr, self.env.uid, self.env.context)
+ audit_env['fusion.login.audit'].sudo().with_context(
+ mail_create_nolog=True
+ ).create(vals)
+ except Exception:
+ _logger.exception(
+ "fusion_login_audit: failed to record %s row for %s",
+ result, attempted_login or (self.login if self else 'unknown'),
+ )
+
+ def _update_last_login(self):
+ result = super()._update_last_login()
+ # Self is the singleton recordset of the user that just logged in.
+ self._fc_record_login_event(result='success')
+ return result
+
+ def _fc_alert_threshold(self):
+ ICP = self.env['ir.config_parameter'].sudo()
+ try:
+ return max(1, int(ICP.get_param(
+ 'fusion_login_audit.alert_threshold', 5)))
+ except (TypeError, ValueError):
+ return 5
+
+ def _fc_alert_window_min(self):
+ ICP = self.env['ir.config_parameter'].sudo()
+ try:
+ return max(1, int(ICP.get_param(
+ 'fusion_login_audit.alert_window_min', 15)))
+ except (TypeError, ValueError):
+ return 15
+
+ def _fc_alert_enabled(self):
+ ICP = self.env['ir.config_parameter'].sudo()
+ # CLAUDE.md rule #5: Boolean config_parameter deletes on False.
+ # An absent key means True (the default). Explicit 'False' or 'false'
+ # means disabled.
+ raw = ICP.get_param('fusion_login_audit.alert_enabled', 'True')
+ return str(raw).strip().lower() != 'false'
+
+ def _fc_recent_failure_count(self, attempted_login):
+ """Failures for this attempted_login within the alert window."""
+ from datetime import timedelta
+ if not attempted_login:
+ return 0
+ cutoff = fields.Datetime.now() - timedelta(
+ minutes=self._fc_alert_window_min())
+ return self.env['fusion.login.audit'].sudo().search_count([
+ ('attempted_login', '=', attempted_login),
+ ('result', '=', 'failure'),
+ ('event_time', '>=', cutoff),
+ ])
+
+ def _fc_send_failure_alert(self, attempted_login):
+ """Queue one alert mail unless cooldown is active. Cooldown is
+ 60 minutes, keyed by attempted_login, stored in ir.config_parameter."""
+ from datetime import timedelta
+ if not self._fc_alert_enabled():
+ return
+ if not attempted_login:
+ return
+ ICP = self.env['ir.config_parameter'].sudo()
+ cd_key = f'fusion_login_audit.last_alert:{attempted_login}'
+ cd_raw = ICP.get_param(cd_key)
+ now = fields.Datetime.now()
+ if cd_raw:
+ try:
+ last = fields.Datetime.from_string(cd_raw)
+ if last and (now - last) < timedelta(minutes=60):
+ return # cooldown active
+ except (TypeError, ValueError):
+ pass
+
+ window = self._fc_alert_window_min()
+ cutoff = now - timedelta(minutes=window)
+ Audit = self.env['fusion.login.audit'].sudo()
+ rows = Audit.search([
+ ('attempted_login', '=', attempted_login),
+ ('result', '=', 'failure'),
+ ('event_time', '>=', cutoff),
+ ], order='event_time desc', limit=20)
+
+ # Admin recipients: members of base.group_system (the Settings group)
+ # who have an email set and are not portal/share users. Note:
+ # CLAUDE.md rule #6 — res.groups has no `users` field in Odoo 19, so
+ # search res.users by group_ids directly. The __system__ superuser
+ # (uid=1) is excluded automatically by Odoo's default user filter.
+ admins = self.env['res.users'].sudo().search([
+ ('group_ids', 'in', self.env.ref('base.group_system').id),
+ ('email', '!=', False),
+ ('share', '=', False),
+ ])
+ if not admins:
+ return
+
+ tmpl = self.env.ref(
+ 'fusion_login_audit.mail_template_failure_burst',
+ raise_if_not_found=False)
+ if not tmpl:
+ return
+
+ # CLAUDE.md rule #12: in mail.template QWeb, `ctx` IS env.context.
+ # So `ctx.get('foo')` resolves to env.context.get('foo'). Pass data
+ # by SPREADING keys into the context, not wrapping in a dict.
+ # `with_context(ctx=ctx_data)` would silently render an empty subject.
+ ctx_data = {
+ 'attempted_login': attempted_login,
+ 'failure_count': len(rows),
+ 'window_min': window,
+ 'rows': [{
+ 'event_time': fields.Datetime.to_string(r.event_time),
+ 'ip_address': r.ip_address or '',
+ 'country_code': r.country_code or '',
+ 'browser': r.browser or '',
+ 'os': r.os or '',
+ } for r in rows],
+ }
+ for admin in admins:
+ tmpl.with_context(**ctx_data).send_mail(
+ admin.id,
+ email_values={'email_to': admin.email,
+ 'auto_delete': True},
+ force_send=False,
+ )
+ ICP.set_param(cd_key, fields.Datetime.to_string(now))
+
+ def _check_credentials(self, credential, env):
+ try:
+ return super()._check_credentials(credential, env)
+ except AccessDenied:
+ cred_type = (credential or {}).get('type', 'password')
+ reason = '2fa_failed' if cred_type == 'totp' else 'bad_password'
+ attempted_login = (credential or {}).get('login') or self.login
+ self._fc_record_login_event(
+ result='failure',
+ failure_reason=reason,
+ user_id=self.id,
+ attempted_login=attempted_login,
+ _credential=credential,
+ )
+ try:
+ if self._fc_recent_failure_count(attempted_login) \
+ >= self._fc_alert_threshold():
+ self._fc_send_failure_alert(attempted_login)
+ except Exception:
+ _logger.exception(
+ "fusion_login_audit: failed to send failure alert")
+ raise
+
+ def _login(self, credential, user_agent_env):
+ """Catch the unknown-user branch of upstream _login.
+
+ In Odoo 19 ``_login`` is an *instance* method (not a classmethod as in
+ earlier versions). Upstream raises ``AccessDenied`` in three cases:
+
+ 1. Unknown login string — ``_assert_can_auth`` or the user-lookup
+ ``search()`` returns empty → ``_check_credentials`` never fires →
+ THIS override is the only chance to record the attempt.
+ 2. Wrong password — user exists, ``_check_credentials`` raises →
+ our ``_check_credentials`` override already wrote a ``bad_password``
+ row → re-raise propagates up to here. We MUST NOT write a second
+ row.
+ 3. 2FA failure — same as #2 but ``failure_reason='2fa_failed'``.
+
+ We distinguish #1 from #2/#3 by checking whether the login string
+ resolves to any user. If it does, ``_check_credentials`` ran (and
+ already logged); if it doesn't, the user lookup failed and we log
+ ``unknown_user`` here.
+
+ ``_fc_record_login_event`` writes through an INDEPENDENT cursor
+ (``self.env.registry.cursor()``), so the audit row survives the
+ outer transaction rollback that follows the re-raised
+ ``AccessDenied``. Audit-side exceptions never block the re-raise.
+ """
+ try:
+ return super()._login(credential, user_agent_env)
+ except AccessDenied:
+ login = (credential or {}).get('login') or ''
+ try:
+ user_exists = bool(self.sudo().search(
+ [('login', '=', login)], limit=1))
+ except Exception:
+ user_exists = False # be permissive — log the row anyway
+ if not user_exists:
+ self._fc_record_login_event(
+ result='failure',
+ failure_reason='unknown_user',
+ user_id=False,
+ attempted_login=login or 'unknown',
+ _credential=credential,
+ )
+ raise
+
+ # ──────────────────────────────────────────────────────────────────
+ # Per-user surface — fields + action method backing the smart button
+ # and "Login Activity" tab on the user form view.
+ # ──────────────────────────────────────────────────────────────────
+
+ x_fc_login_audit_ids = fields.One2many(
+ 'fusion.login.audit', 'user_id',
+ string='Login Activity',
+ )
+ x_fc_login_audit_count = fields.Integer(
+ string='Login Audit Count',
+ compute='_compute_x_fc_login_audit_count',
+ )
+ x_fc_last_successful_login = fields.Datetime(
+ string='Last Successful Login',
+ compute='_compute_x_fc_last_successful_login',
+ store=True,
+ )
+ x_fc_last_login_ip = fields.Char(
+ string='Last Login IP', size=45,
+ compute='_compute_x_fc_last_successful_login',
+ store=True,
+ )
+
+ @api.depends('x_fc_login_audit_ids')
+ def _compute_x_fc_login_audit_count(self):
+ # Odoo 19: read_group → _read_group, returns list of tuples
+ # (group_key, aggregate_value) when given groupby + aggregates.
+ Audit = self.env['fusion.login.audit'].sudo()
+ rows = Audit._read_group(
+ domain=[('user_id', 'in', self.ids)],
+ groupby=['user_id'],
+ aggregates=['__count'],
+ )
+ counts = {user.id: count for user, count in rows}
+ for user in self:
+ user.x_fc_login_audit_count = counts.get(user.id, 0)
+
+ @api.depends('x_fc_login_audit_ids.event_time',
+ 'x_fc_login_audit_ids.result',
+ 'x_fc_login_audit_ids.ip_address')
+ def _compute_x_fc_last_successful_login(self):
+ Audit = self.env['fusion.login.audit'].sudo()
+ for user in self:
+ row = Audit.search(
+ [('user_id', '=', user.id), ('result', '=', 'success')],
+ order='event_time desc', limit=1,
+ )
+ user.x_fc_last_successful_login = row.event_time or False
+ user.x_fc_last_login_ip = row.ip_address or False
+
+ def action_fc_view_login_audit(self):
+ self.ensure_one()
+ return {
+ 'name': _('Login Activity'),
+ 'type': 'ir.actions.act_window',
+ 'res_model': 'fusion.login.audit',
+ 'view_mode': 'list,form',
+ 'domain': [('user_id', '=', self.id)],
+ 'context': {'create': False, 'edit': False, 'delete': False,
+ 'default_user_id': self.id},
+ }
diff --git a/fusion_login_audit/security/ir.model.access.csv b/fusion_login_audit/security/ir.model.access.csv
new file mode 100644
index 00000000..11e712e0
--- /dev/null
+++ b/fusion_login_audit/security/ir.model.access.csv
@@ -0,0 +1,2 @@
+id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink
+access_fusion_login_audit_system,fusion.login.audit system,model_fusion_login_audit,base.group_system,1,0,0,0
diff --git a/fusion_login_audit/security/security.xml b/fusion_login_audit/security/security.xml
new file mode 100644
index 00000000..09572660
--- /dev/null
+++ b/fusion_login_audit/security/security.xml
@@ -0,0 +1,17 @@
+
+
+
+
+
+ fusion.login.audit: admin read only
+
+ [(1, '=', 1)]
+
+
+
+
+
+
+
+
+
diff --git a/fusion_login_audit/static/description/icon.png b/fusion_login_audit/static/description/icon.png
new file mode 100644
index 00000000..de8e1758
Binary files /dev/null and b/fusion_login_audit/static/description/icon.png differ
diff --git a/fusion_login_audit/tests/__init__.py b/fusion_login_audit/tests/__init__.py
new file mode 100644
index 00000000..e3d2310f
--- /dev/null
+++ b/fusion_login_audit/tests/__init__.py
@@ -0,0 +1,3 @@
+# -*- coding: utf-8 -*-
+from . import test_login_audit
+from . import test_security
diff --git a/fusion_login_audit/tests/test_login_audit.py b/fusion_login_audit/tests/test_login_audit.py
new file mode 100644
index 00000000..e6926e64
--- /dev/null
+++ b/fusion_login_audit/tests/test_login_audit.py
@@ -0,0 +1,541 @@
+# -*- coding: utf-8 -*-
+from odoo import fields
+from odoo.tests.common import TransactionCase, tagged
+
+
+@tagged('post_install', '-at_install')
+class TestFusionLoginAuditModel(TransactionCase):
+
+ def setUp(self):
+ # `_fc_record_login_event` uses `registry.cursor()` so that the audit
+ # row survives the outer rollback that follows AccessDenied (see
+ # res_users.py for the rationale). Inside a TransactionCase that
+ # rolls back per test, a fresh cursor on a new connection cannot
+ # see uncommitted records (the freshly-created test user FKs into
+ # the audit row), so we put the registry in test mode — that swaps
+ # `registry.cursor()` for a TestCursor that wraps the test cursor.
+ super().setUp()
+ self.registry_enter_test_mode()
+ # The alert tests below assume at least one admin has an email
+ # (otherwise the recipient filter empties and no mail is queued).
+ # In a fresh fusion-dev DB, base.user_admin's email is NULL; the
+ # superuser (__system__) has an email but is filtered out of normal
+ # res.users searches. Ensure admin has a usable email.
+ admin = self.env.ref('base.user_admin')
+ if not admin.email:
+ admin.sudo().write({'email': 'admin@test.example.com'})
+
+ def test_model_exists_and_creates(self):
+ """Audit row can be created with all expected fields."""
+ Audit = self.env['fusion.login.audit'].sudo()
+ rec = Audit.create({
+ 'attempted_login': 'demo@example.com',
+ 'result': 'success',
+ 'ip_address': '203.0.113.5',
+ 'user_agent_raw': 'Mozilla/5.0 Test',
+ 'browser': 'Test 1.0',
+ 'os': 'TestOS',
+ 'device_type': 'desktop',
+ 'database': self.env.cr.dbname,
+ 'geo_lookup_state': 'pending',
+ })
+ self.assertTrue(rec.id)
+ self.assertEqual(rec.result, 'success')
+ self.assertEqual(rec.geo_lookup_state, 'pending')
+ self.assertEqual(rec.database, self.env.cr.dbname)
+ self.assertTrue(rec.event_time) # default fires
+
+ def test_failure_reason_optional(self):
+ """failure_reason is null on success rows."""
+ rec = self.env['fusion.login.audit'].sudo().create({
+ 'attempted_login': 'demo@example.com',
+ 'result': 'success',
+ })
+ self.assertFalse(rec.failure_reason)
+
+ def test_geo_state_internal_value(self):
+ """`internal` is an accepted geo_lookup_state value (distinct from private_ip)."""
+ rec = self.env['fusion.login.audit'].sudo().create({
+ 'attempted_login': 'demo@example.com',
+ 'result': 'success',
+ 'geo_lookup_state': 'internal',
+ })
+ self.assertEqual(rec.geo_lookup_state, 'internal')
+
+ def test_build_event_vals_with_no_request(self):
+ """Without a live request, geo_lookup_state is 'internal'."""
+ ResUsers = self.env['res.users']
+ vals = ResUsers._fc_build_event_vals(
+ result='success',
+ attempted_login='cron@example.com',
+ )
+ self.assertEqual(vals['result'], 'success')
+ self.assertEqual(vals['attempted_login'], 'cron@example.com')
+ self.assertEqual(vals['ip_address'], 'internal')
+ self.assertEqual(vals['user_agent_raw'], '')
+ self.assertEqual(vals['geo_lookup_state'], 'internal')
+ self.assertEqual(vals['database'], self.env.cr.dbname)
+
+ def test_build_event_vals_parses_user_agent(self):
+ """Parser fills browser/os/device_type from a stub UA dict."""
+ ResUsers = self.env['res.users']
+ vals = ResUsers._fc_build_event_vals(
+ result='success',
+ attempted_login='ua@example.com',
+ _override_ip='203.0.113.5',
+ _override_ua='Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
+ 'AppleWebKit/537.36 Chrome/140.0 Safari/537.36',
+ )
+ self.assertEqual(vals['ip_address'], '203.0.113.5')
+ self.assertIn('Chrome', vals['browser'])
+ self.assertIn('Windows', vals['os'])
+ self.assertEqual(vals['device_type'], 'desktop')
+ self.assertEqual(vals['geo_lookup_state'], 'pending')
+
+ def test_build_event_vals_strips_password(self):
+ """If a credential dict sneaks in, no password leaks into vals."""
+ ResUsers = self.env['res.users']
+ vals = ResUsers._fc_build_event_vals(
+ result='failure',
+ attempted_login='leak@example.com',
+ failure_reason='bad_password',
+ _credential={'login': 'leak@example.com',
+ 'password': 'super-secret-pw',
+ 'type': 'password'},
+ )
+ serialized = repr(vals)
+ self.assertNotIn('super-secret-pw', serialized)
+ self.assertEqual(vals['failure_reason'], 'bad_password')
+
+ def test_update_last_login_writes_audit_row(self):
+ """Calling _update_last_login on a user creates a success row."""
+ user = self.env['res.users'].sudo().create({
+ 'name': 'Audit Tester',
+ 'login': 'audit-tester@example.com',
+ 'password': 'audit-tester-pw-1',
+ })
+ Audit = self.env['fusion.login.audit'].sudo()
+ before = Audit.search_count([('user_id', '=', user.id)])
+ user._update_last_login()
+ after = Audit.search_count([('user_id', '=', user.id)])
+ self.assertEqual(after, before + 1)
+ row = Audit.search([('user_id', '=', user.id)],
+ order='event_time desc', limit=1)
+ self.assertEqual(row.result, 'success')
+ self.assertEqual(row.attempted_login, user.login)
+ self.assertFalse(row.failure_reason)
+ self.assertEqual(row.database, self.env.cr.dbname)
+
+ def test_audit_write_failure_does_not_block_login(self):
+ """An exception inside the audit write must not propagate."""
+ from unittest.mock import patch
+ user = self.env['res.users'].sudo().create({
+ 'name': 'Resilient Tester',
+ 'login': 'resilient-tester@example.com',
+ 'password': 'resilient-tester-pw-1',
+ })
+
+ def boom(self_, vals):
+ raise RuntimeError('simulated audit DB failure')
+
+ with patch.object(type(self.env['fusion.login.audit']),
+ 'create', boom):
+ # Must not raise.
+ user._update_last_login()
+
+ def test_bad_password_writes_failure_row(self):
+ """A wrong password creates a result=failure row with failure_reason='bad_password'."""
+ from odoo.exceptions import AccessDenied
+ user = self.env['res.users'].sudo().create({
+ 'name': 'Wrongpw Tester',
+ 'login': 'wrongpw-tester@example.com',
+ 'password': 'wrongpw-tester-pw-1',
+ })
+ Audit = self.env['fusion.login.audit'].sudo()
+ before = Audit.search_count([('attempted_login', '=', user.login),
+ ('result', '=', 'failure')])
+ # NB: cannot use `self.assertRaises(AccessDenied)` — it opens an extra
+ # savepoint (see odoo/tests/common.py::_assertRaises) that rolls back
+ # the audit row written from inside the override.
+ raised = False
+ try:
+ user._check_credentials(
+ {'login': user.login, 'password': 'definitely-wrong',
+ 'type': 'password'},
+ {'interactive': False},
+ )
+ except AccessDenied:
+ raised = True
+ self.assertTrue(raised, "AccessDenied not raised on wrong password")
+ after = Audit.search_count([('attempted_login', '=', user.login),
+ ('result', '=', 'failure')])
+ self.assertEqual(after, before + 1)
+ row = Audit.search([('attempted_login', '=', user.login),
+ ('result', '=', 'failure')],
+ order='event_time desc', limit=1)
+ self.assertEqual(row.failure_reason, 'bad_password')
+ self.assertEqual(row.user_id, user)
+
+ def test_bad_password_never_appears_in_row(self):
+ """The attempted password string never lands in any field."""
+ from odoo.exceptions import AccessDenied
+ secret = 'NeverInTheRow-9f3a82'
+ user = self.env['res.users'].sudo().create({
+ 'name': 'Leak Test',
+ 'login': 'leak-test-2@example.com',
+ 'password': 'leak-test-pw-1',
+ })
+ # NB: manual try/except instead of assertRaises — see note above.
+ raised = False
+ try:
+ user._check_credentials(
+ {'login': user.login, 'password': secret, 'type': 'password'},
+ {'interactive': False},
+ )
+ except AccessDenied:
+ raised = True
+ self.assertTrue(raised, "AccessDenied not raised on wrong password")
+ row = self.env['fusion.login.audit'].sudo().search(
+ [('attempted_login', '=', user.login),
+ ('result', '=', 'failure')],
+ order='event_time desc', limit=1)
+ self.assertTrue(row, "Audit row not created for bad-password attempt")
+ for fname in ('attempted_login', 'failure_reason', 'user_agent_raw',
+ 'browser', 'os', 'ip_address', 'ip_hostname',
+ 'city', 'country_name', 'country_code', 'geo_state',
+ 'database'):
+ self.assertNotIn(secret, (row[fname] or ''),
+ f"Password leaked into field {fname}")
+
+ def test_unknown_user_writes_failure_row(self):
+ """A login attempt for a username that does not exist gets logged
+ with user_id=NULL and failure_reason='unknown_user'."""
+ from odoo.exceptions import AccessDenied
+ bogus = 'this-user-does-not-exist@example.com'
+ Audit = self.env['fusion.login.audit'].sudo()
+ before = Audit.search_count([('attempted_login', '=', bogus)])
+ # NB: manual try/except instead of assertRaises — see comment in
+ # test_bad_password_writes_failure_row. _login is an instance method
+ # in Odoo 19 (not a classmethod as in earlier versions); we call it
+ # on the empty recordset of res.users, which matches what
+ # authenticate() does internally.
+ raised = False
+ try:
+ self.env['res.users']._login(
+ {'login': bogus, 'password': 'whatever',
+ 'type': 'password'},
+ {'interactive': False},
+ )
+ except AccessDenied:
+ raised = True
+ self.assertTrue(raised, "AccessDenied must propagate after the audit write")
+ after = Audit.search_count([('attempted_login', '=', bogus)])
+ self.assertEqual(after, before + 1)
+ row = Audit.search([('attempted_login', '=', bogus)],
+ order='event_time desc', limit=1)
+ self.assertFalse(row.user_id)
+ self.assertEqual(row.failure_reason, 'unknown_user')
+ self.assertEqual(row.result, 'failure')
+
+ def test_login_known_user_bad_password_single_row(self):
+ """When _login is the entry point for an existing user with the
+ wrong password, only ONE failure row is written (bad_password from
+ _check_credentials) — NOT two (bad_password + unknown_user). The
+ unknown_user branch must only fire when the login string does not
+ resolve to any user.
+
+ Regression test for the duplicate-row bug discovered during the
+ production deploy smoke on westin-v19: a single failed login for
+ an existing user was creating two audit rows.
+ """
+ from odoo.exceptions import AccessDenied
+ user = self.env['res.users'].sudo().create({
+ 'name': 'NoDupTester',
+ 'login': 'nodup-tester@example.com',
+ 'password': 'nodup-tester-pw-1',
+ })
+ Audit = self.env['fusion.login.audit'].sudo()
+ before = Audit.search_count([('attempted_login', '=', user.login)])
+ raised = False
+ try:
+ self.env['res.users']._login(
+ {'login': user.login, 'password': 'wrong-not-the-real-one',
+ 'type': 'password'},
+ {'interactive': False},
+ )
+ except AccessDenied:
+ raised = True
+ self.assertTrue(raised)
+ after = Audit.search_count([('attempted_login', '=', user.login)])
+ self.assertEqual(after - before, 1,
+ "Exactly one row per failed login attempt — not two")
+ row = Audit.search([('attempted_login', '=', user.login)],
+ order='event_time desc', limit=1)
+ self.assertEqual(row.failure_reason, 'bad_password',
+ "Existing-user failure must record bad_password, "
+ "not unknown_user (the user IS in the system)")
+
+ def test_computed_last_successful_login(self):
+ """x_fc_last_successful_login reflects the latest success row."""
+ user = self.env['res.users'].sudo().create({
+ 'name': 'Compute Tester',
+ 'login': 'compute-tester@example.com',
+ 'password': 'compute-tester-pw-1',
+ })
+ # Use registry cursor so the audit row survives the transactional
+ # boundary the way the auth-time path does.
+ with self.env.registry.cursor() as audit_cr:
+ from odoo import api
+ audit_env = api.Environment(audit_cr, self.env.uid, self.env.context)
+ audit_env['fusion.login.audit'].sudo().create({
+ 'user_id': user.id,
+ 'attempted_login': user.login,
+ 'result': 'success',
+ 'database': self.env.cr.dbname,
+ 'ip_address': '198.51.100.42',
+ })
+ user.invalidate_recordset(['x_fc_last_successful_login',
+ 'x_fc_login_audit_count',
+ 'x_fc_last_login_ip'])
+ self.assertTrue(user.x_fc_last_successful_login)
+ self.assertGreaterEqual(user.x_fc_login_audit_count, 1)
+ self.assertEqual(user.x_fc_last_login_ip, '198.51.100.42')
+
+ def test_action_view_login_audit_returns_window_action(self):
+ """The smart-button action returns an act_window scoped to this user."""
+ user = self.env['res.users'].sudo().create({
+ 'name': 'Action Tester',
+ 'login': 'action-tester@example.com',
+ 'password': 'action-tester-pw-1',
+ })
+ action = user.action_fc_view_login_audit()
+ self.assertEqual(action['res_model'], 'fusion.login.audit')
+ self.assertEqual(action['type'], 'ir.actions.act_window')
+ # Domain must filter to this user
+ self.assertIn(('user_id', '=', user.id), action['domain'])
+
+ def test_settings_round_trip(self):
+ """Writing settings persists them via ir.config_parameter."""
+ Settings = self.env['res.config.settings'].sudo()
+ Settings.create({
+ 'x_fc_login_audit_retention_days': 90,
+ 'x_fc_login_audit_alert_threshold': 3,
+ 'x_fc_login_audit_alert_window_min': 5,
+ 'x_fc_login_audit_alert_enabled': False,
+ }).execute()
+ ICP = self.env['ir.config_parameter'].sudo()
+ self.assertEqual(ICP.get_param('fusion_login_audit.retention_days'), '90')
+ self.assertEqual(ICP.get_param('fusion_login_audit.alert_threshold'), '3')
+ self.assertEqual(ICP.get_param('fusion_login_audit.alert_window_min'), '5')
+ # Odoo's set_param deletes the row when the value is falsy, so a
+ # Boolean field set to False yields get_param() == False (Python
+ # bool, the default), not the string 'False'.
+ self.assertFalse(ICP.get_param('fusion_login_audit.alert_enabled'))
+
+ def test_failure_burst_queues_one_email(self):
+ """N consecutive failures (within window) queue exactly one mail.mail."""
+ from odoo.exceptions import AccessDenied
+ ICP = self.env['ir.config_parameter'].sudo()
+ ICP.set_param('fusion_login_audit.alert_threshold', '3')
+ ICP.set_param('fusion_login_audit.alert_window_min', '15')
+ ICP.set_param('fusion_login_audit.alert_enabled', 'True')
+ # Clear any cooldown leftover from earlier tests.
+ ICP.set_param('fusion_login_audit.last_alert:burst@example.com', '')
+
+ user = self.env['res.users'].sudo().create({
+ 'name': 'Burst Tester',
+ 'login': 'burst@example.com',
+ 'password': 'burst-tester-pw-1',
+ })
+ Mail = self.env['mail.mail'].sudo()
+ before = Mail.search_count([('subject', 'ilike', 'burst@example.com')])
+ for _i in range(3):
+ raised = False
+ try:
+ user._check_credentials(
+ {'login': user.login, 'password': 'wrong',
+ 'type': 'password'},
+ {'interactive': False},
+ )
+ except AccessDenied:
+ raised = True
+ self.assertTrue(raised)
+ after = Mail.search_count([('subject', 'ilike', 'burst@example.com')])
+ self.assertEqual(after, before + 1,
+ "Exactly one alert mail should be queued")
+
+ def test_cooldown_suppresses_second_alert(self):
+ """Failures beyond the threshold within the cooldown queue zero more emails."""
+ from odoo.exceptions import AccessDenied
+ ICP = self.env['ir.config_parameter'].sudo()
+ ICP.set_param('fusion_login_audit.alert_threshold', '3')
+ ICP.set_param('fusion_login_audit.alert_window_min', '15')
+ ICP.set_param('fusion_login_audit.alert_enabled', 'True')
+ ICP.set_param('fusion_login_audit.last_alert:cool@example.com', '')
+
+ user = self.env['res.users'].sudo().create({
+ 'name': 'Cooldown Tester',
+ 'login': 'cool@example.com',
+ 'password': 'cooldown-tester-pw-1',
+ })
+ Mail = self.env['mail.mail'].sudo()
+ for _i in range(3):
+ try:
+ user._check_credentials(
+ {'login': user.login, 'password': 'wrong',
+ 'type': 'password'},
+ {'interactive': False},
+ )
+ except AccessDenied:
+ pass
+ count_after_3 = Mail.search_count([('subject', 'ilike', 'cool@example.com')])
+ for _i in range(2):
+ try:
+ user._check_credentials(
+ {'login': user.login, 'password': 'wrong',
+ 'type': 'password'},
+ {'interactive': False},
+ )
+ except AccessDenied:
+ pass
+ count_after_5 = Mail.search_count([('subject', 'ilike', 'cool@example.com')])
+ self.assertEqual(count_after_5, count_after_3,
+ "Cooldown should suppress additional emails")
+
+ def test_alert_disabled_master_switch(self):
+ """alert_enabled=False suppresses all alerts regardless of threshold."""
+ from odoo.exceptions import AccessDenied
+ ICP = self.env['ir.config_parameter'].sudo()
+ ICP.set_param('fusion_login_audit.alert_threshold', '1')
+ ICP.set_param('fusion_login_audit.alert_window_min', '15')
+ # Use the actual boolean field's storage semantics — see CLAUDE.md rule #5.
+ # Writing False through the settings form deletes the param; here we
+ # set the string 'False' explicitly to simulate "disabled".
+ ICP.set_param('fusion_login_audit.alert_enabled', 'False')
+ ICP.set_param('fusion_login_audit.last_alert:disabled@example.com', '')
+
+ user = self.env['res.users'].sudo().create({
+ 'name': 'Disabled Tester',
+ 'login': 'disabled@example.com',
+ 'password': 'disabled-tester-pw-1',
+ })
+ Mail = self.env['mail.mail'].sudo()
+ before = Mail.search_count([('subject', 'ilike', 'disabled@example.com')])
+ try:
+ user._check_credentials(
+ {'login': user.login, 'password': 'wrong',
+ 'type': 'password'},
+ {'interactive': False},
+ )
+ except AccessDenied:
+ pass
+ after = Mail.search_count([('subject', 'ilike', 'disabled@example.com')])
+ self.assertEqual(after, before, "Disabled alerts should queue nothing")
+
+ def test_retention_gc_deletes_old_rows(self):
+ """The GC method deletes rows older than retention_days."""
+ from datetime import timedelta
+ ICP = self.env['ir.config_parameter'].sudo()
+ ICP.set_param('fusion_login_audit.retention_days', '30')
+
+ now = fields.Datetime.now()
+ Audit = self.env['fusion.login.audit'].sudo()
+ old = Audit.create({
+ 'attempted_login': 'gc-old@example.com',
+ 'result': 'success',
+ 'event_time': now - timedelta(days=45),
+ })
+ recent = Audit.create({
+ 'attempted_login': 'gc-recent@example.com',
+ 'result': 'success',
+ 'event_time': now - timedelta(days=5),
+ })
+ old_id, recent_id = old.id, recent.id
+
+ Audit._fc_retention_gc()
+
+ self.assertFalse(Audit.browse(old_id).exists(),
+ "Row older than retention_days should be gone")
+ self.assertTrue(Audit.browse(recent_id).exists(),
+ "Row inside retention_days should survive")
+
+ def test_retention_zero_keeps_forever(self):
+ """retention_days=0 keeps all rows."""
+ from datetime import timedelta
+ ICP = self.env['ir.config_parameter'].sudo()
+ ICP.set_param('fusion_login_audit.retention_days', '0')
+
+ now = fields.Datetime.now()
+ Audit = self.env['fusion.login.audit'].sudo()
+ ancient = Audit.create({
+ 'attempted_login': 'forever@example.com',
+ 'result': 'success',
+ 'event_time': now - timedelta(days=3650),
+ })
+ ancient_id = ancient.id
+
+ Audit._fc_retention_gc()
+
+ self.assertTrue(Audit.browse(ancient_id).exists(),
+ "retention_days=0 must keep everything")
+
+ def test_geo_private_ip_shortcut(self):
+ """Private IPs short-circuit to state='private_ip' without HTTP."""
+ Audit = self.env['fusion.login.audit'].sudo()
+ rec = Audit.create({
+ 'attempted_login': 'lan@example.com',
+ 'result': 'success',
+ 'ip_address': '192.168.1.40',
+ 'geo_lookup_state': 'pending',
+ })
+ Audit._fc_geo_enrich_pending(limit=10)
+ rec.invalidate_recordset()
+ self.assertEqual(rec.geo_lookup_state, 'private_ip')
+ self.assertEqual(rec.country_code, '--')
+
+ def test_geo_cache_hit_avoids_http(self):
+ """A second row with the same recent IP copies from cache."""
+ from unittest.mock import patch
+ Audit = self.env['fusion.login.audit'].sudo()
+ # Seed a "done" row from the same IP.
+ Audit.create({
+ 'attempted_login': 'seed@example.com',
+ 'result': 'success',
+ 'ip_address': '203.0.113.99',
+ 'geo_lookup_state': 'done',
+ 'country_code': 'CA',
+ 'country_name': 'Canada',
+ 'city': 'Toronto',
+ 'geo_state': 'Ontario',
+ })
+ target = Audit.create({
+ 'attempted_login': 'hit@example.com',
+ 'result': 'success',
+ 'ip_address': '203.0.113.99',
+ 'geo_lookup_state': 'pending',
+ })
+
+ with patch(
+ 'odoo.addons.fusion_login_audit.models.fusion_login_audit.requests.get'
+ ) as mock_get:
+ Audit._fc_geo_enrich_pending(limit=10)
+ mock_get.assert_not_called()
+
+ target.invalidate_recordset()
+ self.assertEqual(target.geo_lookup_state, 'done')
+ self.assertEqual(target.country_code, 'CA')
+ self.assertEqual(target.city, 'Toronto')
+
+ def test_geo_internal_skipped(self):
+ """Rows with geo_lookup_state='internal' are not picked up."""
+ Audit = self.env['fusion.login.audit'].sudo()
+ rec = Audit.create({
+ 'attempted_login': 'cron@example.com',
+ 'result': 'success',
+ 'ip_address': 'internal',
+ 'geo_lookup_state': 'internal',
+ })
+ # Should be a no-op for 'internal' state (cron only picks 'pending').
+ Audit._fc_geo_enrich_pending(limit=10)
+ rec.invalidate_recordset()
+ self.assertEqual(rec.geo_lookup_state, 'internal')
diff --git a/fusion_login_audit/tests/test_security.py b/fusion_login_audit/tests/test_security.py
new file mode 100644
index 00000000..9f9c74cd
--- /dev/null
+++ b/fusion_login_audit/tests/test_security.py
@@ -0,0 +1,129 @@
+# -*- coding: utf-8 -*-
+from odoo.exceptions import AccessError
+from odoo.tests.common import TransactionCase, tagged
+
+
+@tagged('post_install', '-at_install')
+class TestFusionLoginAuditSecurity(TransactionCase):
+ """Tests for the layered protection on `fusion.login.audit`:
+
+ Layer 1 — ACL (security/ir.model.access.csv): grants read-only access to
+ `base.group_system` and nothing to any other group. Blocks write/create/
+ unlink for everyone via the ORM regardless of `sudo()`.
+
+ Layer 2 — Record rule (security/security.xml): group-specific rule that
+ grants admins an unrestricted domain (`[(1,'=',1)]`). The rule does NOT
+ actively restrict non-admins — Odoo's semantics for a group-scoped rule
+ is "the rule only applies to users in that group". Non-admins are gated
+ purely by the ACL, which denies them everything. The rule's value is
+ documentation + future-proofing (it keeps admin access explicit if the
+ ACL is ever loosened with a per-group read row; the admin path remains
+ explicit and self-documenting). It is NOT a security gate the ACL relies on.
+
+ Test naming reflects which layer actually does the work:
+ - test_acl_blocks_* — exercises Layer 1 (ACL alone is sufficient).
+ - test_admin_can_read_through_acl_and_rule — exercises both layers in
+ the positive path (admin must satisfy ACL grant
+ AND the admin-scoped rule's domain).
+ """
+
+ def setUp(self):
+ super().setUp()
+ self.audit_row = self.env['fusion.login.audit'].sudo().create({
+ 'attempted_login': 'sec-test@example.com',
+ 'result': 'success',
+ 'database': self.env.cr.dbname,
+ })
+ # Internal non-admin user (active employee, not a Settings admin).
+ self.regular_user = self.env['res.users'].sudo().create({
+ 'name': 'Regular Tester',
+ 'login': 'regular-tester@example.com',
+ 'password': 'regular-tester-pw-1',
+ 'group_ids': [(6, 0, [self.env.ref('base.group_user').id])],
+ })
+ # Portal user (share=True) — must not see audit data either.
+ self.portal_user = self.env['res.users'].sudo().create({
+ 'name': 'Portal Tester',
+ 'login': 'portal-tester@example.com',
+ 'password': 'portal-tester-pw-1',
+ 'group_ids': [(6, 0, [self.env.ref('base.group_portal').id])],
+ })
+
+ def test_admin_can_read_through_acl_and_rule(self):
+ """A Settings admin satisfies both the ACL (grants read) and the
+ record rule (admin-only domain), so the read succeeds."""
+ admin = self.env.ref('base.user_admin')
+ rec = self.audit_row.with_user(admin).read(['attempted_login'])
+ self.assertEqual(rec[0]['attempted_login'], 'sec-test@example.com')
+
+ def test_acl_blocks_read_for_regular_user(self):
+ """A `base.group_user` member has no ACL grant on the model. The
+ ACL alone denies the read; the record rule never gets consulted."""
+ with self.assertRaises(AccessError):
+ self.audit_row.with_user(self.regular_user).read(['attempted_login'])
+
+ def test_acl_blocks_read_for_portal_user(self):
+ """A `base.group_portal` (share=True) user has no ACL grant either.
+ Audit data must never leak to a portal user — IP and attempted_login
+ are sensitive."""
+ with self.assertRaises(AccessError):
+ self.audit_row.with_user(self.portal_user).read(['attempted_login'])
+
+ def test_acl_blocks_write_for_admin(self):
+ """Even Settings admins cannot write — the ACL grants no group any
+ write permission on this model (audit log is append-only). The rule's
+ `perm_write=False` means 'rule does not constrain this op', so this
+ denial is the ACL's work alone."""
+ admin = self.env.ref('base.user_admin')
+ with self.assertRaises(AccessError):
+ self.audit_row.with_user(admin).write({'attempted_login': 'tampered'})
+
+ def test_acl_blocks_unlink_for_admin(self):
+ """Append-only also at the unlink boundary. ACL grants no group
+ delete permission; the record rule's `perm_unlink=False` exempts
+ it from gating this op."""
+ admin = self.env.ref('base.user_admin')
+ with self.assertRaises(AccessError):
+ self.audit_row.with_user(admin).unlink()
+
+ # Note: a "rule actively blocks non-admins" test was attempted but
+ # removed once the actual Odoo semantics were verified. A group-scoped
+ # rule (groups=[base.group_system]) only applies to users in that group.
+ # Granting a base.group_user member an ACL read row would let them read
+ # rows — the rule does not filter them. To make the rule truly restrictive
+ # we would need a global rule (groups=[]) with domain [(0,'=',1)] paired
+ # with the admin grant. That is a security-model redesign and out of
+ # scope for T3. The ACL already provides the actual gate.
+
+ # ─────────────────────────────────────────────────────────────────────
+ # T14: view-level visibility checks. The smart button and the "Login
+ # Activity" tab on res.users are gated by groups="base.group_system"
+ # on the inner XML nodes (the inherited view record itself cannot
+ # carry groups — CLAUDE.md rule #11). Verify the gate works by asking
+ # for the form view as a non-admin and confirming the x_fc_* fields
+ # are stripped from the arch.
+ # ─────────────────────────────────────────────────────────────────────
+
+ def test_view_hides_button_and_tab_for_non_admin(self):
+ """A regular user's get_view() does not contain the x_fc_login_audit_*
+ fields — they live behind groups="base.group_system" XML attributes."""
+ view = self.env['res.users'].with_user(self.regular_user).get_view(
+ view_id=self.env.ref('base.view_users_form').id,
+ view_type='form',
+ )
+ arch = view['arch']
+ self.assertNotIn('x_fc_login_audit_count', arch,
+ "Smart-button field must not leak into non-admin view")
+ self.assertNotIn('x_fc_login_audit_ids', arch,
+ "Login Activity tab must not leak into non-admin view")
+
+ def test_view_shows_button_and_tab_for_admin(self):
+ """A Settings admin DOES see both nodes."""
+ admin = self.env.ref('base.user_admin')
+ view = self.env['res.users'].with_user(admin).get_view(
+ view_id=self.env.ref('base.view_users_form').id,
+ view_type='form',
+ )
+ arch = view['arch']
+ self.assertIn('x_fc_login_audit_count', arch)
+ self.assertIn('x_fc_login_audit_ids', arch)
diff --git a/fusion_login_audit/views/fusion_login_audit_views.xml b/fusion_login_audit/views/fusion_login_audit_views.xml
new file mode 100644
index 00000000..526e5b1a
--- /dev/null
+++ b/fusion_login_audit/views/fusion_login_audit_views.xml
@@ -0,0 +1,118 @@
+
+
+
+
+
+ fusion.login.audit.list
+ fusion.login.audit
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ fusion.login.audit.form
+ fusion.login.audit
+
+
+
+
+
+
+
+ fusion.login.audit.search
+ fusion.login.audit
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Login Events
+ fusion.login.audit
+ list,form
+
+ {}
+
+
+
+ Failed Logins (24h)
+ fusion.login.audit
+ list,form
+
+ {'search_default_filter_failure': 1, 'search_default_filter_24h': 1}
+
+
+
diff --git a/fusion_login_audit/views/menus.xml b/fusion_login_audit/views/menus.xml
new file mode 100644
index 00000000..3c3a78fa
--- /dev/null
+++ b/fusion_login_audit/views/menus.xml
@@ -0,0 +1,24 @@
+
+
+
+
+
+
+
+
+
+
diff --git a/fusion_login_audit/views/res_config_settings_views.xml b/fusion_login_audit/views/res_config_settings_views.xml
new file mode 100644
index 00000000..71b4d426
--- /dev/null
+++ b/fusion_login_audit/views/res_config_settings_views.xml
@@ -0,0 +1,36 @@
+
+
+
+
+ res.config.settings.form.login.audit
+ res.config.settings
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/fusion_login_audit/views/res_users_views.xml b/fusion_login_audit/views/res_users_views.xml
new file mode 100644
index 00000000..5cb0f641
--- /dev/null
+++ b/fusion_login_audit/views/res_users_views.xml
@@ -0,0 +1,56 @@
+
+
+
+
+ res.users.form.inherit.fusion_login_audit
+ res.users
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/scripts/fcb_test_on_trial.sh b/scripts/fcb_test_on_trial.sh
new file mode 100644
index 00000000..a8805503
--- /dev/null
+++ b/scripts/fcb_test_on_trial.sh
@@ -0,0 +1,27 @@
+#!/usr/bin/env bash
+# Sync fusion_centralize_billing to the odoo-trial Enterprise sandbox (Proxmox VM 316)
+# and run its test suite there. The local dev Odoo (odoo-modsdev) is Community and
+# CANNOT install this module (needs sale_subscription + account_accountant), so tests
+# run on odoo-trial (Odoo 19.0 Enterprise, db=trial), reached via Proxmox guest-exec
+# (VM 316 has no direct SSH; only `qm guest exec` through the pve-worker1 host).
+#
+# Usage: bash scripts/fcb_test_on_trial.sh
+# Pass condition: the output ends with `FCB_EXIT=0` (Odoo exits non-zero on test failure).
+set -uo pipefail
+
+MODULE=fusion_centralize_billing
+REPO_DIR="$(cd "$(dirname "$0")/.." && pwd)"
+PVE=pve-worker1 # Proxmox host that runs VM 316 (ssh config alias)
+VMID=316
+
+echo ">> packing ${MODULE}"
+B64=$(tar czf - --exclude='__pycache__' --exclude='*.pyc' -C "${REPO_DIR}" "${MODULE}" | base64 -w0)
+echo " payload: ${#B64} b64 bytes"
+
+echo ">> syncing to odoo-trial:/opt/odoo/custom-addons (guest-exec)"
+ssh -o ConnectTimeout=40 "${PVE}" "qm guest exec ${VMID} --timeout 90 -- bash -lc 'rm -rf /opt/odoo/custom-addons/${MODULE}; echo ${B64} | base64 -d | tar xzf - -C /opt/odoo/custom-addons/ && echo SYNCED'" \
+ 2>&1 | sed -n 's/.*"out-data" : "\(.*\)",/\1/p' | sed 's/\\n/\n/g'
+
+echo ">> upgrade + test on Enterprise 19 (db=trial, --no-http)"
+ssh -o ConnectTimeout=40 "${PVE}" "qm guest exec ${VMID} --timeout 600 -- bash -lc 'docker exec odoo-trial-app odoo -d trial -u ${MODULE} --no-http --http-port 8070 --workers 0 --test-enable --test-tags /${MODULE} --stop-after-init >/tmp/fcb_test.log 2>&1; echo FCB_EXIT=\$?; grep -iE \"FAIL|ERROR|tested in|Ran |assert\" /tmp/fcb_test.log | grep -viE \"fusion_plating|fusion_tasks|not installable|not loaded\" | tail -30'" \
+ 2>&1 | sed -n 's/.*"out-data" : "\(.*\)",/\1/p' | sed 's/\\n/\n/g'