Files
Odoo-Modules/fusion-woo-odoo/fusion_woocommerce/controllers/api.py
gsinghpal 8c01deb2e3 fix: properly decode base64 image data and detect MIME type
Image endpoint was returning base64 text instead of decoded binary.
Now properly decodes base64 from Odoo Binary field and detects actual
content type from magic bytes (JPEG, PNG, GIF, WebP).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 21:57:14 -04:00

345 lines
12 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
import base64
import logging
from odoo import http
from odoo.exceptions import AccessDenied
from odoo.http import request, Response
_logger = logging.getLogger(__name__)
class WooApiController(http.Controller):
"""REST endpoints consumed by the WooCommerce WordPress plugin."""
@http.route('/woo/image/<int:line_id>/<string:filename>',
type='http', auth='none', csrf=False, methods=['GET'])
def serve_variant_image(self, line_id, filename, **kw):
"""Serve a variant image from the transient wizard line.
Used by WC to download images during variant push."""
try:
line = request.env['woo.variant.push.line'].sudo().browse(line_id)
if not line.exists() or not line.image:
_logger.warning("Image endpoint: line %d not found or no image", line_id)
return request.not_found()
img_data = line.image
# Odoo Binary fields always return base64 string
if isinstance(img_data, (str, bytes)):
if isinstance(img_data, bytes):
img_data = img_data.decode('utf-8')
img_data = base64.b64decode(img_data)
elif isinstance(img_data, memoryview):
# Raw bytea from DB — still base64 encoded by ORM
raw = bytes(img_data)
try:
img_data = base64.b64decode(raw)
except Exception:
img_data = raw
# Detect content type from magic bytes
content_type = 'image/png'
if img_data[:2] == b'\xff\xd8':
content_type = 'image/jpeg'
elif img_data[:4] == b'\x89PNG':
content_type = 'image/png'
elif img_data[:4] == b'GIF8':
content_type = 'image/gif'
elif img_data[:4] == b'RIFF':
content_type = 'image/webp'
_logger.info("Serving image for line %d: %d bytes, %s", line_id, len(img_data), content_type)
# Set extension-appropriate filename
ext = content_type.split('/')[-1]
if ext == 'jpeg':
ext = 'jpg'
return Response(
img_data,
content_type=content_type,
status=200,
headers={
'Content-Disposition': f'inline; filename="{filename.rsplit(".", 1)[0]}.{ext}"',
'Cache-Control': 'no-cache',
},
)
except Exception as e:
_logger.error("Failed to serve variant image %d: %s", line_id, e)
return request.not_found()
def _authenticate_instance(self):
"""
Validate Bearer token from Authorization header against woo.instance.odoo_api_key.
Returns the matching woo.instance or raises AccessDenied.
"""
auth_header = request.httprequest.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
raise AccessDenied()
api_key = auth_header[len('Bearer '):]
if not api_key:
raise AccessDenied()
instance = request.env['woo.instance'].sudo().search([
('odoo_api_key', '=', api_key),
], limit=1)
if not instance:
raise AccessDenied()
return instance
def _find_woo_order(self, instance, order_id):
"""Look up a woo.order by WC order ID for a given instance."""
return request.env['woo.order'].sudo().search([
('instance_id', '=', instance.id),
('woo_order_id', '=', int(order_id)),
], limit=1)
# -------------------------------------------------------------------------
# Endpoints
# -------------------------------------------------------------------------
@http.route(
'/woo/api/order/documents',
type='jsonrpc', auth='none', csrf=False, methods=['POST'],
)
def order_documents(self, order_id=None, **kw):
"""
Fetch invoice and delivery PDF URLs for a WooCommerce order.
Expected payload: {"order_id": <woo_order_id>}
Returns: {"invoices": [...], "deliveries": [...]}
"""
try:
instance = self._authenticate_instance()
except AccessDenied:
return {'error': 'Unauthorized', 'code': 401}
if not order_id:
return {'error': 'order_id is required', 'code': 400}
woo_order = self._find_woo_order(instance, order_id)
if not woo_order:
return {'order_id': order_id, 'invoices': [], 'deliveries': []}
invoices = []
if woo_order.invoice_id:
inv = woo_order.invoice_id
invoices.append({
'id': inv.id,
'name': inv.name,
'state': inv.state,
'amount_total': inv.amount_total,
'date': str(inv.invoice_date) if inv.invoice_date else '',
})
deliveries = []
if woo_order.sale_order_id:
pickings = request.env['stock.picking'].sudo().search([
('origin', '=', woo_order.sale_order_id.name),
('picking_type_code', '=', 'outgoing'),
])
for picking in pickings:
deliveries.append({
'id': picking.id,
'name': picking.name,
'state': picking.state,
'tracking_number': picking.woo_tracking_number or '',
'scheduled_date': str(picking.scheduled_date) if picking.scheduled_date else '',
})
return {
'order_id': order_id,
'invoices': invoices,
'deliveries': deliveries,
}
@http.route(
'/woo/api/order/status',
type='jsonrpc', auth='none', csrf=False, methods=['POST'],
)
def order_status(self, order_id=None, **kw):
"""
Fetch order status and timeline data for a WooCommerce order.
Expected payload: {"order_id": <woo_order_id>}
Returns: {"status": ..., "timeline": [...]}
"""
try:
instance = self._authenticate_instance()
except AccessDenied:
return {'error': 'Unauthorized', 'code': 401}
if not order_id:
return {'error': 'order_id is required', 'code': 400}
woo_order = self._find_woo_order(instance, order_id)
if not woo_order:
return {
'order_id': order_id,
'status': None,
'odoo_state': None,
'timeline': [],
}
# Build timeline from tracking messages
timeline = []
if woo_order.sale_order_id:
messages = request.env['mail.message'].sudo().search([
('res_id', '=', woo_order.sale_order_id.id),
('model', '=', 'sale.order'),
], order='create_date asc')
for msg in messages:
timeline.append({
'date': str(msg.create_date),
'type': msg.message_type,
'body': msg.preview or '',
})
# Add shipment events
for shipment in woo_order.shipment_ids:
timeline.append({
'date': str(shipment.shipped_date) if shipment.shipped_date else '',
'type': 'shipment',
'body': f'Shipped via {shipment.carrier_id.name if shipment.carrier_id else "carrier"} — tracking: {shipment.tracking_number or "N/A"}',
})
return {
'order_id': order_id,
'status': woo_order.woo_status,
'odoo_state': woo_order.state,
'odoo_order_ref': woo_order.sale_order_id.name if woo_order.sale_order_id else '',
'timeline': timeline,
}
@http.route(
'/woo/api/order/messages',
type='jsonrpc', auth='none', csrf=False, methods=['POST'],
)
def order_messages(self, order_id=None, **kw):
"""
Fetch customer-visible messages for a WooCommerce order.
Expected payload: {"order_id": <woo_order_id>}
Returns: {"messages": [...]}
"""
try:
instance = self._authenticate_instance()
except AccessDenied:
return {'error': 'Unauthorized', 'code': 401}
if not order_id:
return {'error': 'order_id is required', 'code': 400}
woo_order = self._find_woo_order(instance, order_id)
if not woo_order or not woo_order.sale_order_id:
return {'order_id': order_id, 'messages': []}
# Get customer-visible messages
messages = request.env['mail.message'].sudo().search([
('res_id', '=', woo_order.sale_order_id.id),
('model', '=', 'sale.order'),
('message_type', 'in', ['comment', 'email']),
('subtype_id.internal', '=', False),
], order='create_date asc')
result = []
for msg in messages:
result.append({
'date': str(msg.create_date),
'author': msg.author_id.name if msg.author_id else '',
'body': msg.body or '',
})
return {
'order_id': order_id,
'messages': result,
}
@http.route(
'/woo/api/return/create',
type='jsonrpc', auth='none', csrf=False, methods=['POST'],
)
def return_create(self, order_id=None, reason=None, items=None, **kw):
"""
Submit a return request from the WooCommerce plugin.
Expected payload: {
"order_id": <woo_order_id>,
"reason": "<return reason>",
"items": [{"product_id": ..., "quantity": ..., "reason": ...}, ...]
}
Returns: {"success": True, "return_id": <id>} or {"error": ...}
"""
try:
instance = self._authenticate_instance()
except AccessDenied:
return {'error': 'Unauthorized', 'code': 401}
if not order_id:
return {'error': 'order_id is required', 'code': 400}
woo_order = self._find_woo_order(instance, order_id)
if not woo_order:
return {'error': 'Order not found', 'code': 404}
items = items or []
if not items:
return {'error': 'At least one return item is required', 'code': 400}
# Create woo.return
woo_return = request.env['woo.return'].sudo().create({
'instance_id': instance.id,
'order_id': woo_order.id,
'reason': reason or '',
'company_id': instance.company_id.id,
})
# Create return lines
for item in items:
wc_product_id = item.get('product_id')
quantity = item.get('quantity', 1)
item_reason = item.get('reason', 'other')
# Find mapped Odoo product
product = False
if wc_product_id:
pm = request.env['woo.product.map'].sudo().search([
('instance_id', '=', instance.id),
('woo_product_id', '=', wc_product_id),
('state', '=', 'mapped'),
], limit=1)
if pm:
product = pm.product_id
if not product:
# Try SKU from the item
sku = item.get('sku', '')
if sku:
product = request.env['product.product'].sudo().search([
('default_code', '=', sku),
], limit=1)
if product:
request.env['woo.return.line'].sudo().create({
'return_id': woo_return.id,
'product_id': product.id,
'quantity': quantity,
'reason': item_reason if item_reason in dict(
request.env['woo.return.line']._fields['reason'].selection
) else 'other',
'company_id': instance.company_id.id,
})
instance._log_sync(
'order', 'woo_to_odoo',
woo_order.sale_order_id.name if woo_order.sale_order_id else f'WC#{order_id}',
'success', f'Return request created with {len(items)} item(s)',
)
return {
'success': True,
'return_id': woo_return.id,
'message': 'Return request received.',
}