# -*- coding: utf-8 -*- # Copyright 2024-2026 Nexa Systems Inc. # License OPL-1 (Odoo Proprietary License v1.0) """Public client self-service portal at /repair. Phase 1 scope (no AI yet): - /repair Landing page with "Start" CTA - /repair/new Multi-step form - /repair/submit POST -> creates repair.order via shared intake service - /repair/thanks Confirmation with reference - /repair/lookup_phone jsonrpc safe partner match (masked PII) Security: - Public auth (no login) - the voicemail prompts mention this URL - Per-IP rate limit on submit (configurable) - Honeypot + CSRF - Phone lookup returns ONLY masked name + address slice (never other PII) - Records created via sudo in the controller; record rules don't apply because anonymous users don't have a session Phase 2+ will add: AI self-check, upsell engine, smart SMS verify, safety on-call paging, reCAPTCHA v3. """ import base64 import hashlib import logging import re import time from odoo import SUPERUSER_ID, http, fields from odoo.http import request from odoo.tools import email_normalize _logger = logging.getLogger(__name__) # In-memory rate-limit window per worker. Good enough for Phase 1 # and matches the project's "no extra infra" goal. Resets on restart. _RATE_LIMIT_BUCKET = {} def _now_hour_bucket(): return int(time.time() // 3600) def _mask_partner_for_lookup(partner): """Return ONLY safe summary fields - never the full partner record.""" name = partner.name or "" # First name + last initial; never reveal full surname. if " " in name: first, last = name.split(" ", 1) safe_name = f"{first} {(last or ' ')[:1]}." else: safe_name = name return { "matched": True, "name": safe_name, "city": partner.city or "", } def _e164_clean(phone): if not phone: return "" return re.sub(r"[^\d+]", "", phone)[-12:] class ClientRepairPortal(http.Controller): # ------------------------------------------------------------------ # RATE LIMIT # ------------------------------------------------------------------ def _check_rate_limit(self): ICP = request.env["ir.config_parameter"].sudo() try: limit = int(ICP.get_param( "fusion_repairs.client_portal_rate_limit_per_hour", "10" )) except (ValueError, TypeError): limit = 10 # Use remote_addr from the proxy header if present. ip = ( request.httprequest.headers.get("X-Forwarded-For") or request.httprequest.remote_addr or "unknown" ) ip = ip.split(",")[0].strip() bucket = _now_hour_bucket() key = f"{ip}:{bucket}" # Prune old buckets (cheap - dict is small). for k in list(_RATE_LIMIT_BUCKET.keys()): if not k.endswith(f":{bucket}"): _RATE_LIMIT_BUCKET.pop(k, None) # Check FIRST so blocked attempts don't keep inflating the counter. if _RATE_LIMIT_BUCKET.get(key, 0) >= limit: return True # blocked _RATE_LIMIT_BUCKET[key] = _RATE_LIMIT_BUCKET.get(key, 0) + 1 return False # ------------------------------------------------------------------ # LANDING # ------------------------------------------------------------------ @http.route("/repair", type="http", auth="public", website=True, sitemap=True) def repair_landing(self, **kw): return request.render("fusion_repairs.portal_client_repair_landing", { "page_name": "client_repair_landing", }) @http.route("/repair/new", type="http", auth="public", website=True, sitemap=False) def repair_new(self, sn=None, **kw): categories = request.env["fusion.repair.product.category"].sudo().search([ ("active", "=", True), ], order="sequence, name") prefilled_serial = (sn or "").strip() return request.render("fusion_repairs.portal_client_repair_form", { "page_name": "client_repair_new", "categories": categories, "prefilled_serial": prefilled_serial, "error": kw.get("error"), }) # ------------------------------------------------------------------ # SAFE PARTNER LOOKUP (anti-leak) # ------------------------------------------------------------------ @http.route("/repair/lookup_phone", type="jsonrpc", auth="public", website=True) def repair_lookup_phone(self, phone=None, **kw): if self._check_rate_limit(): return {"error": "rate_limited"} cleaned = _e164_clean(phone) if len(cleaned) < 7: return {"matched": False} matches = request.env["res.partner"].sudo().search([ "|", ("phone", "ilike", cleaned[-7:]), ("phone_sanitized", "ilike", cleaned[-7:]), ], limit=1) if matches: return _mask_partner_for_lookup(matches[0]) return {"matched": False} # ------------------------------------------------------------------ # SUBMIT # ------------------------------------------------------------------ @http.route("/repair/submit", type="http", auth="public", methods=["POST"], csrf=True, website=True) def repair_submit(self, **post): # Honeypot - bots tend to fill every visible field. if (post.get("hp_company") or "").strip(): _logger.info("Client portal submit blocked by honeypot from IP=%s", request.httprequest.remote_addr) return request.redirect("/repair/new?error=spam") if self._check_rate_limit(): return request.redirect("/repair/new?error=rate_limited") # Required fields. partner_name = (post.get("client_name") or "").strip() phone = (post.get("client_phone") or "").strip() issue_summary = (post.get("issue_summary") or "").strip() category_id = int(post.get("category_id") or 0) if not (partner_name and phone and issue_summary and category_id): return request.redirect("/repair/new?error=missing") # Validate email if provided. Empty is allowed; malformed redirects back. raw_email = (post.get("client_email") or "").strip() clean_email = email_normalize(raw_email) if raw_email else False if raw_email and not clean_email: return request.redirect("/repair/new?error=email") # Find or create partner. Match by phone if known (safe - we already # have their consent to contact via this form). cleaned_phone = _e164_clean(phone) partner = False if len(cleaned_phone) >= 7: partner = request.env["res.partner"].sudo().search([ "|", ("phone", "ilike", cleaned_phone[-7:]), ("phone_sanitized", "ilike", cleaned_phone[-7:]), ], limit=1) partner_vals = None if not partner: partner_vals = { "name": partner_name, "phone": phone, "email": clean_email or False, "street": (post.get("client_street") or "").strip(), "city": (post.get("client_city") or "").strip(), } # Stage uploaded photos. files = request.httprequest.files.getlist("photos") attachment_ids = [] for f in files or []: if not getattr(f, "filename", None): continue data = f.read() if not data: continue attachment_ids.append(request.env["ir.attachment"].sudo().create({ "name": f.filename, "datas": base64.b64encode(data), "res_model": "fusion.repair.intake.session", "res_id": 0, }).id) equipment = { "repair_category_id": category_id, "third_party": post.get("third_party") in ("on", "true", "1"), "urgency": post.get("urgency") or "normal", "issue_summary": issue_summary, "internal_notes": (post.get("internal_notes") or "").strip(), "photo_attachment_ids": attachment_ids, } # Pick a real human owner for the repair so emails go from a person: # admin if present, else the lowest-id non-share user, else SUPERUSER_ID. admin = request.env.ref("base.user_admin", raise_if_not_found=False) if admin: intake_uid = admin.id else: internal = request.env["res.users"].sudo().search( [("share", "=", False)], order="id asc", limit=1, ) intake_uid = internal.id if internal else SUPERUSER_ID payload = { "partner_id": partner.id if partner else None, "partner_vals": partner_vals, "intake_user_id": intake_uid, "equipment_items": [equipment], } try: repairs = request.env["fusion.repair.intake.service"].sudo() \ .create_repair_orders(payload, source="client_portal") except Exception: _logger.exception("Client portal repair submit failed") return request.redirect("/repair/new?error=server") token = hashlib.sha256( f"{repairs[0].id}:{repairs[0].create_date}".encode() ).hexdigest()[:16] return request.redirect(f"/repair/thanks?ref={repairs[0].name}&t={token}") @http.route("/repair/thanks", type="http", auth="public", website=True, sitemap=False) def repair_thanks(self, ref=None, t=None, **kw): return request.render("fusion_repairs.portal_client_repair_thanks", { "page_name": "client_repair_thanks", "ref": ref or "", })