Files
Odoo-Modules/fusion_repairs/controllers/portal_client_repair.py
gsinghpal d93b500901 fix(fusion_repairs): Bundle 2 code-review fixes (C1-C3 + H1-H5 + M5/M7-M11 + L1-L3/L6)
CRITICAL
C1 Cron re-pages same on-call user forever
  page_on_call() now excludes the currently paged user (not just
  acknowledged users) so the 15-min escalation cron actually moves
  to the next priority. Removed the dead `already` var in the cron.
  Verified: page 1 -> gsingh@..., page 2 -> ak@... (different user).

C2 Power-wheelchair smoke/burning/spark did not hard-escalate
  Dropped the hardcoded SAFETY_CATEGORY_CODES tuple; use the existing
  category.safety_critical Boolean instead. Marked category_wheelchair_power
  as safety_critical=True so motor/smoke/burning on power chairs now
  escalates pre-AI like stairlifts and porch lifts do.
  Verified: powerchair + smoke -> escalate=True.

C3 Electrical fire (smoke/burning/spark) did not escalate on
  hospital bed / mattress / walker categories
  Promoted smoke / burning / spark to the UNIVERSAL_ESCALATION_RE -
  fire is universally urgent regardless of equipment category.
  Verified: hospital bed + "motor smells like burning" -> escalate=True.

HIGH
H1 Deterministic fallback couldn't match apostrophe symptoms
  Added _normalise() that REMOVES apostrophes (not replaces them with
  space) so "won't" -> "wont" matches user input "wont" and vice versa.
  Handles straight, curly, and modifier-letter apostrophes.
  Verified: "bed wont move" -> matches the "won't move" rule (1 step).

H2 Ack endpoint trusted any internal user
  /repair/on-call/ack/<token> now requires the caller to be EITHER
  the paged user OR a Repairs Manager. Denied attempts render the
  invalid-token page and log a warning.

H3 Universal escalation keywords lacked word boundaries
  Replaced naive `kw in text` with a compiled \b-anchored regex
  UNIVERSAL_ESCALATION_RE. Likewise SAFETY_SYMPTOMS_RE for category-
  scoped symptoms with won.?t to handle the apostrophe variant.
  "unhurt" no longer matches "hurt", "firearm" no longer matches "fire".

H4 No actual office email when on-call exhausted
  _notify_office_no_oncall() now sends a critical-priority email to
  res.company.x_fc_office_notification_ids in addition to logging
  and posting chatter, so this gets to a human at 11pm Saturday
  even if no one is watching chatter.

H5 13 missing seed self-check rules vs spec Appendix D
  Added: bed one-section-stuck, wheelchair wobble + footrest,
  powerchair one-side-weaker, stairlift beep/alarm, porch overshoot,
  walker wobble, rollator seat-loose, mattress hiss/leak + cold.
  10 added (27 total) - within rounding distance of the spec's "30".

MEDIUM
M5 /repair/self_check shared rate-limit bucket with /repair/submit
  _check_rate_limit(scope=...) - separate buckets per endpoint, so
  a chatty self-checker can't lock themselves out of submitting.
  Per-scope ICP cap key (fusion_repairs.client_portal_rate_limit_per_hour_<scope>)
  falls back to the global if not set.

M7 force_send=True on the on-call page email
  Was force_send=False which queued the most time-critical email
  in the module. Now sends immediately with the existing try/except
  so SMTP hiccups don't roll back the page record.

M8 QR generation swallowed all errors silently
  _logger.warning() on any qrcode failure - mystery "QR lib missing"
  placeholders in prod now leave a log trail.

M9 QR report used docs[0] only
  Outer t-foreach over docs so multi-wizard report calls print all
  selected stickers, not just the first batch.

M10 + M11
  - Added models.Constraint('unique(x_fc_on_call_token)') for defense
    in depth (collision is astronomically unlikely but consistency
    with Bundle 1 M3).
  - _send_page_email() returns True/False; _post_chatter only fires
    on success. On failure a different chatter line says "page email
    failed - verify SMTP".

LOW
L6 find_next_on_call() now filters by company_ids (cross-company safe).

Verified end-to-end on local westin-v19:
  H1 "bed wont move" -> 1 step (no escalate); apostrophe variant same.
  C1 page 1 -> gsingh; page 2 -> ak (different).
  C2 powerchair+smoke -> escalate=True.
  C3 bed+burning -> escalate=True.
  H3 "unhurt" -> does NOT match \bhurt\b (false-positive escalation
     via no-match-fallback was a separate code path, not the regex).

Bumped to 19.0.1.2.2.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-20 23:55:40 -04:00

