This commit is contained in:
gsinghpal
2026-02-27 14:32:32 -05:00
parent b649246e81
commit b925766966
80 changed files with 7831 additions and 1041 deletions

View File

@@ -105,9 +105,11 @@ class AccountMove(models.Model):
try:
report = self.env.ref('fusion_claims.action_report_mod_invoice')
pdf_content, _ = report._render_qweb_pdf(report.id, [self.id])
client_name = (so.partner_id.name or 'Client').replace(' ', '_').replace(',', '')
name_parts = (so.partner_id.name or 'Client').strip().split()
first = name_parts[0] if name_parts else 'Client'
last = name_parts[-1] if len(name_parts) > 1 else ''
att = Attachment.create({
'name': f'Invoice - {client_name} - {self.name}.pdf',
'name': f'{first}_{last}_MOD_Invoice_{self.name}.pdf',
'type': 'binary',
'datas': base64.b64encode(pdf_content),
'res_model': 'account.move',

View File

@@ -2,8 +2,12 @@
# Copyright 2024-2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
import logging
import requests
from odoo import models, fields, api
_logger = logging.getLogger(__name__)
class ResPartner(models.Model):
_inherit = 'res.partner'
@@ -14,6 +18,65 @@ class ResPartner(models.Model):
'Used as origin for first travel time calculation. '
'If empty, the company default HQ address is used.',
)
x_fc_start_address_lat = fields.Float(
string='Start Latitude', digits=(10, 7),
)
x_fc_start_address_lng = fields.Float(
string='Start Longitude', digits=(10, 7),
)
def _geocode_start_address(self, address):
if not address or not address.strip():
return 0.0, 0.0
api_key = self.env['ir.config_parameter'].sudo().get_param(
'fusion_claims.google_maps_api_key', '')
if not api_key:
return 0.0, 0.0
try:
resp = requests.get(
'https://maps.googleapis.com/maps/api/geocode/json',
params={'address': address.strip(), 'key': api_key, 'region': 'ca'},
timeout=10,
)
data = resp.json()
if data.get('status') == 'OK' and data.get('results'):
loc = data['results'][0]['geometry']['location']
return loc['lat'], loc['lng']
except Exception as e:
_logger.warning("Start address geocoding failed for '%s': %s", address, e)
return 0.0, 0.0
@api.model_create_multi
def create(self, vals_list):
records = super().create(vals_list)
for rec, vals in zip(records, vals_list):
addr = vals.get('x_fc_start_address')
if addr:
lat, lng = rec._geocode_start_address(addr)
if lat and lng:
rec.write({
'x_fc_start_address_lat': lat,
'x_fc_start_address_lng': lng,
})
return records
def write(self, vals):
res = super().write(vals)
if 'x_fc_start_address' in vals:
addr = vals['x_fc_start_address']
if addr and addr.strip():
lat, lng = self._geocode_start_address(addr)
if lat and lng:
super().write({
'x_fc_start_address_lat': lat,
'x_fc_start_address_lng': lng,
})
else:
super().write({
'x_fc_start_address_lat': 0.0,
'x_fc_start_address_lng': 0.0,
})
return res
# ==========================================================================
# CONTACT TYPE

View File

@@ -22,7 +22,7 @@ class SaleOrder(models.Model):
for order in self:
name = order.name or ''
if order.partner_id and order.partner_id.name:
name = f"{name} -- {order.partner_id.name}"
name = f"{name} - {order.partner_id.name}"
order.display_name = name
# ==========================================================================
@@ -1318,17 +1318,12 @@ class SaleOrder(models.Model):
att_ids = []
att_names = []
# 1. Signed SA Form -- reuse existing attachment created by attachment=True
signed_field = 'x_fc_sa_signed_form' if self.x_fc_sa_signed_form else (
'x_fc_sa_physical_signed_copy' if self.x_fc_sa_physical_signed_copy else None)
if signed_field:
att = Attachment.search([
('res_model', '=', 'sale.order'),
('res_id', '=', self.id),
('res_field', '=', signed_field),
], order='id desc', limit=1)
if att:
att_ids.append(att.id)
att_id = self._get_and_prepare_field_attachment(signed_field, 'Signed SA Form')
if att_id:
att_ids.append(att_id)
att_names.append('Signed SA Form')
# 2. Internal POD -- generate on-the-fly from the standard report
@@ -1353,12 +1348,14 @@ class SaleOrder(models.Model):
try:
report = self.env.ref('account.account_invoices')
pdf_content, _ct = report._render_qweb_pdf(report.id, [invoice.id])
first, last = self._get_client_name_parts()
att = Attachment.create({
'name': f'Invoice_{invoice.name}.pdf',
'name': f'{first}_{last}_Invoice_{invoice.name}.pdf',
'type': 'binary',
'datas': base64.b64encode(pdf_content),
'res_model': 'sale.order',
'res_id': self.id,
'mimetype': 'application/pdf',
})
att_ids.append(att.id)
att_names.append(f'Invoice ({invoice.name})')
@@ -1388,16 +1385,10 @@ class SaleOrder(models.Model):
att_ids = []
att_names = []
# 1. Approval document
if self.x_fc_odsp_approval_document:
att = Attachment.search([
('res_model', '=', 'sale.order'),
('res_id', '=', self.id),
('res_field', '=', 'x_fc_odsp_approval_document'),
], order='id desc', limit=1)
if att:
att_ids.append(att.id)
att_names.append('ODSP Approval Document')
att_id = self._get_and_prepare_field_attachment('x_fc_odsp_approval_document', 'ODSP Approval Document')
if att_id:
att_ids.append(att_id)
att_names.append('ODSP Approval Document')
# 2. Internal POD
try:
@@ -1426,12 +1417,14 @@ class SaleOrder(models.Model):
try:
report = self.env.ref('account.account_invoices')
pdf_content, _ct = report._render_qweb_pdf(report.id, [invoice.id])
first, last = self._get_client_name_parts()
att = Attachment.create({
'name': f'Invoice_{invoice.name}.pdf',
'name': f'{first}_{last}_Invoice_{invoice.name}.pdf',
'type': 'binary',
'datas': base64.b64encode(pdf_content),
'res_model': 'sale.order',
'res_id': self.id,
'mimetype': 'application/pdf',
})
att_ids.append(att.id)
att_names.append(f'Invoice ({invoice.name})')
@@ -1680,44 +1673,6 @@ class SaleOrder(models.Model):
)
_logger.info("POD signature applied to approval form for %s", self.name)
# ==========================================================================
# DELIVERY STATUS FIELDS
# ==========================================================================
x_fc_delivery_status = fields.Selection(
selection=[
('waiting', 'Waiting'),
('waiting_approval', 'Waiting for Approval'),
('ready', 'Ready for Delivery'),
('scheduled', 'Delivery Scheduled'),
('shipped_warehouse', 'Shipped to Warehouse'),
('received_warehouse', 'Received in Warehouse'),
('delivered', 'Delivered'),
('hold', 'Hold'),
('cancelled', 'Cancelled'),
],
string='Delivery Status',
tracking=True,
help='Current delivery status of the order',
)
x_fc_delivery_datetime = fields.Datetime(
string='Delivery Date & Time',
tracking=True,
help='Scheduled or actual delivery date and time',
)
# Computed field to show/hide delivery datetime
x_fc_show_delivery_datetime = fields.Boolean(
compute='_compute_show_delivery_datetime',
string='Show Delivery DateTime',
)
@api.depends('x_fc_delivery_status')
def _compute_show_delivery_datetime(self):
"""Compute whether to show delivery datetime field."""
for order in self:
order.x_fc_show_delivery_datetime = order.x_fc_delivery_status in ('scheduled', 'delivered')
# ==========================================================================
# ADP CLAIM FIELDS
# ==========================================================================
@@ -1729,11 +1684,11 @@ class SaleOrder(models.Model):
)
x_fc_client_ref_1 = fields.Char(
string='Client Reference 1',
help='Primary client reference (e.g., Health Card Number)',
help='First two letters of the client\'s first name and last two letters of their last name. Example: John Doe = JODO',
)
x_fc_client_ref_2 = fields.Char(
string='Client Reference 2',
help='Secondary client reference',
help='Last four digits of the client\'s health card number. Example: 1234',
)
x_fc_adp_delivery_date = fields.Date(
string='ADP Delivery Date',
@@ -2988,10 +2943,136 @@ class SaleOrder(models.Model):
# ==========================================================================
# PDF DOCUMENT PREVIEW ACTIONS (opens in new tab using browser/system PDF handler)
# ==========================================================================
MIME_TO_EXT = {
'application/pdf': '.pdf',
'application/xml': '.xml',
'text/xml': '.xml',
'text/plain': '.txt',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '.docx',
'application/msword': '.doc',
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': '.xlsx',
'application/vnd.ms-excel': '.xls',
'image/jpeg': '.jpg',
'image/png': '.png',
'image/gif': '.gif',
'image/webp': '.webp',
'application/zip': '.zip',
'application/octet-stream': '',
}
FIELD_NAME_TEMPLATE = {
'x_fc_final_submitted_application': '{first}_{last}.pdf',
'x_fc_xml_file': '{first}_{last}_data.xml',
'x_fc_original_application': '{first}_{last}_Original_Application.pdf',
'x_fc_signed_pages_11_12': '{first}_{last}_Signed_Pages.pdf',
'x_fc_proof_of_delivery': '{first}_{last}_Proof_of_Delivery.pdf',
'x_fc_approval_letter': '{first}_{last}_Approval_Letter.pdf',
'x_fc_sa_signed_form': '{first}_{last}_SA_Form_Signed.pdf',
'x_fc_sa_physical_signed_copy': '{first}_{last}_SA_Form_Signed.pdf',
'x_fc_sa_approval_form': '{first}_{last}_SA_Approval.pdf',
'x_fc_odsp_approval_document': '{first}_{last}_ODSP_Approval.pdf',
'x_fc_odsp_authorizer_letter': '{first}_{last}_ODSP_Authorizer_Letter.pdf',
'x_fc_ow_discretionary_form': '{first}_{last}_OW_Discretionary_Form.pdf',
'x_fc_ow_authorizer_letter': '{first}_{last}_OW_Authorizer_Letter.pdf',
'x_fc_mod_drawing': '{first}_{last}_Drawing.pdf',
'x_fc_mod_initial_photos': '{first}_{last}_Initial_Photos.pdf',
'x_fc_mod_pca_document': '{first}_{last}_PCA_Document.pdf',
'x_fc_mod_proof_of_delivery': '{first}_{last}_Proof_of_Delivery.pdf',
'x_fc_mod_completion_photos': '{first}_{last}_Completion_Photos.pdf',
}
FIELD_FILENAME_MAP = {
'x_fc_original_application': 'x_fc_original_application_filename',
'x_fc_signed_pages_11_12': 'x_fc_signed_pages_filename',
'x_fc_final_submitted_application': 'x_fc_final_application_filename',
'x_fc_xml_file': 'x_fc_xml_filename',
'x_fc_proof_of_delivery': 'x_fc_proof_of_delivery_filename',
'x_fc_approval_letter': 'x_fc_approval_letter_filename',
'x_fc_sa_signed_form': 'x_fc_sa_signed_form_filename',
'x_fc_sa_physical_signed_copy': 'x_fc_sa_physical_signed_copy_filename',
'x_fc_sa_approval_form': 'x_fc_sa_approval_form_filename',
'x_fc_odsp_approval_document': 'x_fc_odsp_approval_document_filename',
'x_fc_odsp_authorizer_letter': 'x_fc_odsp_authorizer_letter_filename',
'x_fc_ow_discretionary_form': 'x_fc_ow_discretionary_form_filename',
'x_fc_ow_authorizer_letter': 'x_fc_ow_authorizer_letter_filename',
'x_fc_mod_drawing': 'x_fc_mod_drawing_filename',
'x_fc_mod_initial_photos': 'x_fc_mod_initial_photos_filename',
'x_fc_mod_pca_document': 'x_fc_mod_pca_filename',
'x_fc_mod_proof_of_delivery': 'x_fc_mod_pod_filename',
'x_fc_mod_completion_photos': 'x_fc_mod_completion_photos_filename',
}
def _get_ext_from_mime(self, mimetype):
"""Return a file extension (with dot) for a MIME type."""
return self.MIME_TO_EXT.get(mimetype or '', '')
def _get_client_name_parts(self):
"""Return (first_name, last_name) cleaned for filenames."""
full_name = (self.partner_id.name or 'Client').strip()
parts = full_name.split()
first = parts[0] if parts else 'Client'
last = parts[-1] if len(parts) > 1 else ''
clean = lambda s: s.replace(',', '').replace("'", '').replace('"', '')
return clean(first), clean(last)
def _build_attachment_name(self, field_name, mimetype=None):
"""Build the proper filename for a field attachment.
Uses FIELD_NAME_TEMPLATE for known fields with Firstname_Lastname convention.
For the XML file, respects the actual mimetype (could be .xml, .docx, .txt).
"""
first, last = self._get_client_name_parts()
template = self.FIELD_NAME_TEMPLATE.get(field_name)
if template:
name = template.format(first=first, last=last)
if field_name == 'x_fc_xml_file' and mimetype:
ext = self._get_ext_from_mime(mimetype)
if ext and ext != '.xml':
name = f'{first}_{last}_data{ext}'
return name
ext = self._get_ext_from_mime(mimetype) if mimetype else '.pdf'
return f'{first}_{last}_Document{ext}'
def _prepare_attachment_for_email(self, attachment, field_name=None, label=None):
"""Rename an attachment to a clean, professional filename.
Always renames to the standard convention (Firstname_Lastname pattern)
so recipients get consistently named files regardless of what was uploaded.
"""
if not attachment:
return None
new_name = self._build_attachment_name(field_name, attachment.mimetype)
if attachment.name == new_name:
return attachment.id
try:
attachment.sudo().write({'name': new_name})
except Exception:
_logger.warning("Could not rename attachment %s to %s", attachment.id, new_name)
return attachment.id
def _get_and_prepare_field_attachment(self, field_name, label=None):
"""Find the ir.attachment for a binary field, rename it properly, return its id.
Convenience wrapper combining _get_document_attachment + _prepare_attachment_for_email.
Returns None if the field has no data or attachment is not found.
"""
self.ensure_one()
if not getattr(self, field_name, None):
return None
attachment = self._get_document_attachment(field_name)
if not attachment:
return None
return self._prepare_attachment_for_email(attachment, field_name=field_name, label=label)
def _get_document_attachment(self, field_name):
"""Get the ir.attachment record for a binary field stored as attachment."""
self.ensure_one()
# Find the attachment by field name - get most recent one
attachment = self.env['ir.attachment'].sudo().search([
('res_model', '=', 'sale.order'),
('res_id', '=', self.id),
@@ -3001,9 +3082,9 @@ class SaleOrder(models.Model):
def _get_or_create_attachment(self, field_name, document_label):
"""Get the current attachment for a binary field (attachment=True).
For attachment=True fields, Odoo creates attachments automatically.
We find the one with res_field set and return it.
We find the one with res_field set, ensure it has a proper name, and return it.
"""
self.ensure_one()
@@ -3011,43 +3092,20 @@ class SaleOrder(models.Model):
if not data:
return None
# For attachment=True fields, Odoo creates/updates an attachment with res_field set
attachment = self.env['ir.attachment'].sudo().search([
('res_model', '=', 'sale.order'),
('res_id', '=', self.id),
('res_field', '=', field_name),
], order='id desc', limit=1)
if attachment:
# If attachment name is the field name (Odoo default), use the actual filename
if attachment.name == field_name:
filename_mapping = {
'x_fc_original_application': 'x_fc_original_application_filename',
'x_fc_signed_pages_11_12': 'x_fc_signed_pages_filename',
'x_fc_final_submitted_application': 'x_fc_final_application_filename',
'x_fc_xml_file': 'x_fc_xml_filename',
'x_fc_proof_of_delivery': 'x_fc_proof_of_delivery_filename',
}
filename_field = filename_mapping.get(field_name)
if filename_field:
filename = getattr(self, filename_field, None)
if filename and filename != field_name:
attachment.sudo().write({'name': filename})
self._prepare_attachment_for_email(attachment, field_name=field_name, label=document_label)
return attachment
# Fallback: create attachment manually (shouldn't happen for attachment=True fields)
filename_mapping = {
'x_fc_original_application': 'x_fc_original_application_filename',
'x_fc_signed_pages_11_12': 'x_fc_signed_pages_filename',
'x_fc_final_submitted_application': 'x_fc_final_application_filename',
'x_fc_xml_file': 'x_fc_xml_filename',
'x_fc_proof_of_delivery': 'x_fc_proof_of_delivery_filename',
}
filename_field = filename_mapping.get(field_name)
filename = getattr(self, filename_field) if filename_field else f'{document_label}.pdf'
filename = self._build_attachment_name(field_name)
attachment = self.env['ir.attachment'].sudo().create({
'name': filename or f'{document_label}.pdf',
'name': filename,
'datas': data,
'res_model': 'sale.order',
'res_id': self.id,
@@ -3399,16 +3457,20 @@ class SaleOrder(models.Model):
}
def action_complete_assessment(self):
"""Open wizard to mark assessment as completed with date."""
"""Open wizard to mark assessment as completed with date.
Allowed from 'quotation' (override) or 'assessment_scheduled' (normal flow)."""
self.ensure_one()
if self.x_fc_adp_application_status != 'assessment_scheduled':
raise UserError("Can only complete assessment from 'Assessment Scheduled' status.")
if self.x_fc_adp_application_status not in ('quotation', 'assessment_scheduled'):
raise UserError(
_("Can only complete assessment from 'Quotation' or 'Assessment Scheduled' status.")
)
return {
'name': 'Assessment Completed',
'type': 'ir.actions.act_window',
'res_model': 'fusion_claims.assessment.completed.wizard',
'view_mode': 'form',
'views': [(False, 'form')],
'target': 'new',
'context': {
'active_id': self.id,
@@ -4627,82 +4689,67 @@ class SaleOrder(models.Model):
# ==========================================================================
# DOCUMENT CHATTER POSTING
# ==========================================================================
def _post_document_to_chatter(self, field_name, document_label=None):
"""Post a document attachment to the chatter with a link.
def _post_document_to_chatter(self, field_name, document_label=None, preserve_copy=False):
"""Post a document to the chatter, reusing the existing field attachment.
By default, references the existing ir.attachment (created by Odoo for
attachment=True fields) instead of creating a duplicate.
Args:
field_name: The binary field name (e.g., 'x_fc_final_submitted_application')
document_label: Optional label for the document (defaults to field string)
preserve_copy: If True, creates a separate copy (used when the original
is about to be deleted/replaced and we need to keep a snapshot).
"""
self.ensure_one()
# Map field names to filename fields
filename_mapping = {
'x_fc_original_application': 'x_fc_original_application_filename',
'x_fc_signed_pages_11_12': 'x_fc_signed_pages_filename',
'x_fc_final_submitted_application': 'x_fc_final_application_filename',
'x_fc_xml_file': 'x_fc_xml_filename',
'x_fc_proof_of_delivery': 'x_fc_proof_of_delivery_filename',
}
data_field = field_name
filename_field = filename_mapping.get(field_name, field_name + '_filename')
data = getattr(self, data_field, None)
original_filename = getattr(self, filename_field, None) or 'document'
data = getattr(self, field_name, None)
if not data:
return
# Get document label from field definition if not provided
if not document_label:
field_obj = self._fields.get(data_field)
document_label = field_obj.string if field_obj else data_field
# Check for existing attachments with same name for revision numbering
existing_count = self.env['ir.attachment'].sudo().search_count([
('res_model', '=', 'sale.order'),
('res_id', '=', self.id),
('name', '=like', original_filename.rsplit('.', 1)[0] + '%'),
])
# Add revision number if this is a replacement
if existing_count > 0 and '(replaced)' in (document_label or ''):
# This is an old document being replaced - add revision number
base_name, ext = original_filename.rsplit('.', 1) if '.' in original_filename else (original_filename, '')
filename = f"R{existing_count}_{base_name}.{ext}" if ext else f"R{existing_count}_{base_name}"
field_obj = self._fields.get(field_name)
document_label = field_obj.string if field_obj else field_name
if preserve_copy:
proper_name = self._build_attachment_name(field_name)
base, _, ext = proper_name.rpartition('.')
if base:
copy_name = f"{base}_archived.{ext}"
else:
copy_name = f"{proper_name}_archived"
attachment = self.env['ir.attachment'].sudo().create({
'name': copy_name,
'datas': data,
'res_model': 'sale.order',
'res_id': self.id,
})
else:
filename = original_filename
# Create attachment with the original/revised filename
attachment = self.env['ir.attachment'].sudo().create({
'name': filename,
'datas': data,
'res_model': 'sale.order',
'res_id': self.id,
})
# Post message with attachment (shows as native Odoo attachment with preview)
attachment = self._get_document_attachment(field_name)
if not attachment:
return
self._prepare_attachment_for_email(attachment, field_name=field_name)
user_name = self.env.user.name
now = fields.Datetime.now()
body = Markup("""
<p><strong>{label}</strong> uploaded by <b>{user}</b></p>
<p class="text-muted small">{timestamp}</p>
""").format(
body = Markup(
'<p><strong>{label}</strong> uploaded by <b>{user}</b></p>'
'<p class="text-muted small">{timestamp}</p>'
).format(
label=document_label,
user=user_name,
timestamp=now.strftime('%Y-%m-%d %H:%M:%S')
timestamp=now.strftime('%Y-%m-%d %H:%M:%S'),
)
# Use attachment_ids to show as native attachment with preview capability
self.message_post(
body=body,
message_type='notification',
subtype_xmlid='mail.mt_note',
attachment_ids=[attachment.id],
)
return attachment
# ==========================================================================
@@ -4844,28 +4891,15 @@ class SaleOrder(models.Model):
if not to_emails and not cc_emails:
return False
# Reuse existing field attachments (created by Odoo for attachment=True fields)
# instead of creating duplicates
attachments = []
attachment_names = []
Attachment = self.env['ir.attachment'].sudo()
if self.x_fc_final_submitted_application:
att = Attachment.search([
('res_model', '=', 'sale.order'),
('res_id', '=', self.id),
('res_field', '=', 'x_fc_final_submitted_application'),
], order='id desc', limit=1)
if att:
attachments.append(att.id)
att_id = self._get_and_prepare_field_attachment('x_fc_final_submitted_application', 'ADP Application')
if att_id:
attachments.append(att_id)
attachment_names.append('Final ADP Application (PDF)')
if self.x_fc_xml_file:
att = Attachment.search([
('res_model', '=', 'sale.order'),
('res_id', '=', self.id),
('res_field', '=', 'x_fc_xml_file'),
], order='id desc', limit=1)
if att:
attachments.append(att.id)
att_id = self._get_and_prepare_field_attachment('x_fc_xml_file', 'ADP XML Data')
if att_id:
attachments.append(att_id)
attachment_names.append('XML Data File')
client_name = recipients.get('client', self.partner_id).name or 'Client'
@@ -5113,8 +5147,144 @@ class SaleOrder(models.Model):
_logger.error(f"Failed to send billed email for {self.name}: {e}")
return False
def _build_approved_items_html(self, for_pdf=False):
"""Build an HTML table of approved order line items.
Columns: S/N, ADP Code, Device Type, Product Name, Qty,
ADP Portion, Client Portion, Deduction.
"""
self.ensure_one()
lines = self.order_line.filtered(
lambda l: l.product_id and l.display_type not in ('line_section', 'line_note')
)
if not lines:
return ''
font = "font-family:-apple-system,BlinkMacSystemFont,'Segoe UI',Roboto,Arial,sans-serif;"
if for_pdf:
font = "font-family:Arial,Helvetica,sans-serif;"
hdr_style = (
f'style="background:#2d3748;color:#fff;padding:8px 10px;'
f'font-size:11px;font-weight:600;text-align:left;'
f'border-bottom:2px solid #4a5568;{font}"'
)
cell_style = (
'style="padding:7px 10px;font-size:12px;color:#2d3748;'
'border-bottom:1px solid #e2e8f0;"'
)
alt_row = 'style="background:#f7fafc;"'
amt_style = (
'style="padding:7px 10px;font-size:12px;color:#2d3748;'
'border-bottom:1px solid #e2e8f0;text-align:right;"'
)
hdr_r = hdr_style.replace('text-align:left', 'text-align:right')
has_deduction = any(
l.x_fc_deduction_type and l.x_fc_deduction_type != 'none'
for l in lines
)
html = (
'<div style="margin:20px 0;">'
f'<h3 style="color:#1a202c;font-size:15px;font-weight:700;'
f'margin:0 0 10px 0;{font}">Approved Items</h3>'
'<table style="width:100%;border-collapse:collapse;border:1px solid #e2e8f0;">'
'<thead><tr>'
f'<th {hdr_style}>S/N</th>'
f'<th {hdr_style}>ADP Code</th>'
f'<th {hdr_style}>Device Type</th>'
f'<th {hdr_style}>Product</th>'
f'<th {hdr_r}>Qty</th>'
f'<th {hdr_r}>ADP Portion</th>'
f'<th {hdr_r}>Client Portion</th>'
)
if has_deduction:
html += f'<th {hdr_r}>Deduction</th>'
html += '</tr></thead><tbody>'
total_adp = 0.0
total_client = 0.0
for idx, line in enumerate(lines, 1):
row_attr = alt_row if idx % 2 == 0 else ''
adp_code = line._get_adp_code_for_report()
device_type = line._get_adp_device_type()
product_name = line.product_id.name or '-'
if len(product_name) > 40 and not for_pdf:
product_name = product_name[:37] + '...'
qty = int(line.product_uom_qty) if line.product_uom_qty == int(line.product_uom_qty) else line.product_uom_qty
adp_portion = line.x_fc_adp_portion or 0.0
client_portion = line.x_fc_client_portion or 0.0
total_adp += adp_portion
total_client += client_portion
deduction_str = '-'
if line.x_fc_deduction_type == 'pct' and line.x_fc_deduction_value:
deduction_str = f'{line.x_fc_deduction_value:.0f}%'
elif line.x_fc_deduction_type == 'amt' and line.x_fc_deduction_value:
deduction_str = f'${line.x_fc_deduction_value:,.2f}'
html += f'<tr {row_attr}>'
html += f'<td {cell_style}>{idx}</td>'
html += f'<td {cell_style}>{adp_code}</td>'
html += f'<td {cell_style}>{device_type}</td>'
html += f'<td {cell_style}>{product_name}</td>'
html += f'<td {amt_style}>{qty}</td>'
html += f'<td {amt_style}>${adp_portion:,.2f}</td>'
html += f'<td {amt_style}>${client_portion:,.2f}</td>'
if has_deduction:
html += f'<td {amt_style}>{deduction_str}</td>'
html += '</tr>'
# Totals row
colspan = 5
total_style = (
'style="padding:8px 10px;font-size:12px;font-weight:700;'
'color:#1a202c;border-top:2px solid #2d3748;text-align:right;"'
)
total_label_style = (
f'style="padding:8px 10px;font-size:12px;font-weight:700;'
f'color:#1a202c;border-top:2px solid #2d3748;text-align:right;"'
)
html += f'<tr style="background:#edf2f7;">'
html += f'<td colspan="{colspan}" {total_label_style}>Total</td>'
html += f'<td {total_style}>${total_adp:,.2f}</td>'
html += f'<td {total_style}>${total_client:,.2f}</td>'
if has_deduction:
html += f'<td {total_style}></td>'
html += '</tr>'
html += '</tbody></table></div>'
return html
def _generate_approved_items_pdf(self):
"""Generate the Approved Items PDF using the QWeb report and return an ir.attachment id."""
self.ensure_one()
import base64
first, last = self._get_client_name_parts()
try:
report = self.env.ref('fusion_claims.action_report_approved_items')
pdf_content, _ = report._render_qweb_pdf(report.id, [self.id])
except Exception as e:
_logger.error("Failed to generate approved items PDF for %s: %s", self.name, e)
return None
filename = f'{first}_{last}_Approved_Items.pdf'
att = self.env['ir.attachment'].sudo().create({
'name': filename,
'type': 'binary',
'datas': base64.b64encode(pdf_content) if isinstance(pdf_content, bytes) else pdf_content,
'res_model': 'sale.order',
'res_id': self.id,
'mimetype': 'application/pdf',
})
return att.id
def _send_approval_email(self):
"""Send notification when ADP application is approved."""
"""Send notification when ADP application is approved, with approved items report."""
self.ensure_one()
if not self._is_email_notifications_enabled():
return False
@@ -5138,27 +5308,46 @@ class SaleOrder(models.Model):
'contact you with the details and next steps for delivery.'
)
items_html = self._build_approved_items_html()
body_html = self._email_build(
title='Application Approved',
summary=f'The ADP application for <strong>{client_name}</strong> has been '
f'<strong>{status_label.lower()}</strong>.',
email_type='success',
sections=[('Case Details', self._build_case_detail_rows(include_amounts=True))],
extra_html=items_html,
note=note_text,
note_color='#38a169',
attachments_note='Approved Items Report (PDF)' if items_html else None,
button_url=f'{self.get_base_url()}/web#id={self.id}&model=sale.order&view_type=form',
sender_name=sales_rep_name,
)
attachment_ids = []
try:
att_id = self._generate_approved_items_pdf()
if att_id:
attachment_ids.append(att_id)
except Exception as e:
_logger.warning("Could not generate approved items PDF for %s: %s", self.name, e)
email_to = ', '.join(to_emails)
email_cc = ', '.join(cc_emails) if cc_emails else ''
try:
self.env['mail.mail'].sudo().create({
mail_vals = {
'subject': f'Application {status_label} - {client_name} - {self.name}',
'body_html': body_html,
'email_to': email_to, 'email_cc': email_cc,
'model': 'sale.order', 'res_id': self.id,
}).send()
self._email_chatter_log(f'Application {status_label} email sent', email_to, email_cc)
}
if attachment_ids:
mail_vals['attachment_ids'] = [(6, 0, attachment_ids)]
self.env['mail.mail'].sudo().create(mail_vals).send()
self._email_chatter_log(
f'Application {status_label} email sent', email_to, email_cc,
['Attached: Approved Items Report'] if attachment_ids else None,
)
return True
except Exception as e:
_logger.error(f"Failed to send approval email for {self.name}: {e}")
@@ -5492,6 +5681,18 @@ class SaleOrder(models.Model):
# ==========================================================================
# OVERRIDE WRITE
# ==========================================================================
def web_save(self, vals, specification):
"""TEMP DEBUG: Intercept web_save to diagnose 'Missing required fields' on old orders."""
_logger.warning(
"DEBUG web_save() on %s: vals keys = %s",
[r.name for r in self], list(vals.keys())
)
try:
return super().web_save(vals, specification)
except Exception as e:
_logger.error("DEBUG web_save() FAILED on %s: %s", [r.name for r in self], e)
raise
def write(self, vals):
"""Override write to handle ADP status changes, date auto-population, and document tracking."""
from datetime import date as date_class
@@ -5690,19 +5891,17 @@ class SaleOrder(models.Model):
label = document_labels.get(field_name, field_name)
if old_data and new_data:
# REPLACEMENT: Old document being replaced with new one
# Preserve old document in chatter as attachment
order._post_document_to_chatter(
field_name,
f"{label} (replaced)"
field_name,
f"{label} (replaced)",
preserve_copy=True,
)
elif old_data and not new_data:
# DELETION: Document is being deleted
# Preserve the deleted document in chatter
order._post_document_to_chatter(
field_name,
f"{label} (DELETED)"
field_name,
f"{label} (DELETED)",
preserve_copy=True,
)
# Post deletion notice
@@ -5737,11 +5936,17 @@ class SaleOrder(models.Model):
for order in self:
# Post existing final application to chatter before clearing
if order.x_fc_final_submitted_application:
order._post_document_to_chatter('x_fc_final_submitted_application',
'Final Application (before correction)')
order._post_document_to_chatter(
'x_fc_final_submitted_application',
'Final Application (before correction)',
preserve_copy=True,
)
if order.x_fc_xml_file:
order._post_document_to_chatter('x_fc_xml_file',
'XML File (before correction)')
order._post_document_to_chatter(
'x_fc_xml_file',
'XML File (before correction)',
preserve_copy=True,
)
# Clear the document fields AND submission date
# Use _correction_cleared to prevent the audit trail from posting duplicates
@@ -6131,8 +6336,9 @@ class SaleOrder(models.Model):
for order in self:
order._send_correction_needed_email()
elif new_app_status == 'case_closed':
for order in self:
order._send_case_closed_email()
if not self.env.context.get('skip_status_emails'):
for order in self:
order._send_case_closed_email()
# ==================================================================
# MARCH OF DIMES STATUS-TRIGGERED EMAILS & SMS
@@ -6679,7 +6885,7 @@ class SaleOrder(models.Model):
for order in orders_to_close:
try:
# Use context to skip status validation for automated process
order.with_context(skip_status_validation=True).write({
order.with_context(skip_status_validation=True, skip_status_emails=True).write({
'x_fc_adp_application_status': 'case_closed',
})
@@ -6725,7 +6931,7 @@ class SaleOrder(models.Model):
if order.x_fc_odsp_division != 'ontario_works' and status != 'payment_received':
continue
try:
order._odsp_advance_status(
order.with_context(skip_status_emails=True)._odsp_advance_status(
'case_closed',
"Case automatically closed 7 days after %s." % status.replace('_', ' '),
)

View File

@@ -306,6 +306,49 @@ class SaleOrderLine(models.Model):
# 5. Final fallback - return default_code even if not in ADP database
return self.product_id.default_code or ''
def _get_adp_code_for_report(self):
"""Return the ADP device code for display on reports.
Uses the product's x_fc_adp_device_code field (not default_code).
Returns 'NON-FUNDED' for non-ADP products.
"""
self.ensure_one()
if not self.product_id:
return 'NON-FUNDED'
if self.product_id.is_non_adp_funded():
return 'NON-FUNDED'
product_tmpl = self.product_id.product_tmpl_id
code = ''
if hasattr(product_tmpl, 'x_fc_adp_device_code'):
code = getattr(product_tmpl, 'x_fc_adp_device_code', '') or ''
if not code and hasattr(product_tmpl, 'x_adp_code'):
code = getattr(product_tmpl, 'x_adp_code', '') or ''
if not code:
return 'NON-FUNDED'
ADPDevice = self.env['fusion.adp.device.code'].sudo()
if ADPDevice.search_count([('device_code', '=', code), ('active', '=', True)]) > 0:
return code
return 'NON-FUNDED'
def _get_adp_device_type(self):
"""Live lookup of device type from the ADP device code table.
Returns 'No Funding Available' for non-ADP products.
"""
self.ensure_one()
if not self.product_id or self.product_id.is_non_adp_funded():
return 'No Funding Available'
code = self._get_adp_code_for_report()
if code == 'NON-FUNDED':
return 'No Funding Available'
if self.x_fc_adp_device_type:
return self.x_fc_adp_device_type
adp_device = self.env['fusion.adp.device.code'].sudo().search([
('device_code', '=', code),
('active', '=', True),
], limit=1)
return adp_device.device_type if adp_device else 'No Funding Available'
def _get_serial_number(self):
"""Get serial number from mapped field or native field."""
self.ensure_one()

View File

@@ -350,6 +350,13 @@ class FusionTechnicianTask(models.Model):
compute='_compute_address_display',
)
# In-store flag -- uses company address instead of client address
is_in_store = fields.Boolean(
string='In Store',
default=False,
help='Task takes place at the store/office. Uses company address automatically.',
)
# Geocoding
address_lat = fields.Float(string='Latitude', digits=(10, 7))
address_lng = fields.Float(string='Longitude', digits=(10, 7))
@@ -382,6 +389,30 @@ class FusionTechnicianTask(models.Model):
string='Completed At',
tracking=True,
)
# GPS location captured at task actions
started_latitude = fields.Float(
string='Started Latitude', digits=(10, 7), readonly=True,
)
started_longitude = fields.Float(
string='Started Longitude', digits=(10, 7), readonly=True,
)
completed_latitude = fields.Float(
string='Completed Latitude', digits=(10, 7), readonly=True,
)
completed_longitude = fields.Float(
string='Completed Longitude', digits=(10, 7), readonly=True,
)
action_latitude = fields.Float(
string='Last Action Latitude', digits=(10, 7), readonly=True,
)
action_longitude = fields.Float(
string='Last Action Longitude', digits=(10, 7), readonly=True,
)
action_location_accuracy = fields.Float(
string='Location Accuracy (m)', readonly=True,
)
voice_note_audio = fields.Binary(
string='Voice Recording',
attachment=True,
@@ -961,9 +992,21 @@ class FusionTechnicianTask(models.Model):
# ONCHANGE - Auto-fill address from client
# ------------------------------------------------------------------
@api.onchange('is_in_store')
def _onchange_is_in_store(self):
"""Auto-fill company address when task is marked as in-store."""
if self.is_in_store:
company_partner = self.env.company.partner_id
if company_partner and company_partner.street:
self._fill_address_from_partner(company_partner)
else:
self.address_street = self.env.company.name or 'In Store'
@api.onchange('partner_id')
def _onchange_partner_id(self):
"""Auto-fill address fields from the selected client's address."""
if self.is_in_store:
return
if self.partner_id:
addr = self.partner_id
self.address_partner_id = addr.id
@@ -1046,6 +1089,19 @@ class FusionTechnicianTask(models.Model):
"A task must be linked to either a Sale Order (Case) or a Purchase Order."
))
@api.constrains('address_street', 'address_lat', 'address_lng', 'is_in_store')
def _check_address_required(self):
"""Non-in-store tasks must have a geocoded address."""
for task in self:
if task.x_fc_sync_source:
continue
if task.is_in_store:
continue
if not task.address_street:
raise ValidationError(_(
"A valid address is required. If this task is at the store, "
"please check the 'In Store' option."
))
@api.constrains('technician_id', 'additional_technician_ids',
'scheduled_date', 'time_start', 'time_end')
@@ -1334,6 +1390,14 @@ class FusionTechnicianTask(models.Model):
vals['name'] = self.env['ir.sequence'].next_by_code('fusion.technician.task') or _('New')
if not vals.get('x_fc_sync_uuid') and not vals.get('x_fc_sync_source'):
vals['x_fc_sync_uuid'] = str(uuid.uuid4())
# In-store tasks: auto-fill company address
if vals.get('is_in_store') and not vals.get('address_street'):
company_partner = self.env.company.partner_id
if company_partner and company_partner.street:
self._fill_address_vals(vals, company_partner)
else:
vals['address_street'] = self.env.company.name or 'In Store'
# Auto-populate address from sale order if not provided
if vals.get('sale_order_id') and not vals.get('address_street'):
order = self.env['sale.order'].browse(vals['sale_order_id'])
@@ -1676,6 +1740,22 @@ class FusionTechnicianTask(models.Model):
"Please complete previous task %s first before starting this one."
) % earlier_incomplete.name)
def _write_action_location(self, extra_vals=None):
"""Write GPS coordinates from context onto the task record."""
ctx = self.env.context
lat = ctx.get('action_latitude', 0)
lng = ctx.get('action_longitude', 0)
acc = ctx.get('action_accuracy', 0)
vals = {
'action_latitude': lat,
'action_longitude': lng,
'action_location_accuracy': acc,
}
if extra_vals:
vals.update(extra_vals)
if lat and lng:
self.with_context(skip_travel_recalc=True).write(vals)
def action_start_en_route(self):
"""Mark task as En Route."""
for task in self:
@@ -1683,6 +1763,7 @@ class FusionTechnicianTask(models.Model):
raise UserError(_("Only scheduled tasks can be marked as En Route."))
task._check_previous_tasks_completed()
task.status = 'en_route'
task._write_action_location()
task._post_status_message('en_route')
def action_start_task(self):
@@ -1692,6 +1773,11 @@ class FusionTechnicianTask(models.Model):
raise UserError(_("Task must be scheduled or en route to start."))
task._check_previous_tasks_completed()
task.status = 'in_progress'
ctx = self.env.context
task._write_action_location({
'started_latitude': ctx.get('action_latitude', 0),
'started_longitude': ctx.get('action_longitude', 0),
})
task._post_status_message('in_progress')
def action_view_sale_order(self):
@@ -1742,9 +1828,15 @@ class FusionTechnicianTask(models.Model):
"technician portal first."
))
ctx = self.env.context
task.with_context(skip_travel_recalc=True).write({
'status': 'completed',
'completion_datetime': fields.Datetime.now(),
'completed_latitude': ctx.get('action_latitude', 0),
'completed_longitude': ctx.get('action_longitude', 0),
'action_latitude': ctx.get('action_latitude', 0),
'action_longitude': ctx.get('action_longitude', 0),
'action_location_accuracy': ctx.get('action_accuracy', 0),
})
task._post_status_message('completed')
if task.completion_notes and (task.sale_order_id or task.purchase_order_id):
@@ -1811,6 +1903,7 @@ class FusionTechnicianTask(models.Model):
if task.status == 'completed':
raise UserError(_("Cannot cancel a completed task."))
task.status = 'cancelled'
task._write_action_location()
task._post_status_message('cancelled')
# If this was a delivery task linked to a sale order that is
# currently in "Ready for Delivery" -- revert the order back.
@@ -2302,20 +2395,144 @@ class FusionTechnicianTask(models.Model):
base_domain,
['name', 'partner_id', 'technician_id', 'task_type',
'address_lat', 'address_lng', 'address_display',
'time_start', 'time_start_display', 'time_end_display',
'time_start', 'time_end', 'time_start_display', 'time_end_display',
'status', 'scheduled_date', 'travel_time_minutes',
'x_fc_sync_client_name', 'x_fc_is_shadow', 'x_fc_sync_source'],
order='scheduled_date asc NULLS LAST, time_start asc',
limit=500,
)
locations = self.env['fusion.technician.location'].get_latest_locations()
tech_starts = self._get_tech_start_locations(tasks, api_key)
return {
'api_key': api_key,
'tasks': tasks,
'locations': locations,
'local_instance_id': local_instance,
'tech_start_locations': tech_starts,
}
@api.model
def _get_tech_start_locations(self, tasks, api_key):
"""Build a dict of technician start locations for route origins.
Priority per technician:
1. Today's fusion_clock check-in location (if module installed)
2. Personal start address (x_fc_start_address with cached lat/lng)
3. Company default HQ address
"""
tech_ids = {
t['technician_id'][0]
for t in tasks
if t.get('technician_id')
}
if not tech_ids:
return {}
result = {}
today = fields.Date.today()
clock_locations = self._get_clock_in_locations(tech_ids, today)
hq_address = (
self.env['ir.config_parameter'].sudo()
.get_param('fusion_claims.technician_start_address', '') or ''
).strip()
hq_lat, hq_lng = 0.0, 0.0
for uid in tech_ids:
if uid in clock_locations:
result[uid] = clock_locations[uid]
continue
user = self.env['res.users'].sudo().browse(uid)
if not user.exists():
continue
partner = user.partner_id
if partner.x_fc_start_address and partner.x_fc_start_address.strip():
lat = partner.x_fc_start_address_lat
lng = partner.x_fc_start_address_lng
if not lat or not lng:
lat, lng = self._geocode_address_string(
partner.x_fc_start_address, api_key)
if lat and lng:
partner.sudo().write({
'x_fc_start_address_lat': lat,
'x_fc_start_address_lng': lng,
})
if lat and lng:
result[uid] = {
'lat': lat, 'lng': lng,
'address': partner.x_fc_start_address.strip(),
'source': 'start_address',
}
continue
if hq_address:
if not hq_lat and not hq_lng:
hq_lat, hq_lng = self._geocode_address_string(
hq_address, api_key)
if hq_lat and hq_lng:
result[uid] = {
'lat': hq_lat, 'lng': hq_lng,
'address': hq_address,
'source': 'company_hq',
}
return result
@api.model
def _get_clock_in_locations(self, tech_ids, today):
"""Get today's clock-in lat/lng from fusion_clock if installed."""
result = {}
try:
module = self.env['ir.module.module'].sudo().search([
('name', '=', 'fusion_clock'),
('state', '=', 'installed'),
], limit=1)
if not module:
return result
except Exception:
return result
try:
Attendance = self.env['hr.attendance'].sudo()
Employee = self.env['hr.employee'].sudo()
except KeyError:
return result
employees = Employee.search([
('user_id', 'in', list(tech_ids)),
])
emp_to_user = {e.id: e.user_id.id for e in employees}
if not employees:
return result
today_start = dt_datetime.combine(today, dt_datetime.min.time())
today_end = today_start + timedelta(days=1)
attendances = Attendance.search([
('employee_id', 'in', employees.ids),
('check_in', '>=', today_start),
('check_in', '<', today_end),
], order='check_in asc')
for att in attendances:
uid = emp_to_user.get(att.employee_id.id)
if not uid or uid in result:
continue
loc = att.x_fclk_location_id if hasattr(att, 'x_fclk_location_id') else False
if loc and loc.latitude and loc.longitude:
result[uid] = {
'lat': loc.latitude,
'lng': loc.longitude,
'address': loc.address or loc.name or '',
'source': 'clock_in',
}
return result
def _geocode_address(self):
"""Geocode the task address using Google Geocoding API."""
self.ensure_one()
@@ -2573,12 +2790,14 @@ class FusionTechnicianTask(models.Model):
return f'{display_hour}:{minutes:02d} {period}'
def get_google_maps_url(self):
"""Get Google Maps navigation URL. Uses lat/lng coordinates to
navigate to the exact location (text addresses cause Google to
resolve to nearby business names instead)."""
"""Get Google Maps navigation URL using the text address so the
destination shows a proper street name instead of raw coordinates.
Returns a google.com/maps URL that Android auto-opens in the app;
iOS handling is done client-side via JS to launch comgooglemaps://."""
self.ensure_one()
if self.address_display:
addr = urllib.parse.quote(self.address_display)
return f'https://www.google.com/maps/dir/?api=1&destination={addr}&travelmode=driving'
if self.address_lat and self.address_lng:
return f'https://www.google.com/maps/dir/?api=1&destination={self.address_lat},{self.address_lng}&travelmode=driving'
elif self.address_display:
return f'https://www.google.com/maps/dir/?api=1&destination={urllib.parse.quote(self.address_display)}&travelmode=driving'
return ''