# -*- coding: utf-8 -*- # Copyright 2026 Nexa Systems Inc. # License OPL-1 """HTTP routes for the Fusion Helpdesk Reporter. `/fusion_helpdesk/submit` accepts the dialog payload, forwards it to a remote Odoo Helpdesk over XML-RPC, attaches uploaded files + screenshot, and returns the resulting ticket id/url to the OWL dialog. Why XML-RPC and not JSON-RPC? Helpdesk's external API surface is exposed via Odoo's standard `/xmlrpc/2/object` endpoint which is the most stable cross-version contract. JSON-RPC `/jsonrpc` works too but historically had quirkier error handling on file payloads. """ import base64 import logging import socket import ssl import xmlrpc.client from urllib.parse import urljoin, urlparse from odoo import _, http from odoo.exceptions import UserError from odoo.http import request from odoo.addons.fusion_helpdesk.utils import ( build_ticket_vals, build_scope_domain, bucket_ticket, is_public_message, compute_unread_count, escape_like, _norm_email, ) _logger = logging.getLogger(__name__) class FusionHelpdeskController(http.Controller): @http.route( '/fusion_helpdesk/submit', type='jsonrpc', auth='user', methods=['POST'], ) def submit(self, kind, subject, description, error_code=None, attachments=None, page_url=None, user_agent=None, reply_email=None, is_critical=False): """Forward a bug report or feature request to the central Odoo Helpdesk and return {ok, ticket_id, ticket_url, error}. Args: kind: 'bug' or 'feature'. subject: short title. description: long-form. May contain HTML or plain text. error_code: optional traceback / error string. attachments: list of {name, mimetype, data_b64} dicts. page_url: client-side window.location.href at submit time. user_agent: client-side navigator.userAgent. """ cfg = self._read_config() if not all([cfg['url'], cfg['db'], cfg['login'], cfg['password']]): return { 'ok': False, 'error': 'config_missing', 'message': _( 'Fusion Helpdesk is not fully configured. Ask an ' 'administrator to fill in the remote URL, database, ' 'login and password under Settings → Fusion Helpdesk.' ), } # ---- Build the ticket payload --------------------------------- body_parts = [] if description: body_parts.append( '

Description

%s
' % _html_escape(description) ) if error_code: body_parts.append( '

Error Code / Traceback

