This commit is contained in:
gsinghpal
2026-03-11 12:15:53 -04:00
parent f81e0cd918
commit db4b9aa278
1210 changed files with 173089 additions and 4044 deletions

View File

@@ -1,3 +1,4 @@
import base64
import logging
from datetime import datetime as dt_mod
from lxml import etree
@@ -66,6 +67,7 @@ class FusionCpShipment(models.Model):
('confirmed', 'Confirmed'),
('shipped', 'Shipped'),
('delivered', 'Delivered'),
('returned', 'Returned'),
('cancelled', 'Cancelled'),
],
string='Status',
@@ -95,10 +97,21 @@ class FusionCpShipment(models.Model):
string='Commercial Invoice',
ondelete='set null',
)
# Return label
return_tracking_number = fields.Char(
string='Return Tracking Number',
readonly=True,
copy=False,
)
return_label_attachment_id = fields.Many2one(
'ir.attachment',
string='Return Label',
ondelete='set null',
)
# Shipment details
shipping_cost = fields.Float(
shipping_cost = fields.Monetary(
string='Shipping Cost',
digits='Product Price',
currency_field='currency_id',
readonly=True,
)
service_type = fields.Char(
@@ -141,6 +154,11 @@ class FusionCpShipment(models.Model):
string='Company',
default=lambda self: self.env.company,
)
currency_id = fields.Many2one(
'res.currency',
related='company_id.currency_id',
store=True,
)
# Tracking history
tracking_event_ids = fields.One2many(
'fusion.cp.tracking.event',
@@ -369,15 +387,31 @@ class FusionCpShipment(models.Model):
delivered_date = detail.get('actual-delivery-date', '')
if delivered_date:
self.status = 'delivered'
if self._has_return_events():
self.status = 'returned'
else:
self.status = 'delivered'
try:
self.delivery_date = dt_mod.strptime(
delivered_date, '%Y-%m-%d')
except (ValueError, TypeError):
self.delivery_date = fields.Datetime.now()
elif self._has_return_events():
self.status = 'returned'
elif self.status == 'confirmed' and self.tracking_event_ids:
self.status = 'shipped'
def _has_return_events(self):
"""Check if any tracking events indicate a return/RTS."""
RETURN_TYPES = {'RTS', 'RETURN', 'RTS_LABEL_PROC'}
for event in self.tracking_event_ids:
if event.event_type in RETURN_TYPES:
return True
desc = (event.event_description or '').lower()
if 'return to sender' in desc or 'item returned' in desc:
return True
return False
# ── Void & Reissue ─────────────────────────────────────
def action_void_shipment(self):
@@ -504,3 +538,182 @@ class FusionCpShipment(models.Model):
'url': '%s%s' % (base_link, self.tracking_number),
'target': 'new',
}
# ── Cron ─────────────────────────────────────────────────
@api.model
def _cron_refresh_tracking(self):
"""Auto-refresh tracking for all active (non-terminal) shipments."""
shipments = self.search([
('status', 'in', ('confirmed', 'shipped')),
('tracking_number', '!=', False),
])
_logger.info(
"Cron: refreshing tracking for %d shipments", len(shipments))
for shipment in shipments:
try:
shipment.action_refresh_tracking()
self.env.cr.commit()
except Exception as e:
self.env.cr.rollback()
_logger.warning(
"Cron: tracking refresh failed for %s: %s",
shipment.name, str(e))
# ── Return Labels ────────────────────────────────────────
def action_view_return_label(self):
return self._action_open_attachment(self.return_label_attachment_id)
def action_create_return_label(self):
"""Create a prepaid return label via Canada Post Authorized
Returns API. Bill-on-scan: charged only when the customer
uses the label at a post office.
"""
self.ensure_one()
if self.return_tracking_number:
raise ValidationError(
_("A return label has already been created "
"for this shipment."))
if not self.carrier_id:
raise ValidationError(
_("No carrier linked to this shipment."))
if not self.picking_id:
raise ValidationError(
_("No transfer linked to this shipment."))
carrier = self.carrier_id
customer = carrier.customer_number
warehouse_partner = (
self.picking_id.picking_type_id.warehouse_id.partner_id)
if not warehouse_partner:
raise ValidationError(
_("No warehouse address found. Please configure "
"a contact on the warehouse."))
if carrier.prod_environment:
base = "https://soa-gw.canadapost.ca"
else:
base = "https://ct.soa-gw.canadapost.ca"
url = "%s/rs/%s/%s/authorizedreturn" % (base, customer, customer)
xml = self._build_return_label_xml(carrier, warehouse_partner)
headers = {
'Content-Type': 'application/vnd.cpc.authreturn-v2+xml',
'Accept': 'application/vnd.cpc.authreturn-v2+xml',
'Accept-language': 'en-CA',
}
try:
resp = request(
method='POST', url=url, data=xml,
headers=headers,
auth=(carrier.username, carrier.password))
_logger.info(
"Return Label API %s%s", url, resp.status_code)
if resp.status_code not in (200, 201):
self._parse_cp_error_response(resp)
self._process_return_label_response(resp, carrier)
except ValidationError:
raise
except Exception as e:
raise ValidationError(
_("Failed to create return label: %s") % str(e))
def _build_return_label_xml(self, carrier, return_to_partner):
"""Build Authorized Return XML.
return_to_partner = warehouse address (where the return goes).
"""
ns = "http://www.canadapost.ca/ws/authreturn-v2"
root = etree.Element("authorized-return", xmlns=ns)
etree.SubElement(root, "service-code").text = "DOM.EP"
# Receiver — warehouse where the item is returned to
receiver = etree.SubElement(root, "receiver")
rec_name = etree.SubElement(receiver, "name")
rec_name.text = (return_to_partner.name or '')[:44]
rec_company = etree.SubElement(receiver, "company")
rec_company.text = (
return_to_partner.commercial_company_name
or return_to_partner.name or '')[:44]
rec_addr = etree.SubElement(receiver, "domestic-address")
etree.SubElement(rec_addr, "address-line-1").text = (
return_to_partner.street or '')[:44]
if return_to_partner.street2:
etree.SubElement(rec_addr, "address-line-2").text = (
return_to_partner.street2)[:44]
etree.SubElement(rec_addr, "city").text = (
return_to_partner.city or '')[:40]
etree.SubElement(rec_addr, "province").text = (
return_to_partner.state_id.code or '')
etree.SubElement(rec_addr, "postal-code").text = (
(return_to_partner.zip or '').replace(' ', ''))
# Parcel characteristics
parcel = etree.SubElement(root, "parcel-characteristics")
etree.SubElement(parcel, "weight").text = str(
max(self.weight or 0.5, 0.1))
# Print preferences
prefs = etree.SubElement(root, "print-preferences")
etree.SubElement(prefs, "output-format").text = (
carrier.fusion_cp_output_format or '8.5x11')
return etree.tostring(
root, xml_declaration=True, encoding='UTF-8')
def _process_return_label_response(self, resp, carrier):
"""Parse return label response, download label PDF,
store on shipment.
"""
api_resp = Response(resp)
result = api_resp.dict()
auth_return = result.get('authorized-return', result)
tracking_pin = auth_return.get('tracking-pin', '')
self.return_tracking_number = tracking_pin
# Get label artifact URL from links
links = auth_return.get('links', {})
link_list = links.get('link', [])
if isinstance(link_list, dict):
link_list = [link_list]
label_url = ''
for link in link_list:
if link.get('@rel') == 'labelDetails':
label_url = link.get('@href', '')
break
if label_url:
label_resp = request(
method='GET', url=label_url,
headers={
'Accept': 'application/pdf',
'Accept-language': 'en-CA',
},
auth=(carrier.username, carrier.password))
if label_resp.status_code == 200:
attachment = self.env['ir.attachment'].create({
'name': 'Return-Label-%s.pdf' % tracking_pin,
'type': 'binary',
'datas': base64.b64encode(
label_resp.content).decode(),
'res_model': self._name,
'res_id': self.id,
'mimetype': 'application/pdf',
})
self.return_label_attachment_id = attachment
self.message_post(
body=_("Return label created. Tracking: %s") % tracking_pin,
attachment_ids=(
self.return_label_attachment_id.ids
if self.return_label_attachment_id else []))

View File

@@ -27,17 +27,20 @@ class SaleOrder(models.Model):
fusion_cp_package_height = fields.Float(
string='Package Height', copy=False)
fusion_cp_shipment_ids = fields.One2many(
'fusion.cp.shipment',
'sale_order_id',
string='CP Shipments',
)
fusion_cp_shipment_count = fields.Integer(
string='CP Shipments',
compute='_compute_fusion_cp_shipment_count',
)
def _compute_fusion_cp_shipment_count(self):
Shipment = self.env['fusion.cp.shipment']
for order in self:
order.fusion_cp_shipment_count = Shipment.search_count(
[('sale_order_id', '=', order.id)]
)
order.fusion_cp_shipment_count = len(
order.fusion_cp_shipment_ids)
def action_view_fusion_cp_shipments(self):
self.ensure_one()