Files
Odoo-Modules/fusion_shipping/models/delivery_carrier.py
Nexa Admin 431052920e feat: separate fusion field service and LTC into standalone modules, update core modules
- fusion_claims: separated field service logic, updated controllers/views
- fusion_tasks: updated task views and map integration
- fusion_authorizer_portal: added page 11 signing, schedule booking, migrations
- fusion_shipping: new standalone shipping module (Canada Post, FedEx, DHL, Purolator)
- fusion_ltc_management: new standalone LTC management module
2026-03-11 16:19:52 +00:00

3238 lines
169 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
# Unified Delivery Carrier for all shipping integrations
# Part of Fusion Shipping. See LICENSE file for full copyright and licensing details.
import string
import random
import base64
import logging
import time
import json
from json import JSONDecodeError
import pytz
from datetime import timedelta
from requests import request as http_request
import xml.etree.ElementTree as etree
from markupsafe import Markup
from odoo import api, models, fields, _, tools
from odoo.exceptions import UserError, ValidationError, RedirectWarning
from odoo.tools import pdf, float_repr
from odoo.tools.float_utils import float_round, json_float_round
from odoo.tools.misc import groupby
from odoo.tools.safe_eval import const_eval
# Canada Post
from odoo.addons.fusion_shipping.api.canada_post.response import Response
# UPS SOAP
from odoo.addons.fusion_shipping.api.ups.request import UPSRequest
# UPS REST
from odoo.addons.fusion_shipping.api.ups_rest.request import UPSRequest as UPSRestRequest
# FedEx SOAP
from odoo.addons.fusion_shipping.api.fedex.request import FedexRequest, _convert_curr_iso_fdx, _convert_curr_fdx_iso
# FedEx REST
from odoo.addons.fusion_shipping.api.fedex_rest.request import FedexRequest as FedexRestRequest
# DHL SOAP
from odoo.addons.fusion_shipping.api.dhl.request import DHLProvider
# DHL REST
from odoo.addons.fusion_shipping.api.dhl_rest.request import DHLProvider as DHLRestProvider
try:
from odoo.tools.zeep.helpers import serialize_object
except ImportError:
serialize_object = None
_logger = logging.getLogger(__name__)
# ── FedEx constants ──────────────────────────────────────────────────────────
FEDEX_STOCK_TYPE = [
('PAPER_4X6', 'PAPER_4X6'),
('PAPER_4X6.75', 'PAPER_4X6.75'),
('PAPER_4X8', 'PAPER_4X8'),
('PAPER_4X9', 'PAPER_4X9'),
('PAPER_7X4.75', 'PAPER_7X4.75'),
('PAPER_8.5X11_BOTTOM_HALF_LABEL', 'PAPER_8.5X11_BOTTOM_HALF_LABEL'),
('PAPER_8.5X11_TOP_HALF_LABEL', 'PAPER_8.5X11_TOP_HALF_LABEL'),
('PAPER_LETTER', 'PAPER_LETTER'),
('STOCK_4X6', 'STOCK_4X6'),
('STOCK_4X6.75', 'STOCK_4X6.75'),
('STOCK_4X6.75_LEADING_DOC_TAB', 'STOCK_4X6.75_LEADING_DOC_TAB'),
('STOCK_4X6.75_TRAILING_DOC_TAB', 'STOCK_4X6.75_TRAILING_DOC_TAB'),
('STOCK_4X8', 'STOCK_4X8'),
('STOCK_4X9', 'STOCK_4X9'),
('STOCK_4X9_LEADING_DOC_TAB', 'STOCK_4X9_LEADING_DOC_TAB'),
('STOCK_4X9_TRAILING_DOC_TAB', 'STOCK_4X9_TRAILING_DOC_TAB'),
]
FEDEX_SOAP_HELP_EXTRA_DATA = """The extra data in FedEx is organized like the inside of a json file.
This functionality is advanced/technical and should only be used if you know what you are doing.
Example of valid value: ```
"ShipmentDetails": {"Pieces": {"Piece": {"AdditionalInformation": "extra info"}}}
```
With the above example, the AdditionalInformation of each piece will be updated.
More info on https://www.fedex.com/en-us/developer/web-services/process.html#documentation"""
FEDEX_REST_HELP_EXTRA_DATA = """The extra data in FedEx is organized like the inside of a json file.
This functionality is advanced/technical and should only be used if you know what you are doing.
More info on https://www.developer.fedex.com"""
DHL_REST_HELP_EXTRA_DATA = """Extra data to be sent in the request. It should be JSON-formatted.
This functionality is advanced/technical and should only be used if you know what you are doing.
Example of a valid value: ```
"content": {"packages": {"description": "amazing package"}}
```
With the above example, the description of each package will be updated.
For more information, please refer to the DHL API documentation: https://developer.dhl.com/api-reference/dhl-express-mydhl-api.
"""
# ═══════════════════════════════════════════════════════════════════════════════
# UNIFIED DELIVERY CARRIER
# ═══════════════════════════════════════════════════════════════════════════════
class DeliveryCarrier(models.Model):
_inherit = 'delivery.carrier'
# ── delivery_type with ALL carrier options ────────────────────────────────
delivery_type = fields.Selection(selection_add=[
('fusion_canada_post', 'Canada Post'),
('fusion_ups', 'UPS (Legacy)'),
('fusion_ups_rest', 'UPS'),
('fusion_fedex', 'FedEx (Legacy)'),
('fusion_fedex_rest', 'FedEx'),
('fusion_dhl', 'DHL Express (Legacy)'),
('fusion_dhl_rest', 'DHL Express'),
], ondelete={
'fusion_canada_post': lambda recs: recs.write({'delivery_type': 'fixed', 'fixed_price': 0}),
'fusion_ups': lambda recs: recs.write({'delivery_type': 'fixed', 'fixed_price': 0}),
'fusion_ups_rest': lambda recs: recs.write({'delivery_type': 'fixed', 'fixed_price': 0}),
'fusion_fedex': lambda recs: recs.write({'delivery_type': 'fixed', 'fixed_price': 0}),
'fusion_fedex_rest': lambda recs: recs.write({'delivery_type': 'fixed', 'fixed_price': 0}),
'fusion_dhl': lambda recs: recs.write({'delivery_type': 'fixed', 'fixed_price': 0}),
'fusion_dhl_rest': lambda recs: recs.write({'delivery_type': 'fixed', 'fixed_price': 0}),
})
# ══════════════════════════════════════════════════════════════════════════
# CANADA POST FIELDS
# ══════════════════════════════════════════════════════════════════════════
option_code = fields.Selection([
('SO', 'Signature Required'),
('COV', 'Coverage (Insurance)'),
('COD', 'Collect on Delivery'),
('PA18', 'Proof of Age 18+'),
('PA19', 'Proof of Age 19+'),
('HFP', 'Card for Pickup'),
('DNS', 'Do Not Safe Drop'),
('LAD', 'Leave at Door'),
], string="Shipping Option",
help="Optional delivery option applied to all shipments using this carrier.")
service_type = fields.Selection([
('DOM.RP', 'Regular Parcel'),
('DOM.EP', 'Expedited Parcel'),
('DOM.XP', 'Xpresspost'),
('DOM.PC', 'Priority'),
('USA.XP', 'Xpresspost USA'),
('USA.EP', 'Expedited Parcel USA'),
('INT.IP.SURF', 'International Parcel Surface'),
('INT.PW.PARCEL', 'Priority Worldwide Parcel'),
('INT.XP', 'Xpresspost International'),
], string="Default Service",
help="Default Canada Post service. Users can select a different service when adding shipping to a sale order.")
product_packaging_id = fields.Many2one('stock.package.type', string="Default Package Type",
help="Optional default package type. Dimensions will pre-fill in the shipping wizard but can be overridden per shipment.")
reason_for_export = fields.Selection([
('DOC', 'Document'),
('SAM', 'Commercial Sample'),
('REP', 'Repair or Warranty'),
('SOG', 'Sale of Goods'),
('OTH', 'Other'),
], string="Reason for Export", default="SOG",
help="Required for international and cross-border shipments.")
username = fields.Char("Username", copy=False, help="API key from Canada Post Developer Program.")
password = fields.Char("Password", copy=False, help="API password from Canada Post Developer Program.")
customer_number = fields.Char("Customer Number", copy=False, help="Your Canada Post customer number (mailed-by number).")
tracking_link = fields.Char(string="Tracking Link",
help="Base URL for tracking shipments. Leave blank to use the default Canada Post tracking page.", size=256)
fusion_cp_type = fields.Selection(selection=[('commercial', 'Contract Shipping'), ('counter', 'Non-Contract Shipping')],
string="Customer Type", required=False)
fusion_cp_contract_id = fields.Char(string="Contract ID")
fusion_cp_payment_method = fields.Selection([('CreditCard', 'Credit Card'), ('Account', 'Account')],
string="Payment Method", default='CreditCard',
help="How Canada Post charges for shipments on this account.")
fusion_cp_output_format = fields.Selection(
[('4x6', '4x6 Thermal Label'), ('8.5x11', '8.5x11 Full Page')],
string="Label Output Format",
default='4x6',
help="Label format returned by Canada Post. "
"4x6 is for thermal label printers, "
"8.5x11 is the full page format.",
)
fusion_cp_dimension_unit = fields.Selection(
[('cm', 'Centimeters (cm)'), ('in', 'Inches (in)')],
string="Package Dimension Unit",
default='cm',
help="Unit of measurement for package dimensions (length, width, height). "
"Choose the unit you use when entering dimensions on your package types. "
"Dimensions will be automatically converted to centimeters for the Canada Post API.",
)
def _default_uom_in_delive(self):
weight_uom_id = self.env.ref('uom.product_uom_kgm', raise_if_not_found=False)
if not weight_uom_id:
uom_categ_id = self.env.ref('uom.product_uom_categ_kgm').id
weight_uom_id = self.env['uom.uom'].search([('category_id', '=', uom_categ_id), ('factor', '=', 1)], limit=1)
return weight_uom_id
weight_uom_id = fields.Many2one('uom.uom', string='API Weight Unit',
help="Weight unit expected by the Canada Post API (kg). Used internally for converting weights before sending requests.",
default=_default_uom_in_delive)
# ══════════════════════════════════════════════════════════════════════════
# UPS SOAP (Legacy) FIELDS
# ══════════════════════════════════════════════════════════════════════════
def _get_ups_service_types(self):
return [
('03', 'UPS Ground'),
('11', 'UPS Standard'),
('01', 'UPS Next Day'),
('14', 'UPS Next Day AM'),
('13', 'UPS Next Day Air Saver'),
('02', 'UPS 2nd Day'),
('59', 'UPS 2nd Day AM'),
('12', 'UPS 3-day Select'),
('65', 'UPS Saver'),
('07', 'UPS Worldwide Express'),
('08', 'UPS Worldwide Expedited'),
('54', 'UPS Worldwide Express Plus'),
('96', 'UPS Worldwide Express Freight')
]
ups_username = fields.Char(string='UPS Username', groups="base.group_system")
ups_passwd = fields.Char(string='UPS Password', groups="base.group_system")
ups_shipper_number = fields.Char(string='UPS Shipper Number', groups="base.group_system")
ups_access_number = fields.Char(string='UPS Access Key', groups="base.group_system")
ups_default_package_type_id = fields.Many2one('stock.package.type', string='UPS Legacy Package Type')
ups_default_service_type = fields.Selection(_get_ups_service_types, string="UPS Service Type", default='03')
ups_duty_payment = fields.Selection([('SENDER', 'Sender'), ('RECIPIENT', 'Recipient')], required=True, default="RECIPIENT")
ups_package_weight_unit = fields.Selection([('LBS', 'Pounds'), ('KGS', 'Kilograms')], default='LBS')
ups_package_dimension_unit = fields.Selection([('IN', 'Inches'), ('CM', 'Centimeters')], string="Package Size Unit", default='IN')
ups_label_file_type = fields.Selection([('GIF', 'PDF'),
('ZPL', 'ZPL'),
('EPL', 'EPL'),
('SPL', 'SPL')],
string="UPS Label File Type", default='GIF')
ups_bill_my_account = fields.Boolean(string='Bill My Account',
help="If checked, ecommerce users will be prompted their UPS account number\n"
"and delivery fees will be charged on it.")
ups_saturday_delivery = fields.Boolean(string='UPS Saturday Delivery',
help='This value added service will allow you to ship the package on saturday also.')
ups_cod_funds_code = fields.Selection(selection=[
('0', "Check, Cashier's Check or MoneyOrder"),
('8', "Cashier's Check or MoneyOrder"),
], string='COD Funding Option', default='0')
# ══════════════════════════════════════════════════════════════════════════
# UPS REST FIELDS
# ══════════════════════════════════════════════════════════════════════════
ups_client_id = fields.Char(string='UPS Client ID', groups="base.group_system")
ups_client_secret = fields.Char(string='UPS Client Secret', groups="base.group_system")
ups_access_token = fields.Char(string='UPS Access Token', groups="base.group_system")
ups_default_packaging_id = fields.Many2one('stock.package.type', string='UPS Package Type')
ups_require_signature = fields.Boolean("Require Signature")
# ══════════════════════════════════════════════════════════════════════════
# FEDEX SOAP (Legacy) FIELDS
# ══════════════════════════════════════════════════════════════════════════
fedex_developer_key = fields.Char(string="Developer Key", groups="base.group_system")
fedex_developer_password = fields.Char(string="Password", groups="base.group_system")
fedex_account_number = fields.Char(string="FedEx Legacy Account Number", groups="base.group_system")
fedex_meter_number = fields.Char(string="Meter Number", groups="base.group_system")
fedex_droppoff_type = fields.Selection([('BUSINESS_SERVICE_CENTER', 'BUSINESS_SERVICE_CENTER'),
('DROP_BOX', 'DROP_BOX'),
('REGULAR_PICKUP', 'REGULAR_PICKUP'),
('REQUEST_COURIER', 'REQUEST_COURIER'),
('STATION', 'STATION')],
string="Fedex Drop-Off Type",
default='REGULAR_PICKUP')
fedex_default_package_type_id = fields.Many2one('stock.package.type', string="Fedex Package Type")
fedex_service_type = fields.Selection([('INTERNATIONAL_ECONOMY', 'INTERNATIONAL_ECONOMY'),
('INTERNATIONAL_PRIORITY', 'INTERNATIONAL_PRIORITY'),
('FEDEX_INTERNATIONAL_PRIORITY', 'FEDEX_INTERNATIONAL_PRIORITY'),
('FEDEX_INTERNATIONAL_PRIORITY_EXPRESS', 'FEDEX_INTERNATIONAL_PRIORITY_EXPRESS'),
('FEDEX_GROUND', 'FEDEX_GROUND'),
('FEDEX_2_DAY', 'FEDEX_2_DAY'),
('FEDEX_2_DAY_AM', 'FEDEX_2_DAY_AM'),
('FEDEX_3_DAY_FREIGHT', 'FEDEX_3_DAY_FREIGHT'),
('FIRST_OVERNIGHT', 'FIRST_OVERNIGHT'),
('PRIORITY_OVERNIGHT', 'PRIORITY_OVERNIGHT'),
('STANDARD_OVERNIGHT', 'STANDARD_OVERNIGHT'),
('FEDEX_NEXT_DAY_EARLY_MORNING', 'FEDEX_NEXT_DAY_EARLY_MORNING'),
('FEDEX_NEXT_DAY_MID_MORNING', 'FEDEX_NEXT_DAY_MID_MORNING'),
('FEDEX_NEXT_DAY_AFTERNOON', 'FEDEX_NEXT_DAY_AFTERNOON'),
('FEDEX_NEXT_DAY_END_OF_DAY', 'FEDEX_NEXT_DAY_END_OF_DAY'),
('FEDEX_EXPRESS_SAVER', 'FEDEX_EXPRESS_SAVER'),
('FEDEX_REGIONAL_ECONOMY', 'FEDEX_REGIONAL_ECONOMY'),
('FEDEX_FIRST', 'FEDEX_FIRST'),
('FEDEX_PRIORITY_EXPRESS', 'FEDEX_PRIORITY_EXPRESS'),
('FEDEX_PRIORITY', 'FEDEX_PRIORITY'),
('FEDEX_PRIORITY_EXPRESS_FREIGHT', 'FEDEX_PRIORITY_EXPRESS_FREIGHT'),
('FEDEX_PRIORITY_FREIGHT', 'FEDEX_PRIORITY_FREIGHT'),
('FEDEX_ECONOMY_SELECT', 'FEDEX_ECONOMY_SELECT'),
('FEDEX_INTERNATIONAL_CONNECT_PLUS', 'FEDEX_INTERNATIONAL_CONNECT_PLUS'),
],
default='FEDEX_INTERNATIONAL_PRIORITY')
fedex_duty_payment = fields.Selection([('SENDER', 'Sender'), ('RECIPIENT', 'Recipient')], required=True, default="SENDER")
fedex_weight_unit = fields.Selection([('LB', 'LB'), ('KG', 'KG')], default='LB')
fedex_label_stock_type = fields.Selection(FEDEX_STOCK_TYPE, string='Label Type', default='PAPER_LETTER')
fedex_label_file_type = fields.Selection([('PDF', 'PDF'),
('EPL2', 'EPL2'),
('PNG', 'PNG'),
('ZPLII', 'ZPLII')],
default='PDF', string="FEDEX Label File Type")
fedex_document_stock_type = fields.Selection(FEDEX_STOCK_TYPE, string='Commercial Invoice Type', default='PAPER_LETTER')
fedex_saturday_delivery = fields.Boolean(string="FedEx Saturday Delivery",
help="Special service: Saturday Delivery, can be requested on following days. "
"Thursday: FEDEX_2_DAY. Friday: PRIORITY_OVERNIGHT, FIRST_OVERNIGHT, INTERNATIONAL_PRIORITY (To Select Countries)")
fedex_extra_data_rate_request = fields.Text('Extra data for rate (legacy)', help=FEDEX_SOAP_HELP_EXTRA_DATA)
fedex_extra_data_ship_request = fields.Text('Extra data for ship (legacy)', help=FEDEX_SOAP_HELP_EXTRA_DATA)
fedex_extra_data_return_request = fields.Text('Extra data for return (legacy)', help=FEDEX_SOAP_HELP_EXTRA_DATA)
# ══════════════════════════════════════════════════════════════════════════
# FEDEX REST FIELDS
# ══════════════════════════════════════════════════════════════════════════
fedex_rest_developer_key = fields.Char(string="API Key", groups="base.group_system")
fedex_rest_developer_password = fields.Char(string="Secret Key", groups="base.group_system")
fedex_rest_account_number = fields.Char(string="FedEx Account Number", groups="base.group_system")
fedex_rest_access_token = fields.Char(string="FedEx Access Token", groups="base.group_system")
fedex_rest_droppoff_type = fields.Selection([('CONTACT_FEDEX_TO_SCHEDULE', 'Contact FedEx for pickup'),
('DROPOFF_AT_FEDEX_LOCATION', 'Drop off at FedEx location'),
('USE_SCHEDULED_PICKUP', 'Part of regular scheduled pickup')],
string="FedEx Drop-Off Type",
default='USE_SCHEDULED_PICKUP')
fedex_rest_default_package_type_id = fields.Many2one('stock.package.type', string="FedEx Package Type")
fedex_rest_service_type = fields.Selection([
('EUROPE_FIRST_INTERNATIONAL_PRIORITY', 'FedEx Europe First'),
('FEDEX_1_DAY_FREIGHT', 'FedEx 1Day Freight'),
('FEDEX_2_DAY', 'FedEx 2Day'),
('FEDEX_2_DAY_AM', 'FedEx 2Day AM'),
('FEDEX_2_DAY_FREIGHT', 'FedEx 2Day Freight'),
('FEDEX_3_DAY_FREIGHT', 'FedEx 3Day Freight'),
('FEDEX_ECONOMY', 'FedEx Economy'),
('FEDEX_ECONOMY_FREIGHT', 'FedEx Economy Freight'),
('FEDEX_ECONOMY_SELECT', 'FedEx Economy (Only U.K.)'),
('FEDEX_EXPRESS_SAVER', 'FedEx Express Saver'),
('FEDEX_FIRST', 'FedEx First'),
('FEDEX_FIRST_FREIGHT', 'FedEx First Overnight Freight'),
('FEDEX_GROUND', 'FedEx International Ground and FedEx Domestic Ground'),
('FEDEX_INTERNATIONAL_CONNECT_PLUS', 'FedEx International Connect Plus'),
('FEDEX_INTERNATIONAL_DEFERRED_FREIGHT', 'FedEx International Deferred Freight'),
('FEDEX_INTERNATIONAL_PRIORITY', 'FedEx International Priority'),
('FEDEX_INTERNATIONAL_PRIORITY_EXPRESS', 'FedEx International Priority Express'),
('FEDEX_PRIORITY', 'FedEx Priority'),
('FEDEX_PRIORITY_EXPRESS', 'FedEx Priority Express'),
('FEDEX_PRIORITY_EXPRESS_FREIGHT', 'FedEx Priority Express Freight'),
('FEDEX_PRIORITY_FREIGHT', 'FedEx Priority Freight'),
('FEDEX_REGIONAL_ECONOMY', 'FedEx Regional Economy'),
('FEDEX_REGIONAL_ECONOMY_FREIGHT', 'FedEx Regional Economy Freight'),
('FIRST_OVERNIGHT', 'FedEx First Overnight'),
('GROUND_HOME_DELIVERY', 'FedEx Home Delivery'),
('INTERNATIONAL_DISTRIBUTION_FREIGHT', 'FedEx International Priority DirectDistribution Freight'),
('INTERNATIONAL_ECONOMY', 'FedEx International Economy'),
('INTERNATIONAL_ECONOMY_DISTRIBUTION', 'FedEx International Economy DirectDistribution'),
('INTERNATIONAL_ECONOMY_FREIGHT', 'FedEx International Economy Freight'),
('INTERNATIONAL_FIRST', 'FedEx International First'),
('INTERNATIONAL_PRIORITY_DISTRIBUTION', 'FedEx International Priority DirectDistribution'),
('INTERNATIONAL_PRIORITY_FREIGHT', 'FedEx International Priority Freight'),
('INTL_GROUND_DISTRIBUTION', 'International Ground Distribution (IGD)'),
('PRIORITY_OVERNIGHT', 'FedEx Priority Overnight'),
('SAME_DAY', 'FedEx SameDay'),
('SAME_DAY_CITY', 'FedEx SameDay City'),
('SMART_POST', 'FedEx Ground Economy (Formerly known as FedEx SmartPost)'),
('STANDARD_OVERNIGHT', 'FedEx Standard Overnight'),
('TRANSBORDER_DISTRIBUTION', 'Transborder distribution'),
], default='FEDEX_INTERNATIONAL_PRIORITY', string='FedEx Service Type')
fedex_rest_duty_payment = fields.Selection([('SENDER', 'Sender'), ('RECIPIENT', 'Recipient')], required=True, default="SENDER")
fedex_rest_weight_unit = fields.Selection([('LB', 'Pounds'), ('KG', 'Kilograms')], default='LB')
fedex_rest_label_stock_type = fields.Selection(FEDEX_STOCK_TYPE, string='Label Size', default='PAPER_LETTER')
fedex_rest_label_file_type = fields.Selection([('PDF', 'PDF'),
('EPL2', 'EPL2'),
('PNG', 'PNG'),
('ZPLII', 'ZPLII')],
default='PDF', string="Label File Type")
fedex_rest_extra_data_rate_request = fields.Text('Extra data for rate', help=FEDEX_REST_HELP_EXTRA_DATA)
fedex_rest_extra_data_ship_request = fields.Text('Extra data for ship', help=FEDEX_REST_HELP_EXTRA_DATA)
fedex_rest_extra_data_return_request = fields.Text('Extra data for return', help=FEDEX_REST_HELP_EXTRA_DATA)
fedex_rest_override_shipper_vat = fields.Char('Union tax id (EORI/IOSS)',
help='Will be provided to Fedex as primary company tax identifier of type BUSINESS_UNION to put on the generated invoice. '
'Use this when you need to use an IOSS or EORI number in addition to the national tax number. '
'When not provided the regular tax id on the company will be used with type BUSINESS_NATIONAL.')
fedex_rest_email_notifications = fields.Boolean('Email Notifications',
help='When enabled, the customer will receive email notifications from FedEx about this shipment (when an email address is configured on the customer)')
fedex_rest_documentation_type = fields.Selection(
[('none', 'No'), ('invoice', 'Print PDF'), ('etd', 'Electronic Trade Documents')], 'Generate invoice', default="none", required=True,
help='For international shipments (or some intra-country shipments), a commercial invoice might be required for customs clearance. '
'This commercial invoice can be generated by FedEx based on shipment data and returned as PDF for printing and attaching to the shipment or manual electronic submission to FedEx. '
'It can also be submitted directly as ETD information to FedEx upon shipment validation.')
fedex_rest_residential_address = fields.Selection(
[('never', 'Never'), ('always', 'Always'), ('check', 'Check using FedEx Address API')],
'Residential delivery', default='never', required=True,
help='Determines whether to mark the recipient address as residential (to correctly calculate any possible surcharges). '
'Please note: when retrieving this information using the FedEx Address API, we assume that the address is residential unless it is marked explicitly as a BUSINESS address.')
# ══════════════════════════════════════════════════════════════════════════
# DHL SOAP (Legacy) FIELDS
# ══════════════════════════════════════════════════════════════════════════
dhl_SiteID = fields.Char(string="DHL SiteID", groups="base.group_system")
dhl_password = fields.Char(string="DHL Password", groups="base.group_system")
dhl_account_number = fields.Char(string="DHL Account Number", groups="base.group_system")
dhl_package_dimension_unit = fields.Selection([('I', 'Inches'), ('C', 'Centimeters')],
default='C', string='Package Dimension Unit')
dhl_package_weight_unit = fields.Selection([('L', 'Pounds'), ('K', 'Kilograms')],
default='K', string="Package Weight Unit")
dhl_default_package_type_id = fields.Many2one('stock.package.type', string='DHL Legacy Package Type')
dhl_region_code = fields.Selection([('AP', 'Asia Pacific'), ('AM', 'America'), ('EU', 'Europe')],
default='AM', string='Region')
dhl_product_code = fields.Selection([('0', '0 - Logistics Services'),
('1', '1 - Domestic Express 12:00'),
('2', '2 - B2C'),
('3', '3 - B2C'),
('4', '4 - Jetline'),
('5', '5 - Sprintline'),
('6', '6 - Secureline'),
('7', '7 - Express Easy'),
('8', '8 - Express Easy'),
('9', '9 - Europack'),
('A', 'A - Auto Reversals'),
('B', 'B - Break Bulk Express'),
('C', 'C - Medical Express'),
('D', 'D - Express Worldwide'),
('E', 'E - Express 9:00'),
('F', 'F - Freight Worldwide'),
('G', 'G - Domestic Economy Select'),
('H', 'H - Economy Select'),
('I', 'I - Break Bulk Economy'),
('J', 'J - Jumbo Box'),
('K', 'K - Express 9:00'),
('L', 'L - Express 10:30'),
('M', 'M - Express 10:30'),
('N', 'N - Domestic Express'),
('O', 'O - DOM Express 10:30'),
('P', 'P - Express Worldwide'),
('Q', 'Q - Medical Express'),
('R', 'R - GlobalMail Business'),
('S', 'S - Same Day'),
('T', 'T - Express 12:00'),
('U', 'U - Express Worldwide'),
('V', 'V - Europack'),
('W', 'W - Economy Select'),
('X', 'X - Express Envelope'),
('Y', 'Y - Express 12:00'),
('Z', 'Z - Destination Charges'),
],
default='D',
string='DHL Product')
dhl_dutiable = fields.Boolean(string="Dutiable Material", help="Check this if your package is dutiable.")
dhl_duty_payment = fields.Selection([('S', 'Sender'), ('R', 'Recipient')], required=True, default="S")
dhl_label_image_format = fields.Selection([
('EPL2', 'EPL2'),
('PDF', 'PDF'),
('ZPL2', 'ZPL2'),
], string="Label Image Format", default='PDF')
dhl_label_template = fields.Selection([
('8X4_A4_PDF', '8X4_A4_PDF'),
('8X4_thermal', '8X4_thermal'),
('8X4_A4_TC_PDF', '8X4_A4_TC_PDF'),
('6X4_thermal', '6X4_thermal'),
('6X4_A4_PDF', '6X4_A4_PDF'),
('8X4_CI_PDF', '8X4_CI_PDF'),
('8X4_CI_thermal', '8X4_CI_thermal'),
('8X4_RU_A4_PDF', '8X4_RU_A4_PDF'),
('6X4_PDF', '6X4_PDF'),
('8X4_PDF', '8X4_PDF')
], string="Label Template", default='8X4_A4_PDF')
dhl_custom_data_request = fields.Text(
'Custom data for DHL requests,',
help="The custom data in DHL is organized like the inside of a json file. "
"There are 3 possible keys: 'rate', 'ship', 'return', to which you can add your custom data. "
"More info on https://xmlportal.dhl.com/")
# ══════════════════════════════════════════════════════════════════════════
# DHL REST FIELDS
# ══════════════════════════════════════════════════════════════════════════
dhl_api_key = fields.Char(string="DHL API Key", groups="base.group_system")
dhl_api_secret = fields.Char(string="DHL API Secret", groups="base.group_system")
dhl_unit_system = fields.Selection([('imperial', 'Imperial'), ('metric', 'Metric')],
default='metric', string='Unit System')
dhl_extra_data_rate_request = fields.Text('Extra data for rate requests', help=DHL_REST_HELP_EXTRA_DATA)
dhl_extra_data_ship_request = fields.Text('Extra data for ship requests', help=DHL_REST_HELP_EXTRA_DATA)
dhl_extra_data_return_request = fields.Text('Extra data for return requests', help=DHL_REST_HELP_EXTRA_DATA)
# ══════════════════════════════════════════════════════════════════════════
# COMBINED COMPUTE METHODS
# ══════════════════════════════════════════════════════════════════════════
def _compute_can_generate_return(self):
super()._compute_can_generate_return()
for carrier in self:
if carrier.delivery_type in ('fusion_canada_post', 'fusion_ups', 'fusion_ups_rest',
'fusion_fedex', 'fusion_fedex_rest',
'fusion_dhl', 'fusion_dhl_rest'):
carrier.can_generate_return = True
def _compute_supports_shipping_insurance(self):
super()._compute_supports_shipping_insurance()
for carrier in self:
if carrier.delivery_type in ('fusion_ups', 'fusion_ups_rest',
'fusion_fedex', 'fusion_fedex_rest',
'fusion_dhl', 'fusion_dhl_rest'):
carrier.supports_shipping_insurance = True
# ══════════════════════════════════════════════════════════════════════════
# CANADA POST METHODS
# ══════════════════════════════════════════════════════════════════════════
@api.model
def get_fusion_cp_url(self):
if self.prod_environment:
return "https://soa-gw.canadapost.ca/rs/"
else:
return "https://ct.soa-gw.canadapost.ca/rs/"
def validating_address(self, partner_id):
missing = []
if not partner_id.street:
missing.append('Street')
if not partner_id.city:
missing.append('City')
if not partner_id.state_id:
missing.append('State')
if not partner_id.country_id:
missing.append('Country')
if not partner_id.zip:
missing.append('ZIP')
return missing
def check_required_value_to_ship(self, order):
order.ensure_one()
if order.order_line:
for order_line in order.order_line.filtered(
lambda line: not line.is_delivery
and not line.display_type
and line.product_id.type == 'consu'):
if order_line.product_id and not order_line.product_id.weight:
return _("Product '%s' has no weight defined. Please set the weight before shipping.") % order_line.product_id.name
# validating customer address
missing_value = self.validating_address(order.partner_shipping_id)
if missing_value:
fields_str = ", ".join(missing_value)
return (_("Incomplete customer address. Missing field(s): %s") % fields_str)
# validation shipper address
missing_value = self.validating_address(order.warehouse_id.partner_id)
if missing_value:
fields_str = ", ".join(missing_value)
return (_("Incomplete warehouse address. Missing field(s): %s") % fields_str)
return False
def convert_weight(self, from_uom_unit, to_uom_unit, weight):
if not from_uom_unit:
from_uom_unit = self.env[
"product.template"
]._get_weight_uom_id_from_ir_config_parameter()
return from_uom_unit._compute_quantity(weight, to_uom_unit)
def check_max_weight(self, order, shipment_weight):
for order_line in order.order_line:
if order_line.product_id and order_line.product_id.weight > shipment_weight:
return (_("Product weight exceeds the maximum allowed weight for this package type."))
return False
def fusion_canada_post_rate_shipment(self, order):
# check the address validation
check_value = self.check_required_value_to_ship(order)
# check the product weight is appropriate to maximum weight.
if check_value:
return {'success': False, 'price': 0.0, 'error_message': check_value, 'warning_message': False}
shipper_address = order.warehouse_id.partner_id or order.company_id.partner_id
recipient_address = order.partner_shipping_id or order.partner_id
carrier_ctx = self.env.context
weight = carrier_ctx.get("order_weight", 0.0) or order._get_estimated_weight() or 0.0
total_weight = self.convert_weight(
order.company_id and order.company_id.weight_unit_of_measurement_id,
self.weight_uom_id, weight)
declared_value = round(order.amount_untaxed, 2)
declared_currency = order.currency_id.name
# Get package dimensions from context or SO fields or default package type
package_info = self.env.context.get('cp_package_info')
if not package_info and order.fusion_cp_package_length and order.fusion_cp_package_width and order.fusion_cp_package_height:
package_info = {
'length': round(self._fusion_cp_convert_dimension_to_cm(
order.fusion_cp_package_length), 1),
'width': round(self._fusion_cp_convert_dimension_to_cm(
order.fusion_cp_package_width), 1),
'height': round(self._fusion_cp_convert_dimension_to_cm(
order.fusion_cp_package_height), 1),
}
elif self.product_packaging_id:
package_info = self.get_fusion_cp_parcel(
self.product_packaging_id)
price = 0.0
rate_dict = self.fusion_canada_post_get_shipping_rate(shipper_address, recipient_address, total_weight,
picking_bulk_weight=False, packages=False,
declared_value=declared_value, declared_currency=declared_currency,
company_id=order.company_id,
package_info=package_info)
_logger.info("Rate Response Data : %s" % (rate_dict))
if rate_dict.get('messages', False):
return {'success': False, 'price': price, 'error_message': rate_dict['messages']['message']['description'],
'warning_message': False}
if rate_dict.get('price-quotes', False) and rate_dict.get('price-quotes').get('price-quote', False):
cnt = 0
quotes = rate_dict['price-quotes']['price-quote']
if isinstance(quotes, dict):
quotes = [quotes]
for quote in quotes:
if quote['service-code'] == self.service_type:
price = quote['price-details']['due']
cnt += 1
if cnt == 0:
return {'success': False, 'price': 0.0,
'error_message': _("Service '%s' is not available for this route. "
"Please select a different service or adjust the destination.") % (self.service_type),
'warning_message': False}
return {'success': True, 'price': float(price), 'error_message': False, 'warning_message': False}
def fusion_canada_post_get_shipping_rate(self, shipper_address, recipient_address, total_weight,
picking_bulk_weight=False, packages=False,
declared_value=False, declared_currency=False,
company_id=False, package_info=None):
result = {}
if self.fusion_cp_type == 'commercial':
service_root = etree.Element("mailing-scenario")
service_root.attrib["xmlns"] = "http://www.canadapost.ca/ws/ship/rate-v4"
etree.SubElement(service_root, "customer-number").text = self.customer_number
contract = etree.SubElement(service_root, "contract-id")
contract.text = "%s" % (self.fusion_cp_contract_id or '')
else:
service_root = etree.Element("mailing-scenario")
service_root.attrib["xmlns"] = "http://www.canadapost.ca/ws/ship/rate-v3"
etree.SubElement(service_root, "customer-number").text = self.customer_number
parcel = etree.SubElement(service_root, "parcel-characteristics")
etree.SubElement(parcel, "weight").text = "%s" % round(total_weight, 3)
# Include dimensions if provided (improves cubing/volumetric accuracy)
if package_info:
dims = etree.SubElement(parcel, "dimensions")
etree.SubElement(dims, "length").text = str(
package_info.get('length', 1))
etree.SubElement(dims, "width").text = str(
package_info.get('width', 1))
etree.SubElement(dims, "height").text = str(
package_info.get('height', 1))
etree.SubElement(service_root, "origin-postal-code").text = "%s" % (shipper_address.zip.replace(" ", "").upper())
# Determine destination type from recipient country (not service_type)
destination = etree.SubElement(service_root, "destination")
country_code = recipient_address.country_id and recipient_address.country_id.code or ''
if country_code == 'CA':
domestic = etree.SubElement(destination, "domestic")
etree.SubElement(domestic, "postal-code").text = "%s" % (recipient_address.zip.replace(" ", "").upper())
elif country_code == 'US':
united_states = etree.SubElement(destination, "united-states")
etree.SubElement(united_states, "zip-code").text = "%s" % (recipient_address.zip.upper())
else:
international = etree.SubElement(destination, "international")
etree.SubElement(international, "country-code").text = "%s" % (country_code)
url = '%sship/price' % (self.get_fusion_cp_url())
base_data = etree.tostring(service_root).decode('utf-8')
if self.fusion_cp_type == 'commercial':
headers = {"Accept": "application/vnd.cpc.ship.rate-v4+xml", "Content-Type": "application/vnd.cpc.ship.rate-v4+xml"}
else:
headers = {"Accept": "application/vnd.cpc.ship.rate-v3+xml", "Content-Type": "application/vnd.cpc.ship.rate-v3+xml"}
_logger.info("Rate Request Data : %s" % (base_data))
try:
response_body = http_request(method='POST', url=url, data=base_data, headers=headers, auth=(self.username, self.password))
api = Response(response_body)
result = api.dict()
_logger.info("Rate Response Data : %s" % (result))
except Exception as e:
result['error_message'] = e.message
return result
return result
def fusion_canada_post_rate_shipment_all(self, order):
"""Return ALL available Canada Post service rates for the order."""
check_value = self.check_required_value_to_ship(order)
if check_value:
return {'success': False, 'error_message': check_value}
shipper_address = order.warehouse_id.partner_id or order.company_id.partner_id
recipient_address = order.partner_shipping_id or order.partner_id
carrier_ctx = self.env.context
weight = carrier_ctx.get("order_weight", 0.0) or order._get_estimated_weight() or 0.0
total_weight = self.convert_weight(
order.company_id and order.company_id.weight_unit_of_measurement_id,
self.weight_uom_id, weight)
declared_value = round(order.amount_untaxed, 2)
declared_currency = order.currency_id.name
# Get package dimensions from context (set by wizard)
package_info = self.env.context.get('cp_package_info')
rate_dict = self.fusion_canada_post_get_shipping_rate(
shipper_address, recipient_address, total_weight,
picking_bulk_weight=False, packages=False,
declared_value=declared_value, declared_currency=declared_currency,
company_id=order.company_id,
package_info=package_info)
if rate_dict.get('messages', False):
return {
'success': False,
'error_message': rate_dict['messages']['message']['description'],
}
rates = []
if rate_dict.get('price-quotes') and rate_dict['price-quotes'].get('price-quote'):
quotes = rate_dict['price-quotes']['price-quote']
if isinstance(quotes, dict):
quotes = [quotes]
for quote in quotes:
expected_delivery = ''
service_standard = quote.get('service-standard', {})
if isinstance(service_standard, dict):
expected_delivery = service_standard.get(
'expected-delivery-date', '')
rates.append({
'service_code': quote.get('service-code', ''),
'service_name': quote.get('service-name', ''),
'price': float(quote.get('price-details', {}).get('due', 0)),
'expected_delivery': expected_delivery,
})
if not rates:
return {
'success': False,
'error_message': _("No shipping prices available for this destination."),
}
return rates
def get_group_id(self):
size = 15
chars = string.ascii_uppercase
return ''.join(random.choice(chars) for _ in range(size))
def fusion_canada_post_send_shipping(self, pickings):
response = []
for picking in pickings:
# Determine service code (order-level selection or carrier default)
service_code = self.service_type
if picking.sale_id and picking.sale_id.fusion_cp_service_code:
service_code = picking.sale_id.fusion_cp_service_code
sale_order = picking.sale_id
so_packages = (
sale_order.fusion_cp_package_ids.sorted('sequence')
if sale_order else False)
from_unit = (picking.company_id
and picking.company_id
.weight_unit_of_measurement_id or "")
all_tracking = []
all_results = []
total_price = 0.0
if so_packages:
for so_pkg in so_packages:
pkg_weight = round(self.convert_weight(
from_unit, self.weight_uom_id,
so_pkg.weight), 2)
package_info = {
'length': round(
self._fusion_cp_convert_dimension_to_cm(
so_pkg.package_length), 1),
'width': round(
self._fusion_cp_convert_dimension_to_cm(
so_pkg.package_width), 1),
'height': round(
self._fusion_cp_convert_dimension_to_cm(
so_pkg.package_height), 1),
}
svc = so_pkg.service_code or service_code
self._fusion_cp_validate_package(
pkg_weight, package_info)
result = self._fusion_cp_create_single_shipment(
picking, pkg_weight, package_info, svc)
self._fusion_cp_record_shipment(
result, picking, sale_order, svc, pkg_weight,
pkg_name="Package %d" % (
so_packages.ids.index(so_pkg.id) + 1))
all_tracking.append(
result.get('tracking_number', ''))
total_price += result.get('exact_price', 0.0)
all_results.append(result)
else:
# Fall back to physical packages from the picking
packages = self._get_packages_from_picking(
picking, self.product_packaging_id)
for idx, pkg in enumerate(packages):
pkg_weight = round(self.convert_weight(
from_unit, self.weight_uom_id,
pkg.weight), 2)
# Dimensions: legacy SO fields -> package type -> default
package_info = None
if sale_order:
if (sale_order.fusion_cp_package_length
and sale_order.fusion_cp_package_width
and sale_order.fusion_cp_package_height):
package_info = {
'length': round(
self._fusion_cp_convert_dimension_to_cm(
sale_order.fusion_cp_package_length
), 1),
'width': round(
self._fusion_cp_convert_dimension_to_cm(
sale_order.fusion_cp_package_width
), 1),
'height': round(
self._fusion_cp_convert_dimension_to_cm(
sale_order.fusion_cp_package_height
), 1),
}
if not package_info:
pkg_type = (
pkg.package_type
if hasattr(pkg, 'package_type')
and pkg.package_type
else self.product_packaging_id)
if pkg_type:
package_info = self.get_fusion_cp_parcel(
pkg_type)
else:
raise ValidationError(_(
"No package dimensions available for "
"'%s'. Please set dimensions when "
"adding shipping to the sale order.")
% (pkg.name or 'package'))
self._fusion_cp_validate_package(
pkg_weight, package_info)
result = self._fusion_cp_create_single_shipment(
picking, pkg_weight, package_info,
service_code)
self._fusion_cp_record_shipment(
result, picking, sale_order, service_code,
pkg_weight,
pkg_name=(pkg.name
if hasattr(pkg, 'name') else ''))
all_tracking.append(
result.get('tracking_number', ''))
total_price += result.get('exact_price', 0.0)
all_results.append(result)
# Post combined chatter message with ALL labels
self._fusion_cp_post_shipping_documents(
picking, all_results)
shipping_data = {
'exact_price': total_price,
'tracking_number': ','.join(
filter(None, all_tracking)),
}
response.append(shipping_data)
return response
def _fusion_cp_create_single_shipment(
self, picking, pkg_weight, package_info, service_code):
"""Create one CP shipment for a single package."""
destination_address = picking.partner_id
sender_address = (picking.picking_type_id
and picking.picking_type_id.warehouse_id
and picking.picking_type_id.warehouse_id.partner_id)
# --- Build XML ---
if self.fusion_cp_type == 'commercial':
root_node = etree.Element("shipment")
root_node.attrib["xmlns"] = (
"http://www.canadapost.ca/ws/shipment-v8")
etree.SubElement(
root_node, "transmit-shipment").text = "true"
etree.SubElement(
root_node, "provide-receipt-info").text = "true"
else:
root_node = etree.Element("non-contract-shipment")
root_node.attrib["xmlns"] = (
"http://www.canadapost.ca/ws/ncshipment-v4")
etree.SubElement(
root_node, "requested-shipping-point"
).text = "%s" % (sender_address.zip.replace(" ", "").upper() or "")
delivery_spec_node = etree.SubElement(root_node, "delivery-spec")
etree.SubElement(
delivery_spec_node, "service-code").text = "%s" % service_code
# Sender
sender_node = etree.SubElement(delivery_spec_node, "sender")
etree.SubElement(sender_node, "company").text = sender_address.name
etree.SubElement(
sender_node, "contact-phone"
).text = "%s" % (sender_address.phone or "")
address_details = etree.SubElement(sender_node, "address-details")
etree.SubElement(
address_details, "address-line-1"
).text = sender_address.street or ""
etree.SubElement(
address_details, "city").text = sender_address.city or ""
etree.SubElement(
address_details, "prov-state"
).text = "%s" % (sender_address.state_id
and sender_address.state_id.code or "")
if self.fusion_cp_type == 'commercial':
etree.SubElement(
address_details, "country-code"
).text = "%s" % (sender_address.country_id
and sender_address.country_id.code or "")
etree.SubElement(
address_details, "postal-zip-code"
).text = "%s" % (sender_address.zip.replace(" ", "").upper() or "")
# Destination
destination_node = etree.SubElement(
delivery_spec_node, "destination")
etree.SubElement(
destination_node, "name").text = destination_address.name
etree.SubElement(
destination_node, "company").text = destination_address.name
etree.SubElement(
destination_node, "client-voice-number"
).text = destination_address.phone
dest_addr = etree.SubElement(
destination_node, "address-details")
etree.SubElement(
dest_addr, "address-line-1"
).text = destination_address.street or ""
etree.SubElement(
dest_addr, "city").text = destination_address.city or ""
etree.SubElement(
dest_addr, "prov-state"
).text = "%s" % (destination_address.state_id
and destination_address.state_id.code or "")
etree.SubElement(
dest_addr, "country-code"
).text = "%s" % (destination_address.country_id
and destination_address.country_id.code or "")
etree.SubElement(
dest_addr, "postal-zip-code"
).text = "%s" % (destination_address.zip.replace(
" ", "").upper() or "")
# Options
if (self.option_code
and self.service_type not in [
'USA.XP', 'USA.EP', 'INT.IP.SURF',
'INT.PW.PARCEL', 'INT.XP']):
options = etree.SubElement(delivery_spec_node, "options")
option = etree.SubElement(options, "option")
etree.SubElement(
option, "option-code").text = str(self.option_code or "")
if self.option_code in ['PA18', 'PA19']:
option_pa = etree.SubElement(options, "option")
etree.SubElement(option_pa, "option-code").text = 'SO'
if self.option_code in ('COD', 'COV'):
etree.SubElement(
option, "option-amount"
).text = str(picking.sale_id.amount_total or "")
etree.SubElement(
option, "option-qualifier-1").text = "true"
else:
options = etree.SubElement(delivery_spec_node, "options")
option_usa = etree.SubElement(options, "option")
etree.SubElement(option_usa, "option-code").text = 'RASE'
# Parcel characteristics
parcel_chars = etree.SubElement(
delivery_spec_node, "parcel-characteristics")
etree.SubElement(
parcel_chars, "weight").text = "%s" % pkg_weight
dimensions = etree.SubElement(parcel_chars, "dimensions")
etree.SubElement(
dimensions, "length"
).text = "%s" % package_info.get('length', 1)
etree.SubElement(
dimensions, "width"
).text = "%s" % package_info.get('width', 1)
etree.SubElement(
dimensions, "height"
).text = "%s" % package_info.get('height', 1)
# Preferences
preferences = etree.SubElement(delivery_spec_node, "preferences")
etree.SubElement(
preferences, "show-packing-instructions").text = "true"
if self.fusion_cp_type != 'commercial':
etree.SubElement(
preferences, "output-format"
).text = self.fusion_cp_output_format or "4x6"
else:
print_prefs = etree.SubElement(
delivery_spec_node, "print-preferences")
etree.SubElement(
print_prefs, "output-format"
).text = self.fusion_cp_output_format or "4x6"
# Customs
customs = etree.SubElement(delivery_spec_node, "customs")
etree.SubElement(
customs, "currency"
).text = str(picking.sale_id.currency_id.name)
rate = 1.0
if picking.sale_id.currency_id.rate:
rate = round(picking.sale_id.currency_id.rate, 2)
etree.SubElement(
customs, "conversion-from-cad").text = str(rate or '')
etree.SubElement(
customs, "reason-for-export"
).text = "%s" % self.reason_for_export
sku_list = etree.SubElement(customs, "sku-list")
for move_line in picking.move_ids:
item = etree.SubElement(sku_list, "item")
etree.SubElement(
item, "customs-description"
).text = str(move_line.product_id.name)
etree.SubElement(
item, "unit-weight").text = str(move_line.weight)
etree.SubElement(
item, "customs-value-per-unit"
).text = str(move_line.product_id.lst_price)
etree.SubElement(
item, "customs-number-of-units"
).text = str(int(move_line.product_uom_qty))
# Settlement info (commercial only)
if self.fusion_cp_type == 'commercial':
settlement_info = etree.SubElement(
delivery_spec_node, "settlement-info")
etree.SubElement(
settlement_info, "contract-id"
).text = self.fusion_cp_contract_id
etree.SubElement(
settlement_info, "intended-method-of-payment"
).text = self.fusion_cp_payment_method
# --- Make API call ---
api_url = self.get_fusion_cp_url()
if self.fusion_cp_type == 'commercial':
url = "%s%s/%s/shipment" % (
api_url, self.customer_number, self.customer_number)
else:
url = "%s%s/ncshipment" % (api_url, self.customer_number)
base_data = etree.tostring(root_node).decode('utf-8')
if self.fusion_cp_type == 'commercial':
headers = {
"Accept": "application/vnd.cpc.shipment-v8+xml",
"Content-Type": "application/vnd.cpc.shipment-v8+xml",
"Accept-language": "en-CA",
}
else:
headers = {
"Accept": "application/vnd.cpc.ncshipment-v4+xml",
"Content-Type": "application/vnd.cpc.ncshipment-v4+xml",
"Accept-language": "en-CA",
}
_logger.info("Create Shipment Request Data: %s", base_data)
response_body = http_request(
method='POST', url=url, data=base_data,
headers=headers, auth=(self.username, self.password))
if response_body.status_code == 200:
api = Response(response_body)
result = api.dict()
_logger.info("Create Shipment Response Data: %s", result)
else:
error_code = "%s" % response_body.status_code
error_message = response_body.reason
message = error_code + " " + error_message
api = Response(response_body)
result = api.dict()
if result['messages']['message']['description']:
raise ValidationError(
_("Canada Post: %s")
% result['messages']['message']['description'])
else:
raise ValidationError(
_("Canada Post shipment request failed (%s).\n%s")
% (message, response_body.text[:500]))
# --- Extract shipment ID ---
if self.fusion_cp_type == 'commercial':
if (not result['shipment-info']['shipment-id']
or not result['shipment-info']['links']['link']):
raise RedirectWarning(
"ShipmentRequest Fail \n More Information \n %s"
% result)
shipment_id = str(result['shipment-info']['shipment-id'])
else:
if (not result['non-contract-shipment-info']['shipment-id']
or not result['non-contract-shipment-info']
['links']['link']):
raise RedirectWarning(
"ShipmentRequest Fail \n More Information \n %s"
% result)
shipment_id = str(
result['non-contract-shipment-info']['shipment-id'])
# --- Extract artifact URLs ---
label_url = ""
receipt_link_url = ""
commercial_invoice_url = ""
commercial_invoice = False
links = (result['shipment-info']['links']['link']
if self.fusion_cp_type == 'commercial'
else result['non-contract-shipment-info']
['links']['link'])
if isinstance(links, dict):
links = [links]
for link in links:
if link['_rel'] == 'label':
label_url = link['_href']
elif link['_rel'] == 'receipt':
receipt_link_url = link['_href']
elif link['_rel'] == 'commercialInvoice':
commercial_invoice_url = link['_href']
commercial_invoice = True
# --- Download artifacts ---
pdf_headers = {'Accept': 'application/pdf'}
label_attachment = False
receipt_attachment = False
full_label_attachment = False
ci_attachment = False
label_content = b''
receipt_pdf_content = b''
try:
if label_url:
label_resp = http_request(
method='GET', url=label_url, headers=pdf_headers,
auth=(self.username, self.password))
_logger.info(
"Label Response Status: %s", label_resp.status_code)
if label_resp.status_code == 200:
label_content = label_resp.content
label_attachment = self.env['ir.attachment'].create({
'name': 'Label-%s.pdf' % shipment_id,
'type': 'binary',
'datas': base64.b64encode(label_content),
'res_model': 'stock.picking',
'res_id': picking.id,
'mimetype': 'application/pdf',
})
if not receipt_link_url:
if self.fusion_cp_type == 'commercial':
receipt_link_url = (
"%s%s/%s/shipment/%s/receipt"
% (self.get_fusion_cp_url(),
self.customer_number,
self.customer_number, shipment_id))
else:
receipt_link_url = (
"%s%s/ncshipment/%s/receipt"
% (self.get_fusion_cp_url(),
self.customer_number, shipment_id))
if receipt_link_url:
receipt_pdf_resp = http_request(
method='GET', url=receipt_link_url,
headers=pdf_headers,
auth=(self.username, self.password))
if receipt_pdf_resp.status_code == 200:
receipt_pdf_content = receipt_pdf_resp.content
receipt_attachment = (
self.env['ir.attachment'].create({
'name': 'Receipt-%s.pdf' % shipment_id,
'type': 'binary',
'datas': base64.b64encode(
receipt_pdf_content),
'res_model': 'stock.picking',
'res_id': picking.id,
'mimetype': 'application/pdf',
}))
if label_content and receipt_pdf_content:
try:
from odoo.tools import pdf as pdf_tools
full_content = pdf_tools.merge_pdf(
[label_content, receipt_pdf_content])
except Exception:
full_content = label_content
full_label_attachment = (
self.env['ir.attachment'].create({
'name': 'FullLabel-%s.pdf' % shipment_id,
'type': 'binary',
'datas': base64.b64encode(full_content),
'res_model': 'stock.picking',
'res_id': picking.id,
'mimetype': 'application/pdf',
}))
if commercial_invoice and commercial_invoice_url:
ci_resp = http_request(
method='GET', url=commercial_invoice_url,
headers=pdf_headers,
auth=(self.username, self.password))
if ci_resp.status_code == 200:
ci_attachment = self.env['ir.attachment'].create({
'name': 'CommercialInvoice-%s.pdf' % shipment_id,
'type': 'binary',
'datas': base64.b64encode(ci_resp.content),
'res_model': 'stock.picking',
'res_id': picking.id,
'mimetype': 'application/pdf',
})
except Exception as e:
_logger.error(
"Error downloading Canada Post artifacts: %s", e)
picking.message_post(
body=_('Error downloading shipping documents '
'for shipment %s: %s') % (shipment_id, str(e)))
# --- Fetch receipt XML for pricing info ---
if self.fusion_cp_type == 'commercial':
receipt_xml_headers = {
"Accept": "application/vnd.cpc.shipment-v8+xml",
"Accept-language": "en-CA",
}
url_receipt = "%s%s/%s/shipment/%s/receipt" % (
self.get_fusion_cp_url(), self.customer_number,
self.customer_number, shipment_id)
else:
receipt_xml_headers = {
"Accept": "application/vnd.cpc.ncshipment-v4+xml",
"Accept-language": "en-CA",
}
url_receipt = "%s%s/ncshipment/%s/receipt" % (
self.get_fusion_cp_url(), self.customer_number,
shipment_id)
result_receipt = {}
try:
receipt_response = http_request(
method='GET', url=url_receipt,
headers=receipt_xml_headers,
auth=(self.username, self.password))
if receipt_response.status_code == 200:
api_receipt = Response(receipt_response)
result_receipt = api_receipt.dict()
_logger.info("Receipt XML Data: %s", result_receipt)
else:
_logger.warning(
"Receipt fetch: %s %s",
receipt_response.status_code,
receipt_response.reason)
except Exception as e:
_logger.warning("Error fetching receipt XML: %s", e)
# Extract pricing and tracking
extra_price = 0.0
tracking_pin = False
if self.fusion_cp_type == 'commercial':
if result_receipt:
try:
extra_price = float(
result_receipt.get('shipment-receipt', {})
.get('cc-receipt-details', {})
.get('charge-amount', 0.0))
except (TypeError, ValueError):
extra_price = 0.0
tracking_pin = result['shipment-info'].get(
'tracking-pin', False)
else:
if result_receipt:
try:
extra_price = float(
result_receipt
.get('non-contract-shipment-receipt', {})
.get('cc-receipt-details', {})
.get('charge-amount', 0.0))
except (TypeError, ValueError):
extra_price = 0.0
tracking_pin = (
result['non-contract-shipment-info']
.get('tracking-pin', False))
if not tracking_pin:
_logger.info(
"This service does not provide tracking. Service: %s",
service_code)
return {
'tracking_number': tracking_pin or '',
'shipment_id': shipment_id,
'exact_price': float(extra_price) or 0.0,
'label_attachment': label_attachment,
'full_label_attachment': full_label_attachment,
'receipt_attachment': receipt_attachment,
'ci_attachment': ci_attachment,
'label_content': label_content,
'receipt_content': receipt_pdf_content,
}
def _fusion_cp_post_shipping_documents(self, picking, all_results):
"""Post a single chatter message with combined shipping documents."""
from odoo.tools import pdf as pdf_tools
shipment_ids = [
r.get('shipment_id', '') for r in all_results
if r.get('shipment_id')]
# Merge all label PDFs
label_pages = [
r['label_content'] for r in all_results
if r.get('label_content')]
receipt_pages = [
r['receipt_content'] for r in all_results
if r.get('receipt_content')]
attach_ids = []
if label_pages:
try:
if len(label_pages) > 1:
merged = pdf_tools.merge_pdf(label_pages)
else:
merged = label_pages[0]
att = self.env['ir.attachment'].create({
'name': 'Labels-All.pdf',
'type': 'binary',
'datas': base64.b64encode(merged),
'res_model': 'stock.picking',
'res_id': picking.id,
'mimetype': 'application/pdf',
})
attach_ids.append(att.id)
except Exception as e:
_logger.warning(
"Could not merge label PDFs: %s", e)
for r in all_results:
if r.get('label_attachment'):
attach_ids.append(
r['label_attachment'].id)
if receipt_pages:
try:
if len(receipt_pages) > 1:
merged_rcpt = pdf_tools.merge_pdf(receipt_pages)
else:
merged_rcpt = receipt_pages[0]
att_r = self.env['ir.attachment'].create({
'name': 'Receipts-All.pdf',
'type': 'binary',
'datas': base64.b64encode(merged_rcpt),
'res_model': 'stock.picking',
'res_id': picking.id,
'mimetype': 'application/pdf',
})
attach_ids.append(att_r.id)
except Exception as e:
_logger.warning(
"Could not merge receipt PDFs: %s", e)
for r in all_results:
if r.get('receipt_attachment'):
attach_ids.append(
r['receipt_attachment'].id)
# Commercial invoices (one per shipment, not merged)
for r in all_results:
if r.get('ci_attachment'):
attach_ids.append(r['ci_attachment'].id)
# Post to chatter
if len(shipment_ids) == 1:
body = _('Canada Post shipment %s created.') % (
shipment_ids[0])
elif shipment_ids:
body = _('Canada Post -- %d shipments created: %s') % (
len(shipment_ids), ', '.join(shipment_ids))
else:
body = _('Canada Post shipment created.')
if attach_ids:
picking.message_post(
body=body, attachment_ids=attach_ids)
else:
picking.message_post(body=body)
def _fusion_cp_record_shipment(
self, result, picking, sale_order, service_code,
pkg_weight, pkg_name=''):
"""Create a fusion.cp.shipment record from shipment result dict."""
shipment_vals = {
'tracking_number': result.get('tracking_number', ''),
'shipment_id': result.get('shipment_id', ''),
'carrier_id': self.id,
'sale_order_id': (sale_order.id if sale_order else False),
'picking_id': picking.id,
'service_type': service_code,
'shipment_date': fields.Datetime.now(),
'shipping_cost': result.get('exact_price', 0.0),
'weight': pkg_weight,
'status': 'confirmed',
'company_id': picking.company_id.id,
'package_name': pkg_name,
}
# Attach label / receipt / CI documents
if result.get('label_attachment'):
shipment_vals['label_attachment_id'] = (
result['label_attachment'].id)
if result.get('full_label_attachment'):
shipment_vals['full_label_attachment_id'] = (
result['full_label_attachment'].id)
if result.get('receipt_attachment'):
shipment_vals['receipt_attachment_id'] = (
result['receipt_attachment'].id)
if result.get('ci_attachment'):
shipment_vals['commercial_invoice_attachment_id'] = (
result['ci_attachment'].id)
self.env['fusion.cp.shipment'].create(shipment_vals)
def fusion_canada_post_get_tracking_link(self, picking):
link = picking.carrier_id.tracking_link or 'https://www.canadapost.ca/trackweb/en#/resultList?searchFor='
res = '%s %s' % (link, picking.carrier_tracking_ref)
return res
def fusion_canada_post_cancel_shipment(self, picking):
"""Cancel all CP shipments linked to this picking via void API."""
shipments = self.env['fusion.cp.shipment'].search([
('picking_id', '=', picking.id),
('status', '!=', 'cancelled'),
])
for shipment in shipments:
try:
shipment.action_void_shipment()
except ValidationError as e:
_logger.warning(
"Failed to void CP shipment %s: %s",
shipment.name, e)
raise
def _fusion_cp_validate_package(self, weight_kg, package_info):
"""Validate package dimensions and weight against Canada Post limits."""
errors = []
length_cm = float(package_info.get('length', 0))
width_cm = float(package_info.get('width', 0))
height_cm = float(package_info.get('height', 0))
# Sort dimensions to find longest and second longest
dims = sorted([length_cm, width_cm, height_cm], reverse=True)
# Determine user's preferred display unit
dim_unit = self.fusion_cp_dimension_unit or 'cm'
if dim_unit == 'in':
unit_label = 'in'
max_longest = 39.4 # 100 cm
max_second = 29.9 # 76 cm
factor = 1 / 2.54 # cm -> inches
else:
unit_label = 'cm'
max_longest = 100
max_second = 76
factor = 1
if weight_kg > 30:
errors.append(
_("Weight %.2f kg exceeds Canada Post maximum of 30 kg.")
% weight_kg)
if dims[0] > 100:
errors.append(
_("Longest dimension %.1f %s exceeds Canada Post "
"maximum of %.1f %s.") % (
dims[0] * factor, unit_label,
max_longest, unit_label))
if dims[1] > 76:
errors.append(
_("Second longest dimension %.1f %s exceeds Canada Post "
"maximum of %.1f %s.") % (
dims[1] * factor, unit_label,
max_second, unit_label))
if errors:
raise ValidationError('\n'.join(errors))
# Volumetric weight warning (CP cubing)
if length_cm and width_cm and height_cm:
vol_weight = (length_cm * width_cm * height_cm) / 5000
if vol_weight > weight_kg:
_logger.warning(
"CP cubing: volumetric weight %.2f kg > actual %.2f kg "
"for package (%.1f x %.1f x %.1f cm). Canada Post will "
"bill at the higher volumetric weight.",
vol_weight, weight_kg, length_cm, width_cm, height_cm)
def get_fusion_cp_parcel(self, package):
"""Get package dimensions in cm for Canada Post API."""
packaging_length = self.sudo()._fusion_cp_convert_dimension_to_cm(
package.packaging_length)
width = self.sudo()._fusion_cp_convert_dimension_to_cm(
package.width)
height = self.sudo()._fusion_cp_convert_dimension_to_cm(
package.height)
return {
"length": round(packaging_length, 1),
"width": round(width, 1),
"height": round(height, 1),
}
def _fusion_cp_convert_dimension_to_cm(self, dimension):
"""Convert a dimension value to centimeters using the carrier's configured dimension unit."""
if not dimension:
return 0.0
dim_unit = self.fusion_cp_dimension_unit or 'cm'
if dim_unit == 'cm':
return dimension
# inches -> cm
target_uom = self.env.ref("uom.product_uom_cm")
from_uom = self.env.ref("uom.product_uom_inch")
return from_uom._compute_quantity(dimension, target_uom)
# ══════════════════════════════════════════════════════════════════════════
# UPS SOAP (Legacy) METHODS
# ══════════════════════════════════════════════════════════════════════════
@api.onchange('ups_default_service_type')
def on_change_service_type(self):
self.allow_cash_on_delivery = False
self.ups_saturday_delivery = False
def fusion_ups_rate_shipment(self, order):
superself = self.sudo()
srm = UPSRequest(self.log_xml, superself.ups_username, superself.ups_passwd, superself.ups_shipper_number, superself.ups_access_number, self.prod_environment)
ResCurrency = self.env['res.currency']
packages = self._get_packages_from_order(order, self.ups_default_package_type_id)
shipment_info = {
'total_qty': sum(line.product_uom_qty for line in order.order_line.filtered(lambda line: not line.is_delivery and not line.display_type))
}
if self.allow_cash_on_delivery:
cod_info = {
'currency': order.partner_id.country_id.currency_id.name,
'monetary_value': order.amount_total,
'funds_code': self.ups_cod_funds_code,
}
else:
cod_info = None
check_value = srm.check_required_value(order.company_id.partner_id, order.warehouse_id.partner_id, order.partner_shipping_id, order=order)
if check_value:
return {'success': False,
'price': 0.0,
'error_message': check_value,
'warning_message': False}
ups_service_type = self.ups_default_service_type
result = srm.get_shipping_price(
carrier=self,
shipment_info=shipment_info, packages=packages, shipper=order.company_id.partner_id, ship_from=order.warehouse_id.partner_id,
ship_to=order.partner_shipping_id, service_type=ups_service_type,
saturday_delivery=self.ups_saturday_delivery, cod_info=cod_info)
if result.get('error_message'):
return {'success': False,
'price': 0.0,
'error_message': _('Error:\n%s', result['error_message']),
'warning_message': False}
if order.currency_id.name == result['currency_code']:
price = float(result['price'])
else:
quote_currency = ResCurrency.search([('name', '=', result['currency_code'])], limit=1)
price = quote_currency._convert(
float(result['price']), order.currency_id, order.company_id, order.date_order or fields.Date.today())
if self.ups_bill_my_account and order.partner_ups_carrier_account:
# Don't show delivery amount, if ups bill my account option is true
price = 0.0
return {'success': True,
'price': price,
'error_message': False,
'warning_message': False}
def fusion_ups_send_shipping(self, pickings):
res = []
superself = self.sudo()
srm = UPSRequest(self.log_xml, superself.ups_username, superself.ups_passwd, superself.ups_shipper_number, superself.ups_access_number, self.prod_environment)
ResCurrency = self.env['res.currency']
for picking in pickings:
packages = self._get_packages_from_picking(picking, self.ups_default_package_type_id)
terms_of_shipment = picking.company_id.incoterm_id
if picking.sale_id and picking.sale_id.incoterm:
terms_of_shipment = picking.sale_id.incoterm
shipment_info = {
'require_invoice': picking._should_generate_commercial_invoice(),
'invoice_date': fields.Date.today().strftime('%Y%m%d'),
'description': picking.origin or picking.name,
'total_qty': sum(sml.quantity for sml in picking.move_line_ids),
'ilt_monetary_value': '%d' % sum(sml.sale_price for sml in picking.move_line_ids),
'itl_currency_code': self.env.company.currency_id.name,
'phone': picking.partner_id.phone or picking.sale_id.partner_id.phone,
'terms_of_shipment': terms_of_shipment.code if terms_of_shipment else None,
'purchase_order_number': picking.sale_id.name if picking.sale_id else None,
}
if picking.sale_id and picking.sale_id.carrier_id != picking.carrier_id:
ups_service_type = picking.carrier_id.ups_default_service_type or self.ups_default_service_type
else:
ups_service_type = self.ups_default_service_type
ups_carrier_account = False
if self.ups_bill_my_account:
ups_carrier_account = picking.partner_id.with_company(picking.company_id).property_ups_carrier_account
if picking.carrier_id.allow_cash_on_delivery:
cod_info = {
'currency': picking.partner_id.country_id.currency_id.name,
'monetary_value': picking.sale_id.amount_total,
'funds_code': self.ups_cod_funds_code,
}
else:
cod_info = None
check_value = srm.check_required_value(picking.company_id.partner_id, picking.picking_type_id.warehouse_id.partner_id, picking.partner_id, picking=picking)
if check_value:
raise UserError(check_value)
srm.send_shipping(
carrier=picking.carrier_id, shipment_info=shipment_info, packages=packages, shipper=picking.company_id.partner_id, ship_from=picking.picking_type_id.warehouse_id.partner_id,
ship_to=picking.partner_id, service_type=ups_service_type, duty_payment=picking.carrier_id.ups_duty_payment,
label_file_type=self.ups_label_file_type, ups_carrier_account=ups_carrier_account, saturday_delivery=picking.carrier_id.ups_saturday_delivery,
cod_info=cod_info)
result = srm.process_shipment()
if result.get('error_message'):
raise UserError(result['error_message'].__str__())
order = picking.sale_id
company = order.company_id or picking.company_id or self.env.company
currency_order = picking.sale_id.currency_id
if not currency_order:
currency_order = picking.company_id.currency_id
if currency_order.name == result['currency_code']:
price = float(result['price'])
else:
quote_currency = ResCurrency.search([('name', '=', result['currency_code'])], limit=1)
price = quote_currency._convert(
float(result['price']), currency_order, company, order.date_order or fields.Date.today())
package_labels = []
for track_number, label_binary_data in result.get('label_binary_data').items():
package_labels = package_labels + [(track_number, label_binary_data)]
carrier_tracking_ref = "+".join([pl[0] for pl in package_labels])
logmessage = _("Shipment created into UPS") + Markup("<br/><b>") + \
_("Tracking Numbers:") + Markup("</b> ") + carrier_tracking_ref + \
Markup("<br/><b>") + _("Packages:") + Markup("</b> ") + \
','.join([p.name for p in packages if p.name])
if self.ups_label_file_type != 'GIF':
attachments = [('%s-%s.%s' % (self._get_delivery_label_prefix(), pl[0], self.ups_label_file_type), pl[1]) for pl in package_labels]
if self.ups_label_file_type == 'GIF':
attachments = [('%s.pdf' % (self._get_delivery_label_prefix()), pdf.merge_pdf([pl[1] for pl in package_labels]))]
if 'invoice_binary_data' in result:
attachments.append(('%s-CommercialInvoice.pdf' % (self._get_delivery_doc_prefix()), result['invoice_binary_data']))
if picking.sale_id:
for pick in picking.sale_id.picking_ids:
pick.message_post(body=logmessage, attachments=attachments)
else:
picking.message_post(body=logmessage, attachments=attachments)
shipping_data = {
'exact_price': price,
'tracking_number': carrier_tracking_ref}
res = res + [shipping_data]
if self.return_label_on_delivery:
self.fusion_ups_get_return_label(picking)
return res
def fusion_ups_get_return_label(self, picking, tracking_number=None, origin_date=None):
res = []
superself = self.sudo()
srm = UPSRequest(self.log_xml, superself.ups_username, superself.ups_passwd, superself.ups_shipper_number, superself.ups_access_number, self.prod_environment)
ResCurrency = self.env['res.currency']
packages = self._get_packages_from_picking(picking, self.ups_default_package_type_id)
invoice_line_total = 0
for move in picking.move_ids:
invoice_line_total += picking.company_id.currency_id.round(move.product_id.lst_price * move.product_qty)
shipment_info = {
'is_return': True,
'invoice_date': fields.Date.today().strftime('%Y%m%d'),
'description': picking.origin or picking.name,
'total_qty': sum(sml.quantity for sml in picking.move_line_ids),
'ilt_monetary_value': '%d' % invoice_line_total,
'itl_currency_code': self.env.company.currency_id.name,
'phone': picking.partner_id.phone or picking.sale_id.partner_id.phone,
}
if picking.sale_id and picking.sale_id.carrier_id != picking.carrier_id:
ups_service_type = picking.carrier_id.ups_default_service_type or self.ups_default_service_type
else:
ups_service_type = self.ups_default_service_type
ups_carrier_account = False
if self.ups_bill_my_account:
ups_carrier_account = picking.partner_id.with_company(picking.company_id).property_ups_carrier_account
if picking.carrier_id.allow_cash_on_delivery:
cod_info = {
'currency': picking.partner_id.country_id.currency_id.name,
'monetary_value': picking.sale_id.amount_total,
'funds_code': self.ups_cod_funds_code,
}
else:
cod_info = None
check_value = srm.check_required_value(picking.partner_id, picking.partner_id, picking.picking_type_id.warehouse_id.partner_id)
if check_value:
raise UserError(check_value)
srm.send_shipping(
carrier=picking.carrier_id, shipment_info=shipment_info, packages=packages, shipper=picking.company_id.partner_id, ship_from=picking.partner_id,
ship_to=picking.picking_type_id.warehouse_id.partner_id, service_type=ups_service_type, duty_payment='RECIPIENT', label_file_type=self.ups_label_file_type, ups_carrier_account=ups_carrier_account,
saturday_delivery=picking.carrier_id.ups_saturday_delivery, cod_info=cod_info)
srm.return_label()
result = srm.process_shipment()
if result.get('error_message'):
raise UserError(result['error_message'].__str__())
order = picking.sale_id
company = order.company_id or picking.company_id or self.env.company
currency_order = picking.sale_id.currency_id
if not currency_order:
currency_order = picking.company_id.currency_id
if currency_order.name == result['currency_code']:
price = float(result['price'])
else:
quote_currency = ResCurrency.search([('name', '=', result['currency_code'])], limit=1)
price = quote_currency._convert(
float(result['price']), currency_order, company, order.date_order or fields.Date.today())
package_labels = []
for track_number, label_binary_data in result.get('label_binary_data').items():
package_labels = package_labels + [(track_number, label_binary_data)]
carrier_tracking_ref = "+".join([pl[0] for pl in package_labels])
logmessage = _("Return label generated") + Markup("<br/><b>") + \
_("Tracking Numbers:") + Markup("</b> ") + carrier_tracking_ref + \
Markup("<br/><b>") + _("Packages:") + Markup("</b> ") + \
','.join([p.name for p in packages if p.name])
if self.ups_label_file_type != 'GIF':
attachments = [('%s-%s-%s.%s' % (self.get_return_label_prefix(), pl[0], index, self.ups_label_file_type), pl[1]) for index, pl in enumerate(package_labels)]
if self.ups_label_file_type == 'GIF':
attachments = [('%s-%s-%s.%s' % (self.get_return_label_prefix(), package_labels[0][0], 1, 'pdf'), pdf.merge_pdf([pl[1] for pl in package_labels]))]
picking.message_post(body=logmessage, attachments=attachments)
shipping_data = {
'exact_price': price,
'tracking_number': carrier_tracking_ref}
res = res + [shipping_data]
return res
def fusion_ups_get_tracking_link(self, picking):
return 'http://wwwapps.ups.com/WebTracking/track?track=yes&trackNums=%s' % picking.carrier_tracking_ref.replace('+', '%0D%0A')
def fusion_ups_cancel_shipment(self, picking):
tracking_ref = picking.carrier_tracking_ref
if not self.prod_environment:
tracking_ref = "1ZISDE016691676846" # used for testing purpose
superself = self.sudo()
srm = UPSRequest(self.log_xml, superself.ups_username, superself.ups_passwd, superself.ups_shipper_number, superself.ups_access_number, self.prod_environment)
result = srm.cancel_shipment(tracking_ref.partition('+')[0])
if result.get('error_message'):
raise UserError(result['error_message'].__str__())
else:
picking.message_post(body=_(u'Shipment #%s has been cancelled', picking.carrier_tracking_ref))
picking.write({'carrier_tracking_ref': '',
'carrier_price': 0.0})
def _ups_get_default_custom_package_code(self):
return '02'
def _ups_convert_weight(self, weight, unit='KGS'):
weight_uom_id = self.env['product.template']._get_weight_uom_id_from_ir_config_parameter()
if unit == 'KGS':
return weight_uom_id._compute_quantity(weight, self.env.ref('uom.product_uom_kgm'), round=False)
elif unit == 'LBS':
return weight_uom_id._compute_quantity(weight, self.env.ref('uom.product_uom_lb'), round=False)
else:
raise ValueError
# ══════════════════════════════════════════════════════════════════════════
# UPS REST METHODS
# ══════════════════════════════════════════════════════════════════════════
def fusion_ups_rest_rate_shipment(self, order):
ups = UPSRestRequest(self)
packages = self._get_packages_from_order(order, self.ups_default_packaging_id)
if self.allow_cash_on_delivery:
cod_info = {
'currency': order.partner_id.country_id.currency_id.name,
'monetary_value': order.amount_total,
'funds_code': self.ups_cod_funds_code,
}
else:
cod_info = None
check_value = ups._check_required_value(order=order)
if check_value:
return {'success': False,
'price': 0.0,
'error_message': check_value,
'warning_message': False}
total_qty = sum([line.product_uom_qty for line in order.order_line.filtered(lambda line: not line.is_delivery and not line.display_type)])
result = ups._get_shipping_price(order.company_id.partner_id, order.warehouse_id.partner_id,
order.partner_shipping_id, total_qty, packages, self, cod_info=cod_info)
if result.get('error_message'):
return {'success': False,
'price': 0.0,
'error_message': _('Error:\n%s', result['error_message']),
'warning_message': False}
if order.currency_id.name == result['currency_code']:
price = float(result['price'])
else:
quote_currency = self.env['res.currency'].search([('name', '=', result['currency_code'])], limit=1)
price = quote_currency._convert(
float(result['price']), order.currency_id, order.company_id, order.date_order or fields.Date.today())
if self.ups_bill_my_account and order.partner_ups_carrier_account:
price = 0.0
return {'success': True,
'price': price,
'error_message': False,
'warning_message': result.get('alert_message', False)}
def _ups_rest_prepare_shipping_data(self, picking):
packages = self._get_packages_from_picking(picking, self.ups_default_packaging_id)
terms_of_shipment = picking.company_id.incoterm_id
if picking.sale_id and picking.sale_id.incoterm:
terms_of_shipment = picking.sale_id.incoterm
currency_code = self.env.company.currency_id.name
if picking.sale_id:
currency_code = picking.sale_id.currency_id.name
shipment_info = {
'require_invoice': picking._should_generate_commercial_invoice(),
'invoice_date': fields.Date.today().strftime('%Y%m%d'),
'description': picking.origin or picking.name,
'total_qty': sum(sml.quantity for sml in picking.move_line_ids),
'ilt_monetary_value': '%d' % sum(sml.sale_price for sml in picking.move_line_ids),
'itl_currency_code': currency_code,
'phone': picking.partner_id.phone or picking.sale_id.partner_id.phone,
'terms_of_shipment': terms_of_shipment.code if terms_of_shipment else None,
'purchase_order_number': picking.sale_id.name if picking.sale_id else None,
'reference_number': [
{'Code': 'BM', 'Value': picking.name},
],
}
if picking.sale_id and picking.sale_id.carrier_id != picking.carrier_id:
ups_service_type = picking.carrier_id.ups_default_service_type or self.ups_default_service_type
else:
ups_service_type = self.ups_default_service_type
ups_carrier_account = False
if self.ups_bill_my_account:
ups_carrier_account = picking.partner_id.with_company(picking.company_id).property_ups_carrier_account
if picking.carrier_id.allow_cash_on_delivery:
cod_info = {
'currency': picking.partner_id.country_id.currency_id.name,
'monetary_value': picking.sale_id.amount_total,
'funds_code': self.ups_cod_funds_code,
}
else:
cod_info = None
return packages, shipment_info, ups_service_type, ups_carrier_account, cod_info
def fusion_ups_rest_send_shipping(self, pickings):
res = []
ups = UPSRestRequest(self)
for picking in pickings:
packages, shipment_info, ups_service_type, ups_carrier_account, cod_info = self._ups_rest_prepare_shipping_data(picking)
check_value = ups._check_required_value(picking=picking)
if check_value:
raise UserError(check_value)
result = ups._send_shipping(
shipment_info=shipment_info, packages=packages, carrier=self, shipper=picking.company_id.partner_id, ship_from=picking.picking_type_id.warehouse_id.partner_id,
ship_to=picking.partner_id, service_type=ups_service_type, duty_payment=picking.carrier_id.ups_duty_payment,
saturday_delivery=picking.carrier_id.ups_saturday_delivery, cod_info=cod_info,
label_file_type=self.ups_label_file_type, ups_carrier_account=ups_carrier_account)
order = picking.sale_id
company = order.company_id or picking.company_id or self.env.company
currency_order = picking.sale_id.currency_id
if not currency_order:
currency_order = picking.company_id.currency_id
if currency_order.name == result['currency_code']:
price = float(result['price'])
else:
quote_currency = self.env['res.currency'].search([('name', '=', result['currency_code'])], limit=1)
price = quote_currency._convert(
float(result['price']), currency_order, company, order.date_order or fields.Date.today())
package_labels = result.get('label_binary_data', [])
carrier_tracking_ref = "+".join([pl[0] for pl in package_labels])
logmessage = _("Shipment created into UPS<br/>"
"<b>Tracking Numbers:</b> %(tracking_numbers)s<br/>"
"<b>Packages:</b> %(packages)s",
tracking_numbers=carrier_tracking_ref,
packages=[p.name for p in packages if p.name])
if self.ups_label_file_type != 'GIF':
attachments = [('LabelUPS-%s.%s' % (pl[0], self.ups_label_file_type), pl[1]) for pl in package_labels]
else:
attachments = [('LabelUPS.pdf', pdf.merge_pdf([pl[1] for pl in package_labels]))]
if result.get('invoice_binary_data'):
attachments.append(('UPSCommercialInvoice.pdf', result['invoice_binary_data']))
picking.message_post(body=logmessage, attachments=attachments)
shipping_data = {
'exact_price': price,
'tracking_number': carrier_tracking_ref}
res = res + [shipping_data]
if self.return_label_on_delivery:
try:
self.fusion_ups_rest_get_return_label(picking)
except (UserError, ValidationError) as err:
try:
ups._cancel_shipping(result['tracking_ref'])
except ValidationError:
pass
raise UserError(err)
return res
def fusion_ups_rest_get_return_label(self, picking, tracking_number=None, origin_date=None):
res = []
ups = UPSRestRequest(self)
packages, shipment_info, ups_service_type, ups_carrier_account, cod_info = self._ups_rest_prepare_shipping_data(picking)
check_value = ups._check_required_value(picking=picking, is_return=True)
if check_value:
raise UserError(check_value)
result = ups._send_shipping(
shipment_info=shipment_info, packages=packages, carrier=self, shipper=picking.company_id.partner_id, ship_from=picking.partner_id,
ship_to=picking.picking_type_id.warehouse_id.partner_id, service_type=ups_service_type, duty_payment='RECIPIENT', saturday_delivery=picking.carrier_id.ups_saturday_delivery,
cod_info=cod_info, label_file_type=self.ups_label_file_type, ups_carrier_account=ups_carrier_account, is_return=True)
order = picking.sale_id
company = order.company_id or picking.company_id or self.env.company
currency_order = picking.sale_id.currency_id
if not currency_order:
currency_order = picking.company_id.currency_id
if currency_order.name == result['currency_code']:
price = float(result['price'])
else:
quote_currency = self.env['res.currency'].search([('name', '=', result['currency_code'])], limit=1)
price = quote_currency._convert(
float(result['price']), currency_order, company, order.date_order or fields.Date.today())
package_labels = []
for track_number, label_binary_data in result.get('label_binary_data'):
package_labels = package_labels + [(track_number, label_binary_data)]
carrier_tracking_ref = "+".join([pl[0] for pl in package_labels])
logmessage = _("Return label generated<br/>"
"<b>Tracking Numbers:</b> %(tracking_numbers)s<br/>"
"<b>Packages:</b> %(packages)s",
tracking_numbers=carrier_tracking_ref,
packages=[p.name for p in packages if p.name])
if self.ups_label_file_type != 'GIF':
attachments = [('%s-%s-%s.%s' % (self.get_return_label_prefix(), pl[0], index, self.ups_label_file_type), pl[1]) for index, pl in enumerate(package_labels)]
else:
attachments = [('%s-%s-%s.%s' % (self.get_return_label_prefix(), package_labels[0][0], 1, 'pdf'), pdf.merge_pdf([pl[1] for pl in package_labels]))]
picking.message_post(body=logmessage, attachments=attachments)
shipping_data = {
'exact_price': price,
'tracking_number': carrier_tracking_ref}
res = res + [shipping_data]
return res
def fusion_ups_rest_get_tracking_link(self, picking):
return 'http://wwwapps.ups.com/WebTracking/track?track=yes&trackNums=%s' % picking.carrier_tracking_ref
def fusion_ups_rest_cancel_shipment(self, picking):
ups = UPSRestRequest(self)
ups._cancel_shipping(picking.carrier_tracking_ref.partition('+')[0])
picking.message_post(body=_(u'Shipment #%s has been cancelled', picking.carrier_tracking_ref))
picking.write({'carrier_tracking_ref': '',
'carrier_price': 0.0})
def _get_delivery_type(self):
"""Override of delivery to return the ups delivery type for UPS REST."""
res = super()._get_delivery_type()
if self.delivery_type != 'fusion_ups_rest':
return res
return 'ups'
# ══════════════════════════════════════════════════════════════════════════
# FEDEX SOAP (Legacy) METHODS
# ══════════════════════════════════════════════════════════════════════════
@api.onchange('fedex_service_type')
def on_change_fedex_service_type(self):
self.fedex_saturday_delivery = False
def fusion_fedex_rate_shipment(self, order):
is_india = order.partner_shipping_id.country_id.code == 'IN' and order.company_id.partner_id.country_id.code == 'IN'
order_currency = order.currency_id
superself = self.sudo()
# Authentication stuff
srm = FedexRequest(self.log_xml, request_type="rating", prod_environment=self.prod_environment)
srm.web_authentication_detail(superself.fedex_developer_key, superself.fedex_developer_password)
srm.client_detail(superself.fedex_account_number, superself.fedex_meter_number)
# Build basic rating request and set addresses
srm.transaction_detail(order.name)
srm.shipment_request(
self.fedex_droppoff_type,
self.fedex_service_type,
self.fedex_default_package_type_id.shipper_package_code,
self.fedex_weight_unit,
self.fedex_saturday_delivery,
)
srm.set_currency(_convert_curr_iso_fdx(order_currency.name))
srm.set_shipper(order.company_id.partner_id, order.warehouse_id.partner_id)
srm.set_recipient(order.partner_shipping_id)
packages = self._get_packages_from_order(order, self.fedex_default_package_type_id)
for sequence, package in enumerate(packages, 1):
srm.add_package(
self,
package,
_convert_curr_iso_fdx(package.company_id.currency_id.name),
sequence_number=sequence,
mode='rating'
)
weight_value = self._fedex_convert_weight(order._get_estimated_weight(), self.fedex_weight_unit)
srm.set_master_package(weight_value, 1)
# Commodities for customs declaration (international shipping)
if 'INTERNATIONAL' in self.fedex_service_type or self.fedex_service_type == 'FEDEX_REGIONAL_ECONOMY' or is_india:
commodities = self._get_commodities_from_order(order)
for commodity in commodities:
srm.commodities(self, commodity, _convert_curr_iso_fdx(order_currency.name))
total_commodities_amount = sum(c.monetary_value * c.qty for c in commodities)
srm.customs_value(_convert_curr_iso_fdx(order_currency.name), total_commodities_amount, "NON_DOCUMENTS")
srm.duties_payment(order.warehouse_id.partner_id, superself.fedex_account_number, superself.fedex_duty_payment)
# Prepare the request
self._fedex_update_srm(srm, 'rate', order=order)
del srm.ClientDetail['Region']
request = serialize_object(dict(WebAuthenticationDetail=srm.WebAuthenticationDetail,
ClientDetail=srm.ClientDetail,
TransactionDetail=srm.TransactionDetail,
VersionId=srm.VersionId,
RequestedShipment=srm.RequestedShipment))
self._fedex_add_extra_data_to_request(request, 'rate')
response = srm.rate(request)
warnings = response.get('warnings_message')
if warnings:
_logger.info(warnings)
if response.get('errors_message'):
return {'success': False,
'price': 0.0,
'error_message': _('Error:\n%s', response['errors_message']),
'warning_message': False}
price = self._get_request_price(response['price'], order, order_currency)
return {'success': True,
'price': price,
'error_message': False,
'warning_message': _('Warning:\n%s', warnings) if warnings else False}
def fusion_fedex_send_shipping(self, pickings):
res = []
for picking in pickings:
order_currency = picking.sale_id.currency_id or picking.company_id.currency_id
srm = FedexRequest(self.log_xml, request_type="shipping", prod_environment=self.prod_environment)
superself = self.sudo()
srm.web_authentication_detail(superself.fedex_developer_key, superself.fedex_developer_password)
srm.client_detail(superself.fedex_account_number, superself.fedex_meter_number)
srm.transaction_detail(picking.id)
packages = picking.move_line_ids.result_package_id
package_type = packages and packages[0].package_type_id.shipper_package_code or self.fedex_default_package_type_id.shipper_package_code
srm.shipment_request(self.fedex_droppoff_type, self.fedex_service_type, package_type, self.fedex_weight_unit, self.fedex_saturday_delivery)
srm.set_currency(_convert_curr_iso_fdx(order_currency.name))
srm.set_shipper(picking.company_id.partner_id, picking.picking_type_id.warehouse_id.partner_id)
srm.set_recipient(picking.partner_id)
srm.shipping_charges_payment(superself.fedex_account_number)
srm.shipment_label('COMMON2D', self.fedex_label_file_type, self.fedex_label_stock_type, 'TOP_EDGE_OF_TEXT_FIRST', 'SHIPPING_LABEL_FIRST')
order = picking.sale_id
net_weight = self._fedex_convert_weight(picking.shipping_weight, self.fedex_weight_unit)
# Commodities for customs declaration (international shipping)
if 'INTERNATIONAL' in self.fedex_service_type or self.fedex_service_type == 'FEDEX_REGIONAL_ECONOMY' or (picking.partner_id.country_id.code == 'IN' and picking.picking_type_id.warehouse_id.partner_id.country_id.code == 'IN'):
commodities = self._get_commodities_from_stock_move_lines(picking.move_line_ids)
for commodity in commodities:
srm.commodities(self, commodity, _convert_curr_iso_fdx(order_currency.name))
total_commodities_amount = sum(c.monetary_value * c.qty for c in commodities)
srm.customs_value(_convert_curr_iso_fdx(order_currency.name), total_commodities_amount, "NON_DOCUMENTS")
srm.duties_payment(order.warehouse_id.partner_id, superself.fedex_account_number, superself.fedex_duty_payment)
send_etd = superself.env['ir.config_parameter'].get_param("delivery_fedex.send_etd")
srm.commercial_invoice(self.fedex_document_stock_type, send_etd)
package_count = len(picking.move_line_ids.result_package_id) or 1
# For india picking courier is not accepted without this details in label.
po_number = order.display_name or False
dept_number = False
if picking.partner_id.country_id.code == 'IN' and picking.picking_type_id.warehouse_id.partner_id.country_id.code == 'IN':
po_number = 'B2B' if picking.partner_id.commercial_partner_id.is_company else 'B2C'
dept_number = 'BILL D/T: SENDER'
packages = self._get_packages_from_picking(picking, self.fedex_default_package_type_id)
master_tracking_id = False
package_labels = []
carrier_tracking_refs = []
lognote_pickings = picking.sale_id.picking_ids if picking.sale_id else picking
for sequence, package in enumerate(packages, start=1):
srm.add_package(
self,
package,
_convert_curr_iso_fdx(package.company_id.currency_id.name),
sequence_number=sequence,
po_number=po_number,
dept_number=dept_number,
reference=picking.display_name,
)
srm.set_master_package(net_weight, len(packages), master_tracking_id=master_tracking_id)
# Prepare the request
self._fedex_update_srm(srm, 'ship', picking=picking)
request = serialize_object(dict(WebAuthenticationDetail=srm.WebAuthenticationDetail,
ClientDetail=srm.ClientDetail,
TransactionDetail=srm.TransactionDetail,
VersionId=srm.VersionId,
RequestedShipment=srm.RequestedShipment))
self._fedex_add_extra_data_to_request(request, 'ship')
response = srm.process_shipment(request)
warnings = response.get('warnings_message')
if warnings:
_logger.info(warnings)
if response.get('errors_message'):
raise UserError(response['errors_message'])
package_name = package.name or 'package-' + str(sequence)
package_labels.append((package_name, srm.get_label()))
carrier_tracking_refs.append(response['tracking_number'])
# First package
if sequence == 1:
master_tracking_id = response['master_tracking_id']
# Last package
if sequence == package_count:
carrier_price = self._get_request_price(response['price'], order, order_currency)
logmessage = _(
"Shipment created into Fedex %(line_break)s"
"%(bold_start)s Tracking Numbers: %(bold_end)s %(tracking_numbers)s %(line_break)s"
"%(bold_start)s Packages: %(bold_end)s %(packages)s",
tracking_numbers=carrier_tracking_refs,
packages=[pl[0] for pl in package_labels],
line_break=Markup("<br>"),
bold_start=Markup("<b>"),
bold_end=Markup("</b>"),
)
if self.fedex_label_file_type != 'PDF':
attachments = [('%s-%s.%s' % (self._get_delivery_label_prefix(), pl[0], self.fedex_label_file_type), pl[1]) for pl in package_labels]
if self.fedex_label_file_type == 'PDF':
attachments = [('%s.pdf' % (self._get_delivery_label_prefix()), pdf.merge_pdf([pl[1] for pl in package_labels]))]
for pick in lognote_pickings:
pick.message_post(body=logmessage, attachments=attachments)
shipping_data = {'exact_price': carrier_price,
'tracking_number': ','.join(carrier_tracking_refs)}
res = res + [shipping_data]
if self.return_label_on_delivery:
self.get_return_label(picking, tracking_number=response['tracking_number'], origin_date=response['date'])
commercial_invoice = srm.get_document()
if commercial_invoice:
fedex_documents = [('%s.pdf' % self._get_delivery_doc_prefix(), commercial_invoice)]
for pick in lognote_pickings:
pick.message_post(body=_('Fedex Documents'), attachments=fedex_documents)
return res
def fusion_fedex_get_return_label(self, picking, tracking_number=None, origin_date=None):
srm = FedexRequest(self.log_xml, request_type="shipping", prod_environment=self.prod_environment)
superself = self.sudo()
srm.web_authentication_detail(superself.fedex_developer_key, superself.fedex_developer_password)
srm.client_detail(superself.fedex_account_number, superself.fedex_meter_number)
srm.transaction_detail(picking.id)
packages = picking.move_line_ids.result_package_id
package_type = packages and packages[0].package_type_id.shipper_package_code or self.fedex_default_package_type_id.shipper_package_code
srm.shipment_request(self.fedex_droppoff_type, self.fedex_service_type, package_type, self.fedex_weight_unit, self.fedex_saturday_delivery)
srm.set_currency(_convert_curr_iso_fdx(picking.company_id.currency_id.name))
srm.set_shipper(picking.partner_id, picking.partner_id)
srm.set_recipient(picking.company_id.partner_id)
srm.shipping_charges_payment(superself.fedex_account_number)
srm.shipment_label('COMMON2D', self.fedex_label_file_type, self.fedex_label_stock_type, 'TOP_EDGE_OF_TEXT_FIRST', 'SHIPPING_LABEL_FIRST')
if picking.is_return_picking:
net_weight = self._fedex_convert_weight(picking._get_estimated_weight(), self.fedex_weight_unit)
else:
net_weight = self._fedex_convert_weight(picking.shipping_weight, self.fedex_weight_unit)
package_type = packages[:1].package_type_id or picking.carrier_id.fedex_default_package_type_id
order = picking.sale_id
po_number = order.display_name or False
dept_number = False
packages = self._get_packages_from_picking(picking, self.fedex_default_package_type_id)
for pkg in packages:
srm.add_package(self, pkg, _convert_curr_iso_fdx(pkg.company_id.currency_id.name), reference=picking.display_name, po_number=po_number, dept_number=dept_number)
srm.set_master_package(net_weight, 1)
if 'INTERNATIONAL' in self.fedex_service_type or self.fedex_service_type == 'FEDEX_REGIONAL_ECONOMY' or (picking.partner_id.country_id.code == 'IN' and picking.picking_type_id.warehouse_id.partner_id.country_id.code == 'IN'):
order_currency = picking.sale_id.currency_id or picking.company_id.currency_id
commodities = [com for pack in packages for com in pack.commodities]
for commodity in commodities:
srm.commodities(self, commodity, _convert_curr_iso_fdx(order_currency.name))
total_commodities_amount = sum(com.monetary_value * com.qty for com in commodities)
srm.customs_value(_convert_curr_iso_fdx(order_currency.name), total_commodities_amount, "NON_DOCUMENTS")
srm.duties_payment(order.warehouse_id.partner_id, superself.fedex_account_number, superself.fedex_duty_payment)
srm.customs_value(_convert_curr_iso_fdx(order_currency.name), total_commodities_amount, "NON_DOCUMENTS")
# We consider that returns are always paid by the company creating the label
srm.duties_payment(picking.picking_type_id.warehouse_id.partner_id, superself.fedex_account_number, 'SENDER')
srm.return_label(tracking_number, origin_date)
# Prepare the request
self._fedex_update_srm(srm, 'return', picking=picking)
request = serialize_object(dict(WebAuthenticationDetail=srm.WebAuthenticationDetail,
ClientDetail=srm.ClientDetail,
TransactionDetail=srm.TransactionDetail,
VersionId=srm.VersionId,
RequestedShipment=srm.RequestedShipment))
self._fedex_add_extra_data_to_request(request, 'return')
response = srm.process_shipment(request)
if not response.get('errors_message'):
fedex_labels = [('%s-%s-%s.%s' % (self.get_return_label_prefix(), response['tracking_number'], index, self.fedex_label_file_type), label)
for index, label in enumerate(srm._get_labels(self.fedex_label_file_type))]
picking.message_post(body=_('Return Label'), attachments=fedex_labels)
else:
raise UserError(response['errors_message'])
def fusion_fedex_get_tracking_link(self, picking):
return 'https://www.fedex.com/apps/fedextrack/?action=track&trackingnumber=%s' % picking.carrier_tracking_ref
def fusion_fedex_cancel_shipment(self, picking):
request = FedexRequest(self.log_xml, request_type="shipping", prod_environment=self.prod_environment)
superself = self.sudo()
request.web_authentication_detail(superself.fedex_developer_key, superself.fedex_developer_password)
request.client_detail(superself.fedex_account_number, superself.fedex_meter_number)
request.transaction_detail(picking.id)
master_tracking_id = picking.carrier_tracking_ref.split(',')[0]
request.set_deletion_details(master_tracking_id)
serialized_request = serialize_object(dict(WebAuthenticationDetail=request.WebAuthenticationDetail,
ClientDetail=request.ClientDetail,
TransactionDetail=request.TransactionDetail,
VersionId=request.VersionId,
TrackingId=request.TrackingId,
DeletionControl=request.DeletionControl))
result = request.delete_shipment(serialized_request)
warnings = result.get('warnings_message')
if warnings:
_logger.info(warnings)
if result.get('delete_success') and not result.get('errors_message'):
picking.message_post(body=_(u'Shipment #%s has been cancelled', master_tracking_id))
picking.write({'carrier_tracking_ref': '',
'carrier_price': 0.0})
else:
raise UserError(result['errors_message'])
def _get_request_price(self, req_price, order, order_currency=None):
"""Extract price info in target currency, converting if necessary"""
if not order_currency:
order_currency = order.currency_id
company = order.company_id or self.env.user.company_id
fdx_currency = _convert_curr_iso_fdx(order_currency.name)
if fdx_currency in req_price:
return req_price[fdx_currency]
_logger.info("Preferred currency has not been found in FedEx response")
fdx_currency = _convert_curr_iso_fdx(company.currency_id.name)
if fdx_currency in req_price:
return company.currency_id._convert(
req_price[fdx_currency], order_currency, company, order.date_order or fields.Date.today())
currency_codes = list(req_price.keys())
currency_codes += [_convert_curr_fdx_iso(c) for c in currency_codes]
currency_instances = self.env['res.currency'].search([('name', 'in', currency_codes)])
currency_by_name = {c.name: c for c in currency_instances}
for fdx_currency in req_price:
if fdx_currency in currency_by_name:
return currency_by_name[fdx_currency]._convert(
req_price[fdx_currency], order_currency, company, order.date_order or fields.Date.today())
_logger.info("No known currency has not been found in FedEx response")
return 0.0
def _fedex_add_extra_data_to_request(self, request, request_type):
"""Adds the extra data to the request (FedEx SOAP legacy)."""
extra_data_input = {
'rate': self.fedex_extra_data_rate_request,
'ship': self.fedex_extra_data_ship_request,
'return': self.fedex_extra_data_return_request,
}.get(request_type) or ''
try:
extra_data = const_eval('{' + extra_data_input + '}')
except SyntaxError:
raise UserError(_('Invalid syntax for FedEx extra data.'))
def extra_data_to_request(request, extra_data):
"""recursive function that adds extra data to the current request."""
for key, new_value in extra_data.items():
request[key] = current_value = request.get(key)
if isinstance(current_value, list):
for item in current_value:
extra_data_to_request(item, new_value)
elif isinstance(new_value, dict) and isinstance(current_value, dict):
extra_data_to_request(current_value, new_value)
else:
request[key] = new_value
extra_data_to_request(request, extra_data)
def _fedex_get_default_custom_package_code(self):
return 'YOUR_PACKAGING'
def _fedex_convert_weight(self, weight, unit='KG'):
if unit == 'KG':
convert_to = 'uom.product_uom_kgm'
elif unit == 'LB':
convert_to = 'uom.product_uom_lb'
else:
raise ValueError
weight_uom_id = self.env['product.template']._get_weight_uom_id_from_ir_config_parameter()
new_value = weight_uom_id._compute_quantity(weight, self.env.ref(convert_to), round=False)
if weight > 0.0:
new_value = max(new_value, 0.01)
return float_repr(new_value, 10)
def _fedex_update_srm(self, srm, request_type, order=None, picking=None):
""" Hook to introduce new custom behaviors in the Fedex request. """
return srm
# ══════════════════════════════════════════════════════════════════════════
# FEDEX REST METHODS
# ══════════════════════════════════════════════════════════════════════════
def fusion_fedex_rest_rate_shipment(self, order):
srm = FedexRestRequest(self)
try:
response = srm._get_shipping_price(
ship_from=order.warehouse_id.partner_id,
ship_to=order.partner_shipping_id,
packages=self._get_packages_from_order(order, self.fedex_rest_default_package_type_id),
currency=order.currency_id.name
)
except ValidationError as err:
return {'success': False,
'price': 0.0,
'error_message': _('Error(s) from FedEx:\n%s', err),
'warning_message': False}
warnings = response.get('alert_message')
if warnings:
_logger.info(warnings)
return {'success': True,
'price': response.get('price'),
'error_message': False,
'warning_message': _('Warning(s) from FedEx:\n%s', warnings) if warnings else False}
def fusion_fedex_rest_send_shipping(self, pickings):
res = []
srm = FedexRestRequest(self)
for picking in pickings:
packages = self._get_packages_from_picking(picking, self.fedex_rest_default_package_type_id)
response = srm._ship_package(
ship_from_wh=picking.picking_type_id.warehouse_id.partner_id,
ship_from_company=picking.company_id.partner_id,
ship_to=picking.partner_id,
sold_to=picking.sale_id.partner_invoice_id,
packages=packages,
currency=picking.sale_id.currency_id.name or picking.company_id.currency_id.name,
order_no=picking.sale_id.name,
customer_ref=picking.sale_id.client_order_ref,
picking_no=picking.name,
incoterms=picking.sale_id.incoterm.code,
freight_charge=picking.sale_id.order_line.filtered(lambda sol: sol.is_delivery and sol.product_id == self.product_id).price_total,
)
warnings = response.get('alert_message')
if warnings:
_logger.info(warnings)
logmessage = (_("Shipment created into Fedex") + Markup("<br/>") +
response.get('service_info') + Markup("<br/><b>") +
_("Tracking Numbers:") + Markup("</b> ") + response.get('tracking_numbers') + Markup("<br/><b>") +
_("Packages:") + Markup("</b> ") + ','.join([p.name for p in packages if p.name]))
if response.get('documents'):
logmessage += Markup("<br/><b>") + _("Required documents:") + Markup("</b> ") + response.get('documents')
attachments = [
('%s-%s.%s' % (self._get_delivery_label_prefix(), nr, self.fedex_rest_label_file_type), base64.b64decode(label))
for nr, label in response.get('labels')
]
if response.get('invoice'):
attachments.append(('%s.pdf' % self._get_delivery_doc_prefix(), base64.b64decode(response.get('invoice'))))
lognote_pickings = picking
if picking.sale_id:
lognote_pickings |= picking.sale_id.picking_ids.filtered(lambda p: p.state not in ('done', 'cancel'))
for pick in lognote_pickings:
pick.message_post(body=logmessage, attachments=attachments)
res.append({'exact_price': response.get('price'), 'tracking_number': response.get('tracking_numbers')})
if self.return_label_on_delivery:
if len(packages) > 1:
picking.message_post(body=_("Automated return label generation is not supported by FedEx for multi-package shipments. Please generate the return labels manually."))
else:
self.get_return_label(picking, tracking_number=response.get('tracking_numbers').split(',')[0], origin_date=response.get('date'))
return res
def fusion_fedex_rest_get_return_label(self, picking, tracking_number=None, origin_date=None):
srm = FedexRestRequest(self)
response = srm._return_package(
ship_from=picking.partner_id,
ship_to_company=picking.company_id.partner_id,
ship_to_wh=picking.picking_type_id.warehouse_id.partner_id,
packages=self._get_packages_from_picking(picking, self.fedex_rest_default_package_type_id),
currency=picking.sale_id.currency_id.name or picking.company_id.currency_id.name,
tracking=tracking_number,
date=origin_date
)
warnings = response.get('alert_message')
if warnings:
_logger.info(warnings)
logmessage = (_("Return Label") + Markup("<br/><b>") +
_("Tracking Numbers:") + Markup("</b> ") + response.get('tracking_numbers') + Markup("<br/>"))
if response.get('documents'):
logmessage += Markup("<b>") + _("Required documents:") + Markup("</b> ") + response.get('documents')
fedex_labels = [('%s-%s.%s' % (self.get_return_label_prefix(), nr, self.fedex_rest_label_file_type), base64.b64decode(label))
for nr, label in response.get('labels')]
picking.message_post(body=logmessage, attachments=fedex_labels)
def fusion_fedex_rest_get_tracking_link(self, picking):
return 'https://www.fedex.com/wtrk/track/?trknbr=%s' % picking.carrier_tracking_ref
def fusion_fedex_rest_cancel_shipment(self, picking):
master_tracking = picking.carrier_tracking_ref.split(',')[0]
request = FedexRestRequest(self)
result = request.cancel_shipment(master_tracking)
warnings = result.get('warnings_message')
if warnings:
_logger.info(warnings)
if result.get('delete_success') and not result.get('errors_message'):
picking.message_post(body=_('Shipment %s has been cancelled', picking.carrier_tracking_ref))
picking.write({'carrier_tracking_ref': '', 'carrier_price': 0.0})
else:
raise UserError(result['errors_message'])
def _fedex_rest_convert_weight(self, weight):
if self.fedex_rest_weight_unit == 'KG':
convert_to = 'uom.product_uom_kgm'
elif self.fedex_rest_weight_unit == 'LB':
convert_to = 'uom.product_uom_lb'
else:
raise ValueError
weight_uom_id = self.env['product.template']._get_weight_uom_id_from_ir_config_parameter()
new_value = weight_uom_id._compute_quantity(weight, self.env.ref(convert_to), round=False)
if weight > 0.0:
new_value = max(new_value, 0.01)
return float_repr(new_value, 10)
# ══════════════════════════════════════════════════════════════════════════
# DHL SOAP (Legacy) METHODS
# ══════════════════════════════════════════════════════════════════════════
def fusion_dhl_rate_shipment(self, order):
res = self._dhl_soap_rate_shipment_vals(order=order)
return res
def _dhl_soap_rate_shipment_vals(self, order=False, picking=False):
if picking:
warehouse_partner_id = picking.picking_type_id.warehouse_id.partner_id
currency_id = picking.sale_id.currency_id or picking.company_id.currency_id
destination_partner_id = picking.partner_id
total_value = sum(sml.sale_price for sml in picking.move_line_ids)
else:
warehouse_partner_id = order.warehouse_id.partner_id
currency_id = order.currency_id or order.company_id.currency_id
total_value = sum(line.price_reduce_taxinc * line.product_uom_qty for line in order.order_line.filtered(lambda l: l.product_id.type == 'consu' and not l.display_type))
destination_partner_id = order.partner_shipping_id
rating_request = {}
srm = DHLProvider(self.log_xml, request_type="rate", prod_environment=self.prod_environment)
check_value = srm.check_required_value(self, destination_partner_id, warehouse_partner_id, order=order, picking=picking)
if check_value:
return {'success': False,
'price': 0.0,
'error_message': check_value,
'warning_message': False}
site_id = self.sudo().dhl_SiteID
password = self.sudo().dhl_password
rating_request['Request'] = srm._set_request(site_id, password)
rating_request['From'] = srm._set_dct_from(warehouse_partner_id)
if picking:
packages = self._get_packages_from_picking(picking, self.dhl_default_package_type_id)
else:
packages = self._get_packages_from_order(order, self.dhl_default_package_type_id)
rating_request['BkgDetails'] = srm._set_dct_bkg_details(self, packages)
rating_request['To'] = srm._set_dct_to(destination_partner_id)
if self.dhl_dutiable:
rating_request['Dutiable'] = srm._set_dct_dutiable(total_value, currency_id.name)
real_rating_request = {}
real_rating_request['GetQuote'] = rating_request
real_rating_request['schemaVersion'] = 2.0
self._dhl_soap_add_custom_data_to_request(rating_request, 'rate')
response = srm._process_rating(real_rating_request)
available_product_code = []
shipping_charge = False
qtd_shp = response.findall('GetQuoteResponse/BkgDetails/QtdShp')
if qtd_shp:
for q in qtd_shp:
charge = q.find('ShippingCharge').text
global_product_code = q.find('GlobalProductCode').text
if global_product_code == self.dhl_product_code and charge:
shipping_charge = charge
shipping_currency = q.find('CurrencyCode')
shipping_currency = None if shipping_currency is None else shipping_currency.text
break
else:
available_product_code.append(global_product_code)
else:
condition = response.find('GetQuoteResponse/Note/Condition')
if condition:
condition_code = condition.find('ConditionCode').text
if condition_code == '410301':
return {
'success': False,
'price': 0.0,
'error_message': "%s.\n%s" % (condition.find('ConditionData').text, _("Hint: The destination may not require the dutiable option.")),
'warning_message': False,
}
elif condition_code in ['420504', '420505', '420506', '410304'] or\
response.find('GetQuoteResponse/Note/ActionStatus').text == "Failure":
return {
'success': False,
'price': 0.0,
'error_message': "%s." % (condition.find('ConditionData').text),
'warning_message': False,
}
if shipping_charge:
if order:
order_currency = order.currency_id
else:
order_currency = picking.sale_id.currency_id or picking.company_id.currency_id
if shipping_currency is None or order_currency.name == shipping_currency:
price = float(shipping_charge)
else:
quote_currency = self.env['res.currency'].search([('name', '=', shipping_currency)], limit=1)
price = quote_currency._convert(float(shipping_charge), order_currency, (order or picking).company_id, order.date_order if order else fields.Date.today())
return {'success': True,
'price': price,
'error_message': False,
'warning_message': False}
if available_product_code:
return {'success': False,
'price': 0.0,
'error_message': _(
"There is no price available for this shipping, you should rather try with the DHL product %s",
available_product_code[0]),
'warning_message': False}
def fusion_dhl_send_shipping(self, pickings):
res = []
for picking in pickings:
shipment_request = {}
srm = DHLProvider(self.log_xml, request_type="ship", prod_environment=self.prod_environment)
site_id = self.sudo().dhl_SiteID
password = self.sudo().dhl_password
account_number = self.sudo().dhl_account_number
shipment_request['Request'] = srm._set_request(site_id, password)
shipment_request['RegionCode'] = srm._set_region_code(self.dhl_region_code)
shipment_request['RequestedPickupTime'] = srm._set_requested_pickup_time(True)
shipment_request['Billing'] = srm._set_billing(account_number, "S", self.dhl_duty_payment, self.dhl_dutiable)
shipment_request['Consignee'] = srm._set_consignee(picking.partner_id)
shipment_request['Shipper'] = srm._set_shipper(account_number, picking.company_id.partner_id, picking.picking_type_id.warehouse_id.partner_id)
shipment_request['Reference'] = {
'ReferenceID': picking.sale_id.name if picking.sale_id else picking.name,
'ReferenceType': 'CU'
}
total_value, currency_name = self._dhl_soap_calculate_value(picking)
if self.dhl_dutiable:
incoterm = picking.sale_id.incoterm or self.env.company.incoterm_id
shipment_request['Dutiable'] = srm._set_dutiable(total_value, currency_name, incoterm)
if picking._should_generate_commercial_invoice():
shipment_request['UseDHLInvoice'] = 'Y'
shipment_request['DHLInvoiceType'] = 'CMI'
shipment_request['ExportDeclaration'] = srm._set_export_declaration(self, picking)
shipment_request['ShipmentDetails'] = srm._set_shipment_details(picking)
shipment_request['LabelImageFormat'] = srm._set_label_image_format(self.dhl_label_image_format)
shipment_request['Label'] = srm._set_label(self.dhl_label_template)
shipment_request['schemaVersion'] = 10.0
shipment_request['LanguageCode'] = 'en'
if picking.carrier_id.shipping_insurance:
shipment_request['SpecialService'] = []
shipment_request['SpecialService'].append(srm._set_insurance(shipment_request['ShipmentDetails']))
self._dhl_soap_add_custom_data_to_request(shipment_request, 'ship')
dhl_response = srm._process_shipment(shipment_request)
traking_number = dhl_response.AirwayBillNumber
logmessage = Markup(_("Shipment created into DHL <br/> <b>Tracking Number: </b>%s")) % (traking_number)
dhl_labels = [('%s-%s.%s' % (self._get_delivery_label_prefix(), traking_number, self.dhl_label_image_format), dhl_response.LabelImage[0].OutputImage)]
dhl_cmi = [('%s-%s.%s' % (self._get_delivery_doc_prefix(), mlabel.DocName, mlabel.DocFormat), mlabel.DocImageVal) for mlabel in dhl_response.LabelImage[0].MultiLabels.MultiLabel] if dhl_response.LabelImage[0].MultiLabels else None
lognote_pickings = picking
if picking.sale_id:
lognote_pickings |= picking.sale_id.picking_ids.filtered(lambda p: p.state not in ('done', 'cancel'))
for pick in lognote_pickings:
pick.message_post(body=logmessage, attachments=dhl_labels)
if dhl_cmi:
pick.message_post(body=_("DHL Documents"), attachments=dhl_cmi)
shipping_data = {
'exact_price': 0,
'tracking_number': traking_number,
}
rate = self._dhl_soap_rate_shipment_vals(picking=picking)
shipping_data['exact_price'] = rate['price']
if self.return_label_on_delivery:
self.get_return_label(picking)
res = res + [shipping_data]
return res
def fusion_dhl_get_return_label(self, picking, tracking_number=None, origin_date=None):
shipment_request = {}
srm = DHLProvider(self.log_xml, request_type="ship", prod_environment=self.prod_environment)
site_id = self.sudo().dhl_SiteID
password = self.sudo().dhl_password
account_number = self.sudo().dhl_account_number
shipment_request['Request'] = srm._set_request(site_id, password)
shipment_request['RegionCode'] = srm._set_region_code(self.dhl_region_code)
shipment_request['RequestedPickupTime'] = srm._set_requested_pickup_time(True)
shipment_request['Billing'] = srm._set_billing(account_number, "S", "S", self.dhl_dutiable)
shipment_request['Consignee'] = srm._set_consignee(picking.picking_type_id.warehouse_id.partner_id)
shipment_request['Shipper'] = srm._set_shipper(account_number, picking.partner_id, picking.partner_id)
total_value, currency_name = self._dhl_soap_calculate_value(picking)
if self.dhl_dutiable:
incoterm = picking.sale_id.incoterm or self.env.company.incoterm_id
shipment_request['Dutiable'] = srm._set_dutiable(total_value, currency_name, incoterm)
if picking._should_generate_commercial_invoice():
shipment_request['UseDHLInvoice'] = 'Y'
shipment_request['DHLInvoiceType'] = 'CMI'
shipment_request['ExportDeclaration'] = srm._set_export_declaration(self, picking, is_return=True)
shipment_request['ShipmentDetails'] = srm._set_shipment_details(picking)
shipment_request['LabelImageFormat'] = srm._set_label_image_format(self.dhl_label_image_format)
shipment_request['Label'] = srm._set_label(self.dhl_label_template)
shipment_request['SpecialService'] = []
shipment_request['SpecialService'].append(srm._set_return())
shipment_request['schemaVersion'] = 10.0
shipment_request['LanguageCode'] = 'en'
self._dhl_soap_add_custom_data_to_request(shipment_request, 'return')
dhl_response = srm._process_shipment(shipment_request)
traking_number = dhl_response.AirwayBillNumber
logmessage = Markup(_("Shipment created into DHL <br/> <b>Tracking Number: </b>%s")) % (traking_number)
dhl_labels = [('%s-%s-%s.%s' % (self.get_return_label_prefix(), traking_number, 1, self.dhl_label_image_format), dhl_response.LabelImage[0].OutputImage)]
dhl_cmi = [('%s-Return-%s.%s' % (self._get_delivery_doc_prefix(), mlabel.DocName, mlabel.DocFormat), mlabel.DocImageVal) for mlabel in dhl_response.LabelImage[0].MultiLabels.MultiLabel] if dhl_response.LabelImage[0].MultiLabels else None
lognote_pickings = picking.sale_id.picking_ids if picking.sale_id else picking
for pick in lognote_pickings:
pick.message_post(body=logmessage, attachments=dhl_labels)
if dhl_cmi:
pick.message_post(body=_("DHL Documents"), attachments=dhl_cmi)
shipping_data = {
'exact_price': 0,
'tracking_number': traking_number,
}
return shipping_data
def fusion_dhl_get_tracking_link(self, picking):
return 'http://www.dhl.com/en/express/tracking.html?AWB=%s' % picking.carrier_tracking_ref
def fusion_dhl_cancel_shipment(self, picking):
# Obviously you need a pick up date to delete SHIPMENT by DHL. So you can't do it if you didn't schedule a pick-up.
picking.message_post(body=_(u"You can't cancel DHL shipping without pickup date."))
picking.write({'carrier_tracking_ref': '',
'carrier_price': 0.0})
def _dhl_convert_weight(self, weight, unit):
weight_uom_id = self.env['product.template']._get_weight_uom_id_from_ir_config_parameter()
if unit == 'L':
weight = weight_uom_id._compute_quantity(weight, self.env.ref('uom.product_uom_lb'), round=False)
else:
weight = weight_uom_id._compute_quantity(weight, self.env.ref('uom.product_uom_kgm'), round=False)
return float_repr(weight, 3)
def _dhl_soap_add_custom_data_to_request(self, request, request_type):
"""Adds the custom data to the request (DHL SOAP legacy)."""
if not self.dhl_custom_data_request:
return
try:
custom_data = const_eval('{%s}' % self.dhl_custom_data_request).get(request_type, {})
except SyntaxError:
raise UserError(_('Invalid syntax for DHL custom data.'))
def extra_data_to_request(request, custom_data):
"""recursive function that adds custom data to the current request."""
for key, new_value in custom_data.items():
request[key] = current_value = serialize_object(request.get(key, {})) or None
if isinstance(current_value, list):
for item in current_value:
extra_data_to_request(item, new_value)
elif isinstance(new_value, dict) and isinstance(current_value, dict):
extra_data_to_request(current_value, new_value)
else:
request[key] = new_value
extra_data_to_request(request, custom_data)
def _dhl_soap_calculate_value(self, picking):
sale_order = picking.sale_id
if sale_order:
total_value = sum(line.price_reduce_taxinc * line.product_uom_qty for line in
sale_order.order_line.filtered(
lambda l: l.product_id.type == 'consu' and not l.display_type))
currency_name = picking.sale_id.currency_id.name
else:
total_value = sum([line.product_id.lst_price * line.product_qty for line in picking.move_ids])
currency_name = picking.company_id.currency_id.name
return total_value, currency_name
# ══════════════════════════════════════════════════════════════════════════
# DHL REST METHODS
# ══════════════════════════════════════════════════════════════════════════
def fusion_dhl_rest_rate_shipment(self, order):
res = self._dhl_rest_rate_shipment_vals(order=order)
return res
def _dhl_rest_get_order_packages(self, order):
self.ensure_one()
total_weight = order._get_estimated_weight()
total_weight = self._dhl_rest_convert_weight(total_weight)
if total_weight == 0.0:
weight_uom_name = self.env['product.template']._get_weight_uom_name_from_ir_config_parameter()
raise UserError(_("The package cannot be created because the total weight of the products in the picking is 0.0 %s", weight_uom_name))
max_weight = self.dhl_default_package_type_id.max_weight or total_weight + 1
number_of_packages = int(total_weight / max_weight)
last_package_weight = total_weight % max_weight
weights = [max_weight] * number_of_packages + ([last_package_weight] if last_package_weight else [])
return [{
'weight': weight,
'dimension': {
'length': self.dhl_default_package_type_id.packaging_length,
'width': self.dhl_default_package_type_id.width,
'height': self.dhl_default_package_type_id.height,
},
} for weight in weights]
def _dhl_rest_get_commodities_from_stock_move_lines(self, move_lines):
commodities = []
product_lines = move_lines.filtered(lambda line: line.product_id.type in ['product', 'consu'])
for product, lines in groupby(product_lines, lambda x: x.product_id):
unit_quantity = sum(
line.product_uom_id._compute_quantity(
line.quantity,
product.uom_id)
for line in lines)
rounded_qty = max(1, float_round(unit_quantity, precision_digits=0))
country_of_origin = lines[0].picking_id.picking_type_id.warehouse_id.partner_id.country_id.code
unit_price = sum(line.sale_price for line in lines) / rounded_qty
commodities.append({
'product_id': product,
'qty': rounded_qty,
'monetary_value': unit_price,
'country_of_origin': country_of_origin
})
return commodities
def _dhl_rest_get_picking_packages(self, picking):
self.ensure_one()
packages = []
if picking.is_return_picking:
commodities = self._dhl_rest_get_commodities_from_stock_move_lines(picking.move_line_ids)
weight = picking._get_estimated_weight()
packages.append({
'commodities': commodities,
'weight': weight,
'dimension': {
'length': self.dhl_default_package_type_id.packaging_length,
'width': self.dhl_default_package_type_id.width,
'height': self.dhl_default_package_type_id.height,
},
})
return packages
# Create all packages.
for package in picking.move_line_ids.result_package_id:
move_lines = picking.move_line_ids.filtered(lambda ml: ml.result_package_id == package)
commodities = self._dhl_rest_get_commodities_from_stock_move_lines(move_lines)
packages.append({
'commodities': commodities,
'weight': package.shipping_weight or package.weight,
'dimension': {
'length': package.package_type_id.packaging_length,
'width': package.package_type_id.width,
'height': package.package_type_id.height,
},
'name': package.name,
})
# Create one package: either everything is in pack or nothing is.
if picking.weight_bulk:
commodities = self._dhl_rest_get_commodities_from_stock_move_lines(picking.move_line_ids)
packages.append({
'commodities': commodities,
'weight': picking.weight_bulk,
'dimension': {
'length': self.dhl_default_package_type_id.packaging_length,
'width': self.dhl_default_package_type_id.width,
'height': self.dhl_default_package_type_id.height,
},
'name': 'Bulk Content'
})
elif not packages:
raise UserError(_(
"The package cannot be created because the total weight of the "
"products in the picking is 0.0 %s",
picking.weight_uom_name
))
return packages
def _convert_to_utc_string(self, datetime_object):
return datetime_object.astimezone(tz=pytz.utc).strftime('%Y-%m-%dT%H:%M:%S GMT+00:00')
def _dhl_rest_rate_shipment_vals(self, order=False, picking=False):
if picking:
warehouse_partner_id = picking.picking_type_id.warehouse_id.partner_id
currency_id = picking.sale_id.currency_id or picking.company_id.currency_id
destination_partner_id = picking.partner_id
total_value = sum(sml.sale_price for sml in picking.move_line_ids)
planned_date = picking.scheduled_date + timedelta(hours=1)
else:
warehouse_partner_id = order.warehouse_id.partner_id
currency_id = order.currency_id or order.company_id.currency_id
total_value = sum(line.price_reduce_taxinc * line.product_uom_qty for line in order.order_line.filtered(lambda l: l.product_id.type in ('consu', 'product') and not l.display_type))
destination_partner_id = order.partner_shipping_id
if hasattr(order, 'website_id') and order.website_id:
planned_date = fields.Datetime.now() + timedelta(hours=1)
else:
planned_date = order.date_order + timedelta(hours=1)
rating_request = {}
account_number = self.sudo().dhl_account_number
srm = DHLRestProvider(self)
check_value = srm._check_required_value(self, destination_partner_id, warehouse_partner_id, order=order, picking=picking)
if check_value:
return {'success': False,
'price': 0.0,
'error_message': check_value,
'warning_message': False}
rating_request['customerDetails'] = {
'shipperDetails': srm._get_from_vals(warehouse_partner_id),
'receiverDetails': srm._get_to_vals(destination_partner_id)
}
if picking:
packages = self._dhl_rest_get_picking_packages(picking)
else:
packages = self._dhl_rest_get_order_packages(order)
rating_request['packages'] = srm._get_package_vals(self, packages)
rating_request['isCustomsDeclarable'] = self.dhl_dutiable
if self.dhl_dutiable:
rating_request['monetaryAmount'] = srm._get_dutiable_vals(total_value, currency_id.name)
rating_request['unitOfMeasurement'] = self.dhl_unit_system
if planned_date <= fields.Datetime.now():
raise UserError(_("The planned date for the shipment must be in the future."))
rating_request['plannedShippingDateAndTime'] = self._convert_to_utc_string(planned_date)
rating_request['accounts'] = srm._get_billing_vals(account_number, "shipper")
self._dhl_rest_add_extra_data_to_request(rating_request, 'rate')
rating_request['productsAndServices'] = [{
'productCode': self.dhl_product_code,
'valueAddedServices': [],
}]
if self.supports_shipping_insurance and self.shipping_insurance:
rating_request['productsAndServices'][0]['valueAddedServices'].append(srm._get_insurance_vals(self.shipping_insurance, total_value, currency_id.name))
response = srm._get_rates(rating_request)
available_product_code = []
shipping_charge = False
products = response['products']
for product in products:
charge = [price for price in product['totalPrice'] if price['currencyType'] == 'BILLC']
global_product_code = product['productCode']
if global_product_code == self.dhl_product_code and charge:
shipping_charge = charge[0]['price']
shipping_currency = charge[0]['priceCurrency']
break
else:
available_product_code.append(global_product_code)
if shipping_charge:
if order:
order_currency = order.currency_id
else:
order_currency = picking.sale_id.currency_id or picking.company_id.currency_id
if shipping_currency is None or order_currency.name == shipping_currency:
price = float(shipping_charge)
else:
quote_currency = self.env['res.currency'].search([('name', '=', shipping_currency)], limit=1)
price = quote_currency._convert(float(shipping_charge), order_currency, (order or picking).company_id, order.date_order if order else fields.Date.today())
if self.supports_shipping_insurance and self.shipping_insurance:
for product in products:
services = []
for price_breakdown in product['detailedPriceBreakdown']:
services.extend([service['serviceCode'] for service in price_breakdown['breakdown'] if 'serviceCode' in service])
if 'II' not in services:
return {'success': False,
'price': 0.0,
'error_message': _("Shipment insurance is not available between the origin and destination. You should try with another DHL product, or select a delivery method with no insurance."),
'warning_message': False}
return {'success': True,
'price': price,
'error_message': False,
'warning_message': False}
if available_product_code:
return {'success': False,
'price': 0.0,
'error_message': _(
"There is no price available for this shipping, you should rather try with the DHL product %s",
available_product_code[0]),
'warning_message': False}
def fusion_dhl_rest_send_shipping(self, pickings):
res = []
for picking in pickings:
shipment_request = {}
srm = DHLRestProvider(self)
account_number = self.sudo().dhl_account_number
planned_date = picking.scheduled_date
if planned_date <= fields.Datetime.now():
raise UserError(_("The planned date for the shipment must be in the future."))
shipment_request['plannedShippingDateAndTime'] = self._convert_to_utc_string(planned_date)
shipment_request['pickup'] = {'isRequested': True}
shipment_request['accounts'] = srm._get_billing_vals(account_number, "shipper")
shipment_request['customerDetails'] = {}
shipment_request['customerDetails']['receiverDetails'] = srm._get_consignee_vals(picking.partner_id)
shipment_request['customerDetails']['shipperDetails'] = srm._get_shipper_vals(picking.company_id.partner_id, picking.picking_type_id.warehouse_id.partner_id)
shipment_request['productCode'] = self.dhl_product_code
shipment_request['customerReferences'] = [{
'value': picking.sale_id.name if picking.sale_id else picking.name,
'typeCode': 'CU'
}]
shipment_request['content'] = {}
shipment_request['content']['description'] = picking.sale_id.name if picking.sale_id else picking.name
shipment_request['content']['unitOfMeasurement'] = self.dhl_unit_system
incoterm = picking.sale_id.incoterm or self.env.company.incoterm_id
shipment_request['content']['incoterm'] = incoterm.code or 'EXW'
total_value, currency_name = self._dhl_rest_calculate_value(picking)
shipment_request['content']['isCustomsDeclarable'] = self.dhl_dutiable
if self.dhl_dutiable:
shipment_request['content']['declaredValue'] = total_value
shipment_request['content']['declaredValueCurrency'] = currency_name
if picking._should_generate_commercial_invoice():
shipment_request['content']['exportDeclaration'] = srm._get_export_declaration_vals(self, picking)
shipment_request['content']['declaredValueCurrency'] = currency_name
shipment_request['content']['packages'] = srm._get_shipment_vals(picking)
shipment_request['outputImageProperties'] = {}
shipment_request['outputImageProperties']['imageOptions'] = [{
'typeCode': 'label',
'templateName': self.dhl_label_template,
}]
if self.supports_shipping_insurance and self.shipping_insurance:
shipment_request['valueAddedServices'] = [srm._get_insurance_vals(self.shipping_insurance, total_value, currency_name)]
self._dhl_rest_add_extra_data_to_request(shipment_request, 'ship')
dhl_response = srm._send_shipment(shipment_request)
tracking_number = dhl_response['shipmentTrackingNumber']
logmessage = Markup("%s<br/><b>%s:</b> %s") % (_("Shipment created into DHL"), _("Tracking Number"), tracking_number)
dhl_labels = [
(
'LabelShipping-DHL-{}.{}'.format(tracking_number, document['imageFormat']),
base64.b64decode(document['content'])
)
for document in dhl_response['documents'] if document['typeCode'] == 'label'
]
other_documents = [
(
'ShippingDoc-DHL-{}.{}'.format(document['packageReferenceNumber'], document['imageFormat']),
base64.b64decode(document['content'])
)
for document in dhl_response['documents'] if document['typeCode'] != 'label'
]
lognote_pickings = picking.sale_id.picking_ids if picking.sale_id else picking
for pick in lognote_pickings:
pick.message_post(body=logmessage, attachments=dhl_labels)
if other_documents:
pick.message_post(body=_("DHL Documents"), attachments=other_documents)
shipping_data = {
'exact_price': 0,
'tracking_number': tracking_number,
}
rate = self._dhl_rest_rate_shipment_vals(picking=picking)
shipping_data['exact_price'] = rate['price']
if self.return_label_on_delivery:
self.get_return_label(picking)
res = res + [shipping_data]
return res
def fusion_dhl_rest_get_return_label(self, picking, tracking_number=None, origin_date=None):
shipment_request = {}
srm = DHLRestProvider(self)
account_number = self.sudo().dhl_account_number
planned_date = picking.scheduled_date
if planned_date <= fields.Datetime.now():
raise UserError(_("The planned date for the shipment must be in the future."))
shipment_request['plannedShippingDateAndTime'] = self._convert_to_utc_string(planned_date)
shipment_request['pickup'] = {'isRequested': False}
shipment_request['accounts'] = srm._get_billing_vals(account_number, "shipper")
shipment_request['customerDetails'] = {
'shipperDetails': srm._get_shipper_vals(picking.partner_id, picking.partner_id),
'receiverDetails': srm._get_consignee_vals(picking.picking_type_id.warehouse_id.partner_id)
}
shipment_request['productCode'] = self.dhl_product_code
shipment_request['content'] = {
'description': picking.sale_id.name if picking.sale_id else picking.name,
'unitOfMeasurement': self.dhl_unit_system,
'incoterm': picking.sale_id.incoterm or self.env.company.incoterm_id.code or 'EXW',
'isCustomsDeclarable': self.dhl_dutiable,
'packages': srm._get_shipment_vals(picking)
}
total_value, currency_name = self._dhl_rest_calculate_value(picking)
if self.dhl_dutiable:
shipment_request['content']['declaredValue'] = total_value
shipment_request['content']['declaredValueCurrency'] = currency_name
if picking._should_generate_commercial_invoice():
shipment_request['content']['exportDeclaration'] = srm._get_export_declaration_vals(self, picking)
shipment_request['content']['declaredValueCurrency'] = currency_name
shipment_request['outputImageProperties'] = {
'imageOptions': [{
'typeCode': 'label',
'templateName': self.dhl_label_template,
}]
}
shipment_request['valueAddedServices'] = [{'serviceCode': 'PV'}]
self._dhl_rest_add_extra_data_to_request(shipment_request, 'return')
dhl_response = srm._send_shipment(shipment_request)
tracking_number = dhl_response['shipmentTrackingNumber']
logmessage = Markup("%s<br/><b>%s:</b> %s") % (_("Shipment created into DHL"), _("Tracking Number"), tracking_number)
dhl_labels = [
(
'LabelReturn-DHL-{}.{}'.format(tracking_number, document['imageFormat']),
base64.b64decode(document['content'])
)
for document in dhl_response['documents'] if document['typeCode'] == 'label'
]
other_documents = [
(
'ShippingDoc-DHL-{}.{}'.format(document['packageReferenceNumber'], document['imageFormat']),
base64.b64decode(document['content'])
)
for document in dhl_response['documents'] if document['typeCode'] != 'label'
]
lognote_pickings = picking.sale_id.picking_ids if picking.sale_id else picking
for pick in lognote_pickings:
pick.message_post(body=logmessage, attachments=dhl_labels)
if other_documents:
pick.message_post(body=_("DHL Documents"), attachments=other_documents)
shipping_data = {
'exact_price': 0,
'tracking_number': tracking_number,
}
return shipping_data
def fusion_dhl_rest_get_tracking_link(self, picking):
return 'http://www.dhl.com/en/express/tracking.html?AWB=%s' % picking.carrier_tracking_ref
def fusion_dhl_rest_cancel_shipment(self, picking):
# Obviously you need a pick up date to delete SHIPMENT by DHL. So you can't do it if you didn't schedule a pick-up.
picking.message_post(body=_("You can't cancel DHL shipping without pickup date."))
picking.write({'carrier_tracking_ref': '', 'carrier_price': 0.0})
def _dhl_rest_convert_weight(self, weight):
weight_uom_id = self.env['product.template']._get_weight_uom_id_from_ir_config_parameter()
unit = self.dhl_unit_system
if unit == 'imperial':
weight = weight_uom_id._compute_quantity(weight, self.env.ref('uom.product_uom_lb'), round=False)
else:
weight = weight_uom_id._compute_quantity(weight, self.env.ref('uom.product_uom_kgm'), round=False)
# float_round doesn't work here, for example float_round(0.7000000000000001, 3) = 0.7000000000000001
return json_float_round(weight, 3)
def _dhl_rest_calculate_value(self, picking):
sale_order = picking.sale_id
if sale_order:
total_value = sum(
line.price_reduce_taxinc * line.product_uom_qty
for line in sale_order.order_line
if not line.display_type
if line.product_id.type in ['product', 'consu']
)
currency_name = picking.sale_id.currency_id.name
else:
total_value = sum(line.product_id.lst_price * line.product_qty for line in picking.move_ids)
currency_name = picking.company_id.currency_id.name
return total_value, currency_name
def _dhl_rest_add_extra_data_to_request(self, request, request_type):
"""Adds the extra data to the request (DHL REST)."""
extra_data_input = {
'rate': self.dhl_extra_data_rate_request,
'ship': self.dhl_extra_data_ship_request,
'return': self.dhl_extra_data_return_request,
}.get(request_type) or ''
try:
extra_data = json.loads('{' + extra_data_input + '}')
except JSONDecodeError:
raise UserError(_("Invalid syntax for DHL extra data."))
def extra_data_to_request(request, extra_data):
"""recursive function that adds extra data to the current request"""
for key, new_value in extra_data.items():
current_value = request.get(key)
if isinstance(current_value, list):
for item in current_value:
extra_data_to_request(item, new_value)
elif isinstance(new_value, dict) and isinstance(current_value, dict):
extra_data_to_request(current_value, new_value)
else:
request[key] = new_value
extra_data_to_request(request, extra_data)