' '
%s
' % _html_escape(error_code) ) body_parts.append(self._build_diag_block(page_url, user_agent)) # Identity keystone: send the reporter's name + email so the central # helpdesk find-or-creates the customer partner and subscribes them as # a follower — which is what enables reply emails, the magic link, and # the scoped "My Tickets" inbox. reply_email is the (editable) value the # user confirmed in the dialog; fall back to their Odoo email/login. user = request.env.user # Normalise the confirmed email (and fall back to the user's own). # Normalising rejects garbage / wildcard-bearing values so the stored # partner_email — which is also the inbox scope key — stays clean. reporter_email = _norm_email(reply_email, user.email, user.login) ticket_vals = build_ticket_vals( kind=kind, subject=subject, body_html='\n'.join(body_parts), team_id=cfg['team_id'], client_label=cfg['client_label'], reporter_name=user.name, reporter_email=reporter_email, company_name=request.env.company.name, is_critical=bool(is_critical), ) # Piggyback the configured owner contact on every submission so the # central can keep fusion.helpdesk.client.key.owner_email/name in # sync without a dedicated endpoint. The central's create override # pops these keys before super().create() sees them — they're not # real helpdesk.ticket columns, just a sync side-channel. if cfg.get('owner_email'): ticket_vals['x_fc_owner_email'] = cfg['owner_email'] if cfg.get('owner_name'): ticket_vals['x_fc_owner_name'] = cfg['owner_name'] # ---- Talk to remote Odoo -------------------------------------- try: uid, models_proxy = self._authenticate(cfg) except _RemoteError as e: return e.to_response() try: ticket_id = models_proxy.execute_kw( cfg['db'], uid, cfg['password'], 'helpdesk.ticket', 'create', [ticket_vals], ) except xmlrpc.client.Fault as e: fault = (e.faultString or '').strip() _logger.warning( 'fusion_helpdesk: helpdesk.ticket.create failed: %s', fault, ) # The Helpdesk app might not be installed on the remote. if 'helpdesk.ticket' in fault and 'does not exist' in fault.lower(): return { 'ok': False, 'error': 'helpdesk_not_installed', 'message': _( 'The Helpdesk app is not installed on the central ' 'Odoo at %s. Install the Helpdesk app there before ' 'submitting tickets.' ) % cfg['url'], } if 'access' in fault.lower() or 'rights' in fault.lower(): return { 'ok': False, 'error': 'permission_denied', 'message': _( 'The remote service account "%s" does not have ' 'permission to create helpdesk tickets. Ask a ' 'central Odoo admin to grant Helpdesk Officer ' 'rights to that user.' ) % cfg['login'], } return { 'ok': False, 'error': 'create_failed', 'message': _('The remote Helpdesk rejected the ticket: %s' ) % fault, } except (socket.timeout, OSError, ssl.SSLError) as e: _logger.warning( 'fusion_helpdesk: ticket create network error: %s', e, ) return _network_error_response(cfg['url'], e) # ---- Push attachments ----------------------------------------- # The ticket already exists; an attachment failure must NOT bubble up # as a 500 (the user would think the whole submission failed and file # a duplicate). Catch network errors too, count failures, and report # them back so the dialog can tell the user which files didn't make it. attached = 0 failed = 0 for att in attachments or []: data_b64 = (att or {}).get('data_b64') name = (att or {}).get('name') or 'attachment.bin' mimetype = (att or {}).get('mimetype') or 'application/octet-stream' if not data_b64: continue try: models_proxy.execute_kw( cfg['db'], uid, cfg['password'], 'ir.attachment', 'create', [{ 'name': name, 'datas': data_b64, 'res_model': 'helpdesk.ticket', 'res_id': ticket_id, 'mimetype': mimetype, }], ) attached += 1 except (xmlrpc.client.Fault, xmlrpc.client.ProtocolError, socket.timeout, OSError, ssl.SSLError) as e: failed += 1 _logger.warning( 'fusion_helpdesk: attachment "%s" upload failed: %s', name, e, ) ticket_url = urljoin( cfg['url'].rstrip('/') + '/', 'odoo/helpdesk/%s' % ticket_id, ) _logger.info( 'fusion_helpdesk: created remote ticket #%s (%s attached, %s failed) ' 'on %s for user %s', ticket_id, attached, failed, cfg['url'], request.env.user.login, ) return { 'ok': True, 'ticket_id': ticket_id, 'ticket_url': ticket_url, 'attached': attached, 'failed': failed, } # ------------------------------------------------------------------ def _read_config(self): """Return the active config as a plain dict. Run as sudo so regular users can submit even without read-access on ir.config_parameter (the password row is system-write but readable by anyone with backend access).""" ICP = request.env['ir.config_parameter'].sudo() return { 'url': (ICP.get_param('fusion_helpdesk.remote_url') or '').strip(), 'db': (ICP.get_param('fusion_helpdesk.remote_db') or '').strip(), 'login': (ICP.get_param('fusion_helpdesk.remote_login') or '').strip(), 'password': ICP.get_param('fusion_helpdesk.remote_password') or '', 'team_id': int( ICP.get_param('fusion_helpdesk.remote_team_id') or 0 ) or False, 'client_label': ( ICP.get_param('fusion_helpdesk.client_label') or '' ).strip(), 'owner_email': ( ICP.get_param('fusion_helpdesk.owner_email') or '' ).strip(), 'owner_name': ( ICP.get_param('fusion_helpdesk.owner_name') or '' ).strip(), } def _authenticate(self, cfg): """Authenticate against the remote and return (uid, models_proxy). Raises _RemoteError with a granular `error` code and a friendly end-user message so the dialog can show what actually broke (network vs. credentials vs. server problem). """ url = cfg['url'].rstrip('/') try: common = xmlrpc.client.ServerProxy( '%s/xmlrpc/2/common' % url, allow_none=True, ) try: uid = common.authenticate( cfg['db'], cfg['login'], cfg['password'], {}, ) except xmlrpc.client.ProtocolError as e: # Server returned an HTTP error status. _logger.warning( 'fusion_helpdesk: HTTP %s from %s during authenticate: %s', e.errcode, url, e.errmsg, ) if e.errcode in (401, 403): raise _RemoteError( 'auth_failed', _('The central Odoo at %s rejected the login. ' 'Check the Service Login and API Key in ' 'Settings → Fusion Helpdesk.') % url, ) if e.errcode == 404: raise _RemoteError( 'endpoint_not_found', _('The XML-RPC endpoint at %s/xmlrpc/2/common ' 'returned 404. Verify the Remote URL points ' 'at an Odoo server (no trailing path).') % url, ) raise _RemoteError( 'remote_http_error', _('The central Odoo returned HTTP %(code)s while ' 'authenticating. Try again, or check that ' '%(url)s is reachable from a browser.' ) % {'code': e.errcode, 'url': url}, ) except xmlrpc.client.Fault as e: # Server-side application error — usually wrong DB name. fault = (e.faultString or '').strip() _logger.warning( 'fusion_helpdesk: XML-RPC fault during authenticate: %s', fault, ) if 'database' in fault.lower() and ( 'not exist' in fault.lower() or 'unknown' in fault.lower() ): raise _RemoteError( 'wrong_database', _('Database "%(db)s" does not exist on %(url)s. ' 'Verify the Remote DB in Settings → Fusion ' 'Helpdesk.') % {'db': cfg['db'], 'url': url}, ) raise _RemoteError( 'auth_failed', _('The central Odoo rejected authentication: %s' ) % fault, ) except ssl.SSLError as e: _logger.warning( 'fusion_helpdesk: TLS error against %s: %s', url, e, ) raise _RemoteError( 'tls_error', _('TLS / SSL handshake with %(url)s failed: ' '%(msg)s. Either the server cert is invalid or ' 'the system cert store on this host is out of ' 'date.') % {'url': url, 'msg': str(e)}, ) except socket.gaierror as e: _logger.warning( 'fusion_helpdesk: DNS lookup failed for %s: %s', url, e, ) raise _RemoteError( 'dns_error', _('Could not resolve "%(host)s". Check Settings → ' 'Fusion Helpdesk → Remote URL, or your server\'s ' 'DNS / outbound network.' ) % {'host': urlparse(url).hostname or url}, ) except (ConnectionRefusedError, socket.timeout) as e: _logger.warning( 'fusion_helpdesk: connection problem to %s: %s', url, e, ) raise _RemoteError( 'unreachable', _('Could not reach %(url)s — connection refused or ' 'timed out. Check that this server can make ' 'outbound HTTPS to %(host)s (firewall, proxy).' ) % {'url': url, 'host': urlparse(url).hostname or url}, ) except OSError as e: # Catch-all for socket / network issues we didn't classify. _logger.warning( 'fusion_helpdesk: network error to %s: %s', url, e, ) raise _RemoteError( 'unreachable', _('Network error reaching %(url)s: %(msg)s. Verify ' 'this server has outbound internet access.' ) % {'url': url, 'msg': str(e)}, ) except _RemoteError: raise except Exception as e: _logger.exception( 'fusion_helpdesk: unexpected error during authenticate ' 'against %s', url, ) raise _RemoteError( 'unknown_error', _('Unexpected error contacting the central Helpdesk: %s' ) % str(e), ) if not uid: raise _RemoteError( 'auth_failed', _('The central Odoo at %(url)s did not accept the ' 'login "%(login)s". Verify the Service Login and ' 'API Key in Settings → Fusion Helpdesk.' ) % {'url': url, 'login': cfg['login']}, ) models_proxy = xmlrpc.client.ServerProxy( '%s/xmlrpc/2/object' % url, allow_none=True, ) return uid, models_proxy def _build_diag_block(self, page_url, user_agent): env = request.env company = env.company user = env.user rows = [ ('User', '%s (#%s, %s)' % (user.name, user.id, user.login)), ('Company', '%s (#%s)' % (company.name, company.id)), ('Source page', page_url or '—'), ('User agent', user_agent or '—'), ('Source DB', request.env.cr.dbname), ('Source host', request.httprequest.host_url), ] body = '

