Files
Odoo-Modules/fusion_repairs/controllers/portal_client_repair.py
gsinghpal 5c8768c556 feat(fusion_repairs): Bundle 2 - weekend self-service (CL6/CL7 + CL15 + CL17)
CL6/CL7 AI self-check engine
- New fusion.repair.ai.service AbstractModel with single guardrailed
  suggest_self_check(category_id, symptoms, urgency) entry point.
- Hard-escalation FIRST (before any AI call): stairlift / porch lift +
  safety symptoms (smoke / burning / spark / stuck / motor), OR any
  mention of fire / injury / hurt / bleeding / trapped, OR urgency=safety
  -> escalate immediately regardless of AI availability.
- AI call via fusion.api.service.call_openai() (consumer='fusion_repairs',
  feature='client_self_triage') with try/fallback per project rule -
  no hard fusion_api dep, no install error if it's missing.
- Strict response validation: JSON schema check, max 3 steps, max 200
  chars per field, forbidden-phrase regex (diagnose, you have, medical
  condition, stop using, consult doctor, price patterns) - on any
  failure falls back to deterministic rules.
- 24h in-memory cache keyed by (category, symptom_hash) so repeat calls
  during AI cost-cap incidents come from cache.
- System prompt + JSON schema published as ir.config_parameter so office
  can refine without code changes (default prompt + schema in spec
  Appendix A).
- New fusion.repair.self.check.rule model + 17 seeded rules across all
  7 product categories (data/self_check_data.xml) - these are the
  deterministic fallback AND the canonical seed if AI is disabled.
- New /repair/self_check jsonrpc route (auth=public) gated by the
  per-IP rate-limit; defensive input bounds (max 5 symptoms, 500 chars
  each) defend against prompt-injection bloat.

CL15 weekend safety escalation + on-call paging
- New fusion.repair.on.call.service AbstractModel with:
  * find_next_on_call(exclude=...) -> lowest x_fc_on_call_priority
  * page_on_call(repair) -> sends mail to next available + writes
    x_fc_on_call_token / x_fc_on_call_paged_user_id / paged_at on the
    repair, posts chatter
  * acknowledge(repair, user) -> records ack, posts chatter
  * cron_escalate_unacknowledged() -> every 5 min, re-pages the next
    priority for repairs paged >15 min ago without ack
- Auto-fires from intake service whenever x_fc_urgency='safety' is
  submitted. _is_business_hours() defaults to "page" when no calendar
  is set or after working hours.
- New email_template_on_call_page with 4px red accent + acknowledge
  CTA button linking to /repair/on-call/ack/<token>.
- /repair/on-call/ack/<token> http route (auth=user, must be the paged
  manager OR any internal user) records the ack and renders confirmation.
- 5-minute cron 'Fusion Repairs: Escalate unacknowledged on-call pages'
  with configurable window via fusion_repairs.on_call_escalate_minutes
  (default 15).
- New repair.order fields x_fc_on_call_token, x_fc_on_call_paged_user_id,
  x_fc_on_call_paged_at, x_fc_on_call_acknowledged_user_ids,
  x_fc_on_call_acknowledged_at - all copy=False so duplicates start fresh.

CL17 QR sticker generator
- New fusion.repair.qr.sticker.wizard TransientModel takes a Many2many
  of stock.lot records (optionally filtered by product).
- QWeb PDF report fusion_repairs.report_qr_stickers prints a 4-up
  sticker sheet on letter paper: 80mm x 50mm per sticker with the
  QR code (38mm), product name, serial number, and the canonical
  portal URL (from web.base.url + fusion_repairs.client_portal_url).
- QR encodes /repair?sn=<serial> which the public client portal
  already pre-fills via the ?sn= query param.
- Uses the qrcode library if available; renders 'QR lib missing'
  placeholder otherwise so the PDF still prints.
- New menu Configuration > Generate QR Stickers + standalone wizard.

Verified end-to-end on local westin-v19:
  CL6 stairlift+smoke -> escalate=True source=escalated reason=safety
  CL6 bed (no AI) -> fallback returned escalate=True (safe default)
  CL15 admin paged for RO-202605-10 with 27-char token
  CL17 sticker URL: /repair?sn=001124032521528404
       QR data URI: data:image/png;base64,iVBORw... (PNG OK)

Bumped to 19.0.1.2.0 (minor bump - new public-facing capabilities).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-20 23:40:52 -04:00

