Files
Odoo-Modules/fusion_ringcentral/models/rc_config.py
gsinghpal acd3fc455e changes
2026-03-09 15:21:22 -04:00

721 lines
28 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
import json
import logging
import socket
import time
from datetime import datetime, timedelta
import requests
from odoo import api, fields, models, _
from odoo.exceptions import UserError
RC_RATE_LIMIT_DELAY = 10
RC_RATE_LIMIT_RETRY_WAIT = 65
RC_MAX_RETRIES = 5
_logger = logging.getLogger(__name__)
RC_EMBEDDABLE_REDIRECT = (
'https://apps.ringcentral.com/integration/ringcentral-embeddable/latest/redirect.html'
)
class RcConfig(models.Model):
_name = 'rc.config'
_description = 'RingCentral Configuration'
_rec_name = 'name'
name = fields.Char(string='Configuration Name', required=True, default='RingCentral')
client_id = fields.Char(string='Client ID', required=True, groups='fusion_ringcentral.group_rc_manager')
client_secret = fields.Char(string='Client Secret', required=True, groups='fusion_ringcentral.group_rc_manager')
server_url = fields.Char(
string='Server URL',
required=True,
default='https://platform.ringcentral.com',
)
access_token = fields.Char(string='Access Token', groups='base.group_system')
refresh_token = fields.Char(string='Refresh Token', groups='base.group_system')
token_expiry = fields.Datetime(string='Token Expires At', groups='base.group_system')
webhook_subscription_id = fields.Char(string='Webhook Subscription ID', readonly=True)
state = fields.Selection([
('draft', 'Not Connected'),
('connected', 'Connected'),
('error', 'Error'),
], string='Status', default='draft', required=True, tracking=True)
phone_widget_enabled = fields.Boolean(string='Enable Phone Widget', default=True)
proxy_url = fields.Char(string='HTTP Proxy URL', help='Optional proxy for enterprise networks.')
proxy_port = fields.Char(string='Proxy Port')
ssl_verify = fields.Boolean(string='Verify SSL Certificates', default=True)
extension_name = fields.Char(string='Connected As', readonly=True)
company_id = fields.Many2one('res.company', default=lambda self: self.env.company)
# ──────────────────────────────────────────────────────────
# Connection test
# ──────────────────────────────────────────────────────────
def action_test_connection(self):
"""Test DNS + HTTPS connectivity to the RingCentral server."""
self.ensure_one()
results = []
# DNS check
hostname = self.server_url.replace('https://', '').replace('http://', '').rstrip('/')
try:
socket.getaddrinfo(hostname, 443)
results.append(f'DNS: {hostname} resolved OK')
except socket.gaierror:
return self._notify('Connection Failed', f'DNS resolution failed for {hostname}', 'danger')
# HTTPS check
try:
resp = requests.get(
f'{self.server_url}/restapi/v1.0',
timeout=10,
verify=self.ssl_verify,
proxies=self._get_proxies(),
)
results.append(f'HTTPS: Status {resp.status_code}')
except requests.RequestException as e:
return self._notify('Connection Failed', f'HTTPS error: {e}', 'danger')
return self._notify('Connection Successful', '\n'.join(results), 'success')
# ──────────────────────────────────────────────────────────
# OAuth flow
# ──────────────────────────────────────────────────────────
def action_oauth_connect(self):
"""Redirect user to RingCentral OAuth authorization page."""
self.ensure_one()
base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url', '')
redirect_uri = f'{base_url}/ringcentral/oauth'
state = json.dumps({'config_id': self.id})
auth_url = (
f'{self.server_url}/restapi/oauth/authorize'
f'?response_type=code'
f'&client_id={self.client_id}'
f'&redirect_uri={redirect_uri}'
f'&state={state}'
)
return {
'type': 'ir.actions.act_url',
'url': auth_url,
'target': 'self',
}
def action_disconnect(self):
"""Revoke tokens and disconnect."""
self.ensure_one()
if self.access_token:
try:
requests.post(
f'{self.server_url}/restapi/oauth/revoke',
data={'token': self.access_token},
auth=(self.client_id, self.client_secret),
timeout=10,
verify=self.ssl_verify,
proxies=self._get_proxies(),
)
except Exception:
_logger.exception("Error revoking RingCentral token")
self.write({
'access_token': False,
'refresh_token': False,
'token_expiry': False,
'webhook_subscription_id': False,
'state': 'draft',
'extension_name': False,
})
return self._notify('Disconnected', 'RingCentral has been disconnected.', 'warning')
def exchange_auth_code(self, code, redirect_uri):
"""Exchange authorization code for access/refresh tokens."""
self.ensure_one()
try:
resp = requests.post(
f'{self.server_url}/restapi/oauth/token',
data={
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': redirect_uri,
},
auth=(self.client_id, self.client_secret),
timeout=15,
verify=self.ssl_verify,
proxies=self._get_proxies(),
)
resp.raise_for_status()
data = resp.json()
expires_in = data.get('expires_in', 3600)
self.write({
'access_token': data.get('access_token'),
'refresh_token': data.get('refresh_token'),
'token_expiry': datetime.utcnow() + timedelta(seconds=expires_in),
'state': 'connected',
})
self._fetch_extension_info()
self._create_webhook_subscription()
return True
except Exception:
_logger.exception("RingCentral OAuth token exchange failed")
self.write({'state': 'error'})
return False
def _refresh_token(self):
"""Refresh the access token using the refresh token.
Uses SELECT ... FOR UPDATE to prevent concurrent cron workers
from racing on the same refresh token (RC rotates tokens on
each refresh, so only the first caller wins).
"""
self.ensure_one()
if not self.refresh_token:
self.write({'state': 'error'})
return False
self.env.cr.execute(
"SELECT refresh_token, state, token_expiry "
"FROM rc_config WHERE id = %s FOR UPDATE SKIP LOCKED",
(self.id,),
)
row = self.env.cr.fetchone()
if not row:
_logger.debug("RC config %s locked by another worker, skipping refresh", self.id)
return True
if not row[0]:
return False
if row[1] == 'connected' and row[2] and row[2] > datetime.utcnow() + timedelta(minutes=2):
return True
current_refresh = row[0]
try:
resp = requests.post(
f'{self.server_url}/restapi/oauth/token',
data={
'grant_type': 'refresh_token',
'refresh_token': current_refresh,
},
auth=(self.client_id, self.client_secret),
timeout=15,
verify=self.ssl_verify,
proxies=self._get_proxies(),
)
if resp.status_code != 200:
try:
err_data = resp.json()
rc_error = err_data.get('error', 'unknown')
rc_desc = err_data.get('error_description', '')
rc_codes = [e.get('errorCode', '') for e in err_data.get('errors', [])]
_logger.error(
"RingCentral token refresh failed: %s - %s (codes: %s). "
"Manual OAuth reconnect required.",
rc_error, rc_desc, ', '.join(rc_codes),
)
except Exception:
_logger.error(
"RingCentral token refresh failed: HTTP %d", resp.status_code,
)
self.write({'state': 'error'})
return False
data = resp.json()
expires_in = data.get('expires_in', 3600)
self.write({
'access_token': data.get('access_token'),
'refresh_token': data.get('refresh_token', current_refresh),
'token_expiry': datetime.utcnow() + timedelta(seconds=expires_in),
'state': 'connected',
})
return True
except Exception:
_logger.exception("RingCentral token refresh failed (network error)")
self.write({'state': 'error'})
return False
def _ensure_token(self):
"""Ensure we have a valid access token, refreshing proactively."""
self.ensure_one()
if not self.access_token:
raise UserError(_('RingCentral is not connected. Please connect via OAuth first.'))
buffer = fields.Datetime.now() + timedelta(minutes=5)
if self.token_expiry and self.token_expiry < buffer:
if not self._refresh_token():
raise UserError(_('Failed to refresh RingCentral token. Please reconnect.'))
def _get_headers(self):
"""Return authorization headers for API calls."""
self._ensure_token()
return {
'Authorization': f'Bearer {self.access_token}',
'Content-Type': 'application/json',
}
def _api_get(self, endpoint, params=None):
"""Make an authenticated GET request with rate-limit handling."""
return self._api_request('GET', endpoint, params=params)
def _api_post(self, endpoint, data=None):
"""Make an authenticated POST request with rate-limit handling."""
return self._api_request('POST', endpoint, json_data=data)
def _api_request(self, method, endpoint, params=None, json_data=None):
"""Central API request method with automatic rate-limit retry.
RingCentral Heavy group (Call Log): 10 req / 60 sec, 60 sec penalty.
We pace at 10 sec between calls (~6 req/min) and auto-retry on
429 or 401 (token can appear invalid during penalty window).
"""
self._ensure_token()
url = endpoint if endpoint.startswith('http') else f'{self.server_url}{endpoint}'
kwargs = {
'timeout': 30,
'verify': self.ssl_verify,
'proxies': self._get_proxies(),
}
if params:
kwargs['params'] = params
if json_data is not None:
kwargs['json'] = json_data
last_error = None
for attempt in range(1, RC_MAX_RETRIES + 1):
kwargs['headers'] = self._get_headers()
resp = requests.request(method, url, **kwargs)
if resp.status_code == 429:
retry_after = int(resp.headers.get('Retry-After', RC_RATE_LIMIT_RETRY_WAIT))
_logger.warning(
"RC rate limit 429. Waiting %d sec (attempt %d/%d)...",
retry_after, attempt, RC_MAX_RETRIES,
)
time.sleep(retry_after)
self._ensure_token()
continue
if resp.status_code == 401 and attempt < RC_MAX_RETRIES:
_logger.warning(
"RC 401 Unauthorized -- token may be stale from rate-limit penalty. "
"Refreshing token and waiting 60 sec (attempt %d/%d)...",
attempt, RC_MAX_RETRIES,
)
time.sleep(60)
try:
self._refresh_token()
except Exception:
pass
continue
if resp.status_code >= 400:
last_error = resp
_logger.error(
"RC API error %d on %s %s: %s",
resp.status_code, method, url, resp.text[:300],
)
resp.raise_for_status()
time.sleep(RC_RATE_LIMIT_DELAY)
return resp.json()
if last_error is not None:
last_error.raise_for_status()
raise UserError(_('RingCentral API request failed after %d attempts.') % RC_MAX_RETRIES)
# ──────────────────────────────────────────────────────────
# Extension info
# ──────────────────────────────────────────────────────────
def _fetch_extension_info(self):
"""Fetch the connected user's extension info."""
self.ensure_one()
try:
data = self._api_get('/restapi/v1.0/account/~/extension/~')
self.extension_name = data.get('name', 'Unknown')
except Exception:
_logger.exception("Failed to fetch extension info")
# ──────────────────────────────────────────────────────────
# Webhook management
# ──────────────────────────────────────────────────────────
def _create_webhook_subscription(self):
"""Create a webhook subscription for telephony events."""
self.ensure_one()
base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url', '')
webhook_url = f'{base_url}/ringcentral/webhook'
try:
data = self._api_post('/restapi/v1.0/subscription', data={
'eventFilters': [
'/restapi/v1.0/account/~/extension/~/telephony/sessions',
],
'deliveryMode': {
'transportType': 'WebHook',
'address': webhook_url,
},
'expiresIn': 630720000,
})
self.webhook_subscription_id = data.get('id', '')
_logger.info("RingCentral webhook subscription created: %s", self.webhook_subscription_id)
except Exception:
_logger.exception("Failed to create RingCentral webhook subscription")
def _renew_webhook_subscription(self):
"""Renew the webhook subscription."""
self.ensure_one()
if not self.webhook_subscription_id:
self._create_webhook_subscription()
return
try:
self._api_post(
f'/restapi/v1.0/subscription/{self.webhook_subscription_id}/renew'
)
_logger.info("RingCentral webhook renewed: %s", self.webhook_subscription_id)
except Exception:
_logger.warning("Webhook renewal failed, recreating...")
self._create_webhook_subscription()
# ──────────────────────────────────────────────────────────
# Helpers
# ──────────────────────────────────────────────────────────
def _get_proxies(self):
"""Return proxy dict for requests if configured."""
if self.proxy_url:
proxy = f'{self.proxy_url}:{self.proxy_port}' if self.proxy_port else self.proxy_url
return {'http': proxy, 'https': proxy}
return None
def _notify(self, title, message, level='info'):
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'title': _(title),
'message': message,
'type': level,
'sticky': level == 'danger',
},
}
@api.model
def _get_active_config(self):
"""Return the first connected config, or False."""
return self.search([('state', '=', 'connected')], limit=1)
def _send_sms(self, to_number, message_text, from_number=None):
"""Send an SMS via the RingCentral REST API.
:param str to_number: Recipient phone number (E.164 format).
:param str message_text: SMS body (max 1000 characters).
:param str from_number: Sender phone number. Defaults to the
company phone configured in Odoo.
:return: API response dict.
:raises UserError: If the RingCentral config is not connected.
"""
self.ensure_one()
if not from_number:
from_number = self.env.company.phone
if not from_number:
raise UserError(
_('No sender phone number configured. Set a phone number '
'on your company or pass from_number explicitly.')
)
payload = {
'to': [{'phoneNumber': to_number}],
'from': {'phoneNumber': from_number},
'text': message_text[:1000],
}
return self._api_post(
'/restapi/v1.0/account/~/extension/~/sms',
data=payload,
)
def action_rematch_contacts(self):
"""Re-run contact matching on all calls without a linked partner."""
self.ensure_one()
CallHistory = self.env['rc.call.history']
unlinked = CallHistory.search([('partner_id', '=', False)])
matched = 0
for call in unlinked:
partner = CallHistory._match_partner(
call.from_number or '', call.to_number or '', call.direction,
)
if partner:
call.write({'partner_id': partner.id})
matched += 1
return self._notify(
'Contact Matching Complete',
f'Matched {matched} calls out of {len(unlinked)} unlinked records.',
'success' if matched else 'info',
)
def action_import_historical_calls(self):
"""Trigger historical call import in the background via cron.
Returns immediately so the UI doesn't freeze. The actual work
happens in _run_historical_import() called by a one-shot cron.
"""
self.ensure_one()
self._ensure_token()
cron = self.env.ref(
'fusion_ringcentral.cron_rc_historical_import', raise_if_not_found=False,
)
if cron:
cron.sudo().write({
'active': True,
'nextcall': fields.Datetime.now(),
})
else:
self.env['ir.cron'].sudo().create({
'name': 'RingCentral: Historical Import',
'cron_name': 'RingCentral: Historical Import',
'model_id': self.env['ir.model']._get('rc.config').id,
'state': 'code',
'code': 'model._run_historical_import()',
'interval_number': 9999,
'interval_type': 'days',
'nextcall': fields.Datetime.now(),
'active': True,
})
return self._notify(
'Import Started',
'Historical call import is running in the background. '
'Check Fusion RingCentral > Call History in a few minutes to see results.',
'info',
)
def action_import_historical_voicemails(self):
"""Trigger historical voicemail import in the background via cron."""
self.ensure_one()
self._ensure_token()
cron = self.env.ref(
'fusion_ringcentral.cron_rc_historical_vm_import',
raise_if_not_found=False,
)
if cron:
cron.sudo().write({
'active': True,
'nextcall': fields.Datetime.now(),
})
else:
self.env['ir.cron'].sudo().create({
'name': 'RingCentral: Historical Voicemail Import',
'cron_name': 'RingCentral: Historical Voicemail Import',
'model_id': self.env['ir.model']._get('rc.voicemail').id,
'state': 'code',
'code': 'model._run_historical_voicemail_import()',
'interval_number': 9999,
'interval_type': 'days',
'nextcall': fields.Datetime.now(),
'active': True,
})
return self._notify(
'Voicemail Import Started',
'Historical voicemail import is running in the background. '
'Check Fusion RingCentral > Voicemails in a few minutes.',
'info',
)
def action_import_historical_faxes(self):
"""Trigger historical fax import in the background via cron."""
self.ensure_one()
self._ensure_token()
cron = self.env.ref(
'fusion_ringcentral.cron_rc_historical_fax_import',
raise_if_not_found=False,
)
if cron:
cron.sudo().write({
'active': True,
'nextcall': fields.Datetime.now(),
})
else:
self.env['ir.cron'].sudo().create({
'name': 'RingCentral: Historical Fax Import',
'cron_name': 'RingCentral: Historical Fax Import',
'model_id': self.env['ir.model']._get('fusion.fax').id,
'state': 'code',
'code': 'model._run_historical_fax_import()',
'interval_number': 9999,
'interval_type': 'days',
'nextcall': fields.Datetime.now(),
'active': True,
})
return self._notify(
'Fax Import Started',
'Historical fax import (12 months) is running in the background. '
'Check Fusion Connect > Faxes in a few minutes.',
'info',
)
def action_backfill_voicemail_media(self):
"""Re-download audio and transcriptions for voicemails missing them."""
self.ensure_one()
self._ensure_token()
cron = self.env.ref(
'fusion_ringcentral.cron_rc_backfill_vm_media',
raise_if_not_found=False,
)
if cron:
cron.sudo().write({
'active': True,
'nextcall': fields.Datetime.now(),
})
else:
self.env['ir.cron'].sudo().create({
'name': 'RingCentral: Backfill Voicemail Media',
'cron_name': 'RingCentral: Backfill Voicemail Media',
'model_id': self.env['ir.model']._get('rc.voicemail').id,
'state': 'code',
'code': 'model._run_backfill_voicemail_media()',
'interval_number': 9999,
'interval_type': 'days',
'nextcall': fields.Datetime.now(),
'active': True,
})
return self._notify(
'Voicemail Media Backfill Started',
'Downloading audio and transcriptions for voicemails that are missing them. '
'This runs in the background with rate-limit pacing.',
'info',
)
@api.model
def _run_historical_import(self):
"""Background job: import up to 12 months of call history.
Tracks which monthly chunks completed successfully via
ir.config_parameter so it never re-queries months that
already finished. Only fetches chunks that failed or
haven't been attempted yet.
"""
config = self._get_active_config()
if not config:
_logger.warning("RC Historical Import: No connected config found.")
return
try:
config._ensure_token()
except Exception:
_logger.exception("RC Historical Import: Token invalid, aborting.")
return
ICP = self.env['ir.config_parameter'].sudo()
CallHistory = self.env['rc.call.history']
now = datetime.utcnow()
total_imported = 0
total_skipped_chunks = 0
failed_chunks = 0
for months_back in range(12, 0, -1):
chunk_start = now - timedelta(days=months_back * 30)
chunk_end = now - timedelta(days=(months_back - 1) * 30)
chunk_num = 13 - months_back
date_from = chunk_start.strftime('%Y-%m-%dT%H:%M:%S.000Z')
date_to = chunk_end.strftime('%Y-%m-%dT%H:%M:%S.000Z')
chunk_key = f'fusion_rc.import_done_{chunk_start.strftime("%Y%m")}'
already_done = ICP.get_param(chunk_key, '')
if already_done:
_logger.info(
"RC Import [%d/12]: %s to %s -- already imported, skipping.",
chunk_num, date_from[:10], date_to[:10],
)
total_skipped_chunks += 1
continue
_logger.info(
"RC Import [%d/12]: %s to %s ...",
chunk_num, date_from[:10], date_to[:10],
)
try:
chunk_count = CallHistory._sync_calls_from_date(config, date_from, date_to)
total_imported += chunk_count
ICP.set_param(chunk_key, fields.Datetime.now().isoformat())
_logger.info(
"RC Import [%d/12]: %d calls imported.",
chunk_num, chunk_count,
)
except Exception:
failed_chunks += 1
_logger.exception(
"RC Import [%d/12]: Failed, will retry next run.",
chunk_num,
)
ICP.set_param(
'fusion_rc.last_call_sync',
now.strftime('%Y-%m-%dT%H:%M:%S.000Z'),
)
_logger.info(
"RC Historical Import complete: %d imported, %d chunks skipped (already done), %d failed.",
total_imported, total_skipped_chunks, failed_chunks,
)
# ──────────────────────────────────────────────────────────
# Cron: token refresh
# ──────────────────────────────────────────────────────────
@api.model
def _cron_refresh_tokens(self):
"""Refresh tokens for connected configs nearing expiry and recover error configs."""
soon = fields.Datetime.now() + timedelta(minutes=10)
configs = self.search([
('state', '=', 'connected'),
('token_expiry', '<=', soon),
('refresh_token', '!=', False),
])
for cfg in configs:
try:
cfg._refresh_token()
except Exception:
_logger.exception("Cron: Failed to refresh token for config %s", cfg.name)
error_configs = self.search([
('state', '=', 'error'),
('refresh_token', '!=', False),
])
for cfg in error_configs:
try:
if cfg._refresh_token():
_logger.info("Cron: Recovered RC config %s from error state", cfg.name)
except Exception:
pass
# ──────────────────────────────────────────────────────────
# Cron: webhook renewal
# ──────────────────────────────────────────────────────────
@api.model
def _cron_renew_webhooks(self):
"""Renew webhook subscriptions for all connected configs."""
configs = self.search([('state', '=', 'connected')])
for cfg in configs:
try:
cfg._renew_webhook_subscription()
except Exception:
_logger.exception("Cron: Failed to renew webhook for config %s", cfg.name)