Files
Odoo-Modules/fusion_shipping/api/canada_post/response.py
Nexa Admin 431052920e feat: separate fusion field service and LTC into standalone modules, update core modules
- fusion_claims: separated field service logic, updated controllers/views
- fusion_tasks: updated task views and map integration
- fusion_authorizer_portal: added page 11 signing, schedule booking, migrations
- fusion_shipping: new standalone shipping module (Canada Post, FedEx, DHL, Purolator)
- fusion_ltc_management: new standalone LTC management module
2026-03-11 16:19:52 +00:00

165 lines
4.8 KiB
Python

import lxml
import datetime
import logging
from collections import defaultdict
from odoo.addons.fusion_shipping.api.canada_post.utils import get_dom_tree, python_2_unicode_compatible
import json
_logger = logging.getLogger(__name__)
@python_2_unicode_compatible
class ResponseDataObject():
def __init__(self, mydict, datetime_nodes=[]):
self._load_dict(mydict, list(datetime_nodes))
def __repr__(self):
return str(self)
def __str__(self):
return "%s" % self.__dict__
def has_key(self, name):
try:
getattr(self, name)
return True
except AttributeError:
return False
def get(self, name, default=None):
try:
return getattr(self, name)
except AttributeError:
return default
def _setattr(self, name, value, datetime_nodes):
if name.lower() in datetime_nodes:
try:
ts = "%s %s" % (value.partition('T')[0], value.partition('T')[2].partition('.')[0])
value = datetime.datetime.strptime(ts, '%Y-%m-%d %H:%M:%S')
except ValueError:
pass
setattr(self, name, value)
def _load_dict(self, mydict, datetime_nodes):
for a in list(mydict.items()):
if isinstance(a[1], dict):
o = ResponseDataObject(a[1], datetime_nodes)
setattr(self, a[0], o)
elif isinstance(a[1], list):
objs = []
for i in a[1]:
if i is None or isinstance(i, str) or isinstance(i, str):
objs.append(i)
else:
objs.append(ResponseDataObject(i, datetime_nodes))
setattr(self, a[0], objs)
else:
self._setattr(a[0], a[1], datetime_nodes)
class Response():
def __init__(self, obj, verb=None, parse_response=True):
self._obj = obj
if parse_response:
try:
self._dom = self._parse_xml(obj.content)
self._dict = self._etree_to_dict(self._dom)
if verb and 'Envelope' in list(self._dict.keys()):
elem = self._dom.find('Body').find('%sResponse' % verb)
if elem is not None:
self._dom = elem
self._dict = self._dict['Envelope']['Body'].get('%sResponse' % verb, self._dict)
elif verb:
elem = self._dom.find('%sResponse' % verb)
if elem is not None:
self._dom = elem
self._dict = self._dict.get('%sResponse' % verb, self._dict)
self.reply = ResponseDataObject(self._dict,[])
except lxml.etree.XMLSyntaxError as e:
_logger.debug('Response parse failed: %s' % e)
self.reply = ResponseDataObject({}, [])
else:
self.reply = ResponseDataObject({}, [])
def _get_node_path(self, t):
i = t
path = []
path.insert(0, i.tag)
while 1:
try:
path.insert(0, i.getparent().tag)
i = i.getparent()
except AttributeError:
break
return '.'.join(path)
@staticmethod
def _pullval(v):
if len(v) == 1:
return v[0]
else:
return v
def _etree_to_dict(self, t):
if type(t) == lxml.etree._Comment:
return {}
# remove xmlns from nodes
t.tag = self._get_node_tag(t)
d = {t.tag: {} if t.attrib else None}
children = list(t)
if children:
dd = defaultdict(list)
for dc in map(self._etree_to_dict, children):
for k, v in list(dc.items()):
dd[k].append(v)
d = {t.tag: dict((k, self._pullval(v)) for k, v in list(dd.items()))}
parent_path = self._get_node_path(t)
for k in list(d[t.tag].keys()):
path = "%s.%s" % (parent_path, k)
if t.attrib:
d[t.tag].update(('_' + k, v) for k, v in list(t.attrib.items()))
if t.text:
text = t.text.strip()
if children or t.attrib:
if text:
d[t.tag]['value'] = text
else:
d[t.tag] = text
return d
def __getattr__(self, name):
return getattr(self._obj, name)
def _parse_xml(self, xml):
return get_dom_tree(xml)
def _get_node_tag(self, node):
return node.tag.replace('{' + node.nsmap.get(node.prefix, '') + '}', '')
def dom(self, lxml=True):
if not lxml:
pass
return self._dom
def dict(self):
return self._dict
def json(self):
return json.dumps(self.dict())