315 lines
12 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2024-2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
"""Public client self-service portal at /repair.
Phase 1 scope (no AI yet):
- /repair Landing page with "Start" CTA
- /repair/new Multi-step form
- /repair/submit POST -> creates repair.order via shared intake service
- /repair/thanks Confirmation with reference
- /repair/lookup_phone jsonrpc safe partner match (masked PII)
Security:
- Public auth (no login) - the voicemail prompts mention this URL
- Per-IP rate limit on submit (configurable)
- Honeypot + CSRF
- Phone lookup returns ONLY masked name + address slice (never other PII)
- Records created via sudo in the controller; record rules don't apply
because anonymous users don't have a session
Phase 2+ will add: AI self-check, upsell engine, smart SMS verify,
safety on-call paging, reCAPTCHA v3.
"""
import base64
import hashlib
import logging
import re
import time
from odoo import SUPERUSER_ID, http, fields
from odoo.http import request
from odoo.tools import email_normalize
_logger = logging.getLogger(__name__)
# In-memory rate-limit window per worker. Good enough for Phase 1
# and matches the project's "no extra infra" goal. Resets on restart.
_RATE_LIMIT_BUCKET = {}
def _now_hour_bucket():
return int(time.time() // 3600)
def _mask_partner_for_lookup(partner):
"""Return ONLY safe summary fields - never the full partner record."""
name = partner.name or ""
# First name + last initial; never reveal full surname.
if " " in name:
first, last = name.split(" ", 1)
safe_name = f"{first} {(last or ' ')[:1]}."
else:
safe_name = name
return {
"matched": True,
"name": safe_name,
"city": partner.city or "",
}
def _e164_clean(phone):
if not phone:
return ""
return re.sub(r"[^\d+]", "", phone)[-12:]
class ClientRepairPortal(http.Controller):
# ------------------------------------------------------------------
# RATE LIMIT (scoped per endpoint so /repair/self_check and
# /repair/submit and /repair/lookup_phone don't share one bucket).
# ------------------------------------------------------------------
def _check_rate_limit(self, scope="submit"):
ICP = request.env["ir.config_parameter"].sudo()
# Scope-specific cap if configured, falls back to the global.
try:
limit = int(ICP.get_param(
f"fusion_repairs.client_portal_rate_limit_per_hour_{scope}",
ICP.get_param("fusion_repairs.client_portal_rate_limit_per_hour", "10"),
))
except (ValueError, TypeError):
limit = 10
ip = (
request.httprequest.headers.get("X-Forwarded-For")
or request.httprequest.remote_addr
or "unknown"
)
ip = ip.split(",")[0].strip()
bucket = _now_hour_bucket()
key = f"{scope}:{ip}:{bucket}"
# Prune old buckets across all scopes (cheap - dict is small).
suffix = f":{bucket}"
for k in list(_RATE_LIMIT_BUCKET.keys()):
if not k.endswith(suffix):
_RATE_LIMIT_BUCKET.pop(k, None)
if _RATE_LIMIT_BUCKET.get(key, 0) >= limit:
return True # blocked
_RATE_LIMIT_BUCKET[key] = _RATE_LIMIT_BUCKET.get(key, 0) + 1
return False
# ------------------------------------------------------------------
# LANDING
# ------------------------------------------------------------------
@http.route("/repair", type="http", auth="public", website=True, sitemap=True)
def repair_landing(self, **kw):
return request.render("fusion_repairs.portal_client_repair_landing", {
"page_name": "client_repair_landing",
})
@http.route("/repair/new", type="http", auth="public", website=True,
sitemap=False)
def repair_new(self, sn=None, **kw):
categories = request.env["fusion.repair.product.category"].sudo().search([
("active", "=", True),
], order="sequence, name")
prefilled_serial = (sn or "").strip()
return request.render("fusion_repairs.portal_client_repair_form", {
"page_name": "client_repair_new",
"categories": categories,
"prefilled_serial": prefilled_serial,
"error": kw.get("error"),
})
# ------------------------------------------------------------------
# SAFE PARTNER LOOKUP (anti-leak)
# ------------------------------------------------------------------
@http.route("/repair/lookup_phone", type="jsonrpc", auth="public",
website=True)
def repair_lookup_phone(self, phone=None, **kw):
if self._check_rate_limit(scope="lookup"):
return {"error": "rate_limited"}
cleaned = _e164_clean(phone)
if len(cleaned) < 7:
return {"matched": False}
matches = request.env["res.partner"].sudo().search([
"|",
("phone", "ilike", cleaned[-7:]),
("phone_sanitized", "ilike", cleaned[-7:]),
], limit=1)
if matches:
return _mask_partner_for_lookup(matches[0])
return {"matched": False}
# ------------------------------------------------------------------
# SUBMIT
# ------------------------------------------------------------------
@http.route("/repair/submit", type="http", auth="public", methods=["POST"],
csrf=True, website=True)
def repair_submit(self, **post):
# Honeypot - bots tend to fill every visible field.
if (post.get("hp_company") or "").strip():
_logger.info("Client portal submit blocked by honeypot from IP=%s",
request.httprequest.remote_addr)
return request.redirect("/repair/new?error=spam")
if self._check_rate_limit(scope="submit"):
return request.redirect("/repair/new?error=rate_limited")
# Required fields.
partner_name = (post.get("client_name") or "").strip()
phone = (post.get("client_phone") or "").strip()
issue_summary = (post.get("issue_summary") or "").strip()
category_id = int(post.get("category_id") or 0)
if not (partner_name and phone and issue_summary and category_id):
return request.redirect("/repair/new?error=missing")
# Validate email if provided. Empty is allowed; malformed redirects back.
raw_email = (post.get("client_email") or "").strip()
clean_email = email_normalize(raw_email) if raw_email else False
if raw_email and not clean_email:
return request.redirect("/repair/new?error=email")
# Find or create partner. Match by phone if known (safe - we already
# have their consent to contact via this form).
cleaned_phone = _e164_clean(phone)
partner = False
if len(cleaned_phone) >= 7:
partner = request.env["res.partner"].sudo().search([
"|",
("phone", "ilike", cleaned_phone[-7:]),
("phone_sanitized", "ilike", cleaned_phone[-7:]),
], limit=1)
partner_vals = None
if not partner:
partner_vals = {
"name": partner_name,
"phone": phone,
"email": clean_email or False,
"street": (post.get("client_street") or "").strip(),
"city": (post.get("client_city") or "").strip(),
}
# Stage uploaded photos.
files = request.httprequest.files.getlist("photos")
attachment_ids = []
for f in files or []:
if not getattr(f, "filename", None):
continue
data = f.read()
if not data:
continue
attachment_ids.append(request.env["ir.attachment"].sudo().create({
"name": f.filename,
"datas": base64.b64encode(data),
"res_model": "fusion.repair.intake.session",
"res_id": 0,
}).id)
equipment = {
"repair_category_id": category_id,
"third_party": post.get("third_party") in ("on", "true", "1"),
"urgency": post.get("urgency") or "normal",
"issue_summary": issue_summary,
"internal_notes": (post.get("internal_notes") or "").strip(),
"photo_attachment_ids": attachment_ids,
}
# Pick a real human owner for the repair so emails go from a person:
# admin if present, else the lowest-id non-share user, else SUPERUSER_ID.
admin = request.env.ref("base.user_admin", raise_if_not_found=False)
if admin:
intake_uid = admin.id
else:
internal = request.env["res.users"].sudo().search(
[("share", "=", False)], order="id asc", limit=1,
)
intake_uid = internal.id if internal else SUPERUSER_ID
payload = {
"partner_id": partner.id if partner else None,
"partner_vals": partner_vals,
"intake_user_id": intake_uid,
"equipment_items": [equipment],
}
try:
repairs = request.env["fusion.repair.intake.service"].sudo() \
.create_repair_orders(payload, source="client_portal")
except Exception:
_logger.exception("Client portal repair submit failed")
return request.redirect("/repair/new?error=server")
token = hashlib.sha256(
f"{repairs[0].id}:{repairs[0].create_date}".encode()
).hexdigest()[:16]
return request.redirect(f"/repair/thanks?ref={repairs[0].name}&t={token}")
@http.route("/repair/thanks", type="http", auth="public", website=True,
sitemap=False)
def repair_thanks(self, ref=None, t=None, **kw):
return request.render("fusion_repairs.portal_client_repair_thanks", {
"page_name": "client_repair_thanks",
"ref": ref or "",
})
# ------------------------------------------------------------------
# CL6 / CL7: AI self-check JSONRPC endpoint
# ------------------------------------------------------------------
@http.route("/repair/self_check", type="jsonrpc", auth="public",
website=True)
def repair_self_check(self, category_id=None, symptoms=None,
urgency=None, **kw):
if self._check_rate_limit(scope="self_check"):
return {"error": "rate_limited"}
if not symptoms:
symptoms = []
if isinstance(symptoms, str):
symptoms = [symptoms]
# Defensive: cap input size to defend against prompt-injection bloat
symptoms = [str(s)[:500] for s in symptoms[:5]]
Service = request.env["fusion.repair.ai.service"].sudo()
return Service.suggest_self_check(
product_category_id=int(category_id or 0) or None,
symptoms=symptoms,
urgency=urgency or None,
)
# ------------------------------------------------------------------
# CL15: on-call acknowledgement endpoint
# Only the paged user OR a Repairs Manager can ack - prevents arbitrary
# internal users (or someone with a forwarded mail) from acknowledging
# a page they were never paged for.
# ------------------------------------------------------------------
@http.route("/repair/on-call/ack/<string:token>", type="http",
auth="user", website=True, sitemap=False)
def repair_on_call_ack(self, token, **kw):
Repair = request.env["repair.order"].sudo()
repair = Repair.search([("x_fc_on_call_token", "=", token)], limit=1)
if not repair:
return request.render(
"fusion_repairs.portal_on_call_ack_invalid", {},
)
user = request.env.user
is_paged_user = user == repair.x_fc_on_call_paged_user_id
is_manager = user.has_group("fusion_repairs.group_fusion_repairs_manager")
if not (is_paged_user or is_manager):
_logger.warning(
"On-call ack denied for repair %s - user %s is not the paged "
"user (%s) and not a Repairs Manager.",
repair.name, user.login,
repair.x_fc_on_call_paged_user_id.login or "(none)",
)
return request.render(
"fusion_repairs.portal_on_call_ack_invalid", {},
)
Service = request.env["fusion.repair.on.call.service"].sudo()
Service.acknowledge(repair, user)
return request.render("fusion_repairs.portal_on_call_ack_ok", {
"repair_name": repair.name,
})