Files
Odoo-Modules/fusion-woo-odoo/fusion_woocommerce/lib/woo_api_client.py
gsinghpal 2a363c6b40 changes
2026-04-02 03:30:02 -04:00

342 lines
12 KiB
Python

import base64
import hashlib
import hmac
import logging
import threading
import time
import requests
_logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Shared circuit breaker state — one per WC base URL so all Odoo workers
# referencing the same WooCommerce store share the same breaker.
# ---------------------------------------------------------------------------
_circuit_breakers = {}
_cb_lock = threading.Lock()
class _CircuitBreaker:
"""Per-host circuit breaker: CLOSED → OPEN after N failures, auto-resets
after a cooldown period to HALF_OPEN (allows one probe request)."""
CLOSED = 'closed'
OPEN = 'open'
HALF_OPEN = 'half_open'
def __init__(self, failure_threshold=5, cooldown_seconds=60):
self.failure_threshold = failure_threshold
self.cooldown_seconds = cooldown_seconds
self.state = self.CLOSED
self.consecutive_failures = 0
self.last_failure_time = 0
self._lock = threading.Lock()
def record_success(self):
with self._lock:
self.consecutive_failures = 0
self.state = self.CLOSED
def record_failure(self):
with self._lock:
self.consecutive_failures += 1
self.last_failure_time = time.monotonic()
if self.consecutive_failures >= self.failure_threshold:
self.state = self.OPEN
_logger.warning(
"Circuit breaker OPEN after %d consecutive failures — "
"blocking requests for %ds",
self.consecutive_failures, self.cooldown_seconds,
)
def allow_request(self):
with self._lock:
if self.state == self.CLOSED:
return True
elapsed = time.monotonic() - self.last_failure_time
if elapsed >= self.cooldown_seconds:
self.state = self.HALF_OPEN
_logger.info("Circuit breaker HALF_OPEN — allowing probe request")
return True
return False
class _TokenBucket:
"""Simple token-bucket rate limiter. Tokens refill at *rate* per second
up to *capacity*. ``consume()`` blocks until a token is available."""
def __init__(self, rate, capacity):
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_refill = time.monotonic()
self._lock = threading.Lock()
def consume(self):
while True:
with self._lock:
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_refill = now
if self.tokens >= 1:
self.tokens -= 1
return
time.sleep(0.1)
class WooApiClient:
"""WooCommerce REST API v3 client wrapper with rate limiting and circuit
breaker protection."""
# Default: 3 requests/sec, burst up to 5.
# WooCommerce typically allows ~240 req/min (4/sec) so 3/sec is safe.
DEFAULT_RATE = 3
DEFAULT_BURST = 5
def __init__(self, url, consumer_key, consumer_secret,
api_version='wc/v3', timeout=30,
rate_limit=None, burst_limit=None):
self.base_url = url.rstrip('/')
self.api_version = api_version
self.timeout = timeout
self.session = requests.Session()
self.session.auth = (consumer_key, consumer_secret)
self.session.headers.update({
'Content-Type': 'application/json',
'User-Agent': 'FusionWooCommerce/1.0',
})
rate = rate_limit or self.DEFAULT_RATE
burst = burst_limit or self.DEFAULT_BURST
self._bucket = _TokenBucket(rate, burst)
with _cb_lock:
if self.base_url not in _circuit_breakers:
_circuit_breakers[self.base_url] = _CircuitBreaker(
failure_threshold=5, cooldown_seconds=60,
)
self._breaker = _circuit_breakers[self.base_url]
def _url(self, endpoint):
return f"{self.base_url}/wp-json/{self.api_version}/{endpoint}"
def _request(self, method, endpoint, data=None, params=None, retries=3):
url = self._url(endpoint)
if not self._breaker.allow_request():
raise ConnectionError(
"WooCommerce API circuit breaker is OPEN for %s"
"too many consecutive failures. Retry later." % self.base_url
)
last_exc = None
for attempt in range(retries):
self._bucket.consume()
try:
response = self.session.request(
method, url,
json=data, params=params,
timeout=self.timeout,
)
# --- Handle rate-limit response from WC / server ---
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 10))
retry_after = min(retry_after, 120)
_logger.warning(
"WC API 429 on %s %s — backing off %ds (attempt %d/%d)",
method, endpoint, retry_after, attempt + 1, retries,
)
time.sleep(retry_after)
continue
# --- Non-retryable client errors (400-499 except 429) ---
if 400 <= response.status_code < 500:
_logger.error(
"WC API %s %s returned %s (non-retryable): %s",
method, endpoint, response.status_code,
response.text[:500],
)
self._breaker.record_success()
response.raise_for_status()
# --- Server errors (500+) are retryable ---
if response.status_code >= 500:
_logger.warning(
"WC API %s %s returned %s (attempt %d/%d): %s",
method, endpoint, response.status_code,
attempt + 1, retries, response.text[:300],
)
last_exc = requests.HTTPError(response=response)
wait = min(2 ** attempt * 2, 30)
time.sleep(wait)
continue
self._breaker.record_success()
return response.json()
except requests.exceptions.ConnectionError as exc:
last_exc = exc
wait = min(2 ** attempt * 2, 30)
_logger.warning(
"WC API connection error %s %s (attempt %d/%d): %s"
"retrying in %ds",
method, endpoint, attempt + 1, retries, exc, wait,
)
time.sleep(wait)
except requests.exceptions.Timeout as exc:
last_exc = exc
wait = min(2 ** attempt * 2, 30)
_logger.warning(
"WC API timeout %s %s (attempt %d/%d) — retrying in %ds",
method, endpoint, attempt + 1, retries, wait,
)
time.sleep(wait)
except Exception as exc:
last_exc = exc
_logger.error(
"WC API unexpected error %s %s: %s", method, endpoint, exc,
)
break
self._breaker.record_failure()
raise last_exc
# --- Convenience methods ---
def get(self, endpoint, params=None):
return self._request('GET', endpoint, params=params)
def post(self, endpoint, data):
return self._request('POST', endpoint, data=data)
def put(self, endpoint, data):
return self._request('PUT', endpoint, data=data)
def delete(self, endpoint):
return self._request('DELETE', endpoint)
# --- Product endpoints ---
def get_products(self, page=1, per_page=100, **kwargs):
params = {'page': page, 'per_page': per_page, **kwargs}
return self.get('products', params=params)
def get_product(self, product_id):
return self.get(f'products/{product_id}')
def get_product_variations(self, product_id, page=1, per_page=100):
params = {'page': page, 'per_page': per_page}
return self.get(f'products/{product_id}/variations', params=params)
def update_product(self, product_id, data):
return self.put(f'products/{product_id}', data)
def create_product(self, data):
return self.post('products', data)
# --- Attribute endpoints ---
def get_product_attributes(self):
return self.get('products/attributes', params={'per_page': 100})
def create_product_attribute(self, data):
return self.post('products/attributes', data)
def get_attribute_terms(self, attribute_id, page=1, per_page=100):
return self.get(
f'products/attributes/{attribute_id}/terms',
params={'page': page, 'per_page': per_page},
)
def create_attribute_term(self, attribute_id, data):
return self.post(f'products/attributes/{attribute_id}/terms', data)
# --- Variation endpoints ---
def create_product_variation(self, product_id, data):
return self.post(f'products/{product_id}/variations', data)
def update_product_variation(self, product_id, variation_id, data):
return self.put(f'products/{product_id}/variations/{variation_id}', data)
def delete_product_variation(self, product_id, variation_id):
return self.delete(f'products/{product_id}/variations/{variation_id}')
def batch_create_variations(self, product_id, variations_data):
"""Create multiple variations at once using WC batch endpoint."""
return self.post(
f'products/{product_id}/variations/batch',
{'create': variations_data},
)
# --- Order endpoints ---
def get_orders(self, page=1, per_page=100, **kwargs):
params = {'page': page, 'per_page': per_page, **kwargs}
return self.get('orders', params=params)
def get_order(self, order_id):
return self.get(f'orders/{order_id}')
def update_order(self, order_id, data):
return self.put(f'orders/{order_id}', data)
# --- Customer endpoints ---
def get_customers(self, page=1, per_page=100, **kwargs):
params = {'page': page, 'per_page': per_page, **kwargs}
return self.get('customers', params=params)
def get_customer(self, customer_id):
return self.get(f'customers/{customer_id}')
def create_customer(self, data):
return self.post('customers', data)
def update_customer(self, customer_id, data):
return self.put(f'customers/{customer_id}', data)
# --- Webhook endpoints ---
def create_webhook(self, data):
return self.post('webhooks', data)
def get_webhooks(self):
return self.get('webhooks', params={'per_page': 100})
def delete_webhook(self, webhook_id):
return self.delete(f'webhooks/{webhook_id}')
# --- Tax endpoints ---
def get_tax_classes(self):
return self.get('taxes/classes')
# --- Utility ---
def test_connection(self):
try:
result = self.get('system_status')
wc_version = result.get('environment', {}).get('version', 'unknown')
return True, wc_version
except Exception as exc:
return False, str(exc)
@staticmethod
def verify_webhook_signature(payload, signature, secret):
"""Verify a WooCommerce webhook HMAC-SHA256 signature."""
if isinstance(payload, str):
payload = payload.encode('utf-8')
if isinstance(secret, str):
secret = secret.encode('utf-8')
computed = base64.b64encode(
hmac.new(secret, payload, hashlib.sha256).digest()
).decode('utf-8')
return hmac.compare_digest(computed, signature)