Diagnostic context

' for k, v in rows: body += ( '' '' ) % (_html_escape(k), _html_escape(str(v))) body += '
%s%s
' return body # ================================================================== # Embedded ticket inbox — identity, RPC seam, helpers # ================================================================== def _identity(self): """Resolve the caller's scope from the SERVER-SIDE session only. Never trust an email / label / scope sent by the browser — this is the security boundary that stops one deployment reading another's tickets through the shared bot account.""" user = request.env.user cfg = self._read_config() return { 'cfg': cfg, # Normalised so a self-set wildcard email ('%') can't widen scope. 'email': _norm_email(user.email, user.login), 'label': cfg['client_label'], 'is_admin': user.has_group('fusion_helpdesk.group_reporter_admin'), 'name': user.name, } def _config_ready(self, cfg): return all([cfg['url'], cfg['db'], cfg['login'], cfg['password']]) def _rpc(self, cfg, model, method, args, kw=None): """Authenticate + execute_kw against the central Odoo as the bot. A ProtocolError on the execute_kw leg (e.g. a 502/503/429 from the central reverse proxy) is NOT an OSError subclass, so we convert it to a _RemoteError here — otherwise it would escape every endpoint's except-tuple and surface as a raw 500 (mislabelled "Network error").""" uid, proxy = self._authenticate(cfg) try: return proxy.execute_kw( cfg['db'], uid, cfg['password'], model, method, args, kw or {}, ) except xmlrpc.client.ProtocolError as e: _logger.warning('fusion_helpdesk: HTTP %s on %s.%s: %s', e.errcode, model, method, e.errmsg) raise _RemoteError( 'remote_http_error', _('The central Helpdesk returned HTTP %(code)s. Please try ' 'again in a moment.') % {'code': e.errcode}, ) def _stage_fold_map(self, cfg, tickets): """{stage_id: fold_bool} for the distinct stages on these tickets. `helpdesk.stage.fold` is the central support team's signal for "kanban-folded = closed" — Solved + Cancelled by default. We use it (not the stage name) to bucket tickets into the Solved section, so renaming a stage on central doesn't quietly mis-categorise rows. One extra RPC per list call, batched across all distinct stages.""" ids = sorted({t['stage_id'][0] for t in tickets if t.get('stage_id')}) if not ids: return {} rows = self._rpc(cfg, 'helpdesk.stage', 'read', [ids], {'fields': ['fold']}) return {r['id']: r.get('fold', False) for r in rows} def _internal_subtype_map(self, cfg, subtype_ids): """{subtype_id: internal_bool} so internal notes can be hidden.""" ids = [s for s in set(subtype_ids) if s] if not ids: return {} rows = self._rpc(cfg, 'mail.message.subtype', 'read', [ids], {'fields': ['internal']}) return {r['id']: r.get('internal', False) for r in rows} def _ticket_messages(self, cfg, ticket_ids): """Raw comment/email messages for a set of tickets (one RPC).""" if not ticket_ids: return [] return self._rpc( cfg, 'mail.message', 'search_read', [[('model', '=', 'helpdesk.ticket'), ('res_id', 'in', list(ticket_ids)), ('message_type', 'in', ['comment', 'email'])]], {'fields': ['id', 'res_id', 'author_id', 'subtype_id']}, ) def _last_support_map(self, cfg, tickets, msgs): """{ticket_id: latest customer-visible SUPPORT message id}. A support message is a public comment NOT authored by the ticket's own customer (internal notes and the customer's own posts excluded).""" internal = self._internal_subtype_map( cfg, [m['subtype_id'][0] for m in msgs if m.get('subtype_id')]) customer = { t['id']: (t['partner_id'][0] if t['partner_id'] else None) for t in tickets } last = {} for m in msgs: st = m.get('subtype_id') if st and internal.get(st[0]): continue # internal note — never counts / never shown author = m['author_id'][0] if m['author_id'] else None rid = m['res_id'] if author and author == customer.get(rid): continue # the customer's own reply isn't an unread "support" msg if m['id'] > last.get(rid, 0): last[rid] = m['id'] return last def _public_messages(self, cfg, ticket_id): """Customer-visible thread for one ticket, oldest first.""" raw = self._rpc( cfg, 'mail.message', 'search_read', [[('model', '=', 'helpdesk.ticket'), ('res_id', '=', ticket_id), ('message_type', 'in', ['comment', 'email'])]], {'fields': ['id', 'date', 'body', 'author_id', 'subtype_id', 'attachment_ids'], 'order': 'id asc'}, ) internal = self._internal_subtype_map( cfg, [m['subtype_id'][0] for m in raw if m.get('subtype_id')]) out = [] for m in raw: st = m.get('subtype_id') msg = { 'id': m['id'], 'date': m['date'], 'body': m['body'] or '', 'author': (m['author_id'][1] if m['author_id'] else ''), 'author_id': (m['author_id'][0] if m['author_id'] else False), 'attachment_count': len(m.get('attachment_ids') or []), 'subtype_is_internal': internal.get(st[0], False) if st else False, } if is_public_message(msg): out.append(msg) return out def _resolve_author(self, cfg, ident, ticket): """Find-or-create the replier's OWN partner on central so their reply is correctly attributed. `ident['email']` is already normalised (no wildcards); we escape it for the =ilike search as belt-and-suspenders. On any failure we log and return False — message_post then attributes the reply to the service account, which is honest. We deliberately do NOT fall back to the ticket's customer: for an admin replying to a colleague's ticket that would silently impersonate the customer.""" email = ident['email'] if not email: return False try: pids = self._rpc(cfg, 'res.partner', 'search', [[('email', '=ilike', escape_like(email))]], {'limit': 1}) if pids: return pids[0] return self._rpc(cfg, 'res.partner', 'create', [{'name': ident['name'], 'email': email}]) except (xmlrpc.client.Fault, _RemoteError) as e: _logger.warning( 'fusion_helpdesk: could not resolve reply author for %s on ' 'ticket %s (%s); posting as the service account.', email, ticket.get('id'), e) return False def _mark_ticket_seen(self, ticket_id, messages): """Best-effort read-tracking. Runs AFTER the remote read/post, so it must never raise — otherwise a local DB hiccup here would turn an already-successful reply into a reported failure, and the user would resubmit (posting a duplicate). Bookkeeping only; log and swallow.""" if not messages: return try: request.env['fusion.helpdesk.ticket.seen']._mark_seen( ticket_id, max(m['id'] for m in messages)) except Exception: # noqa: BLE001 — non-critical bookkeeping _logger.exception( 'fusion_helpdesk: mark-seen failed for ticket %s', ticket_id) def _remote_failure(self, cfg, err): """Map a mid-RPC failure to the dialog's response shape.""" if isinstance(err, _RemoteError): return err.to_response() if isinstance(err, (socket.timeout, OSError, ssl.SSLError)): return _network_error_response(cfg['url'], err) return {'ok': False, 'error': 'remote_error', 'message': _('The central Helpdesk returned an error: %s' ) % str(err)} # ================================================================== # Embedded ticket inbox — endpoints (auth='user', server-side scoped) # ================================================================== @http.route('/fusion_helpdesk/my_tickets', type='jsonrpc', auth='user', methods=['POST']) def my_tickets(self, scope='mine'): """List the caller's tickets (scoped). Admins may pass scope='all' to see every ticket from their deployment.""" ident = self._identity() cfg = ident['cfg'] if not self._config_ready(cfg): return {'ok': False, 'error': 'config_missing', 'message': _('Fusion Helpdesk is not configured.')} view_all = ident['is_admin'] and scope == 'all' domain = build_scope_domain(ident['label'], ident['email'], view_all) try: tickets = self._rpc( cfg, 'helpdesk.ticket', 'search_read', [domain], {'fields': ['id', 'name', 'stage_id', 'partner_id', 'write_date', 'ticket_ref', 'priority'], 'order': 'write_date desc', 'limit': 100}) msgs = self._ticket_messages(cfg, [t['id'] for t in tickets]) stage_fold = self._stage_fold_map(cfg, tickets) except (_RemoteError, xmlrpc.client.Fault, OSError, ssl.SSLError) as e: return self._remote_failure(cfg, e) last_support = self._last_support_map(cfg, tickets, msgs) ids = [t['id'] for t in tickets] seen = request.env['fusion.helpdesk.ticket.seen']._seen_map(ids) rows = [] for t in tickets: rid = t['id'] ls = last_support.get(rid, 0) sid = t['stage_id'][0] if t['stage_id'] else None rows.append({ 'id': rid, 'ref': t.get('ticket_ref') or str(rid), 'subject': t['name'], 'stage': t['stage_id'][1] if t['stage_id'] else '', 'last_update': t['write_date'], 'last_support_msg_id': ls, 'has_unread': ls > (seen.get(rid, 0) or 0), 'priority': t.get('priority') or '0', 'group': bucket_ticket(stage_fold.get(sid, False), t.get('priority')), }) return {'ok': True, 'tickets': rows, 'is_admin': ident['is_admin'], 'unread': compute_unread_count(rows, seen)} @http.route('/fusion_helpdesk/ticket/', type='jsonrpc', auth='user', methods=['POST']) def ticket_detail(self, ticket_id, **kw): """Full thread for one ticket — re-checks scope, hides internal notes, marks the ticket seen for the badge.""" ident = self._identity() cfg = ident['cfg'] if not self._config_ready(cfg): return {'ok': False, 'error': 'config_missing', 'message': _('Fusion Helpdesk is not configured.')} domain = build_scope_domain( ident['label'], ident['email'], ident['is_admin'] ) + [('id', '=', ticket_id)] try: found = self._rpc(cfg, 'helpdesk.ticket', 'search_read', [domain], {'fields': ['id', 'name', 'stage_id', 'access_token'], 'limit': 1}) if not found: return {'ok': False, 'error': 'not_found', 'message': _('Ticket not found or not accessible.')} messages = self._public_messages(cfg, ticket_id) except (_RemoteError, xmlrpc.client.Fault, OSError, ssl.SSLError) as e: return self._remote_failure(cfg, e) self._mark_ticket_seen(ticket_id, messages) t = found[0] # Magic link: the customer's own access-token URL on central, so they # can open the full ticket (incl. attachments) in the portal if needed. portal_url = '' if t.get('access_token'): portal_url = '%s/my/ticket/%s/%s' % ( cfg['url'].rstrip('/'), t['id'], t['access_token']) return {'ok': True, 'ticket': { 'id': t['id'], 'subject': t['name'], 'stage': t['stage_id'][1] if t['stage_id'] else '', 'portal_url': portal_url, 'messages': messages}} @http.route('/fusion_helpdesk/ticket//reply', type='jsonrpc', auth='user', methods=['POST']) def ticket_reply(self, ticket_id, body=None, **kw): """Post a customer reply on a scoped ticket, attributed to the replier.""" ident = self._identity() cfg = ident['cfg'] text = (body or '').strip() if not text: return {'ok': False, 'error': 'empty', 'message': _('Your reply is empty.')} if not self._config_ready(cfg): return {'ok': False, 'error': 'config_missing', 'message': _('Fusion Helpdesk is not configured.')} domain = build_scope_domain( ident['label'], ident['email'], ident['is_admin'] ) + [('id', '=', ticket_id)] try: found = self._rpc(cfg, 'helpdesk.ticket', 'search_read', [domain], {'fields': ['id', 'partner_id'], 'limit': 1}) if not found: return {'ok': False, 'error': 'not_found', 'message': _('Ticket not found or not accessible.')} author_id = self._resolve_author(cfg, ident, found[0]) # We escape the user's text ourselves, then mark it up as paragraphs. # message_post() ESCAPES a plain str body (it expects a Markup for # HTML) — but Markup can't cross XML-RPC, so we pass body_is_html=True # which tells the remote message_post to treat our already-escaped # HTML as Markup. Without this the customer would see literal

