diff --git a/fusion_plating/fusion_plating_shopfloor/models/__init__.py b/fusion_plating/fusion_plating_shopfloor/models/__init__.py index 4cd178ed..a0d1ff50 100644 --- a/fusion_plating/fusion_plating_shopfloor/models/__init__.py +++ b/fusion_plating/fusion_plating_shopfloor/models/__init__.py @@ -10,3 +10,4 @@ from . import fp_tank from . import res_users from . import res_config_settings from . import fp_tablet_session_event +from . import fp_tablet_pin_reset diff --git a/fusion_plating/fusion_plating_shopfloor/models/fp_tablet_pin_reset.py b/fusion_plating/fusion_plating_shopfloor/models/fp_tablet_pin_reset.py new file mode 100644 index 00000000..31b34501 --- /dev/null +++ b/fusion_plating/fusion_plating_shopfloor/models/fp_tablet_pin_reset.py @@ -0,0 +1,288 @@ +# -*- coding: utf-8 -*- +# Copyright 2026 Nexa Systems Inc. +# License OPL-1 (Odoo Proprietary License v1.0) +"""Tablet PIN email-reset codes. + +Spec: docs/superpowers/specs/2026-05-25-tablet-pin-self-service-design.md + +Stores hashed 4-digit codes emailed to users for self-service PIN +creation OR reset. One active row per user (SQL constraint). 72-hour +expiry. 5-wrong-attempts-per-code cap. Cleaned up daily by cron. +""" +import base64 +import hashlib +import hmac +import json +import logging +import secrets +import time +from datetime import timedelta + +from odoo import _, api, fields, models +from odoo.exceptions import UserError + +_logger = logging.getLogger(__name__) + +# Code TTL — user-picked default per D4 (spec). Long enough for shift +# workers / weekend gaps; short enough that an old code in the inbox +# isn't a long-lived risk. +_CODE_TTL_HOURS = 72 + +# Per-code wrong-attempt cap per D5. After this many wrong tries the +# code is invalidated even if expires_at hasn't passed. +_MAX_ATTEMPTS = 5 + +# Rate-limit window + max requests per D6. Counts rows created in the +# rolling 60-minute window per user. +_RATE_LIMIT_WINDOW_MIN = 60 +_RATE_LIMIT_MAX = 3 + +# Reset token TTL per D16 (spec). Short enough that an intercepted +# token can't be sat on; long enough for the user to type + confirm +# a new PIN without rushing. +_RESET_TOKEN_TTL_SEC = 300 + +# Same PBKDF2 iteration count as the regular PIN hash (res_users.py). +# Keeps hash-cost consistent across the system. +_PBKDF2_ITERATIONS = 200_000 + + +class FpTabletPinReset(models.Model): + _name = 'fp.tablet.pin.reset' + _description = 'Tablet PIN Email-Reset Code' + _order = 'create_date desc' + + user_id = fields.Many2one( + 'res.users', required=True, ondelete='cascade', index=True, + ) + code_hash = fields.Char( + required=True, + groups='fusion_plating.group_fusion_plating_manager', + help='PBKDF2-SHA256 hash + salt of the 4-digit code. Format: ' + '$. Never plaintext.', + ) + expires_at = fields.Datetime(required=True, index=True) + used_at = fields.Datetime( + help='Set when the code is successfully verified, OR when 5 ' + 'wrong attempts invalidate it. Either way the row stops ' + 'being "active" and a new one can be requested.', + ) + attempt_count = fields.Integer( + default=0, + help='Wrong-guess counter. 5 wrong attempts invalidate the ' + 'code (used_at set).', + ) + requester_ip = fields.Char(help='IP of the kiosk that requested.') + + _sql_constraints = [ + # At most ONE active (used_at IS NULL) row per user. Forces the + # "request new = invalidate old" behavior. Uses Postgres + # EXCLUDE — partial unique index doesn't compose with the + # other rows where used_at IS NOT NULL. + ('one_active_per_user', + "EXCLUDE (user_id WITH =) WHERE (used_at IS NULL)", + 'A user may have at most one outstanding tablet PIN reset code.'), + ] + + # ===== Hash helpers (mirror ResUsers._hash_tablet_pin pattern) ===== + + @staticmethod + def _hash_code(code, salt=None): + """Hash `code` with optional salt. Returns 'salt_hex$digest_hex'.""" + if salt is None: + salt = secrets.token_bytes(16) + digest = hashlib.pbkdf2_hmac( + 'sha256', code.encode('utf-8'), salt, _PBKDF2_ITERATIONS, + ) + return f"{salt.hex()}${digest.hex()}" + + @staticmethod + def _verify_code_hash(code, stored): + """Constant-time verify of `code` against a stored hash string.""" + if not stored or '$' not in stored: + return False + salt_hex, expected_hex = stored.split('$', 1) + try: + salt = bytes.fromhex(salt_hex) + except ValueError: + return False + digest = hashlib.pbkdf2_hmac( + 'sha256', code.encode('utf-8'), salt, _PBKDF2_ITERATIONS, + ) + return secrets.compare_digest(digest.hex(), expected_hex) + + # ===== Public lifecycle ===== + + @api.model + def _generate_for_user(self, user, requester_ip=None): + """Issue a fresh code for `user`. Replaces any active row. + Returns (record, plaintext_code). + + Caller must email the plaintext_code; never persisted anywhere + after this returns. Raises UserError on rate-limit breach. + """ + # Rate-limit: count rows created in the rolling window. + cutoff = fields.Datetime.now() - timedelta( + minutes=_RATE_LIMIT_WINDOW_MIN, + ) + recent = self.sudo().search_count([ + ('user_id', '=', user.id), + ('create_date', '>=', cutoff), + ]) + if recent >= _RATE_LIMIT_MAX: + # Find when the oldest of the window ages out + oldest = self.sudo().search([ + ('user_id', '=', user.id), + ('create_date', '>=', cutoff), + ], order='create_date asc', limit=1) + ages_out = oldest.create_date + timedelta( + minutes=_RATE_LIMIT_WINDOW_MIN, + ) + wait_min = max( + 1, + int((ages_out - fields.Datetime.now()).total_seconds() / 60), + ) + raise UserError(_( + 'Too many reset requests. Wait %d minutes before ' + 'trying again.' + ) % wait_min) + # Invalidate any existing active row (SQL constraint enforces + # one active; we explicitly mark prior as used to be clean). + self.sudo().search([ + ('user_id', '=', user.id), + ('used_at', '=', False), + ]).write({'used_at': fields.Datetime.now()}) + # Generate the code — 0000-9999, zero-padded. + code = f"{secrets.randbelow(10000):04d}" + rec = self.sudo().create({ + 'user_id': user.id, + 'code_hash': self._hash_code(code), + 'expires_at': fields.Datetime.now() + timedelta( + hours=_CODE_TTL_HOURS, + ), + 'requester_ip': (requester_ip or '')[:64], + }) + return rec, code + + def _verify_and_consume(self, code): + """Verify `code` against this row. Returns (ok, error_str_or_None). + + Side effects: + - Increments attempt_count regardless of result + - If correct: sets used_at, returns (True, None) + - If wrong + attempt_count == _MAX_ATTEMPTS: sets used_at, + returns (False, 'too_many_attempts') + - If expired: sets used_at, returns (False, 'expired') + """ + self.ensure_one() + now = fields.Datetime.now() + if self.used_at: + return (False, 'already_used') + if self.expires_at < now: + self.sudo().write({'used_at': now}) + return (False, 'expired') + # Always increment attempt_count before verifying so wrong tries + # count against the cap even if the user retries the same wrong + # code. + self.sudo().write({'attempt_count': self.attempt_count + 1}) + if not self._verify_code_hash(code, self.sudo().code_hash): + # Did this push us to the cap? + if self.attempt_count + 1 >= _MAX_ATTEMPTS: + self.sudo().write({'used_at': now}) + return (False, 'too_many_attempts') + return (False, 'wrong_code') + # Correct + self.sudo().write({'used_at': now}) + return (True, None) + + # ===== Reset-token signing (HMAC-SHA256, single-use, 5min TTL) ===== + + @api.model + def _sign_reset_token(self, user_id): + """Mint a signed short-lived token proving the user just + verified a reset code. Used by /fp/tablet/set_pin to authorise + a new-PIN write without an old PIN. + + Format: base64url(payload).base64url(signature) + Payload: {user_id, exp_epoch, purpose} + """ + secret = self.env['ir.config_parameter'].sudo().get_param( + 'database.secret', + ) + if not secret: + raise UserError(_( + 'Cannot sign reset token — database.secret not set.' + )) + payload = { + 'user_id': int(user_id), + 'exp': int(time.time()) + _RESET_TOKEN_TTL_SEC, + 'purpose': 'tablet_pin_reset', + } + body = base64.urlsafe_b64encode( + json.dumps(payload, separators=(',', ':')).encode('utf-8'), + ).rstrip(b'=') + sig = hmac.new( + secret.encode('utf-8'), body, hashlib.sha256, + ).digest() + sig_b64 = base64.urlsafe_b64encode(sig).rstrip(b'=') + return body.decode() + '.' + sig_b64.decode() + + @api.model + def _verify_reset_token(self, token): + """Verify token signature + expiry + purpose claim. + Returns user_id on success, raises UserError otherwise. + """ + if not token or '.' not in token: + raise UserError(_('Invalid reset token.')) + body_b64, sig_b64 = token.split('.', 1) + secret = self.env['ir.config_parameter'].sudo().get_param( + 'database.secret', + ) + if not secret: + raise UserError(_('Cannot verify reset token.')) + expected_sig = hmac.new( + secret.encode('utf-8'), + body_b64.encode('utf-8'), + hashlib.sha256, + ).digest() + expected_sig_b64 = base64.urlsafe_b64encode( + expected_sig, + ).rstrip(b'=').decode() + if not hmac.compare_digest(sig_b64, expected_sig_b64): + raise UserError(_('Reset token signature invalid.')) + # Decode payload (re-pad for base64) + padding = '=' * (-len(body_b64) % 4) + try: + payload = json.loads( + base64.urlsafe_b64decode(body_b64 + padding).decode('utf-8'), + ) + except (ValueError, UnicodeDecodeError): + raise UserError(_('Reset token payload invalid.')) + if payload.get('purpose') != 'tablet_pin_reset': + raise UserError(_('Reset token purpose mismatch.')) + if payload.get('exp', 0) < int(time.time()): + raise UserError(_('Reset token expired.')) + uid = payload.get('user_id') + if not isinstance(uid, int): + raise UserError(_('Reset token user_id invalid.')) + return uid + + # ===== Cleanup cron ===== + + @api.model + def _cron_purge_expired(self): + """Daily cron — delete used/expired rows > 7 days old. + Audit trail lives in fp.tablet.session.event, not here, so we + can purge aggressively without losing forensics.""" + cutoff = fields.Datetime.now() - timedelta(days=7) + to_purge = self.sudo().search([ + '|', + '&', ('used_at', '!=', False), ('used_at', '<', cutoff), + '&', ('used_at', '=', False), ('expires_at', '<', cutoff), + ]) + if to_purge: + count = len(to_purge) + to_purge.unlink() + _logger.info( + 'fp.tablet.pin.reset cleanup: purged %d expired rows', count, + ) diff --git a/fusion_plating/fusion_plating_shopfloor/models/fp_tablet_session_event.py b/fusion_plating/fusion_plating_shopfloor/models/fp_tablet_session_event.py index 5143f410..c6a37ad1 100644 --- a/fusion_plating/fusion_plating_shopfloor/models/fp_tablet_session_event.py +++ b/fusion_plating/fusion_plating_shopfloor/models/fp_tablet_session_event.py @@ -29,6 +29,10 @@ class FpTabletSessionEvent(models.Model): ('ceiling_lock', '8-hour ceiling lock'), ('force_lock', 'Force lock (cron, stale session)'), ('admin_reset', 'Admin force-reset PIN'), + # Spec 2026-05-25 — self-service PIN reset flow + ('pin_reset_requested', 'PIN reset code requested (email sent)'), + ('pin_reset_code_verified', 'PIN reset code verified'), + ('pin_set_after_reset', 'New PIN set via email reset flow'), ], required=True, readonly=True, diff --git a/fusion_plating/fusion_plating_shopfloor/security/ir.model.access.csv b/fusion_plating/fusion_plating_shopfloor/security/ir.model.access.csv index c3c849f6..dd7a8739 100644 --- a/fusion_plating/fusion_plating_shopfloor/security/ir.model.access.csv +++ b/fusion_plating/fusion_plating_shopfloor/security/ir.model.access.csv @@ -15,3 +15,4 @@ access_fp_job_node_override_operator,fp.job.node.override.operator,fusion_platin access_res_users_kiosk,res.users.kiosk.read,base.model_res_users,fusion_plating_shopfloor.group_fp_tablet_kiosk,1,0,0,0 access_ir_config_param_kiosk,ir.config_parameter.kiosk.read,base.model_ir_config_parameter,fusion_plating_shopfloor.group_fp_tablet_kiosk,1,0,0,0 access_fp_tablet_session_event_owner,fp.tablet.session.event.owner.read,model_fp_tablet_session_event,fusion_plating.group_fp_owner,1,0,0,0 +access_fp_tablet_pin_reset_manager,fp.tablet.pin.reset.manager,model_fp_tablet_pin_reset,fusion_plating.group_fusion_plating_manager,1,1,1,1 diff --git a/fusion_plating/fusion_plating_shopfloor/tests/__init__.py b/fusion_plating/fusion_plating_shopfloor/tests/__init__.py index cd1d0e2e..871dafed 100644 --- a/fusion_plating/fusion_plating_shopfloor/tests/__init__.py +++ b/fusion_plating/fusion_plating_shopfloor/tests/__init__.py @@ -9,3 +9,4 @@ from . import test_tablet_pin_auth_manager from . import test_unlock_lock_session_endpoints from . import test_force_lock_cron from . import test_tiles_bootstrap_fields +from . import test_pin_reset_flow diff --git a/fusion_plating/fusion_plating_shopfloor/tests/test_pin_reset_flow.py b/fusion_plating/fusion_plating_shopfloor/tests/test_pin_reset_flow.py new file mode 100644 index 00000000..f052c078 --- /dev/null +++ b/fusion_plating/fusion_plating_shopfloor/tests/test_pin_reset_flow.py @@ -0,0 +1,131 @@ +# -*- coding: utf-8 -*- +"""Tablet PIN self-service reset-code tests. + +Spec: docs/superpowers/specs/2026-05-25-tablet-pin-self-service-design.md +Plan: docs/superpowers/plans/2026-05-25-tablet-pin-self-service-plan.md +""" +from datetime import timedelta + +from odoo import fields +from odoo.tests.common import TransactionCase +from odoo.exceptions import UserError + + +class TestPinResetModel(TransactionCase): + """Model-level lifecycle tests (no HTTP).""" + + def setUp(self): + super().setUp() + self.user = self.env['res.users'].create({ + 'name': 'Reset Tester', + 'login': 'reset.tester@example.com', + }) + + def _model(self): + return self.env['fp.tablet.pin.reset'] + + def test_generate_creates_active_row(self): + rec, code = self._model()._generate_for_user(self.user) + self.assertTrue(rec.exists()) + self.assertEqual(len(code), 4) + self.assertTrue(code.isdigit()) + self.assertFalse(rec.used_at) + # 72h ± a few seconds + delta = rec.expires_at - fields.Datetime.now() + self.assertGreater(delta, timedelta(hours=71, minutes=59)) + self.assertLess(delta, timedelta(hours=72, minutes=1)) + + def test_generate_replaces_prior_active(self): + rec1, _c1 = self._model()._generate_for_user(self.user) + rec2, _c2 = self._model()._generate_for_user(self.user) + # rec1 should now be marked used (forced by _generate_for_user) + rec1.invalidate_recordset(['used_at']) + self.assertTrue(rec1.used_at) + self.assertFalse(rec2.used_at) + + def test_rate_limit_kicks_in_at_4th_request(self): + self._model()._generate_for_user(self.user) + self._model()._generate_for_user(self.user) + self._model()._generate_for_user(self.user) + with self.assertRaises(UserError): + self._model()._generate_for_user(self.user) + + def test_verify_correct_code_succeeds(self): + rec, code = self._model()._generate_for_user(self.user) + ok, err = rec._verify_and_consume(code) + self.assertTrue(ok) + self.assertIsNone(err) + rec.invalidate_recordset(['used_at']) + self.assertTrue(rec.used_at) + + def test_verify_wrong_code_increments_attempt_count(self): + rec, code = self._model()._generate_for_user(self.user) + wrong = '9999' if code != '9999' else '0000' + ok, err = rec._verify_and_consume(wrong) + self.assertFalse(ok) + self.assertEqual(err, 'wrong_code') + rec.invalidate_recordset(['attempt_count']) + self.assertEqual(rec.attempt_count, 1) + + def test_5_wrong_attempts_invalidates_code(self): + rec, code = self._model()._generate_for_user(self.user) + wrong = '9999' if code != '9999' else '0000' + for i in range(4): + rec._verify_and_consume(wrong) + rec.invalidate_recordset(['attempt_count', 'used_at']) + self.assertEqual(rec.attempt_count, 4) + self.assertFalse(rec.used_at) + # 5th wrong attempt invalidates + ok, err = rec._verify_and_consume(wrong) + self.assertFalse(ok) + self.assertEqual(err, 'too_many_attempts') + rec.invalidate_recordset(['used_at']) + self.assertTrue(rec.used_at) + + def test_expired_code_rejects_even_if_correct(self): + rec, code = self._model()._generate_for_user(self.user) + # Backdate expiry to past + rec.sudo().write({ + 'expires_at': fields.Datetime.now() - timedelta(minutes=1), + }) + ok, err = rec._verify_and_consume(code) + self.assertFalse(ok) + self.assertEqual(err, 'expired') + + def test_reset_token_sign_verify_roundtrip(self): + token = self._model()._sign_reset_token(self.user.id) + uid = self._model()._verify_reset_token(token) + self.assertEqual(uid, self.user.id) + + def test_reset_token_tampered_signature_rejects(self): + token = self._model()._sign_reset_token(self.user.id) + # Flip a character in the signature half + body, sig = token.split('.', 1) + bad_sig = ('A' if sig[0] != 'A' else 'B') + sig[1:] + bad_token = body + '.' + bad_sig + with self.assertRaises(UserError): + self._model()._verify_reset_token(bad_token) + + def test_reset_token_purpose_mismatch_rejects(self): + # Manually craft a token with wrong purpose + import base64 + import hmac + import hashlib + import json + import time + secret = self.env['ir.config_parameter'].sudo().get_param( + 'database.secret', + ) + payload = { + 'user_id': self.user.id, + 'exp': int(time.time()) + 300, + 'purpose': 'wrong_purpose', + } + body = base64.urlsafe_b64encode( + json.dumps(payload, separators=(',', ':')).encode(), + ).rstrip(b'=') + sig = hmac.new(secret.encode(), body, hashlib.sha256).digest() + sig_b64 = base64.urlsafe_b64encode(sig).rstrip(b'=') + bad_token = body.decode() + '.' + sig_b64.decode() + with self.assertRaises(UserError): + self._model()._verify_reset_token(bad_token)