feat(tablet_pin_reset): new model + hash helpers + token sign (Task 1)
fp.tablet.pin.reset stores hashed 4-digit codes emailed for self-
service PIN create/reset. Per CLAUDE.md Rule 24 + Rule 13l it follows
the defensive patterns established elsewhere in the shopfloor module:
- PBKDF2-SHA256 hashing (200k iterations, matches ResUsers PIN)
- 72h TTL per D4
- 5 wrong-attempt cap per D5 (invalidates code, used_at set)
- 3 requests/60min rate limit per D6 (raises UserError)
- SQL EXCLUDE constraint enforces one-active-row-per-user per D7
- HMAC-SHA256 reset_token (300s TTL, single-use) for step 3 of
the flow (set_pin via reset_token alternative to old_pin)
Audit event_type extended with 3 new values (pin_reset_requested,
pin_reset_code_verified, pin_set_after_reset). Manager-only ACL on
the new model; sudo when endpoints need access.
10 model-level tests cover generate / replace-active / rate-limit /
verify-correct / verify-wrong / 5-attempt-cap / expired / token sign
roundtrip / tampered-sig / purpose-mismatch.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -10,3 +10,4 @@ from . import fp_tank
|
|||||||
from . import res_users
|
from . import res_users
|
||||||
from . import res_config_settings
|
from . import res_config_settings
|
||||||
from . import fp_tablet_session_event
|
from . import fp_tablet_session_event
|
||||||
|
from . import fp_tablet_pin_reset
|
||||||
|
|||||||
@@ -0,0 +1,288 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2026 Nexa Systems Inc.
|
||||||
|
# License OPL-1 (Odoo Proprietary License v1.0)
|
||||||
|
"""Tablet PIN email-reset codes.
|
||||||
|
|
||||||
|
Spec: docs/superpowers/specs/2026-05-25-tablet-pin-self-service-design.md
|
||||||
|
|
||||||
|
Stores hashed 4-digit codes emailed to users for self-service PIN
|
||||||
|
creation OR reset. One active row per user (SQL constraint). 72-hour
|
||||||
|
expiry. 5-wrong-attempts-per-code cap. Cleaned up daily by cron.
|
||||||
|
"""
|
||||||
|
import base64
|
||||||
|
import hashlib
|
||||||
|
import hmac
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import secrets
|
||||||
|
import time
|
||||||
|
from datetime import timedelta
|
||||||
|
|
||||||
|
from odoo import _, api, fields, models
|
||||||
|
from odoo.exceptions import UserError
|
||||||
|
|
||||||
|
_logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Code TTL — user-picked default per D4 (spec). Long enough for shift
|
||||||
|
# workers / weekend gaps; short enough that an old code in the inbox
|
||||||
|
# isn't a long-lived risk.
|
||||||
|
_CODE_TTL_HOURS = 72
|
||||||
|
|
||||||
|
# Per-code wrong-attempt cap per D5. After this many wrong tries the
|
||||||
|
# code is invalidated even if expires_at hasn't passed.
|
||||||
|
_MAX_ATTEMPTS = 5
|
||||||
|
|
||||||
|
# Rate-limit window + max requests per D6. Counts rows created in the
|
||||||
|
# rolling 60-minute window per user.
|
||||||
|
_RATE_LIMIT_WINDOW_MIN = 60
|
||||||
|
_RATE_LIMIT_MAX = 3
|
||||||
|
|
||||||
|
# Reset token TTL per D16 (spec). Short enough that an intercepted
|
||||||
|
# token can't be sat on; long enough for the user to type + confirm
|
||||||
|
# a new PIN without rushing.
|
||||||
|
_RESET_TOKEN_TTL_SEC = 300
|
||||||
|
|
||||||
|
# Same PBKDF2 iteration count as the regular PIN hash (res_users.py).
|
||||||
|
# Keeps hash-cost consistent across the system.
|
||||||
|
_PBKDF2_ITERATIONS = 200_000
|
||||||
|
|
||||||
|
|
||||||
|
class FpTabletPinReset(models.Model):
|
||||||
|
_name = 'fp.tablet.pin.reset'
|
||||||
|
_description = 'Tablet PIN Email-Reset Code'
|
||||||
|
_order = 'create_date desc'
|
||||||
|
|
||||||
|
user_id = fields.Many2one(
|
||||||
|
'res.users', required=True, ondelete='cascade', index=True,
|
||||||
|
)
|
||||||
|
code_hash = fields.Char(
|
||||||
|
required=True,
|
||||||
|
groups='fusion_plating.group_fusion_plating_manager',
|
||||||
|
help='PBKDF2-SHA256 hash + salt of the 4-digit code. Format: '
|
||||||
|
'<salt_hex>$<digest_hex>. Never plaintext.',
|
||||||
|
)
|
||||||
|
expires_at = fields.Datetime(required=True, index=True)
|
||||||
|
used_at = fields.Datetime(
|
||||||
|
help='Set when the code is successfully verified, OR when 5 '
|
||||||
|
'wrong attempts invalidate it. Either way the row stops '
|
||||||
|
'being "active" and a new one can be requested.',
|
||||||
|
)
|
||||||
|
attempt_count = fields.Integer(
|
||||||
|
default=0,
|
||||||
|
help='Wrong-guess counter. 5 wrong attempts invalidate the '
|
||||||
|
'code (used_at set).',
|
||||||
|
)
|
||||||
|
requester_ip = fields.Char(help='IP of the kiosk that requested.')
|
||||||
|
|
||||||
|
_sql_constraints = [
|
||||||
|
# At most ONE active (used_at IS NULL) row per user. Forces the
|
||||||
|
# "request new = invalidate old" behavior. Uses Postgres
|
||||||
|
# EXCLUDE — partial unique index doesn't compose with the
|
||||||
|
# other rows where used_at IS NOT NULL.
|
||||||
|
('one_active_per_user',
|
||||||
|
"EXCLUDE (user_id WITH =) WHERE (used_at IS NULL)",
|
||||||
|
'A user may have at most one outstanding tablet PIN reset code.'),
|
||||||
|
]
|
||||||
|
|
||||||
|
# ===== Hash helpers (mirror ResUsers._hash_tablet_pin pattern) =====
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _hash_code(code, salt=None):
|
||||||
|
"""Hash `code` with optional salt. Returns 'salt_hex$digest_hex'."""
|
||||||
|
if salt is None:
|
||||||
|
salt = secrets.token_bytes(16)
|
||||||
|
digest = hashlib.pbkdf2_hmac(
|
||||||
|
'sha256', code.encode('utf-8'), salt, _PBKDF2_ITERATIONS,
|
||||||
|
)
|
||||||
|
return f"{salt.hex()}${digest.hex()}"
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _verify_code_hash(code, stored):
|
||||||
|
"""Constant-time verify of `code` against a stored hash string."""
|
||||||
|
if not stored or '$' not in stored:
|
||||||
|
return False
|
||||||
|
salt_hex, expected_hex = stored.split('$', 1)
|
||||||
|
try:
|
||||||
|
salt = bytes.fromhex(salt_hex)
|
||||||
|
except ValueError:
|
||||||
|
return False
|
||||||
|
digest = hashlib.pbkdf2_hmac(
|
||||||
|
'sha256', code.encode('utf-8'), salt, _PBKDF2_ITERATIONS,
|
||||||
|
)
|
||||||
|
return secrets.compare_digest(digest.hex(), expected_hex)
|
||||||
|
|
||||||
|
# ===== Public lifecycle =====
|
||||||
|
|
||||||
|
@api.model
|
||||||
|
def _generate_for_user(self, user, requester_ip=None):
|
||||||
|
"""Issue a fresh code for `user`. Replaces any active row.
|
||||||
|
Returns (record, plaintext_code).
|
||||||
|
|
||||||
|
Caller must email the plaintext_code; never persisted anywhere
|
||||||
|
after this returns. Raises UserError on rate-limit breach.
|
||||||
|
"""
|
||||||
|
# Rate-limit: count rows created in the rolling window.
|
||||||
|
cutoff = fields.Datetime.now() - timedelta(
|
||||||
|
minutes=_RATE_LIMIT_WINDOW_MIN,
|
||||||
|
)
|
||||||
|
recent = self.sudo().search_count([
|
||||||
|
('user_id', '=', user.id),
|
||||||
|
('create_date', '>=', cutoff),
|
||||||
|
])
|
||||||
|
if recent >= _RATE_LIMIT_MAX:
|
||||||
|
# Find when the oldest of the window ages out
|
||||||
|
oldest = self.sudo().search([
|
||||||
|
('user_id', '=', user.id),
|
||||||
|
('create_date', '>=', cutoff),
|
||||||
|
], order='create_date asc', limit=1)
|
||||||
|
ages_out = oldest.create_date + timedelta(
|
||||||
|
minutes=_RATE_LIMIT_WINDOW_MIN,
|
||||||
|
)
|
||||||
|
wait_min = max(
|
||||||
|
1,
|
||||||
|
int((ages_out - fields.Datetime.now()).total_seconds() / 60),
|
||||||
|
)
|
||||||
|
raise UserError(_(
|
||||||
|
'Too many reset requests. Wait %d minutes before '
|
||||||
|
'trying again.'
|
||||||
|
) % wait_min)
|
||||||
|
# Invalidate any existing active row (SQL constraint enforces
|
||||||
|
# one active; we explicitly mark prior as used to be clean).
|
||||||
|
self.sudo().search([
|
||||||
|
('user_id', '=', user.id),
|
||||||
|
('used_at', '=', False),
|
||||||
|
]).write({'used_at': fields.Datetime.now()})
|
||||||
|
# Generate the code — 0000-9999, zero-padded.
|
||||||
|
code = f"{secrets.randbelow(10000):04d}"
|
||||||
|
rec = self.sudo().create({
|
||||||
|
'user_id': user.id,
|
||||||
|
'code_hash': self._hash_code(code),
|
||||||
|
'expires_at': fields.Datetime.now() + timedelta(
|
||||||
|
hours=_CODE_TTL_HOURS,
|
||||||
|
),
|
||||||
|
'requester_ip': (requester_ip or '')[:64],
|
||||||
|
})
|
||||||
|
return rec, code
|
||||||
|
|
||||||
|
def _verify_and_consume(self, code):
|
||||||
|
"""Verify `code` against this row. Returns (ok, error_str_or_None).
|
||||||
|
|
||||||
|
Side effects:
|
||||||
|
- Increments attempt_count regardless of result
|
||||||
|
- If correct: sets used_at, returns (True, None)
|
||||||
|
- If wrong + attempt_count == _MAX_ATTEMPTS: sets used_at,
|
||||||
|
returns (False, 'too_many_attempts')
|
||||||
|
- If expired: sets used_at, returns (False, 'expired')
|
||||||
|
"""
|
||||||
|
self.ensure_one()
|
||||||
|
now = fields.Datetime.now()
|
||||||
|
if self.used_at:
|
||||||
|
return (False, 'already_used')
|
||||||
|
if self.expires_at < now:
|
||||||
|
self.sudo().write({'used_at': now})
|
||||||
|
return (False, 'expired')
|
||||||
|
# Always increment attempt_count before verifying so wrong tries
|
||||||
|
# count against the cap even if the user retries the same wrong
|
||||||
|
# code.
|
||||||
|
self.sudo().write({'attempt_count': self.attempt_count + 1})
|
||||||
|
if not self._verify_code_hash(code, self.sudo().code_hash):
|
||||||
|
# Did this push us to the cap?
|
||||||
|
if self.attempt_count + 1 >= _MAX_ATTEMPTS:
|
||||||
|
self.sudo().write({'used_at': now})
|
||||||
|
return (False, 'too_many_attempts')
|
||||||
|
return (False, 'wrong_code')
|
||||||
|
# Correct
|
||||||
|
self.sudo().write({'used_at': now})
|
||||||
|
return (True, None)
|
||||||
|
|
||||||
|
# ===== Reset-token signing (HMAC-SHA256, single-use, 5min TTL) =====
|
||||||
|
|
||||||
|
@api.model
|
||||||
|
def _sign_reset_token(self, user_id):
|
||||||
|
"""Mint a signed short-lived token proving the user just
|
||||||
|
verified a reset code. Used by /fp/tablet/set_pin to authorise
|
||||||
|
a new-PIN write without an old PIN.
|
||||||
|
|
||||||
|
Format: base64url(payload).base64url(signature)
|
||||||
|
Payload: {user_id, exp_epoch, purpose}
|
||||||
|
"""
|
||||||
|
secret = self.env['ir.config_parameter'].sudo().get_param(
|
||||||
|
'database.secret',
|
||||||
|
)
|
||||||
|
if not secret:
|
||||||
|
raise UserError(_(
|
||||||
|
'Cannot sign reset token — database.secret not set.'
|
||||||
|
))
|
||||||
|
payload = {
|
||||||
|
'user_id': int(user_id),
|
||||||
|
'exp': int(time.time()) + _RESET_TOKEN_TTL_SEC,
|
||||||
|
'purpose': 'tablet_pin_reset',
|
||||||
|
}
|
||||||
|
body = base64.urlsafe_b64encode(
|
||||||
|
json.dumps(payload, separators=(',', ':')).encode('utf-8'),
|
||||||
|
).rstrip(b'=')
|
||||||
|
sig = hmac.new(
|
||||||
|
secret.encode('utf-8'), body, hashlib.sha256,
|
||||||
|
).digest()
|
||||||
|
sig_b64 = base64.urlsafe_b64encode(sig).rstrip(b'=')
|
||||||
|
return body.decode() + '.' + sig_b64.decode()
|
||||||
|
|
||||||
|
@api.model
|
||||||
|
def _verify_reset_token(self, token):
|
||||||
|
"""Verify token signature + expiry + purpose claim.
|
||||||
|
Returns user_id on success, raises UserError otherwise.
|
||||||
|
"""
|
||||||
|
if not token or '.' not in token:
|
||||||
|
raise UserError(_('Invalid reset token.'))
|
||||||
|
body_b64, sig_b64 = token.split('.', 1)
|
||||||
|
secret = self.env['ir.config_parameter'].sudo().get_param(
|
||||||
|
'database.secret',
|
||||||
|
)
|
||||||
|
if not secret:
|
||||||
|
raise UserError(_('Cannot verify reset token.'))
|
||||||
|
expected_sig = hmac.new(
|
||||||
|
secret.encode('utf-8'),
|
||||||
|
body_b64.encode('utf-8'),
|
||||||
|
hashlib.sha256,
|
||||||
|
).digest()
|
||||||
|
expected_sig_b64 = base64.urlsafe_b64encode(
|
||||||
|
expected_sig,
|
||||||
|
).rstrip(b'=').decode()
|
||||||
|
if not hmac.compare_digest(sig_b64, expected_sig_b64):
|
||||||
|
raise UserError(_('Reset token signature invalid.'))
|
||||||
|
# Decode payload (re-pad for base64)
|
||||||
|
padding = '=' * (-len(body_b64) % 4)
|
||||||
|
try:
|
||||||
|
payload = json.loads(
|
||||||
|
base64.urlsafe_b64decode(body_b64 + padding).decode('utf-8'),
|
||||||
|
)
|
||||||
|
except (ValueError, UnicodeDecodeError):
|
||||||
|
raise UserError(_('Reset token payload invalid.'))
|
||||||
|
if payload.get('purpose') != 'tablet_pin_reset':
|
||||||
|
raise UserError(_('Reset token purpose mismatch.'))
|
||||||
|
if payload.get('exp', 0) < int(time.time()):
|
||||||
|
raise UserError(_('Reset token expired.'))
|
||||||
|
uid = payload.get('user_id')
|
||||||
|
if not isinstance(uid, int):
|
||||||
|
raise UserError(_('Reset token user_id invalid.'))
|
||||||
|
return uid
|
||||||
|
|
||||||
|
# ===== Cleanup cron =====
|
||||||
|
|
||||||
|
@api.model
|
||||||
|
def _cron_purge_expired(self):
|
||||||
|
"""Daily cron — delete used/expired rows > 7 days old.
|
||||||
|
Audit trail lives in fp.tablet.session.event, not here, so we
|
||||||
|
can purge aggressively without losing forensics."""
|
||||||
|
cutoff = fields.Datetime.now() - timedelta(days=7)
|
||||||
|
to_purge = self.sudo().search([
|
||||||
|
'|',
|
||||||
|
'&', ('used_at', '!=', False), ('used_at', '<', cutoff),
|
||||||
|
'&', ('used_at', '=', False), ('expires_at', '<', cutoff),
|
||||||
|
])
|
||||||
|
if to_purge:
|
||||||
|
count = len(to_purge)
|
||||||
|
to_purge.unlink()
|
||||||
|
_logger.info(
|
||||||
|
'fp.tablet.pin.reset cleanup: purged %d expired rows', count,
|
||||||
|
)
|
||||||
@@ -29,6 +29,10 @@ class FpTabletSessionEvent(models.Model):
|
|||||||
('ceiling_lock', '8-hour ceiling lock'),
|
('ceiling_lock', '8-hour ceiling lock'),
|
||||||
('force_lock', 'Force lock (cron, stale session)'),
|
('force_lock', 'Force lock (cron, stale session)'),
|
||||||
('admin_reset', 'Admin force-reset PIN'),
|
('admin_reset', 'Admin force-reset PIN'),
|
||||||
|
# Spec 2026-05-25 — self-service PIN reset flow
|
||||||
|
('pin_reset_requested', 'PIN reset code requested (email sent)'),
|
||||||
|
('pin_reset_code_verified', 'PIN reset code verified'),
|
||||||
|
('pin_set_after_reset', 'New PIN set via email reset flow'),
|
||||||
],
|
],
|
||||||
required=True,
|
required=True,
|
||||||
readonly=True,
|
readonly=True,
|
||||||
|
|||||||
@@ -15,3 +15,4 @@ access_fp_job_node_override_operator,fp.job.node.override.operator,fusion_platin
|
|||||||
access_res_users_kiosk,res.users.kiosk.read,base.model_res_users,fusion_plating_shopfloor.group_fp_tablet_kiosk,1,0,0,0
|
access_res_users_kiosk,res.users.kiosk.read,base.model_res_users,fusion_plating_shopfloor.group_fp_tablet_kiosk,1,0,0,0
|
||||||
access_ir_config_param_kiosk,ir.config_parameter.kiosk.read,base.model_ir_config_parameter,fusion_plating_shopfloor.group_fp_tablet_kiosk,1,0,0,0
|
access_ir_config_param_kiosk,ir.config_parameter.kiosk.read,base.model_ir_config_parameter,fusion_plating_shopfloor.group_fp_tablet_kiosk,1,0,0,0
|
||||||
access_fp_tablet_session_event_owner,fp.tablet.session.event.owner.read,model_fp_tablet_session_event,fusion_plating.group_fp_owner,1,0,0,0
|
access_fp_tablet_session_event_owner,fp.tablet.session.event.owner.read,model_fp_tablet_session_event,fusion_plating.group_fp_owner,1,0,0,0
|
||||||
|
access_fp_tablet_pin_reset_manager,fp.tablet.pin.reset.manager,model_fp_tablet_pin_reset,fusion_plating.group_fusion_plating_manager,1,1,1,1
|
||||||
|
|||||||
|
@@ -9,3 +9,4 @@ from . import test_tablet_pin_auth_manager
|
|||||||
from . import test_unlock_lock_session_endpoints
|
from . import test_unlock_lock_session_endpoints
|
||||||
from . import test_force_lock_cron
|
from . import test_force_lock_cron
|
||||||
from . import test_tiles_bootstrap_fields
|
from . import test_tiles_bootstrap_fields
|
||||||
|
from . import test_pin_reset_flow
|
||||||
|
|||||||
@@ -0,0 +1,131 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
"""Tablet PIN self-service reset-code tests.
|
||||||
|
|
||||||
|
Spec: docs/superpowers/specs/2026-05-25-tablet-pin-self-service-design.md
|
||||||
|
Plan: docs/superpowers/plans/2026-05-25-tablet-pin-self-service-plan.md
|
||||||
|
"""
|
||||||
|
from datetime import timedelta
|
||||||
|
|
||||||
|
from odoo import fields
|
||||||
|
from odoo.tests.common import TransactionCase
|
||||||
|
from odoo.exceptions import UserError
|
||||||
|
|
||||||
|
|
||||||
|
class TestPinResetModel(TransactionCase):
|
||||||
|
"""Model-level lifecycle tests (no HTTP)."""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super().setUp()
|
||||||
|
self.user = self.env['res.users'].create({
|
||||||
|
'name': 'Reset Tester',
|
||||||
|
'login': 'reset.tester@example.com',
|
||||||
|
})
|
||||||
|
|
||||||
|
def _model(self):
|
||||||
|
return self.env['fp.tablet.pin.reset']
|
||||||
|
|
||||||
|
def test_generate_creates_active_row(self):
|
||||||
|
rec, code = self._model()._generate_for_user(self.user)
|
||||||
|
self.assertTrue(rec.exists())
|
||||||
|
self.assertEqual(len(code), 4)
|
||||||
|
self.assertTrue(code.isdigit())
|
||||||
|
self.assertFalse(rec.used_at)
|
||||||
|
# 72h ± a few seconds
|
||||||
|
delta = rec.expires_at - fields.Datetime.now()
|
||||||
|
self.assertGreater(delta, timedelta(hours=71, minutes=59))
|
||||||
|
self.assertLess(delta, timedelta(hours=72, minutes=1))
|
||||||
|
|
||||||
|
def test_generate_replaces_prior_active(self):
|
||||||
|
rec1, _c1 = self._model()._generate_for_user(self.user)
|
||||||
|
rec2, _c2 = self._model()._generate_for_user(self.user)
|
||||||
|
# rec1 should now be marked used (forced by _generate_for_user)
|
||||||
|
rec1.invalidate_recordset(['used_at'])
|
||||||
|
self.assertTrue(rec1.used_at)
|
||||||
|
self.assertFalse(rec2.used_at)
|
||||||
|
|
||||||
|
def test_rate_limit_kicks_in_at_4th_request(self):
|
||||||
|
self._model()._generate_for_user(self.user)
|
||||||
|
self._model()._generate_for_user(self.user)
|
||||||
|
self._model()._generate_for_user(self.user)
|
||||||
|
with self.assertRaises(UserError):
|
||||||
|
self._model()._generate_for_user(self.user)
|
||||||
|
|
||||||
|
def test_verify_correct_code_succeeds(self):
|
||||||
|
rec, code = self._model()._generate_for_user(self.user)
|
||||||
|
ok, err = rec._verify_and_consume(code)
|
||||||
|
self.assertTrue(ok)
|
||||||
|
self.assertIsNone(err)
|
||||||
|
rec.invalidate_recordset(['used_at'])
|
||||||
|
self.assertTrue(rec.used_at)
|
||||||
|
|
||||||
|
def test_verify_wrong_code_increments_attempt_count(self):
|
||||||
|
rec, code = self._model()._generate_for_user(self.user)
|
||||||
|
wrong = '9999' if code != '9999' else '0000'
|
||||||
|
ok, err = rec._verify_and_consume(wrong)
|
||||||
|
self.assertFalse(ok)
|
||||||
|
self.assertEqual(err, 'wrong_code')
|
||||||
|
rec.invalidate_recordset(['attempt_count'])
|
||||||
|
self.assertEqual(rec.attempt_count, 1)
|
||||||
|
|
||||||
|
def test_5_wrong_attempts_invalidates_code(self):
|
||||||
|
rec, code = self._model()._generate_for_user(self.user)
|
||||||
|
wrong = '9999' if code != '9999' else '0000'
|
||||||
|
for i in range(4):
|
||||||
|
rec._verify_and_consume(wrong)
|
||||||
|
rec.invalidate_recordset(['attempt_count', 'used_at'])
|
||||||
|
self.assertEqual(rec.attempt_count, 4)
|
||||||
|
self.assertFalse(rec.used_at)
|
||||||
|
# 5th wrong attempt invalidates
|
||||||
|
ok, err = rec._verify_and_consume(wrong)
|
||||||
|
self.assertFalse(ok)
|
||||||
|
self.assertEqual(err, 'too_many_attempts')
|
||||||
|
rec.invalidate_recordset(['used_at'])
|
||||||
|
self.assertTrue(rec.used_at)
|
||||||
|
|
||||||
|
def test_expired_code_rejects_even_if_correct(self):
|
||||||
|
rec, code = self._model()._generate_for_user(self.user)
|
||||||
|
# Backdate expiry to past
|
||||||
|
rec.sudo().write({
|
||||||
|
'expires_at': fields.Datetime.now() - timedelta(minutes=1),
|
||||||
|
})
|
||||||
|
ok, err = rec._verify_and_consume(code)
|
||||||
|
self.assertFalse(ok)
|
||||||
|
self.assertEqual(err, 'expired')
|
||||||
|
|
||||||
|
def test_reset_token_sign_verify_roundtrip(self):
|
||||||
|
token = self._model()._sign_reset_token(self.user.id)
|
||||||
|
uid = self._model()._verify_reset_token(token)
|
||||||
|
self.assertEqual(uid, self.user.id)
|
||||||
|
|
||||||
|
def test_reset_token_tampered_signature_rejects(self):
|
||||||
|
token = self._model()._sign_reset_token(self.user.id)
|
||||||
|
# Flip a character in the signature half
|
||||||
|
body, sig = token.split('.', 1)
|
||||||
|
bad_sig = ('A' if sig[0] != 'A' else 'B') + sig[1:]
|
||||||
|
bad_token = body + '.' + bad_sig
|
||||||
|
with self.assertRaises(UserError):
|
||||||
|
self._model()._verify_reset_token(bad_token)
|
||||||
|
|
||||||
|
def test_reset_token_purpose_mismatch_rejects(self):
|
||||||
|
# Manually craft a token with wrong purpose
|
||||||
|
import base64
|
||||||
|
import hmac
|
||||||
|
import hashlib
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
secret = self.env['ir.config_parameter'].sudo().get_param(
|
||||||
|
'database.secret',
|
||||||
|
)
|
||||||
|
payload = {
|
||||||
|
'user_id': self.user.id,
|
||||||
|
'exp': int(time.time()) + 300,
|
||||||
|
'purpose': 'wrong_purpose',
|
||||||
|
}
|
||||||
|
body = base64.urlsafe_b64encode(
|
||||||
|
json.dumps(payload, separators=(',', ':')).encode(),
|
||||||
|
).rstrip(b'=')
|
||||||
|
sig = hmac.new(secret.encode(), body, hashlib.sha256).digest()
|
||||||
|
sig_b64 = base64.urlsafe_b64encode(sig).rstrip(b'=')
|
||||||
|
bad_token = body.decode() + '.' + sig_b64.decode()
|
||||||
|
with self.assertRaises(UserError):
|
||||||
|
self._model()._verify_reset_token(bad_token)
|
||||||
Reference in New Issue
Block a user