tags. html = '

%s

' % _html_escape(text).replace('\n', '
') self._rpc(cfg, 'helpdesk.ticket', 'message_post', [[ticket_id]], { 'body': html, 'body_is_html': True, 'message_type': 'comment', 'subtype_xmlid': 'mail.mt_comment', 'author_id': author_id, }) messages = self._public_messages(cfg, ticket_id) except (_RemoteError, xmlrpc.client.Fault, OSError, ssl.SSLError) as e: return self._remote_failure(cfg, e) self._mark_ticket_seen(ticket_id, messages) return {'ok': True, 'messages': messages} @http.route('/fusion_helpdesk/unread_count', type='jsonrpc', auth='user', methods=['POST']) def unread_count(self): """Badge count: tickets with a support reply newer than last-seen. Always scoped to the caller's OWN tickets (never the admin-all view).""" ident = self._identity() cfg = ident['cfg'] if not self._config_ready(cfg): return {'ok': True, 'count': 0} domain = build_scope_domain(ident['label'], ident['email'], False) try: tickets = self._rpc(cfg, 'helpdesk.ticket', 'search_read', [domain], {'fields': ['id', 'partner_id'], 'limit': 100}) msgs = self._ticket_messages(cfg, [t['id'] for t in tickets]) except (_RemoteError, xmlrpc.client.Fault, OSError, ssl.SSLError): return {'ok': True, 'count': 0} # badge must never break the systray last_support = self._last_support_map(cfg, tickets, msgs) rows = [{'id': k, 'last_support_msg_id': v} for k, v in last_support.items()] seen = request.env['fusion.helpdesk.ticket.seen']._seen_map( list(last_support.keys())) return {'ok': True, 'count': compute_unread_count(rows, seen)} def _html_escape(s): return ( (s or '') .replace('&', '&') .replace('<', '<') .replace('>', '>') ) class _RemoteError(Exception): """Typed wrapper that carries an `error` code + a friendly message so the dialog can show the right thing.""" def __init__(self, code, message): super().__init__(message) self.code = code self.message = message def to_response(self): return {'ok': False, 'error': self.code, 'message': self.message} def _network_error_response(url, err): """Map a raw network exception raised mid-RPC into the same response shape the dialog already knows how to render.""" host = urlparse(url).hostname or url if isinstance(err, ssl.SSLError): return { 'ok': False, 'error': 'tls_error', 'message': _('TLS / SSL error talking to %(url)s: %(msg)s' ) % {'url': url, 'msg': str(err)}, } if isinstance(err, socket.gaierror): return { 'ok': False, 'error': 'dns_error', 'message': _('Could not resolve "%s" — check DNS / outbound network.' ) % host, } if isinstance(err, socket.timeout): return { 'ok': False, 'error': 'unreachable', 'message': _('Timed out talking to %s — check the firewall ' 'allows outbound HTTPS.') % url, } return { 'ok': False, 'error': 'unreachable', 'message': _('Network error talking to %(url)s: %(msg)s' ) % {'url': url, 'msg': str(err)}, }