Files
gsinghpal a2fe1fcbcc changes
2026-04-29 03:35:33 -04:00

681 lines
26 KiB
Python

# Part of Odoo. See LICENSE file for full copyright and licensing details.
import hashlib
import hmac
import json
import logging
import pprint
from odoo import fields, http
from odoo.exceptions import UserError, ValidationError
from odoo.http import request
from odoo.tools import mute_logger
from odoo.addons.fusion_clover import utils as clover_utils
_logger = logging.getLogger(__name__)
def _detect_card_brand(card_number):
"""Detect the card brand from the card number using BIN prefixes."""
num = (card_number or '').replace(' ', '')
if len(num) < 2:
return 'other'
if num[:2] in ('34', '37'):
return 'amex'
if num[0] == '4':
return 'visa'
prefix2 = int(num[:2])
if 51 <= prefix2 <= 55:
return 'mastercard'
if len(num) >= 4:
prefix4 = int(num[:4])
if 2221 <= prefix4 <= 2720:
return 'mastercard'
return 'other'
class CloverController(http.Controller):
_return_url = '/payment/clover/return'
_webhook_url = '/payment/clover/webhook'
_oauth_callback_url = '/payment/clover/oauth/callback'
# === RETURN ROUTE === #
@http.route(_return_url, type='http', methods=['GET'], auth='public')
def clover_return(self, **data):
"""Process the return from a Clover payment flow."""
tx_sudo = request.env['payment.transaction'].sudo()._search_by_reference(
'clover', data,
)
if tx_sudo and tx_sudo.clover_charge_id:
try:
provider = tx_sudo.provider_id.sudo()
charge_data = provider._clover_make_ecom_request(
'GET', f'v1/charges/{tx_sudo.clover_charge_id}',
)
payment_data = {
'reference': tx_sudo.reference,
'clover_charge_id': charge_data.get('id'),
'clover_status': charge_data.get('status', ''),
'source': charge_data.get('source', {}),
}
tx_sudo._process('clover', payment_data)
except ValidationError:
_logger.error(
"Failed to fetch Clover charge %s on return.",
tx_sudo.clover_charge_id,
)
with mute_logger('werkzeug'):
return request.redirect('/payment/status')
# === WEBHOOK ROUTE === #
@http.route(_webhook_url, type='http', methods=['POST'], auth='public', csrf=False)
def clover_webhook(self):
"""Process webhook notifications from Clover.
Verifies the HMAC signature (when an app secret is configured)
before processing. Clover signs webhooks with HMAC-SHA256 over
the raw request body using the app secret as the key, and sends
the signature in the ``X-Clover-Auth-Code`` header (hex-encoded).
Webhooks for the ecom-style providers may also use the legacy
``X-Clover-Signature`` header — both are checked.
Also handles Clover's one-time URL verification challenge:
when a developer clicks "Send Verification Code" in the Clover
dashboard, Clover POSTs ``{"verificationCode": "<uuid>"}`` to
the URL. We log it loudly so the developer can grab it from the
Odoo log instead of fishing through Cloudflare logs (the same
code is also logged by the Nexa dispatcher Worker if used).
"""
raw_body = request.httprequest.data
try:
event = json.loads(raw_body.decode('utf-8'))
except (ValueError, UnicodeDecodeError):
_logger.warning("Received invalid JSON from Clover webhook")
return request.make_json_response({'status': 'error'}, status=400)
# --- Verification challenge ---------------------------------------
# This special POST is sent ONCE when the dev dashboard's "Send
# Verification Code" button is clicked. It is NOT signed (Clover
# has no signature to add yet — the webhook isn't activated),
# so we accept it without HMAC verification.
verification_code = event.get('verificationCode') if isinstance(event, dict) else None
if verification_code:
_logger.warning(
"================================================================\n"
"CLOVER WEBHOOK VERIFICATION CODE (paste into Clover dashboard):\n"
" %s\n"
"================================================================",
verification_code,
)
return request.make_json_response({
'status': 'ok',
'verification': verification_code,
})
if not self._verify_webhook_signature(raw_body):
_logger.warning(
"Clover webhook signature verification FAILED — "
"rejecting payload."
)
return request.make_json_response(
{'status': 'forbidden'}, status=403,
)
_logger.info(
"Clover webhook notification received:\n%s",
pprint.pformat(event),
)
try:
self._dispatch_clover_webhook(event)
except ValidationError:
_logger.exception("Unable to process Clover webhook; acknowledging to avoid retries")
return request.make_json_response({'status': 'ok'})
def _dispatch_clover_webhook(self, event):
"""Route a Clover webhook payload to the appropriate handler.
Clover uses two payload shapes depending on the integration:
1. **Hosted Checkout / Ecommerce style** —
``{"type": "charge.succeeded", "data": {...}}``
2. **Merchant App firehose style** —
``{"appId": "...", "merchants": {"<mId>": [{"objectId": "P:xxx",
"type": "CREATE", "ts": 1234567890}, ...]}}``
The two shapes carry different information density. We handle
format 1 directly and decode format 2 into best-effort polls of
the underlying object (e.g. fetch the payment from Platform API
and synthesise a charge.succeeded equivalent).
"""
if not isinstance(event, dict):
return
# --- Format 1: domain events ------------------------------------
if 'type' in event and 'data' in event:
event_type = event.get('type', '')
data = event.get('data', {}) or {}
if event_type in ('charge.succeeded', 'charge.captured', 'payment.created'):
self._handle_charge_webhook(data, 'succeeded')
elif event_type == 'charge.failed':
self._handle_charge_webhook(data, 'failed')
elif event_type == 'charge.voided':
self._handle_void_webhook(data)
elif event_type in ('refund.created', 'refund.succeeded'):
self._handle_refund_webhook(data)
elif event_type == 'refund.failed':
_logger.warning("Clover refund failed webhook: %s", data.get('id', ''))
else:
_logger.info("Unhandled Clover webhook event type: %s", event_type)
return
# --- Format 2: merchant-app firehose ----------------------------
merchants_block = event.get('merchants') or {}
if not isinstance(merchants_block, dict):
return
for merchant_id, events in merchants_block.items():
if not isinstance(events, list):
continue
for ev in events:
if not isinstance(ev, dict):
continue
object_ref = ev.get('objectId') or ''
# objectId looks like "P:E2DYXYRBT52K1/abcd1234" (Payment),
# "C:<mid>/<id>" (Customer), "O:<mid>/<id>" (Order), etc.
kind = object_ref[:2] if len(object_ref) >= 2 else ''
action = ev.get('type', '')
_logger.info(
"Clover firehose event: merchant=%s objectId=%s action=%s",
merchant_id, object_ref, action,
)
if kind == 'P:' and action in ('CREATE', 'UPDATE'):
# Don't synchronously fetch from the Platform API
# here — we'd block Clover's webhook ack timeout.
# If recovery from missed sync responses becomes a
# real need, schedule a queue_job and return 200 fast.
_logger.debug(
"Payment object change for merchant %s: %s (%s) — "
"no synchronous handler, see _dispatch_clover_webhook",
merchant_id, object_ref, action,
)
def _verify_webhook_signature(self, raw_body):
"""Validate the HMAC signature on a Clover webhook payload.
:param bytes raw_body: The raw request body, exactly as received.
:return: True if the signature is valid OR if no provider has a
secret configured (development/sandbox mode). False if a
secret is configured but the signature does not match.
:rtype: bool
"""
# Find any Clover provider with an app secret configured. If none,
# we silently allow the webhook (sandbox/dev). If at least one has
# a secret, signatures become mandatory.
providers = request.env['payment.provider'].sudo().search([
('code', '=', 'clover'),
('clover_app_secret', '!=', False),
])
if not providers:
return True
sig_header = (
request.httprequest.headers.get('X-Clover-Auth-Code')
or request.httprequest.headers.get('X-Clover-Signature')
or ''
)
if not sig_header:
return False
sig_header_clean = sig_header.lower().strip()
if sig_header_clean.startswith('sha256='):
sig_header_clean = sig_header_clean[len('sha256='):]
for provider in providers:
secret = provider.clover_app_secret
if not secret:
continue
expected = hmac.new(
secret.encode('utf-8'),
raw_body,
hashlib.sha256,
).hexdigest().lower()
if hmac.compare_digest(expected, sig_header_clean):
return True
return False
def _handle_charge_webhook(self, data, status):
"""Process a charge-related webhook event."""
charge_id = data.get('id', '')
if not charge_id:
return
payment_data = {
'clover_charge_id': charge_id,
'clover_status': status,
'source': data.get('source', {}),
}
# Try to find by metadata reference first
metadata = data.get('metadata', {})
reference = metadata.get('odoo_reference', '')
if reference:
payment_data['reference'] = reference
tx_sudo = request.env['payment.transaction'].sudo()._search_by_reference(
'clover', payment_data,
)
if tx_sudo:
tx_sudo._process('clover', payment_data)
def _handle_void_webhook(self, data):
"""Process a void webhook event."""
charge_id = data.get('id', '')
if not charge_id:
return
tx_sudo = request.env['payment.transaction'].sudo().search([
('clover_charge_id', '=', charge_id),
('provider_code', '=', 'clover'),
('state', '=', 'done'),
], limit=1)
if not tx_sudo:
return
if not tx_sudo.clover_voided:
tx_sudo.sudo().write({
'state': 'cancel',
'clover_voided': True,
'clover_void_date': fields.Datetime.now(),
})
_logger.info("Clover void webhook: voided transaction %s", tx_sudo.reference)
def _handle_refund_webhook(self, data):
"""Process a refund webhook event."""
refund_id = data.get('id', '')
charge_id = data.get('charge', '')
if not charge_id:
return
source_tx = request.env['payment.transaction'].sudo().search([
('clover_charge_id', '=', charge_id),
('provider_code', '=', 'clover'),
('state', '=', 'done'),
], limit=1)
if not source_tx:
return
existing_refund = source_tx.child_transaction_ids.filtered(
lambda t: t.provider_reference == refund_id
)
if existing_refund:
return
refund_amount_minor = data.get('amount', 0)
refund_amount = clover_utils.parse_clover_amount(
refund_amount_minor, source_tx.currency_id,
)
refund_tx = source_tx._create_child_transaction(
refund_amount, is_refund=True,
)
payment_data = {
'reference': refund_tx.reference,
'clover_charge_id': charge_id,
'clover_refund_id': refund_id,
'clover_status': 'succeeded',
}
refund_tx._process('clover', payment_data)
# === OAUTH CALLBACK ROUTE === #
@http.route(_oauth_callback_url, type='http', methods=['GET'], auth='user')
def clover_oauth_callback(self, **data):
"""Handle the OAuth2 authorization callback from Clover.
After the merchant authorises the Nexa app, Clover redirects to
the Nexa OAuth dispatcher (https://oauth.nexasystems.ca/clover/
callback), which verifies the signed state and 302-redirects
here with the original code/merchant_id/state query params.
Note: the dispatcher already verified the HMAC signature on
``state`` before forwarding. We re-verify here as defence in
depth — an attacker who tricks the user into hitting this
callback directly (skipping the dispatcher) must still know the
dispatcher secret to forge a valid state.
"""
code = data.get('code', '')
merchant_id = data.get('merchant_id', '')
state = data.get('state', '')
if not code:
_logger.warning("Clover OAuth callback missing authorization code")
return request.redirect('/odoo/settings')
# Locate the Clover provider record. There's normally one per
# company; we pick the most recently configured.
provider = request.env['payment.provider'].sudo().search([
('code', '=', 'clover'),
], order='id desc', limit=1)
if not provider:
_logger.warning("Clover OAuth callback but no Clover provider exists.")
return request.redirect('/odoo/settings')
# Defence-in-depth: verify the state HMAC ourselves.
if state and not self._verify_dispatcher_state(state, provider):
_logger.error("Clover OAuth callback: invalid HMAC on state, refusing to exchange code.")
return request.redirect('/odoo/settings')
try:
provider._clover_exchange_oauth_code(code)
except (ValidationError, UserError) as e:
_logger.exception("Clover OAuth code exchange failed: %s", e)
return request.redirect('/odoo/settings')
if merchant_id and not provider.clover_merchant_id:
provider.sudo().write({'clover_merchant_id': merchant_id})
_logger.info(
"Clover OAuth: linked merchant %s to provider id=%s",
merchant_id or '(unknown)', provider.id,
)
return request.redirect('/odoo/settings')
def _verify_dispatcher_state(self, state, provider):
"""Recompute the HMAC on the dispatcher state and constant-time
compare. Skips the iat freshness check (the dispatcher already
did that) but enforces the signature."""
secret = provider._clover_dispatcher_secret()
if not secret:
# No secret configured -> dev/local mode where state is just
# decorative. Accept whatever the dispatcher already vetted.
return True
try:
payload_b64u, sig_b64u = state.rsplit('.', 1)
except ValueError:
return False
expected = hmac.new(
secret.encode('utf-8'),
payload_b64u.encode('ascii'),
hashlib.sha256,
).digest()
try:
received = self._b64u_decode(sig_b64u)
except Exception:
return False
return hmac.compare_digest(expected, received)
@staticmethod
def _b64u_decode(s):
"""Decode a base64url string with optional padding."""
import base64
padded = s + '=' * (-len(s) % 4)
return base64.urlsafe_b64decode(padded.encode('ascii'))
# === SURCHARGE HELPER === #
def _apply_portal_surcharge(self, tx_sudo, card_type):
"""Apply credit card surcharge to the linked invoice if enabled."""
ICP = request.env['ir.config_parameter'].sudo()
if ICP.get_param('fusion_clover.surcharge_enabled', 'False') != 'True':
return 0.0
if not card_type:
card_type = 'other'
rate_key = {
'visa': 'fusion_clover.surcharge_visa_rate',
'mastercard': 'fusion_clover.surcharge_mastercard_rate',
'amex': 'fusion_clover.surcharge_amex_rate',
'debit': 'fusion_clover.surcharge_debit_rate',
}.get(card_type, 'fusion_clover.surcharge_other_rate')
rate = float(ICP.get_param(rate_key, '0') or 0)
if rate <= 0:
return 0.0
invoices = tx_sudo.invoice_ids
if not invoices:
base_amount = tx_sudo.amount
else:
base_amount = sum(invoices.mapped('amount_residual'))
fee_amount = round(base_amount * rate / 100.0, 2)
if fee_amount <= 0:
return 0.0
product_id = int(ICP.get_param('fusion_clover.surcharge_product_id', '0') or 0)
product = request.env['product.product'].sudo().browse(product_id).exists()
if not product:
product = request.env.ref(
'fusion_clover.product_cc_processing_fee', raise_if_not_found=False,
)
if not product:
_logger.warning("Surcharge product not configured; skipping surcharge")
return 0.0
for invoice in invoices.sudo():
was_posted = invoice.state == 'posted'
if was_posted:
invoice.button_draft()
description = "Credit Card Processing Fee (%.2f%% surcharge)" % rate
invoice.write({
'invoice_line_ids': [(0, 0, {
'product_id': product.id,
'name': description,
'quantity': 1,
'price_unit': fee_amount,
'tax_ids': [(5, 0, 0)],
})],
})
if was_posted:
invoice.action_post()
new_amount = tx_sudo.amount + fee_amount
tx_sudo.write({'amount': new_amount})
return fee_amount
# === TERMINAL ROUTES === #
@http.route('/payment/clover/terminals', type='jsonrpc', auth='public')
def clover_list_terminals(self, provider_id=None, **kwargs):
"""Return a list of active terminals for the given Clover provider."""
if not provider_id:
return []
terminals = request.env['clover.terminal'].sudo().search([
('provider_id', '=', int(provider_id)),
('active', '=', True),
])
return [
{
'id': t.id,
'name': t.name,
'serial': t.serial_number,
'status': t.status or 'unknown',
'model': t.model_name or '',
}
for t in terminals
]
@http.route('/payment/clover/send_to_terminal', type='jsonrpc', auth='public')
def clover_send_to_terminal(self, reference=None, terminal_id=None,
card_type=None, **kwargs):
"""Send a payment request to a Clover terminal via Cloud Pay Display."""
if not reference or not terminal_id:
return {'error': 'Missing reference or terminal ID.'}
tx_sudo = request.env['payment.transaction'].sudo().search([
('reference', '=', reference),
('provider_code', '=', 'clover'),
], limit=1)
if not tx_sudo:
return {'error': 'Transaction not found.'}
terminal = request.env['clover.terminal'].sudo().browse(int(terminal_id))
if not terminal.exists():
return {'error': 'Terminal not found.'}
try:
if card_type:
self._apply_portal_surcharge(tx_sudo, card_type)
provider = tx_sudo.provider_id.sudo()
capture = not provider.capture_manually
result = terminal.action_send_payment(
amount=tx_sudo.amount,
currency=tx_sudo.currency_id,
reference=reference,
capture=capture,
)
return {'success': True}
except (ValidationError, UserError) as e:
return {'error': str(e)}
@http.route('/payment/clover/terminal_status', type='jsonrpc', auth='public')
def clover_terminal_status(self, reference=None, terminal_id=None, **kwargs):
"""Poll for the status of a terminal payment."""
if not reference or not terminal_id:
return {'status': 'error', 'message': 'Missing reference or terminal ID.'}
terminal = request.env['clover.terminal'].sudo().browse(int(terminal_id))
if not terminal.exists():
return {'status': 'error', 'message': 'Terminal not found.'}
result = terminal.action_check_payment_status(reference)
status = result.get('status', 'pending')
# If payment completed, process the transaction
if status in ('CLOSED', 'AUTH', 'AUTHORIZED', 'CAPTURED'):
tx_sudo = request.env['payment.transaction'].sudo().search([
('reference', '=', reference),
('provider_code', '=', 'clover'),
], limit=1)
if tx_sudo and tx_sudo.state == 'draft':
payment_id = result.get('payment_id', '')
card_txn = result.get('card_transaction', {})
tx_sudo.write({
'clover_charge_id': payment_id or tx_sudo.clover_charge_id,
'provider_reference': payment_id or tx_sudo.provider_reference,
})
payment_data = {
'reference': reference,
'clover_charge_id': payment_id,
'clover_status': 'succeeded',
'source': {
'brand': card_txn.get('cardType', ''),
'last4': card_txn.get('last4', ''),
},
}
tx_sudo._process('clover', payment_data)
return result
@http.route('/payment/clover/terminal/callback', type='http',
methods=['POST'], auth='public', csrf=False)
def clover_terminal_callback(self, **data):
"""Handle callback from terminal payment completion (if configured)."""
_logger.info("Clover terminal callback received: %s", data)
return request.make_json_response({'status': 'ok'})
# === JSON-RPC ROUTES (called from frontend JS) === #
@http.route('/payment/clover/process_card', type='jsonrpc', auth='public')
def clover_process_card(self, reference=None, card_token=None,
card_type=None, save_token=False, **kwargs):
"""Process a card payment through the Clover Ecommerce API.
The frontend MUST tokenize the card client-side using Clover.js
(https://docs.clover.com/docs/web-sdk) and send only the
resulting ``clv_xxx`` token here. Raw PAN must never reach Odoo
(PCI scope).
:param str reference: The Odoo payment.transaction reference.
:param str card_token: The Clover.js source token (``clv_xxx``).
:param str card_type: Optional detected card brand for surcharge.
:param bool save_token: Whether to persist the token for future
charges (card-on-file).
:return: Dict with success status or error message.
:rtype: dict
"""
if not reference:
return {'error': 'Missing payment reference.'}
tx_sudo = request.env['payment.transaction'].sudo().search([
('reference', '=', reference),
('provider_code', '=', 'clover'),
], limit=1)
if not tx_sudo:
return {'error': 'Transaction not found.'}
if not card_token or not isinstance(card_token, str) \
or not card_token.startswith('clv_'):
return {
'error': 'Missing or invalid Clover token. '
'The card must be tokenized via Clover.js before '
'submission.',
}
try:
if card_type:
self._apply_portal_surcharge(tx_sudo, card_type)
provider = tx_sudo.provider_id.sudo()
capture = not provider.capture_manually
result = provider._clover_create_charge(
source_token=card_token,
amount=tx_sudo.amount,
currency=tx_sudo.currency_id,
capture=capture,
description=reference,
ecomind='ecom',
receipt_email=tx_sudo.partner_id.email or '',
metadata={'odoo_reference': reference},
)
charge_id = result.get('id', '')
status = result.get('status', '')
tx_sudo.write({
'clover_charge_id': charge_id,
'provider_reference': charge_id,
})
payment_data = {
'reference': reference,
'clover_charge_id': charge_id,
'clover_status': status,
'source': result.get('source', {}),
}
if save_token:
# Mark the transaction so _extract_token_values fires on
# _apply_updates.
tx_sudo.tokenize = True
tx_sudo._process('clover', payment_data)
return {'success': True, 'status': status}
except ValidationError as e:
return {'error': str(e)}
except Exception as e:
_logger.exception("Card payment processing failed: %s", e)
return {'error': 'Payment processing failed. Please try again.'}