297 lines
12 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2024-2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
"""Public client self-service portal at /repair.
Phase 1 scope (no AI yet):
- /repair Landing page with "Start" CTA
- /repair/new Multi-step form
- /repair/submit POST -> creates repair.order via shared intake service
- /repair/thanks Confirmation with reference
- /repair/lookup_phone jsonrpc safe partner match (masked PII)
Security:
- Public auth (no login) - the voicemail prompts mention this URL
- Per-IP rate limit on submit (configurable)
- Honeypot + CSRF
- Phone lookup returns ONLY masked name + address slice (never other PII)
- Records created via sudo in the controller; record rules don't apply
because anonymous users don't have a session
Phase 2+ will add: AI self-check, upsell engine, smart SMS verify,
safety on-call paging, reCAPTCHA v3.
"""
import base64
import hashlib
import logging
import re
import time
from odoo import SUPERUSER_ID, http, fields
from odoo.http import request
from odoo.tools import email_normalize
_logger = logging.getLogger(__name__)
# In-memory rate-limit window per worker. Good enough for Phase 1
# and matches the project's "no extra infra" goal. Resets on restart.
_RATE_LIMIT_BUCKET = {}
def _now_hour_bucket():
return int(time.time() // 3600)
def _mask_partner_for_lookup(partner):
"""Return ONLY safe summary fields - never the full partner record."""
name = partner.name or ""
# First name + last initial; never reveal full surname.
if " " in name:
first, last = name.split(" ", 1)
safe_name = f"{first} {(last or ' ')[:1]}."
else:
safe_name = name
return {
"matched": True,
"name": safe_name,
"city": partner.city or "",
}
def _e164_clean(phone):
if not phone:
return ""
return re.sub(r"[^\d+]", "", phone)[-12:]
class ClientRepairPortal(http.Controller):
# ------------------------------------------------------------------
# RATE LIMIT
# ------------------------------------------------------------------
def _check_rate_limit(self):
ICP = request.env["ir.config_parameter"].sudo()
try:
limit = int(ICP.get_param(
"fusion_repairs.client_portal_rate_limit_per_hour", "10"
))
except (ValueError, TypeError):
limit = 10
# Use remote_addr from the proxy header if present.
ip = (
request.httprequest.headers.get("X-Forwarded-For")
or request.httprequest.remote_addr
or "unknown"
)
ip = ip.split(",")[0].strip()
bucket = _now_hour_bucket()
key = f"{ip}:{bucket}"
# Prune old buckets (cheap - dict is small).
for k in list(_RATE_LIMIT_BUCKET.keys()):
if not k.endswith(f":{bucket}"):
_RATE_LIMIT_BUCKET.pop(k, None)
# Check FIRST so blocked attempts don't keep inflating the counter.
if _RATE_LIMIT_BUCKET.get(key, 0) >= limit:
return True # blocked
_RATE_LIMIT_BUCKET[key] = _RATE_LIMIT_BUCKET.get(key, 0) + 1
return False
# ------------------------------------------------------------------
# LANDING
# ------------------------------------------------------------------
@http.route("/repair", type="http", auth="public", website=True, sitemap=True)
def repair_landing(self, **kw):
return request.render("fusion_repairs.portal_client_repair_landing", {
"page_name": "client_repair_landing",
})
@http.route("/repair/new", type="http", auth="public", website=True,
sitemap=False)
def repair_new(self, sn=None, **kw):
categories = request.env["fusion.repair.product.category"].sudo().search([
("active", "=", True),
], order="sequence, name")
prefilled_serial = (sn or "").strip()
return request.render("fusion_repairs.portal_client_repair_form", {
"page_name": "client_repair_new",
"categories": categories,
"prefilled_serial": prefilled_serial,
"error": kw.get("error"),
})
# ------------------------------------------------------------------
# SAFE PARTNER LOOKUP (anti-leak)
# ------------------------------------------------------------------
@http.route("/repair/lookup_phone", type="jsonrpc", auth="public",
website=True)
def repair_lookup_phone(self, phone=None, **kw):
if self._check_rate_limit():
return {"error": "rate_limited"}
cleaned = _e164_clean(phone)
if len(cleaned) < 7:
return {"matched": False}
matches = request.env["res.partner"].sudo().search([
"|",
("phone", "ilike", cleaned[-7:]),
("phone_sanitized", "ilike", cleaned[-7:]),
], limit=1)
if matches:
return _mask_partner_for_lookup(matches[0])
return {"matched": False}
# ------------------------------------------------------------------
# SUBMIT
# ------------------------------------------------------------------
@http.route("/repair/submit", type="http", auth="public", methods=["POST"],
csrf=True, website=True)
def repair_submit(self, **post):
# Honeypot - bots tend to fill every visible field.
if (post.get("hp_company") or "").strip():
_logger.info("Client portal submit blocked by honeypot from IP=%s",
request.httprequest.remote_addr)
return request.redirect("/repair/new?error=spam")
if self._check_rate_limit():
return request.redirect("/repair/new?error=rate_limited")
# Required fields.
partner_name = (post.get("client_name") or "").strip()
phone = (post.get("client_phone") or "").strip()
issue_summary = (post.get("issue_summary") or "").strip()
category_id = int(post.get("category_id") or 0)
if not (partner_name and phone and issue_summary and category_id):
return request.redirect("/repair/new?error=missing")
# Validate email if provided. Empty is allowed; malformed redirects back.
raw_email = (post.get("client_email") or "").strip()
clean_email = email_normalize(raw_email) if raw_email else False
if raw_email and not clean_email:
return request.redirect("/repair/new?error=email")
# Find or create partner. Match by phone if known (safe - we already
# have their consent to contact via this form).
cleaned_phone = _e164_clean(phone)
partner = False
if len(cleaned_phone) >= 7:
partner = request.env["res.partner"].sudo().search([
"|",
("phone", "ilike", cleaned_phone[-7:]),
("phone_sanitized", "ilike", cleaned_phone[-7:]),
], limit=1)
partner_vals = None
if not partner:
partner_vals = {
"name": partner_name,
"phone": phone,
"email": clean_email or False,
"street": (post.get("client_street") or "").strip(),
"city": (post.get("client_city") or "").strip(),
}
# Stage uploaded photos.
files = request.httprequest.files.getlist("photos")
attachment_ids = []
for f in files or []:
if not getattr(f, "filename", None):
continue
data = f.read()
if not data:
continue
attachment_ids.append(request.env["ir.attachment"].sudo().create({
"name": f.filename,
"datas": base64.b64encode(data),
"res_model": "fusion.repair.intake.session",
"res_id": 0,
}).id)
equipment = {
"repair_category_id": category_id,
"third_party": post.get("third_party") in ("on", "true", "1"),
"urgency": post.get("urgency") or "normal",
"issue_summary": issue_summary,
"internal_notes": (post.get("internal_notes") or "").strip(),
"photo_attachment_ids": attachment_ids,
}
# Pick a real human owner for the repair so emails go from a person:
# admin if present, else the lowest-id non-share user, else SUPERUSER_ID.
admin = request.env.ref("base.user_admin", raise_if_not_found=False)
if admin:
intake_uid = admin.id
else:
internal = request.env["res.users"].sudo().search(
[("share", "=", False)], order="id asc", limit=1,
)
intake_uid = internal.id if internal else SUPERUSER_ID
payload = {
"partner_id": partner.id if partner else None,
"partner_vals": partner_vals,
"intake_user_id": intake_uid,
"equipment_items": [equipment],
}
try:
repairs = request.env["fusion.repair.intake.service"].sudo() \
.create_repair_orders(payload, source="client_portal")
except Exception:
_logger.exception("Client portal repair submit failed")
return request.redirect("/repair/new?error=server")
token = hashlib.sha256(
f"{repairs[0].id}:{repairs[0].create_date}".encode()
).hexdigest()[:16]
return request.redirect(f"/repair/thanks?ref={repairs[0].name}&t={token}")
@http.route("/repair/thanks", type="http", auth="public", website=True,
sitemap=False)
def repair_thanks(self, ref=None, t=None, **kw):
return request.render("fusion_repairs.portal_client_repair_thanks", {
"page_name": "client_repair_thanks",
"ref": ref or "",
})
# ------------------------------------------------------------------
# CL6 / CL7: AI self-check JSONRPC endpoint
# ------------------------------------------------------------------
@http.route("/repair/self_check", type="jsonrpc", auth="public",
website=True)
def repair_self_check(self, category_id=None, symptoms=None,
urgency=None, **kw):
if self._check_rate_limit():
return {"error": "rate_limited"}
if not symptoms:
symptoms = []
if isinstance(symptoms, str):
symptoms = [symptoms]
# Defensive: cap input size to defend against prompt-injection bloat
symptoms = [str(s)[:500] for s in symptoms[:5]]
Service = request.env["fusion.repair.ai.service"].sudo()
return Service.suggest_self_check(
product_category_id=int(category_id or 0) or None,
symptoms=symptoms,
urgency=urgency or None,
)
# ------------------------------------------------------------------
# CL15: on-call acknowledgement endpoint
# ------------------------------------------------------------------
@http.route("/repair/on-call/ack/<string:token>", type="http",
auth="user", website=True, sitemap=False)
def repair_on_call_ack(self, token, **kw):
Repair = request.env["repair.order"].sudo()
repair = Repair.search([("x_fc_on_call_token", "=", token)], limit=1)
if not repair:
return request.render(
"fusion_repairs.portal_on_call_ack_invalid", {},
)
Service = request.env["fusion.repair.on.call.service"].sudo()
Service.acknowledge(repair, request.env.user)
return request.render("fusion_repairs.portal_on_call_ack_ok", {
"repair_name": repair.name,
})