2484 lines
105 KiB
Python
2484 lines
105 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2024-2026 Nexa Systems Inc.
|
|
# License OPL-1 (Odoo Proprietary License v1.0)
|
|
|
|
"""
|
|
Fusion Technician Task
|
|
Scheduling and task management for field technicians.
|
|
Replaces Monday.com for technician schedule tracking.
|
|
"""
|
|
|
|
from odoo import models, fields, api, _
|
|
from odoo.exceptions import UserError, ValidationError
|
|
from odoo.osv import expression
|
|
from markupsafe import Markup
|
|
import logging
|
|
import json
|
|
import uuid
|
|
import requests
|
|
from datetime import datetime as dt_datetime, timedelta
|
|
import urllib.parse
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FusionTechnicianTask(models.Model):
|
|
_name = 'fusion.technician.task'
|
|
_description = 'Technician Task'
|
|
_order = 'scheduled_date, sequence, time_start, id'
|
|
_inherit = ['mail.thread', 'mail.activity.mixin']
|
|
_rec_name = 'name'
|
|
|
|
def _compute_display_name(self):
|
|
"""Richer display name: Client - Type | 9:00 AM - 10:00 AM."""
|
|
type_labels = dict(self._fields['task_type'].selection)
|
|
for task in self:
|
|
client = task.x_fc_sync_client_name if task.x_fc_sync_source else (task.partner_id.name or '')
|
|
ttype = type_labels.get(task.task_type, task.task_type or '')
|
|
start = self._float_to_time_str(task.time_start)
|
|
end = self._float_to_time_str(task.time_end)
|
|
parts = [client, ttype]
|
|
label = ' - '.join(p for p in parts if p)
|
|
if start and end:
|
|
label += f' | {start} - {end}'
|
|
task.display_name = label or task.name
|
|
|
|
# ------------------------------------------------------------------
|
|
# STORE HOURS HELPER
|
|
# ------------------------------------------------------------------
|
|
def _get_store_hours(self):
|
|
"""Return (open_hour, close_hour) from settings. Defaults 9.0 / 18.0."""
|
|
ICP = self.env['ir.config_parameter'].sudo()
|
|
try:
|
|
open_h = float(ICP.get_param('fusion_claims.store_open_hour', '9.0') or '9.0')
|
|
except (ValueError, TypeError):
|
|
open_h = 9.0
|
|
try:
|
|
close_h = float(ICP.get_param('fusion_claims.store_close_hour', '18.0') or '18.0')
|
|
except (ValueError, TypeError):
|
|
close_h = 18.0
|
|
return (open_h, close_h)
|
|
|
|
# ------------------------------------------------------------------
|
|
# CORE FIELDS
|
|
# ------------------------------------------------------------------
|
|
name = fields.Char(
|
|
string='Task Reference',
|
|
required=True,
|
|
copy=False,
|
|
readonly=True,
|
|
default=lambda self: _('New'),
|
|
)
|
|
active = fields.Boolean(default=True)
|
|
|
|
# Cross-instance sync fields
|
|
x_fc_sync_source = fields.Char(
|
|
'Source Instance', readonly=True, index=True,
|
|
help='Origin instance ID if this is a synced shadow task (e.g. westin, mobility)',
|
|
)
|
|
x_fc_sync_remote_id = fields.Integer(
|
|
'Remote Task ID', readonly=True,
|
|
help='ID of the task on the remote instance',
|
|
)
|
|
x_fc_sync_uuid = fields.Char(
|
|
'Sync UUID', readonly=True, index=True, copy=False,
|
|
help='Unique ID for cross-instance deduplication',
|
|
)
|
|
x_fc_is_shadow = fields.Boolean(
|
|
'Shadow Task', compute='_compute_is_shadow', store=True,
|
|
help='True if this task was synced from another instance',
|
|
)
|
|
x_fc_sync_client_name = fields.Char(
|
|
'Synced Client Name', readonly=True,
|
|
help='Client name from the remote instance (shadow tasks only)',
|
|
)
|
|
|
|
x_fc_source_label = fields.Char(
|
|
'Source', compute='_compute_is_shadow', store=True,
|
|
)
|
|
|
|
@api.depends('x_fc_sync_source')
|
|
def _compute_is_shadow(self):
|
|
local_id = self.env['ir.config_parameter'].sudo().get_param(
|
|
'fusion_claims.sync_instance_id', '')
|
|
for task in self:
|
|
task.x_fc_is_shadow = bool(task.x_fc_sync_source)
|
|
task.x_fc_source_label = task.x_fc_sync_source or local_id
|
|
|
|
technician_id = fields.Many2one(
|
|
'res.users',
|
|
string='Technician',
|
|
required=True,
|
|
tracking=True,
|
|
domain="[('x_fc_is_field_staff', '=', True)]",
|
|
help='Shows: users marked as Field Staff (technicians and sales reps)',
|
|
)
|
|
technician_name = fields.Char(
|
|
related='technician_id.name',
|
|
string='Technician Name',
|
|
store=True,
|
|
)
|
|
|
|
sale_order_id = fields.Many2one(
|
|
'sale.order',
|
|
string='Related Case',
|
|
tracking=True,
|
|
ondelete='restrict',
|
|
help='Sale order / case linked to this task',
|
|
)
|
|
sale_order_name = fields.Char(
|
|
related='sale_order_id.name',
|
|
string='Case Reference',
|
|
store=True,
|
|
)
|
|
|
|
purchase_order_id = fields.Many2one(
|
|
'purchase.order',
|
|
string='Related Purchase Order',
|
|
tracking=True,
|
|
ondelete='restrict',
|
|
help='Purchase order linked to this task (e.g. manufacturer pickup)',
|
|
)
|
|
purchase_order_name = fields.Char(
|
|
related='purchase_order_id.name',
|
|
string='PO Reference',
|
|
store=True,
|
|
)
|
|
|
|
task_type = fields.Selection([
|
|
('delivery', 'Delivery'),
|
|
('repair', 'Repair'),
|
|
('pickup', 'Pickup'),
|
|
('troubleshoot', 'Troubleshooting'),
|
|
('assessment', 'Assessment'),
|
|
('installation', 'Installation'),
|
|
('maintenance', 'Maintenance'),
|
|
('ltc_visit', 'LTC Visit'),
|
|
('other', 'Other'),
|
|
], string='Task Type', required=True, default='delivery', tracking=True)
|
|
|
|
facility_id = fields.Many2one(
|
|
'fusion.ltc.facility',
|
|
string='LTC Facility',
|
|
tracking=True,
|
|
help='LTC Home for this visit',
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# SCHEDULING
|
|
# ------------------------------------------------------------------
|
|
scheduled_date = fields.Date(
|
|
string='Scheduled Date',
|
|
tracking=True,
|
|
default=fields.Date.context_today,
|
|
index=True,
|
|
)
|
|
time_start = fields.Float(
|
|
string='Start Time',
|
|
help='Start time in hours (e.g. 9.5 = 9:30 AM)',
|
|
default=9.0,
|
|
)
|
|
time_end = fields.Float(
|
|
string='End Time',
|
|
help='End time in hours (e.g. 10.5 = 10:30 AM)',
|
|
default=10.0,
|
|
)
|
|
time_start_display = fields.Char(
|
|
string='Start',
|
|
compute='_compute_time_displays',
|
|
)
|
|
time_end_display = fields.Char(
|
|
string='End',
|
|
compute='_compute_time_displays',
|
|
)
|
|
# Legacy 12h selection fields -- kept for DB compatibility, hidden on form
|
|
time_start_12h = fields.Selection(
|
|
selection='_get_time_selection',
|
|
string='Start Time (12h)',
|
|
compute='_compute_time_12h',
|
|
inverse='_inverse_time_start_12h',
|
|
store=True,
|
|
)
|
|
time_end_12h = fields.Selection(
|
|
selection='_get_time_selection',
|
|
string='End Time (12h)',
|
|
compute='_compute_time_12h',
|
|
inverse='_inverse_time_end_12h',
|
|
store=True,
|
|
)
|
|
sequence = fields.Integer(
|
|
string='Sequence',
|
|
default=10,
|
|
help='Order of task within the day',
|
|
)
|
|
duration_hours = fields.Float(
|
|
string='Duration',
|
|
default=1.0,
|
|
help='Task duration in hours. Auto-calculates end time.',
|
|
)
|
|
|
|
# Task type -> default duration mapping
|
|
TASK_TYPE_DURATIONS = {
|
|
'delivery': 1.0,
|
|
'repair': 2.0,
|
|
'pickup': 0.5,
|
|
'troubleshoot': 1.5,
|
|
'assessment': 1.5,
|
|
'installation': 2.0,
|
|
'maintenance': 1.5,
|
|
'ltc_visit': 3.0,
|
|
'other': 1.0,
|
|
}
|
|
|
|
# Previous task travel warning banner
|
|
prev_task_summary_html = fields.Html(
|
|
string='Previous Task',
|
|
compute='_compute_prev_task_summary',
|
|
sanitize=False,
|
|
)
|
|
|
|
# Datetime fields for calendar view (computed from date + float time)
|
|
datetime_start = fields.Datetime(
|
|
string='Start',
|
|
compute='_compute_datetimes',
|
|
inverse='_inverse_datetime_start',
|
|
store=True,
|
|
help='Combined start datetime for calendar display',
|
|
)
|
|
datetime_end = fields.Datetime(
|
|
string='End',
|
|
compute='_compute_datetimes',
|
|
inverse='_inverse_datetime_end',
|
|
store=True,
|
|
help='Combined end datetime for calendar display',
|
|
)
|
|
|
|
# Schedule info helper for the form
|
|
schedule_info_html = fields.Html(
|
|
string='Schedule Info',
|
|
compute='_compute_schedule_info',
|
|
sanitize=False,
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# STATUS
|
|
# ------------------------------------------------------------------
|
|
status = fields.Selection([
|
|
('pending', 'Pending'),
|
|
('scheduled', 'Scheduled'),
|
|
('en_route', 'En Route'),
|
|
('in_progress', 'In Progress'),
|
|
('completed', 'Completed'),
|
|
('cancelled', 'Cancelled'),
|
|
('rescheduled', 'Rescheduled'),
|
|
], string='Status', default='scheduled', required=True, tracking=True, index=True)
|
|
|
|
priority = fields.Selection([
|
|
('0', 'Normal'),
|
|
('1', 'Urgent'),
|
|
('2', 'Emergency'),
|
|
], string='Priority', default='0')
|
|
|
|
color = fields.Integer(
|
|
string='Color Index',
|
|
compute='_compute_color',
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# CLIENT / ADDRESS
|
|
# ------------------------------------------------------------------
|
|
partner_id = fields.Many2one(
|
|
'res.partner',
|
|
string='Client',
|
|
tracking=True,
|
|
help='Client for this task',
|
|
)
|
|
partner_phone = fields.Char(
|
|
related='partner_id.phone',
|
|
string='Client Phone',
|
|
)
|
|
|
|
# Address fields - computed from shipping address or manually set
|
|
address_partner_id = fields.Many2one(
|
|
'res.partner',
|
|
string='Task Address',
|
|
help='Partner record containing the task address (usually shipping address)',
|
|
)
|
|
address_street = fields.Char(string='Street')
|
|
address_street2 = fields.Char(string='Unit/Suite #')
|
|
address_city = fields.Char(string='City')
|
|
address_state_id = fields.Many2one('res.country.state', string='Province')
|
|
address_zip = fields.Char(string='Postal Code')
|
|
address_buzz_code = fields.Char(string='Buzz Code', help='Building buzzer code for entry')
|
|
address_display = fields.Text(
|
|
string='Full Address',
|
|
compute='_compute_address_display',
|
|
)
|
|
|
|
# Geocoding
|
|
address_lat = fields.Float(string='Latitude', digits=(10, 7))
|
|
address_lng = fields.Float(string='Longitude', digits=(10, 7))
|
|
|
|
# ------------------------------------------------------------------
|
|
# TASK DETAILS
|
|
# ------------------------------------------------------------------
|
|
description = fields.Text(
|
|
string='Task Description',
|
|
help='What needs to be done',
|
|
)
|
|
equipment_needed = fields.Text(
|
|
string='Equipment / Materials Needed',
|
|
help='Tools and materials the technician should bring',
|
|
)
|
|
pod_required = fields.Boolean(
|
|
string='POD Required',
|
|
default=False,
|
|
help='Proof of Delivery signature required',
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# COMPLETION
|
|
# ------------------------------------------------------------------
|
|
completion_notes = fields.Html(
|
|
string='Completion Notes',
|
|
help='Notes from the technician about what was done',
|
|
)
|
|
completion_datetime = fields.Datetime(
|
|
string='Completed At',
|
|
tracking=True,
|
|
)
|
|
voice_note_audio = fields.Binary(
|
|
string='Voice Recording',
|
|
attachment=True,
|
|
)
|
|
voice_note_transcription = fields.Text(
|
|
string='Voice Transcription',
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# TRAVEL
|
|
# ------------------------------------------------------------------
|
|
travel_time_minutes = fields.Integer(
|
|
string='Travel Time (min)',
|
|
help='Estimated travel time from previous task in minutes',
|
|
)
|
|
travel_distance_km = fields.Float(
|
|
string='Travel Distance (km)',
|
|
digits=(8, 1),
|
|
)
|
|
travel_origin = fields.Char(
|
|
string='Travel From',
|
|
help='Origin address for travel calculation',
|
|
)
|
|
previous_task_id = fields.Many2one(
|
|
'fusion.technician.task',
|
|
string='Previous Task',
|
|
help='The task before this one in the schedule (for travel calculation)',
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# PUSH NOTIFICATION TRACKING
|
|
# ------------------------------------------------------------------
|
|
push_notified = fields.Boolean(
|
|
string='Push Notified',
|
|
default=False,
|
|
help='Whether a push notification was sent for this task',
|
|
)
|
|
push_notified_datetime = fields.Datetime(
|
|
string='Notified At',
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# RENTAL INSPECTION (added by fusion_rental)
|
|
# ------------------------------------------------------------------
|
|
rental_inspection_condition = fields.Selection([
|
|
('excellent', 'Excellent'),
|
|
('good', 'Good'),
|
|
('fair', 'Fair'),
|
|
('damaged', 'Damaged'),
|
|
], string='Inspection Condition')
|
|
rental_inspection_notes = fields.Text(
|
|
string='Inspection Notes',
|
|
)
|
|
rental_inspection_photo_ids = fields.Many2many(
|
|
'ir.attachment',
|
|
'technician_task_inspection_photo_rel',
|
|
'task_id',
|
|
'attachment_id',
|
|
string='Inspection Photos',
|
|
)
|
|
rental_inspection_completed = fields.Boolean(
|
|
string='Inspection Completed',
|
|
default=False,
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# COMPUTED FIELDS
|
|
# ------------------------------------------------------------------
|
|
|
|
# ------------------------------------------------------------------
|
|
# SLOT AVAILABILITY HELPERS
|
|
# ------------------------------------------------------------------
|
|
|
|
def _find_next_available_slot(self, tech_id, date, preferred_start=9.0,
|
|
duration=1.0, exclude_task_id=False,
|
|
dest_lat=0, dest_lng=0):
|
|
"""Find the next available time slot for a technician on a given date.
|
|
|
|
Scans all non-cancelled tasks for that tech+date, sorts them, and
|
|
walks through the day (9 AM - 6 PM) looking for a gap that fits
|
|
the requested duration PLUS travel time from the previous task.
|
|
|
|
:param tech_id: res.users id of the technician
|
|
:param date: date object for the day to check
|
|
:param preferred_start: float hour to start looking from (default 9.0)
|
|
:param duration: required slot length in hours (default 1.0)
|
|
:param exclude_task_id: task id to exclude (when editing an existing task)
|
|
:param dest_lat: latitude of the destination (new task location)
|
|
:param dest_lng: longitude of the destination (new task location)
|
|
:returns: (start_float, end_float) or (False, False) if fully booked
|
|
"""
|
|
STORE_OPEN, STORE_CLOSE = self._get_store_hours()
|
|
|
|
if not tech_id or not date:
|
|
return (preferred_start, preferred_start + duration)
|
|
|
|
domain = [
|
|
('technician_id', '=', tech_id),
|
|
('scheduled_date', '=', date),
|
|
('status', 'not in', ['cancelled']),
|
|
]
|
|
if exclude_task_id:
|
|
domain.append(('id', '!=', exclude_task_id))
|
|
|
|
booked = self.sudo().search(domain, order='time_start')
|
|
|
|
# Build sorted list of (start, end, lat, lng) intervals
|
|
intervals = []
|
|
for b in booked:
|
|
intervals.append((
|
|
max(b.time_start, STORE_OPEN),
|
|
min(b.time_end, STORE_CLOSE),
|
|
b.address_lat or 0,
|
|
b.address_lng or 0,
|
|
))
|
|
|
|
def _travel_hours(from_lat, from_lng, to_lat, to_lng):
|
|
"""Calculate travel time in hours between two locations.
|
|
Returns 0 if coordinates are missing. Rounds up to 15-min."""
|
|
if not from_lat or not from_lng or not to_lat or not to_lng:
|
|
return 0
|
|
travel_min = self._quick_travel_time(
|
|
from_lat, from_lng, to_lat, to_lng)
|
|
if travel_min > 0:
|
|
import math
|
|
return math.ceil(travel_min / 15.0) * 0.25
|
|
return 0
|
|
|
|
def _travel_from_prev(iv_lat, iv_lng):
|
|
"""Travel from a previous booked task TO the new task."""
|
|
return _travel_hours(iv_lat, iv_lng, dest_lat, dest_lng)
|
|
|
|
def _travel_to_next(next_lat, next_lng):
|
|
"""Travel FROM the new task TO the next booked task."""
|
|
return _travel_hours(dest_lat, dest_lng, next_lat, next_lng)
|
|
|
|
def _check_gap_fits(cursor, dur, idx):
|
|
"""Check if a slot at 'cursor' for 'dur' hours fits before
|
|
the interval at index 'idx' (accounting for travel TO that task)."""
|
|
if idx >= len(intervals):
|
|
return cursor + dur <= STORE_CLOSE
|
|
next_start, _ne, next_lat, next_lng = intervals[idx]
|
|
travel_fwd = _travel_to_next(next_lat, next_lng)
|
|
return cursor + dur + travel_fwd <= next_start
|
|
|
|
# Walk through gaps, starting from preferred_start
|
|
cursor = max(preferred_start, STORE_OPEN)
|
|
|
|
for i, (iv_start, iv_end, iv_lat, iv_lng) in enumerate(intervals):
|
|
if cursor + duration <= iv_start:
|
|
# Check travel time from new task end TO next booked task
|
|
if _check_gap_fits(cursor, duration, i):
|
|
return (cursor, cursor + duration)
|
|
# Not enough travel time -- try pushing start earlier or skip
|
|
# If we can't fit here, fall through to jump past this interval
|
|
# Jump past this booked interval + travel buffer from prev to new
|
|
new_cursor = max(cursor, iv_end)
|
|
travel = _travel_from_prev(iv_lat, iv_lng)
|
|
new_cursor += travel
|
|
# Snap to nearest 15 min
|
|
new_cursor = round(new_cursor * 4) / 4
|
|
cursor = new_cursor
|
|
|
|
# Check gap after last interval (no next task, so no forward travel needed)
|
|
if cursor + duration <= STORE_CLOSE:
|
|
return (cursor, cursor + duration)
|
|
|
|
# No gap found from preferred_start onward -- wrap and try from start
|
|
if preferred_start > STORE_OPEN:
|
|
cursor = STORE_OPEN
|
|
for i, (iv_start, iv_end, iv_lat, iv_lng) in enumerate(intervals):
|
|
if cursor + duration <= iv_start:
|
|
if _check_gap_fits(cursor, duration, i):
|
|
return (cursor, cursor + duration)
|
|
new_cursor = max(cursor, iv_end)
|
|
travel = _travel_from_prev(iv_lat, iv_lng)
|
|
new_cursor += travel
|
|
new_cursor = round(new_cursor * 4) / 4
|
|
cursor = new_cursor
|
|
if cursor + duration <= STORE_CLOSE:
|
|
return (cursor, cursor + duration)
|
|
|
|
return (False, False)
|
|
|
|
def _get_available_gaps(self, tech_id, date, exclude_task_id=False):
|
|
"""Return a list of available (start, end) gaps for a technician on a date.
|
|
|
|
Used by schedule_info_html to show green "available" badges.
|
|
"""
|
|
STORE_OPEN, STORE_CLOSE = self._get_store_hours()
|
|
|
|
if not tech_id or not date:
|
|
return [(STORE_OPEN, STORE_CLOSE)]
|
|
|
|
domain = [
|
|
('technician_id', '=', tech_id),
|
|
('scheduled_date', '=', date),
|
|
('status', 'not in', ['cancelled']),
|
|
]
|
|
if exclude_task_id:
|
|
domain.append(('id', '!=', exclude_task_id))
|
|
|
|
booked = self.sudo().search(domain, order='time_start')
|
|
intervals = [(max(b.time_start, STORE_OPEN), min(b.time_end, STORE_CLOSE))
|
|
for b in booked]
|
|
|
|
gaps = []
|
|
cursor = STORE_OPEN
|
|
for iv_start, iv_end in intervals:
|
|
if cursor < iv_start:
|
|
gaps.append((cursor, iv_start))
|
|
cursor = max(cursor, iv_end)
|
|
if cursor < STORE_CLOSE:
|
|
gaps.append((cursor, STORE_CLOSE))
|
|
return gaps
|
|
|
|
@api.model
|
|
def _get_time_selection(self):
|
|
"""Generate 12-hour time slots every 15 minutes, store hours only (9 AM - 6 PM)."""
|
|
times = []
|
|
for hour in range(9, 18): # 9 AM to 5:45 PM
|
|
for minute in (0, 15, 30, 45):
|
|
float_val = hour + minute / 60.0
|
|
key = f'{float_val:.2f}'
|
|
period = 'AM' if hour < 12 else 'PM'
|
|
display_hour = hour % 12 or 12
|
|
label = f'{display_hour}:{minute:02d} {period}'
|
|
times.append((key, label))
|
|
# Add 6:00 PM as end-time option
|
|
times.append(('18.00', '6:00 PM'))
|
|
return times
|
|
|
|
@api.depends('time_start', 'time_end')
|
|
def _compute_time_12h(self):
|
|
"""Sync the 12h selection fields from the raw float values."""
|
|
for task in self:
|
|
task.time_start_12h = f'{(task.time_start or 9.0):.2f}'
|
|
task.time_end_12h = f'{(task.time_end or 10.0):.2f}'
|
|
|
|
def _inverse_time_start_12h(self):
|
|
for task in self:
|
|
if task.time_start_12h:
|
|
task.time_start = float(task.time_start_12h)
|
|
|
|
def _inverse_time_end_12h(self):
|
|
for task in self:
|
|
if task.time_end_12h:
|
|
task.time_end = float(task.time_end_12h)
|
|
|
|
@api.depends('time_start', 'time_end')
|
|
def _compute_time_displays(self):
|
|
"""Convert float hours to readable time strings."""
|
|
for task in self:
|
|
task.time_start_display = self._float_to_time_str(task.time_start)
|
|
task.time_end_display = self._float_to_time_str(task.time_end)
|
|
|
|
@api.onchange('task_type')
|
|
def _onchange_task_type_duration(self):
|
|
"""Set default duration based on task type."""
|
|
if self.task_type:
|
|
self.duration_hours = self.TASK_TYPE_DURATIONS.get(self.task_type, 1.0)
|
|
# Also recalculate end time
|
|
if self.time_start:
|
|
_open, close = self._get_store_hours()
|
|
self.time_end = min(self.time_start + self.duration_hours, close)
|
|
|
|
@api.onchange('time_start', 'duration_hours')
|
|
def _onchange_compute_end_time(self):
|
|
"""Auto-compute end time from start + duration. Also run overlap check."""
|
|
if self.time_start and self.duration_hours:
|
|
_open, close = self._get_store_hours()
|
|
new_end = min(self.time_start + self.duration_hours, close)
|
|
self.time_end = new_end
|
|
# Run overlap snap if we have enough data
|
|
if self.technician_id and self.scheduled_date and self.time_start and self.time_end:
|
|
result = self._snap_if_overlap()
|
|
if result:
|
|
return result
|
|
|
|
@api.depends('scheduled_date', 'time_start', 'time_end')
|
|
def _compute_datetimes(self):
|
|
"""Combine date + float time into proper Datetime fields for calendar.
|
|
time_start/time_end are LOCAL hours; datetime_start/end must be UTC for Odoo."""
|
|
import pytz
|
|
user_tz = pytz.timezone(self.env.user.tz or 'UTC')
|
|
for task in self:
|
|
if task.scheduled_date:
|
|
# Build local datetime, then convert to UTC
|
|
base = dt_datetime.combine(task.scheduled_date, dt_datetime.min.time())
|
|
store_open, _close = task._get_store_hours()
|
|
local_start = user_tz.localize(base + timedelta(hours=task.time_start or store_open))
|
|
local_end = user_tz.localize(base + timedelta(hours=task.time_end or (store_open + 1.0)))
|
|
task.datetime_start = local_start.astimezone(pytz.utc).replace(tzinfo=None)
|
|
task.datetime_end = local_end.astimezone(pytz.utc).replace(tzinfo=None)
|
|
else:
|
|
task.datetime_start = False
|
|
task.datetime_end = False
|
|
|
|
def _inverse_datetime_start(self):
|
|
"""When datetime_start is changed (e.g. from calendar drag), update date + time."""
|
|
import pytz
|
|
user_tz = pytz.timezone(self.env.user.tz or 'UTC')
|
|
for task in self:
|
|
if task.datetime_start:
|
|
local_dt = pytz.utc.localize(task.datetime_start).astimezone(user_tz)
|
|
task.scheduled_date = local_dt.date()
|
|
task.time_start = local_dt.hour + local_dt.minute / 60.0
|
|
|
|
def _inverse_datetime_end(self):
|
|
"""When datetime_end is changed (e.g. from calendar resize), update time_end."""
|
|
import pytz
|
|
user_tz = pytz.timezone(self.env.user.tz or 'UTC')
|
|
for task in self:
|
|
if task.datetime_end:
|
|
local_dt = pytz.utc.localize(task.datetime_end).astimezone(user_tz)
|
|
task.time_end = local_dt.hour + local_dt.minute / 60.0
|
|
|
|
@api.depends('technician_id', 'scheduled_date')
|
|
def _compute_schedule_info(self):
|
|
"""Show booked + available time slots for the technician on the selected date."""
|
|
for task in self:
|
|
if not task.technician_id or not task.scheduled_date:
|
|
task.schedule_info_html = ''
|
|
continue
|
|
|
|
exclude_id = task.id if task.id else 0
|
|
# Find other tasks for the same technician+date
|
|
others = self.sudo().search([
|
|
('technician_id', '=', task.technician_id.id),
|
|
('scheduled_date', '=', task.scheduled_date),
|
|
('status', 'not in', ['cancelled']),
|
|
('id', '!=', exclude_id),
|
|
], order='time_start')
|
|
|
|
if not others:
|
|
s_open, s_close = self._get_store_hours()
|
|
open_str = self._float_to_time_str(s_open)
|
|
close_str = self._float_to_time_str(s_close)
|
|
task.schedule_info_html = Markup(
|
|
f'<div class="text-success" style="padding: 4px 0;">'
|
|
f'<i class="fa fa-check-circle"/> All slots available ({open_str} - {close_str})</div>'
|
|
)
|
|
continue
|
|
|
|
# Booked badges
|
|
booked_lines = []
|
|
for o in others:
|
|
start_str = self._float_to_time_str(o.time_start)
|
|
end_str = self._float_to_time_str(o.time_end)
|
|
type_label = dict(self._fields['task_type'].selection).get(o.task_type, o.task_type)
|
|
client_name = o.partner_id.name or ''
|
|
booked_lines.append(
|
|
f'<span class="badge text-bg-danger" style="margin:2px;">'
|
|
f'{start_str} - {end_str} ({type_label}{" - " + client_name if client_name else ""})'
|
|
f'</span>'
|
|
)
|
|
|
|
# Available gaps badges
|
|
gaps = self._get_available_gaps(
|
|
task.technician_id.id, task.scheduled_date,
|
|
exclude_task_id=exclude_id,
|
|
)
|
|
avail_lines = []
|
|
for g_start, g_end in gaps:
|
|
# Only show gaps >= 15 min
|
|
if g_end - g_start >= 0.25:
|
|
avail_lines.append(
|
|
f'<span class="badge text-bg-success" style="margin:2px;">'
|
|
f'{self._float_to_time_str(g_start)} - {self._float_to_time_str(g_end)}'
|
|
f'</span>'
|
|
)
|
|
|
|
html_parts = [
|
|
'<div style="padding:4px 0;">',
|
|
'<strong class="text-danger"><i class="fa fa-ban"/> Booked:</strong> ',
|
|
' '.join(booked_lines),
|
|
]
|
|
if avail_lines:
|
|
html_parts.append(
|
|
'<br/><strong class="text-success"><i class="fa fa-check-circle"/> '
|
|
'Available:</strong> '
|
|
+ ' '.join(avail_lines)
|
|
)
|
|
elif not avail_lines:
|
|
html_parts.append(
|
|
'<br/><span class="text-warning"><i class="fa fa-warning"/> '
|
|
'Fully booked</span>'
|
|
)
|
|
html_parts.append('</div>')
|
|
|
|
task.schedule_info_html = Markup(''.join(html_parts))
|
|
|
|
@api.depends('technician_id', 'scheduled_date', 'time_start',
|
|
'address_lat', 'address_lng', 'address_street')
|
|
def _compute_prev_task_summary(self):
|
|
"""Show previous task info + travel time warning with color coding."""
|
|
for task in self:
|
|
if not task.technician_id or not task.scheduled_date:
|
|
task.prev_task_summary_html = ''
|
|
continue
|
|
|
|
exclude_id = task.id if task.id else 0
|
|
# Find the task that ends just before this one starts
|
|
prev_tasks = self.sudo().search([
|
|
('technician_id', '=', task.technician_id.id),
|
|
('scheduled_date', '=', task.scheduled_date),
|
|
('status', 'not in', ['cancelled']),
|
|
('id', '!=', exclude_id),
|
|
('time_end', '<=', task.time_start or 99.0),
|
|
], order='time_end desc', limit=1)
|
|
|
|
if not prev_tasks:
|
|
# Check if this is the first task of the day -- show start location info
|
|
task.prev_task_summary_html = Markup(
|
|
'<div class="alert alert-info mb-0 p-2" style="font-size:0.9em;">'
|
|
'<i class="fa fa-info-circle"/> First task of the day -- '
|
|
'travel calculated from start location.</div>'
|
|
)
|
|
continue
|
|
|
|
prev = prev_tasks[0]
|
|
prev_start = self._float_to_time_str(prev.time_start)
|
|
prev_end = self._float_to_time_str(prev.time_end)
|
|
type_label = dict(self._fields['task_type'].selection).get(
|
|
prev.task_type, prev.task_type or '')
|
|
client_name = prev.partner_id.name or ''
|
|
prev_addr = prev.address_display or 'No address'
|
|
|
|
# Calculate gap between prev task end and this task start
|
|
s_open, _s_close = self._get_store_hours()
|
|
gap_hours = (task.time_start or s_open) - (prev.time_end or s_open)
|
|
gap_minutes = int(gap_hours * 60)
|
|
|
|
# Try to get travel time if both have coordinates
|
|
travel_minutes = 0
|
|
travel_text = ''
|
|
if (prev.address_lat and prev.address_lng and
|
|
task.address_lat and task.address_lng):
|
|
travel_minutes = self._quick_travel_time(
|
|
prev.address_lat, prev.address_lng,
|
|
task.address_lat, task.address_lng,
|
|
)
|
|
if travel_minutes > 0:
|
|
travel_text = f'{travel_minutes} min drive'
|
|
else:
|
|
travel_text = 'Could not calculate travel time'
|
|
elif prev.address_street and task.address_street:
|
|
travel_text = 'Save to calculate travel time'
|
|
else:
|
|
travel_text = 'Address missing -- cannot calculate travel'
|
|
|
|
# Determine color coding
|
|
if travel_minutes > 0 and gap_minutes >= travel_minutes:
|
|
bg_class = 'alert-success' # Green -- enough time
|
|
icon = 'fa-check-circle'
|
|
status_text = (
|
|
f'{gap_minutes} min gap -- enough travel time '
|
|
f'(~{travel_minutes} min drive)'
|
|
)
|
|
elif travel_minutes > 0 and gap_minutes > 0:
|
|
bg_class = 'alert-warning' # Yellow -- tight
|
|
icon = 'fa-exclamation-triangle'
|
|
status_text = (
|
|
f'{gap_minutes} min gap -- tight! '
|
|
f'Travel is ~{travel_minutes} min drive'
|
|
)
|
|
elif travel_minutes > 0 and gap_minutes <= 0:
|
|
bg_class = 'alert-danger' # Red -- impossible
|
|
icon = 'fa-times-circle'
|
|
status_text = (
|
|
f'No gap! Previous task ends at {prev_end}. '
|
|
f'Travel is ~{travel_minutes} min drive'
|
|
)
|
|
else:
|
|
bg_class = 'alert-info' # Blue -- no travel data yet
|
|
icon = 'fa-info-circle'
|
|
status_text = travel_text
|
|
|
|
html = (
|
|
f'<div class="alert {bg_class} mb-0 p-2" '
|
|
f'style="font-size:0.9em;">'
|
|
f'<i class="fa {icon}"/> '
|
|
f'<strong>Previous:</strong> {prev.name} '
|
|
f'({type_label}) {prev_start} - {prev_end}'
|
|
f'{" -- " + client_name if client_name else ""}'
|
|
f'<br/>'
|
|
f'<small><i class="fa fa-map-marker"/> {prev_addr}</small>'
|
|
f'<br/>'
|
|
f'<small><i class="fa fa-car"/> {status_text}</small>'
|
|
f'</div>'
|
|
)
|
|
task.prev_task_summary_html = Markup(html)
|
|
|
|
def _quick_travel_time(self, from_lat, from_lng, to_lat, to_lng):
|
|
"""Quick inline travel time calculation using Google Distance Matrix API.
|
|
Returns travel time in minutes, or 0 if unavailable."""
|
|
try:
|
|
api_key = self.env['ir.config_parameter'].sudo().get_param(
|
|
'fusion_claims.google_maps_api_key', '')
|
|
if not api_key:
|
|
return 0
|
|
|
|
url = 'https://maps.googleapis.com/maps/api/distancematrix/json'
|
|
params = {
|
|
'origins': f'{from_lat},{from_lng}',
|
|
'destinations': f'{to_lat},{to_lng}',
|
|
'mode': 'driving',
|
|
'avoid': 'tolls',
|
|
'departure_time': 'now',
|
|
'key': api_key,
|
|
}
|
|
resp = requests.get(url, params=params, timeout=5)
|
|
data = resp.json()
|
|
if data.get('status') == 'OK':
|
|
elements = data['rows'][0]['elements'][0]
|
|
if elements.get('status') == 'OK':
|
|
# Use duration_in_traffic if available, else duration
|
|
duration = elements.get(
|
|
'duration_in_traffic', elements.get('duration', {}))
|
|
seconds = duration.get('value', 0)
|
|
return max(1, int(seconds / 60))
|
|
except Exception:
|
|
_logger.warning('Failed to calculate travel time', exc_info=True)
|
|
return 0
|
|
|
|
@api.depends('status')
|
|
def _compute_color(self):
|
|
color_map = {
|
|
'pending': 5, # purple
|
|
'scheduled': 0, # grey
|
|
'en_route': 4, # blue
|
|
'in_progress': 2, # orange
|
|
'completed': 10, # green
|
|
'cancelled': 1, # red
|
|
'rescheduled': 3, # yellow
|
|
}
|
|
for task in self:
|
|
task.color = color_map.get(task.status, 0)
|
|
|
|
@api.depends('address_street', 'address_street2', 'address_city',
|
|
'address_state_id', 'address_zip')
|
|
def _compute_address_display(self):
|
|
for task in self:
|
|
street = task.address_street or ''
|
|
# If the street field already contains a full address (has a comma),
|
|
# use it directly -- Google Places stores the formatted address here.
|
|
if ',' in street and (
|
|
(task.address_city and task.address_city in street) or
|
|
(task.address_zip and task.address_zip in street)
|
|
):
|
|
# Street already has full address; just append unit if separate
|
|
if task.address_street2 and task.address_street2 not in street:
|
|
task.address_display = f"{street}, {task.address_street2}"
|
|
else:
|
|
task.address_display = street
|
|
else:
|
|
# Build from components (manual entry or legacy data)
|
|
parts = [
|
|
street,
|
|
task.address_street2,
|
|
task.address_city,
|
|
task.address_state_id.name if task.address_state_id else '',
|
|
task.address_zip,
|
|
]
|
|
task.address_display = ', '.join([p for p in parts if p])
|
|
|
|
# ------------------------------------------------------------------
|
|
# ONCHANGE - Auto-fill address from client
|
|
# ------------------------------------------------------------------
|
|
|
|
@api.onchange('partner_id')
|
|
def _onchange_partner_id(self):
|
|
"""Auto-fill address fields from the selected client's address."""
|
|
if self.partner_id:
|
|
addr = self.partner_id
|
|
self.address_partner_id = addr.id
|
|
self.address_street = addr.street or ''
|
|
self.address_street2 = addr.street2 or ''
|
|
self.address_city = addr.city or ''
|
|
self.address_state_id = addr.state_id.id if addr.state_id else False
|
|
self.address_zip = addr.zip or ''
|
|
self.address_lat = addr.x_fc_latitude if hasattr(addr, 'x_fc_latitude') and addr.x_fc_latitude else 0
|
|
self.address_lng = addr.x_fc_longitude if hasattr(addr, 'x_fc_longitude') and addr.x_fc_longitude else 0
|
|
|
|
@api.onchange('sale_order_id')
|
|
def _onchange_sale_order_id(self):
|
|
"""Auto-fill client and address from the sale order's shipping address."""
|
|
if self.sale_order_id:
|
|
self.purchase_order_id = False
|
|
order = self.sale_order_id
|
|
if not self.partner_id:
|
|
self.partner_id = order.partner_id
|
|
addr = order.partner_shipping_id or order.partner_id
|
|
self._fill_address_from_partner(addr)
|
|
|
|
@api.onchange('purchase_order_id')
|
|
def _onchange_purchase_order_id(self):
|
|
"""Auto-fill client and address from the purchase order's vendor."""
|
|
if self.purchase_order_id:
|
|
self.sale_order_id = False
|
|
order = self.purchase_order_id
|
|
if not self.partner_id:
|
|
self.partner_id = order.partner_id
|
|
addr = order.dest_address_id or order.partner_id
|
|
self._fill_address_from_partner(addr)
|
|
|
|
@api.onchange('facility_id')
|
|
def _onchange_facility_id(self):
|
|
"""Auto-fill address from the LTC facility."""
|
|
if self.facility_id and self.task_type == 'ltc_visit':
|
|
fac = self.facility_id
|
|
self.address_street = fac.street or ''
|
|
self.address_street2 = fac.street2 or ''
|
|
self.address_city = fac.city or ''
|
|
self.address_state_id = fac.state_id.id if fac.state_id else False
|
|
self.address_zip = fac.zip or ''
|
|
self.description = self.description or _(
|
|
'LTC Visit at %s', fac.name
|
|
)
|
|
|
|
@api.onchange('task_type')
|
|
def _onchange_task_type_ltc(self):
|
|
if self.task_type == 'ltc_visit':
|
|
self.sale_order_id = False
|
|
self.purchase_order_id = False
|
|
|
|
def _fill_address_from_partner(self, addr):
|
|
"""Populate address fields from a partner record."""
|
|
if not addr:
|
|
return
|
|
self.address_partner_id = addr.id
|
|
self.address_street = addr.street or ''
|
|
self.address_street2 = addr.street2 or ''
|
|
self.address_city = addr.city or ''
|
|
self.address_state_id = addr.state_id.id if addr.state_id else False
|
|
self.address_zip = addr.zip or ''
|
|
self.address_lat = addr.x_fc_latitude if hasattr(addr, 'x_fc_latitude') and addr.x_fc_latitude else 0
|
|
self.address_lng = addr.x_fc_longitude if hasattr(addr, 'x_fc_longitude') and addr.x_fc_longitude else 0
|
|
|
|
# ------------------------------------------------------------------
|
|
# CONSTRAINTS + VALIDATION
|
|
# ------------------------------------------------------------------
|
|
|
|
@api.constrains('sale_order_id', 'purchase_order_id')
|
|
def _check_order_link(self):
|
|
for task in self:
|
|
if task.x_fc_sync_source:
|
|
continue
|
|
if task.task_type == 'ltc_visit':
|
|
continue
|
|
if not task.sale_order_id and not task.purchase_order_id:
|
|
raise ValidationError(_(
|
|
"A task must be linked to either a Sale Order (Case) or a Purchase Order."
|
|
))
|
|
|
|
|
|
@api.constrains('technician_id', 'scheduled_date', 'time_start', 'time_end')
|
|
def _check_no_overlap(self):
|
|
"""Prevent overlapping bookings for the same technician on the same date."""
|
|
for task in self:
|
|
if task.status == 'cancelled':
|
|
continue
|
|
if task.x_fc_sync_source:
|
|
continue
|
|
# Validate time range
|
|
if task.time_start >= task.time_end:
|
|
raise ValidationError(_("Start time must be before end time."))
|
|
# Validate store hours
|
|
s_open, s_close = self._get_store_hours()
|
|
if task.time_start < s_open or task.time_end > s_close:
|
|
open_str = self._float_to_time_str(s_open)
|
|
close_str = self._float_to_time_str(s_close)
|
|
raise ValidationError(_(
|
|
"Tasks must be scheduled within store hours (%s - %s)."
|
|
) % (open_str, close_str))
|
|
# Validate not in the past (only for new/scheduled local tasks)
|
|
if task.status == 'scheduled' and task.scheduled_date and not task.x_fc_sync_source:
|
|
today = fields.Date.context_today(self)
|
|
if task.scheduled_date < today:
|
|
raise ValidationError(_("Cannot schedule tasks in the past."))
|
|
if task.scheduled_date == today:
|
|
now = fields.Datetime.now()
|
|
current_hour = now.hour + now.minute / 60.0
|
|
if task.time_start < current_hour:
|
|
pass # Allow editing existing tasks that started earlier today
|
|
# Check overlap with other tasks
|
|
overlapping = self.sudo().search([
|
|
('technician_id', '=', task.technician_id.id),
|
|
('scheduled_date', '=', task.scheduled_date),
|
|
('status', 'not in', ['cancelled']),
|
|
('id', '!=', task.id),
|
|
('time_start', '<', task.time_end),
|
|
('time_end', '>', task.time_start),
|
|
], limit=1)
|
|
if overlapping:
|
|
start_str = self._float_to_time_str(overlapping.time_start)
|
|
end_str = self._float_to_time_str(overlapping.time_end)
|
|
raise ValidationError(_(
|
|
"Time slot overlaps with %(task)s (%(start)s - %(end)s). "
|
|
"Please choose a different time.",
|
|
task=overlapping.name,
|
|
start=start_str,
|
|
end=end_str,
|
|
))
|
|
|
|
# Check travel time gap to the NEXT task on the same day
|
|
next_task = self.sudo().search([
|
|
('technician_id', '=', task.technician_id.id),
|
|
('scheduled_date', '=', task.scheduled_date),
|
|
('status', 'not in', ['cancelled']),
|
|
('id', '!=', task.id),
|
|
('time_start', '>=', task.time_end),
|
|
], order='time_start', limit=1)
|
|
if next_task and task.address_lat and task.address_lng and \
|
|
next_task.address_lat and next_task.address_lng:
|
|
travel_min = self._quick_travel_time(
|
|
task.address_lat, task.address_lng,
|
|
next_task.address_lat, next_task.address_lng,
|
|
)
|
|
if travel_min > 0:
|
|
gap_min = int((next_task.time_start - task.time_end) * 60)
|
|
if gap_min < travel_min:
|
|
raise ValidationError(_(
|
|
"Not enough travel time to the next task!\n\n"
|
|
"This task ends at %(end)s, and %(next)s starts "
|
|
"at %(next_start)s (%(gap)d min gap).\n"
|
|
"Travel time is ~%(travel)d minutes.\n\n"
|
|
"Please allow at least %(travel)d minutes between tasks.",
|
|
end=self._float_to_time_str(task.time_end),
|
|
next=next_task.name,
|
|
next_start=self._float_to_time_str(next_task.time_start),
|
|
gap=gap_min,
|
|
travel=travel_min,
|
|
))
|
|
|
|
# Check travel time gap FROM the PREVIOUS task on the same day
|
|
prev_task = self.sudo().search([
|
|
('technician_id', '=', task.technician_id.id),
|
|
('scheduled_date', '=', task.scheduled_date),
|
|
('status', 'not in', ['cancelled']),
|
|
('id', '!=', task.id),
|
|
('time_end', '<=', task.time_start),
|
|
], order='time_end desc', limit=1)
|
|
if prev_task and task.address_lat and task.address_lng and \
|
|
prev_task.address_lat and prev_task.address_lng:
|
|
travel_min = self._quick_travel_time(
|
|
prev_task.address_lat, prev_task.address_lng,
|
|
task.address_lat, task.address_lng,
|
|
)
|
|
if travel_min > 0:
|
|
gap_min = int((task.time_start - prev_task.time_end) * 60)
|
|
if gap_min < travel_min:
|
|
raise ValidationError(_(
|
|
"Not enough travel time from the previous task!\n\n"
|
|
"%(prev)s ends at %(prev_end)s, and this task starts "
|
|
"at %(start)s (%(gap)d min gap).\n"
|
|
"Travel time is ~%(travel)d minutes.\n\n"
|
|
"Please allow at least %(travel)d minutes between tasks.",
|
|
prev=prev_task.name,
|
|
prev_end=self._float_to_time_str(prev_task.time_end),
|
|
start=self._float_to_time_str(task.time_start),
|
|
gap=gap_min,
|
|
travel=travel_min,
|
|
))
|
|
|
|
@api.onchange('technician_id', 'scheduled_date')
|
|
def _onchange_technician_date_autoset(self):
|
|
"""Auto-set start/end time to the first available slot when tech+date change."""
|
|
if not self.technician_id or not self.scheduled_date:
|
|
return
|
|
exclude_id = self._origin.id if self._origin else False
|
|
duration = self.duration_hours or 1.0
|
|
s_open, _s_close = self._get_store_hours()
|
|
preferred = self.time_start or s_open
|
|
start, end = self._find_next_available_slot(
|
|
self.technician_id.id,
|
|
self.scheduled_date,
|
|
preferred_start=preferred,
|
|
duration=duration,
|
|
exclude_task_id=exclude_id,
|
|
dest_lat=self.address_lat or 0,
|
|
dest_lng=self.address_lng or 0,
|
|
)
|
|
if start is not False:
|
|
self.time_start = start
|
|
self.time_end = end
|
|
self.duration_hours = end - start
|
|
else:
|
|
return {'warning': {
|
|
'title': _('Fully Booked'),
|
|
'message': _(
|
|
'%s is fully booked on %s. No available slots.'
|
|
) % (self.technician_id.name,
|
|
self.scheduled_date.strftime('%B %d, %Y')),
|
|
}}
|
|
|
|
def _snap_if_overlap(self):
|
|
"""Check if current time_start/time_end overlaps with another task.
|
|
If so, auto-snap to the next available slot and return a warning dict."""
|
|
if not self.technician_id or not self.scheduled_date or not self.time_start:
|
|
return None
|
|
exclude_id = self._origin.id if self._origin else 0
|
|
duration = max(self.duration_hours or 1.0, 0.25)
|
|
|
|
overlapping = self.sudo().search([
|
|
('technician_id', '=', self.technician_id.id),
|
|
('scheduled_date', '=', self.scheduled_date),
|
|
('status', 'not in', ['cancelled']),
|
|
('id', '!=', exclude_id),
|
|
('time_start', '<', self.time_end),
|
|
('time_end', '>', self.time_start),
|
|
], limit=1)
|
|
if overlapping:
|
|
conflict_name = overlapping.name
|
|
conflict_start = self._float_to_time_str(overlapping.time_start)
|
|
conflict_end = self._float_to_time_str(overlapping.time_end)
|
|
start, end = self._find_next_available_slot(
|
|
self.technician_id.id,
|
|
self.scheduled_date,
|
|
preferred_start=self.time_start,
|
|
duration=duration,
|
|
exclude_task_id=exclude_id,
|
|
dest_lat=self.address_lat or 0,
|
|
dest_lng=self.address_lng or 0,
|
|
)
|
|
if start is not False:
|
|
new_start_str = self._float_to_time_str(start)
|
|
new_end_str = self._float_to_time_str(end)
|
|
self.time_start = start
|
|
self.time_end = end
|
|
self.duration_hours = end - start
|
|
return {'warning': {
|
|
'title': _('Moved to Available Slot'),
|
|
'message': _(
|
|
'The selected time conflicts with %s (%s - %s).\n'
|
|
'Automatically moved to: %s - %s.'
|
|
) % (conflict_name, conflict_start, conflict_end,
|
|
new_start_str, new_end_str),
|
|
}}
|
|
else:
|
|
return {'warning': {
|
|
'title': _('No Available Slots'),
|
|
'message': _(
|
|
'The selected time conflicts with %s (%s - %s) '
|
|
'and no other slots are available on this day.'
|
|
) % (conflict_name, conflict_start, conflict_end),
|
|
}}
|
|
return None
|
|
|
|
# ------------------------------------------------------------------
|
|
# DEFAULT_GET - Calendar pre-fill
|
|
# ------------------------------------------------------------------
|
|
|
|
def _snap_to_quarter(self, hour_float):
|
|
"""Round a float hour to the nearest 15-minute slot and clamp to store hours."""
|
|
s_open, s_close = self._get_store_hours()
|
|
snapped = round(hour_float * 4) / 4
|
|
return max(s_open, min(s_close, snapped))
|
|
|
|
@api.model
|
|
def default_get(self, fields_list):
|
|
"""Handle calendar time range selection: pre-fill date + times from context."""
|
|
res = super().default_get(fields_list)
|
|
ctx = self.env.context
|
|
|
|
# Set duration default based on task type from context
|
|
task_type = ctx.get('default_task_type', res.get('task_type', 'delivery'))
|
|
if 'duration_hours' not in res or not res.get('duration_hours'):
|
|
res['duration_hours'] = self.TASK_TYPE_DURATIONS.get(task_type, 1.0)
|
|
|
|
# When user clicks a time range on the calendar, Odoo passes
|
|
# default_datetime_start/end in UTC
|
|
dt_start_utc = None
|
|
dt_end_utc = None
|
|
if ctx.get('default_datetime_start'):
|
|
try:
|
|
dt_start_utc = fields.Datetime.from_string(ctx['default_datetime_start'])
|
|
except (ValueError, TypeError):
|
|
pass
|
|
if ctx.get('default_datetime_end'):
|
|
try:
|
|
dt_end_utc = fields.Datetime.from_string(ctx['default_datetime_end'])
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
if dt_start_utc or dt_end_utc:
|
|
import pytz
|
|
user_tz = pytz.timezone(self.env.user.tz or 'UTC')
|
|
|
|
if dt_start_utc:
|
|
dt_start_local = pytz.utc.localize(dt_start_utc).astimezone(user_tz)
|
|
res['scheduled_date'] = dt_start_local.date()
|
|
start_float = self._snap_to_quarter(
|
|
dt_start_local.hour + dt_start_local.minute / 60.0)
|
|
res['time_start'] = start_float
|
|
|
|
if dt_end_utc:
|
|
dt_end_local = pytz.utc.localize(dt_end_utc).astimezone(user_tz)
|
|
end_float = self._snap_to_quarter(
|
|
dt_end_local.hour + dt_end_local.minute / 60.0)
|
|
if 'time_start' in res and end_float <= res['time_start']:
|
|
end_float = res['time_start'] + 1.0
|
|
res['time_end'] = end_float
|
|
# Compute duration from the calendar drag
|
|
if 'time_start' in res:
|
|
res['duration_hours'] = end_float - res['time_start']
|
|
|
|
# Always compute end from start + duration if not already set
|
|
if 'time_end' not in res and 'time_start' in res and 'duration_hours' in res:
|
|
_open, close = self._get_store_hours()
|
|
res['time_end'] = min(
|
|
res['time_start'] + res['duration_hours'], close)
|
|
|
|
return res
|
|
|
|
# ------------------------------------------------------------------
|
|
# CRUD OVERRIDES
|
|
# ------------------------------------------------------------------
|
|
|
|
@api.model_create_multi
|
|
def create(self, vals_list):
|
|
for vals in vals_list:
|
|
if vals.get('name', _('New')) == _('New'):
|
|
vals['name'] = self.env['ir.sequence'].next_by_code('fusion.technician.task') or _('New')
|
|
if not vals.get('x_fc_sync_uuid') and not vals.get('x_fc_sync_source'):
|
|
vals['x_fc_sync_uuid'] = str(uuid.uuid4())
|
|
# Auto-populate address from sale order if not provided
|
|
if vals.get('sale_order_id') and not vals.get('address_street'):
|
|
order = self.env['sale.order'].browse(vals['sale_order_id'])
|
|
addr = order.partner_shipping_id or order.partner_id
|
|
if addr:
|
|
self._fill_address_vals(vals, addr)
|
|
if not vals.get('partner_id'):
|
|
vals['partner_id'] = order.partner_id.id
|
|
# Auto-populate address from purchase order if not provided
|
|
elif vals.get('purchase_order_id') and not vals.get('address_street'):
|
|
po = self.env['purchase.order'].browse(vals['purchase_order_id'])
|
|
addr = po.dest_address_id or po.partner_id
|
|
if addr:
|
|
self._fill_address_vals(vals, addr)
|
|
if not vals.get('partner_id'):
|
|
vals['partner_id'] = po.partner_id.id
|
|
# Auto-populate address from partner if no order set
|
|
elif vals.get('partner_id') and not vals.get('address_street'):
|
|
partner = self.env['res.partner'].browse(vals['partner_id'])
|
|
if partner.street:
|
|
self._fill_address_vals(vals, partner)
|
|
records = super().create(vals_list)
|
|
# Post creation notice to linked order chatter
|
|
for rec in records:
|
|
rec._post_task_created_to_linked_order()
|
|
# If created from "Ready for Delivery" flow, mark the sale order
|
|
if self.env.context.get('mark_ready_for_delivery'):
|
|
records._mark_sale_order_ready_for_delivery()
|
|
if self.env.context.get('mark_odsp_ready_for_delivery'):
|
|
for rec in records:
|
|
order = rec.sale_order_id
|
|
if order and order.x_fc_is_odsp_sale and order._get_odsp_status() != 'ready_delivery':
|
|
order._odsp_advance_status('ready_delivery',
|
|
"Order is ready for delivery. Delivery task scheduled.")
|
|
# Auto-calculate travel times for the full day chain
|
|
if not self.env.context.get('skip_travel_recalc'):
|
|
records._recalculate_day_travel_chains()
|
|
# Send "Appointment Scheduled" email
|
|
for rec in records:
|
|
rec._send_task_scheduled_email()
|
|
# Push new local tasks to remote instances
|
|
local_records = records.filtered(lambda r: not r.x_fc_sync_source)
|
|
if local_records and not self.env.context.get('skip_task_sync'):
|
|
self.env['fusion.task.sync.config']._push_tasks(local_records, 'create')
|
|
return records
|
|
|
|
def write(self, vals):
|
|
if self.env.context.get('skip_travel_recalc'):
|
|
return super().write(vals)
|
|
|
|
# Safety: ensure time_end is consistent when start/duration change
|
|
# but time_end wasn't sent (readonly field in view may not save)
|
|
if ('time_start' in vals or 'duration_hours' in vals) and 'time_end' not in vals:
|
|
_open, close = self._get_store_hours()
|
|
start = vals.get('time_start', self[:1].time_start if len(self) == 1 else 9.0)
|
|
dur = vals.get('duration_hours', self[:1].duration_hours if len(self) == 1 else 1.0) or 1.0
|
|
vals['time_end'] = min(start + dur, close)
|
|
|
|
# Detect reschedule mode: capture old values BEFORE write
|
|
reschedule_mode = self.env.context.get('reschedule_mode')
|
|
old_schedule = {}
|
|
schedule_fields = {'scheduled_date', 'time_start', 'time_end',
|
|
'duration_hours', 'technician_id'}
|
|
schedule_changed = schedule_fields & set(vals.keys())
|
|
if reschedule_mode and schedule_changed:
|
|
for task in self:
|
|
old_schedule[task.id] = {
|
|
'date': task.scheduled_date,
|
|
'time_start': task.time_start,
|
|
'time_end': task.time_end,
|
|
}
|
|
|
|
# Capture old tech+date combos BEFORE write for travel recalc
|
|
travel_fields = {'address_street', 'address_city', 'address_zip', 'address_lat', 'address_lng',
|
|
'scheduled_date', 'sequence', 'time_start', 'technician_id'}
|
|
needs_travel_recalc = travel_fields & set(vals.keys())
|
|
old_combos = set()
|
|
if needs_travel_recalc:
|
|
old_combos = {(t.technician_id.id, t.scheduled_date) for t in self}
|
|
res = super().write(vals)
|
|
if needs_travel_recalc:
|
|
new_combos = {(t.technician_id.id, t.scheduled_date) for t in self}
|
|
all_combos = old_combos | new_combos
|
|
self._recalculate_combos_travel(all_combos)
|
|
|
|
# After write: send reschedule email if schedule actually changed
|
|
if reschedule_mode and old_schedule:
|
|
for task in self:
|
|
old = old_schedule.get(task.id, {})
|
|
if old and (
|
|
old['date'] != task.scheduled_date
|
|
or abs(old['time_start'] - task.time_start) > 0.01
|
|
or abs(old['time_end'] - task.time_end) > 0.01
|
|
):
|
|
task._post_status_message('rescheduled')
|
|
task._send_task_rescheduled_email(
|
|
old_date=old['date'],
|
|
old_start=old['time_start'],
|
|
old_end=old['time_end'],
|
|
)
|
|
# Push updates to remote instances for local tasks
|
|
sync_fields = {'technician_id', 'scheduled_date', 'time_start', 'time_end',
|
|
'duration_hours', 'status', 'task_type', 'address_street',
|
|
'address_city', 'address_zip', 'address_lat', 'address_lng',
|
|
'partner_id'}
|
|
if sync_fields & set(vals.keys()) and not self.env.context.get('skip_task_sync'):
|
|
local_records = self.filtered(lambda r: not r.x_fc_sync_source)
|
|
if local_records:
|
|
self.env['fusion.task.sync.config']._push_tasks(local_records, 'write')
|
|
return res
|
|
|
|
@api.model
|
|
def _fill_address_vals(self, vals, partner):
|
|
"""Helper to fill address vals dict from a partner record."""
|
|
vals.update({
|
|
'address_partner_id': partner.id,
|
|
'address_street': partner.street or '',
|
|
'address_street2': partner.street2 or '',
|
|
'address_city': partner.city or '',
|
|
'address_state_id': partner.state_id.id if partner.state_id else False,
|
|
'address_zip': partner.zip or '',
|
|
'address_lat': partner.x_fc_latitude if hasattr(partner, 'x_fc_latitude') else 0,
|
|
'address_lng': partner.x_fc_longitude if hasattr(partner, 'x_fc_longitude') else 0,
|
|
})
|
|
|
|
def _post_task_created_to_linked_order(self):
|
|
"""Post a brief task creation notice to the linked order's chatter."""
|
|
self.ensure_one()
|
|
order = self.sale_order_id or self.purchase_order_id
|
|
if not order:
|
|
return
|
|
task_type_label = dict(self._fields['task_type'].selection).get(self.task_type, self.task_type)
|
|
date_str = self.scheduled_date.strftime('%B %d, %Y') if self.scheduled_date else 'TBD'
|
|
time_str = self._float_to_time_str(self.time_start)
|
|
task_url = f'/web#id={self.id}&model=fusion.technician.task&view_type=form'
|
|
body = Markup(
|
|
f'<div style="background:#e8f4fd;border-left:4px solid #17a2b8;padding:10px;border-radius:4px;">'
|
|
f'<strong><i class="fa fa-wrench"></i> Technician Task Scheduled</strong><br/>'
|
|
f'<strong>{self.name}</strong> ({task_type_label}) - {date_str} at {time_str}<br/>'
|
|
f'Technician: {self.technician_id.name}<br/>'
|
|
f'<a href="{task_url}">View Task</a>'
|
|
f'</div>'
|
|
)
|
|
order.message_post(
|
|
body=body, message_type='notification', subtype_xmlid='mail.mt_note',
|
|
)
|
|
|
|
def _mark_sale_order_ready_for_delivery(self):
|
|
"""Mark linked sale orders as Ready for Delivery.
|
|
|
|
Called when a delivery task is created from the "Ready for Delivery"
|
|
button on the sale order. This replaces the old wizard workflow.
|
|
"""
|
|
for task in self:
|
|
order = task.sale_order_id
|
|
if not order:
|
|
continue
|
|
# Only update if not already marked
|
|
if order.x_fc_adp_application_status == 'ready_delivery':
|
|
continue
|
|
|
|
user_name = self.env.user.name
|
|
tech_name = task.technician_id.name or ''
|
|
|
|
# Save current status so we can revert if task is cancelled
|
|
previous_status = order.x_fc_adp_application_status
|
|
|
|
# Update the sale order status and delivery fields
|
|
order.with_context(skip_status_validation=True).write({
|
|
'x_fc_adp_application_status': 'ready_delivery',
|
|
'x_fc_status_before_delivery': previous_status,
|
|
'x_fc_delivery_technician_ids': [(4, task.technician_id.id)],
|
|
'x_fc_ready_for_delivery_date': fields.Datetime.now(),
|
|
'x_fc_scheduled_delivery_datetime': task.datetime_start,
|
|
})
|
|
|
|
# Post chatter message
|
|
early_badge = ''
|
|
if order.x_fc_early_delivery:
|
|
early_badge = ' <span class="badge bg-warning text-dark">Early Delivery</span>'
|
|
|
|
scheduled_str = ''
|
|
if task.scheduled_date:
|
|
time_str = task._float_to_time_str(task.time_start) if task.time_start else ''
|
|
date_str = task.scheduled_date.strftime('%B %d, %Y')
|
|
scheduled_str = f'<li><strong>Scheduled:</strong> {date_str} at {time_str}</li>'
|
|
|
|
notes_str = ''
|
|
if task.description:
|
|
notes_str = f'<hr/><p class="mb-0"><strong>Delivery Notes:</strong> {task.description}</p>'
|
|
|
|
chatter_body = Markup(
|
|
f'<div class="alert alert-success" role="alert">'
|
|
f'<h5 class="alert-heading"><i class="fa fa-truck"></i> Ready for Delivery{early_badge}</h5>'
|
|
f'<ul>'
|
|
f'<li><strong>Marked By:</strong> {user_name}</li>'
|
|
f'<li><strong>Technician:</strong> {tech_name}</li>'
|
|
f'{scheduled_str}'
|
|
f'<li><strong>Delivery Address:</strong> {task.address_display or "N/A"}</li>'
|
|
f'</ul>'
|
|
f'{notes_str}'
|
|
f'</div>'
|
|
)
|
|
order.message_post(
|
|
body=chatter_body,
|
|
message_type='notification',
|
|
subtype_xmlid='mail.mt_note',
|
|
)
|
|
|
|
# Send email notifications
|
|
try:
|
|
order._send_ready_for_delivery_email(
|
|
technicians=task.technician_id,
|
|
scheduled_datetime=task.datetime_start,
|
|
notes=task.description,
|
|
)
|
|
except Exception as e:
|
|
_logger.warning("Ready for delivery email failed for %s: %s", order.name, e)
|
|
|
|
def _recalculate_day_travel_chains(self):
|
|
"""Recalculate travel for all tech+date combos affected by these tasks."""
|
|
combos = {(t.technician_id.id, t.scheduled_date) for t in self if t.technician_id and t.scheduled_date}
|
|
self._recalculate_combos_travel(combos)
|
|
|
|
def _get_technician_start_address(self, tech_id):
|
|
"""Get the start address for a technician.
|
|
|
|
Priority:
|
|
1. Technician's personal x_fc_start_address (if set)
|
|
2. Company default HQ address (fusion_claims.technician_start_address)
|
|
Returns the address string or ''.
|
|
"""
|
|
tech_user = self.env['res.users'].sudo().browse(tech_id)
|
|
if tech_user.exists() and tech_user.x_fc_start_address:
|
|
return tech_user.x_fc_start_address.strip()
|
|
# Fallback to company default
|
|
return (self.env['ir.config_parameter'].sudo()
|
|
.get_param('fusion_claims.technician_start_address', '') or '').strip()
|
|
|
|
def _geocode_address_string(self, address, api_key):
|
|
"""Geocode an address string and return (lat, lng) or (0.0, 0.0)."""
|
|
if not address or not api_key:
|
|
return 0.0, 0.0
|
|
try:
|
|
url = 'https://maps.googleapis.com/maps/api/geocode/json'
|
|
params = {'address': address, 'key': api_key, 'region': 'ca'}
|
|
resp = requests.get(url, params=params, timeout=10)
|
|
data = resp.json()
|
|
if data.get('status') == 'OK' and data.get('results'):
|
|
loc = data['results'][0]['geometry']['location']
|
|
return loc['lat'], loc['lng']
|
|
except Exception as e:
|
|
_logger.warning("Address geocoding failed for '%s': %s", address, e)
|
|
return 0.0, 0.0
|
|
|
|
def _recalculate_combos_travel(self, combos):
|
|
"""Recalculate travel for a set of (tech_id, date) combinations."""
|
|
ICP = self.env['ir.config_parameter'].sudo()
|
|
enabled = ICP.get_param('fusion_claims.google_distance_matrix_enabled', False)
|
|
if not enabled:
|
|
return
|
|
api_key = self._get_google_maps_api_key()
|
|
|
|
# Cache geocoded start addresses per technician to avoid repeated API calls
|
|
start_coords_cache = {}
|
|
|
|
for tech_id, date in combos:
|
|
if not tech_id or not date:
|
|
continue
|
|
all_day_tasks = self.sudo().search([
|
|
('technician_id', '=', tech_id),
|
|
('scheduled_date', '=', date),
|
|
('status', 'not in', ['cancelled']),
|
|
], order='time_start, sequence, id')
|
|
if not all_day_tasks:
|
|
continue
|
|
|
|
# Get this technician's start location (personal or company default)
|
|
if tech_id not in start_coords_cache:
|
|
addr = self._get_technician_start_address(tech_id)
|
|
start_coords_cache[tech_id] = self._geocode_address_string(addr, api_key)
|
|
|
|
prev_lat, prev_lng = start_coords_cache[tech_id]
|
|
for i, task in enumerate(all_day_tasks):
|
|
if not (task.address_lat and task.address_lng):
|
|
task._geocode_address()
|
|
travel_vals = {}
|
|
if prev_lat and prev_lng and task.address_lat and task.address_lng:
|
|
task.with_context(skip_travel_recalc=True)._calculate_travel_time(prev_lat, prev_lng)
|
|
travel_vals['previous_task_id'] = all_day_tasks[i - 1].id if i > 0 else False
|
|
travel_vals['travel_origin'] = 'Start Location' if i == 0 else f'Task {all_day_tasks[i - 1].name}'
|
|
if travel_vals:
|
|
task.with_context(skip_travel_recalc=True).write(travel_vals)
|
|
prev_lat = task.address_lat or prev_lat
|
|
prev_lng = task.address_lng or prev_lng
|
|
|
|
# ------------------------------------------------------------------
|
|
# STATUS ACTIONS
|
|
# ------------------------------------------------------------------
|
|
|
|
def _check_previous_tasks_completed(self):
|
|
"""Check that all earlier tasks for the same technician+date are completed."""
|
|
self.ensure_one()
|
|
earlier_incomplete = self.sudo().search([
|
|
('technician_id', '=', self.technician_id.id),
|
|
('scheduled_date', '=', self.scheduled_date),
|
|
('time_start', '<', self.time_start),
|
|
('status', 'not in', ['completed', 'cancelled']),
|
|
('id', '!=', self.id),
|
|
], limit=1)
|
|
if earlier_incomplete:
|
|
raise UserError(_(
|
|
"Please complete previous task %s first before starting this one."
|
|
) % earlier_incomplete.name)
|
|
|
|
def action_start_en_route(self):
|
|
"""Mark task as En Route."""
|
|
for task in self:
|
|
if task.status != 'scheduled':
|
|
raise UserError(_("Only scheduled tasks can be marked as En Route."))
|
|
task._check_previous_tasks_completed()
|
|
task.status = 'en_route'
|
|
task._post_status_message('en_route')
|
|
|
|
def action_start_task(self):
|
|
"""Mark task as In Progress."""
|
|
for task in self:
|
|
if task.status not in ('scheduled', 'en_route'):
|
|
raise UserError(_("Task must be scheduled or en route to start."))
|
|
task._check_previous_tasks_completed()
|
|
task.status = 'in_progress'
|
|
task._post_status_message('in_progress')
|
|
|
|
def action_view_sale_order(self):
|
|
"""Open the linked sale order / case."""
|
|
self.ensure_one()
|
|
if not self.sale_order_id:
|
|
return
|
|
return {
|
|
'name': self.sale_order_id.name,
|
|
'type': 'ir.actions.act_window',
|
|
'res_model': 'sale.order',
|
|
'view_mode': 'form',
|
|
'res_id': self.sale_order_id.id,
|
|
}
|
|
|
|
def action_view_purchase_order(self):
|
|
"""Open the linked purchase order."""
|
|
self.ensure_one()
|
|
if not self.purchase_order_id:
|
|
return
|
|
return {
|
|
'name': self.purchase_order_id.name,
|
|
'type': 'ir.actions.act_window',
|
|
'res_model': 'purchase.order',
|
|
'view_mode': 'form',
|
|
'res_id': self.purchase_order_id.id,
|
|
}
|
|
|
|
def _is_rental_pickup_task(self):
|
|
"""Check if this is a pickup task for a rental order."""
|
|
self.ensure_one()
|
|
return (
|
|
self.task_type == 'pickup'
|
|
and self.sale_order_id
|
|
and self.sale_order_id.is_rental_order
|
|
)
|
|
|
|
def action_complete_task(self):
|
|
"""Mark task as Completed."""
|
|
for task in self:
|
|
if task.status not in ('in_progress', 'en_route', 'scheduled'):
|
|
raise UserError(_("Task must be in progress to complete."))
|
|
|
|
if task._is_rental_pickup_task() and not task.rental_inspection_completed:
|
|
raise UserError(_(
|
|
"Rental pickup tasks require a security inspection before "
|
|
"completion. Please complete the inspection from the "
|
|
"technician portal first."
|
|
))
|
|
|
|
task.with_context(skip_travel_recalc=True).write({
|
|
'status': 'completed',
|
|
'completion_datetime': fields.Datetime.now(),
|
|
})
|
|
task._post_status_message('completed')
|
|
if task.completion_notes and (task.sale_order_id or task.purchase_order_id):
|
|
task._post_completion_to_linked_order()
|
|
task._notify_scheduler_on_completion()
|
|
|
|
if (task.task_type == 'delivery'
|
|
and task.sale_order_id
|
|
and task.sale_order_id.x_fc_is_odsp_sale
|
|
and task.sale_order_id._get_odsp_status() == 'ready_delivery'):
|
|
task.sale_order_id._odsp_advance_status(
|
|
'delivered',
|
|
"Delivery task completed by technician. Order marked as delivered.",
|
|
)
|
|
|
|
if task._is_rental_pickup_task():
|
|
task._apply_rental_inspection_results()
|
|
|
|
def _apply_rental_inspection_results(self):
|
|
"""Write inspection results from the task back to the rental order."""
|
|
self.ensure_one()
|
|
order = self.sale_order_id
|
|
if not order or not order.is_rental_order:
|
|
return
|
|
|
|
inspection_status = 'passed'
|
|
if self.rental_inspection_condition in ('fair', 'damaged'):
|
|
inspection_status = 'flagged'
|
|
|
|
vals = {
|
|
'rental_inspection_status': inspection_status,
|
|
'rental_inspection_notes': self.rental_inspection_notes or '',
|
|
}
|
|
if self.rental_inspection_photo_ids:
|
|
vals['rental_inspection_photo_ids'] = [(6, 0, self.rental_inspection_photo_ids.ids)]
|
|
order.write(vals)
|
|
|
|
if inspection_status == 'passed':
|
|
order._refund_security_deposit()
|
|
elif inspection_status == 'flagged':
|
|
order.activity_schedule(
|
|
'mail.mail_activity_data_todo',
|
|
date_deadline=fields.Date.today(),
|
|
summary=_("Review rental inspection: %s", order.name),
|
|
note=_(
|
|
"Technician flagged rental pickup for %s. "
|
|
"Condition: %s. Please review inspection photos and notes.",
|
|
order.partner_id.name,
|
|
self.rental_inspection_condition or 'Unknown',
|
|
),
|
|
user_id=order.user_id.id or self.env.uid,
|
|
)
|
|
|
|
def action_cancel_task(self):
|
|
"""Cancel the task. Sends cancellation email and reverts sale order if delivery."""
|
|
for task in self:
|
|
if task.status == 'completed':
|
|
raise UserError(_("Cannot cancel a completed task."))
|
|
task.status = 'cancelled'
|
|
task._post_status_message('cancelled')
|
|
# If this was a delivery task linked to a sale order that is
|
|
# currently in "Ready for Delivery" -- revert the order back.
|
|
# _revert_sale_order_on_cancel also sends the cancellation email
|
|
# for delivery tasks.
|
|
if task.task_type == 'delivery':
|
|
task._revert_sale_order_on_cancel()
|
|
else:
|
|
# Non-delivery tasks: still send a cancellation email
|
|
task._send_task_cancelled_email()
|
|
|
|
def _revert_sale_order_on_cancel(self):
|
|
"""When a delivery task is cancelled, check if the linked sale order
|
|
should revert to its previous status. Only reverts if:
|
|
- Task is a delivery type
|
|
- Sale order is currently 'ready_delivery'
|
|
- No other active (non-cancelled) delivery tasks exist for this order
|
|
"""
|
|
self.ensure_one()
|
|
if self.task_type != 'delivery' or not self.sale_order_id:
|
|
return
|
|
order = self.sale_order_id
|
|
if order.x_fc_adp_application_status != 'ready_delivery':
|
|
return
|
|
|
|
# Check if any other non-cancelled delivery tasks exist for this order
|
|
other_delivery_tasks = self.sudo().search([
|
|
('sale_order_id', '=', order.id),
|
|
('task_type', '=', 'delivery'),
|
|
('status', 'not in', ['cancelled']),
|
|
('id', '!=', self.id),
|
|
], limit=1)
|
|
if other_delivery_tasks:
|
|
return # Other active delivery tasks still exist, don't revert
|
|
|
|
# Revert to the status saved before Ready for Delivery
|
|
prev_status = order.x_fc_status_before_delivery or 'approved'
|
|
status_labels = dict(order._fields['x_fc_adp_application_status'].selection)
|
|
prev_label = status_labels.get(prev_status, prev_status)
|
|
|
|
# skip_status_emails prevents the "Approved" email from re-firing
|
|
order.with_context(
|
|
skip_status_validation=True,
|
|
skip_status_emails=True,
|
|
).write({
|
|
'x_fc_adp_application_status': prev_status,
|
|
'x_fc_status_before_delivery': False,
|
|
})
|
|
|
|
# Post chatter message about the revert
|
|
body = Markup(
|
|
f'<div class="alert alert-warning" role="alert">'
|
|
f'<h5><i class="fa fa-undo"></i> Delivery Task Cancelled</h5>'
|
|
f'<p>Delivery task <strong>{self.name}</strong> was cancelled by '
|
|
f'{self.env.user.name}.</p>'
|
|
f'<p>Order status reverted to <strong>{prev_label}</strong>.</p>'
|
|
f'</div>'
|
|
)
|
|
order.message_post(
|
|
body=body,
|
|
message_type='notification',
|
|
subtype_xmlid='mail.mt_note',
|
|
)
|
|
|
|
# Send a "Delivery Cancelled" email instead
|
|
self._send_task_cancelled_email()
|
|
|
|
def action_reschedule(self):
|
|
"""Open the reschedule form for this task.
|
|
Saves old schedule info, then opens the same task form for editing.
|
|
On save, the write() method detects the reschedule and sends emails."""
|
|
self.ensure_one()
|
|
return {
|
|
'type': 'ir.actions.act_window',
|
|
'res_model': 'fusion.technician.task',
|
|
'res_id': self.id,
|
|
'view_mode': 'form',
|
|
'target': 'new',
|
|
'context': {
|
|
'reschedule_mode': True,
|
|
'old_date': str(self.scheduled_date) if self.scheduled_date else '',
|
|
'old_time_start': self.time_start,
|
|
'old_time_end': self.time_end,
|
|
},
|
|
}
|
|
|
|
def action_reset_to_scheduled(self):
|
|
"""Reset task back to scheduled."""
|
|
for task in self:
|
|
task.status = 'scheduled'
|
|
|
|
# ------------------------------------------------------------------
|
|
# CHATTER / NOTIFICATIONS
|
|
# ------------------------------------------------------------------
|
|
|
|
def _post_status_message(self, new_status):
|
|
"""Post a status change message to the task chatter."""
|
|
self.ensure_one()
|
|
status_labels = dict(self._fields['status'].selection)
|
|
label = status_labels.get(new_status, new_status)
|
|
icons = {
|
|
'en_route': 'fa-road',
|
|
'in_progress': 'fa-wrench',
|
|
'completed': 'fa-check-circle',
|
|
'cancelled': 'fa-times-circle',
|
|
'rescheduled': 'fa-calendar',
|
|
}
|
|
icon = icons.get(new_status, 'fa-info-circle')
|
|
body = Markup(
|
|
f'<p><i class="fa {icon}"></i> Task status changed to '
|
|
f'<strong>{label}</strong> by {self.env.user.name}</p>'
|
|
)
|
|
self.message_post(body=body, message_type='notification', subtype_xmlid='mail.mt_note')
|
|
|
|
def _post_completion_to_linked_order(self):
|
|
"""Post the completion notes to the linked order's chatter."""
|
|
self.ensure_one()
|
|
order = self.sale_order_id or self.purchase_order_id
|
|
if not order or not self.completion_notes:
|
|
return
|
|
task_type_label = dict(self._fields['task_type'].selection).get(self.task_type, self.task_type)
|
|
body = Markup(
|
|
f'<div class="alert alert-info">'
|
|
f'<h5><i class="fa fa-wrench"></i> Technician Task Completed</h5>'
|
|
f'<ul>'
|
|
f'<li><strong>Task:</strong> {self.name} ({task_type_label})</li>'
|
|
f'<li><strong>Technician:</strong> {self.technician_id.name}</li>'
|
|
f'<li><strong>Completed:</strong> {self.completion_datetime.strftime("%B %d, %Y at %I:%M %p") if self.completion_datetime else "N/A"}</li>'
|
|
f'</ul>'
|
|
f'<hr/>'
|
|
f'{self.completion_notes}'
|
|
f'</div>'
|
|
)
|
|
order.message_post(
|
|
body=body,
|
|
message_type='notification',
|
|
subtype_xmlid='mail.mt_note',
|
|
)
|
|
|
|
def _notify_scheduler_on_completion(self):
|
|
"""Send an Odoo notification to whoever created/scheduled the task."""
|
|
self.ensure_one()
|
|
# Notify the task creator (scheduler) if they're not the technician
|
|
if self.create_uid and self.create_uid != self.technician_id:
|
|
task_type_label = dict(self._fields['task_type'].selection).get(self.task_type, self.task_type)
|
|
task_url = f'/web#id={self.id}&model=fusion.technician.task&view_type=form'
|
|
client_name = self.partner_id.name or 'N/A'
|
|
order = self.sale_order_id or self.purchase_order_id
|
|
case_ref = order.name if order else ''
|
|
# Build address string
|
|
addr_parts = [p for p in [
|
|
self.address_street,
|
|
self.address_street2,
|
|
self.address_city,
|
|
self.address_state_id.name if self.address_state_id else '',
|
|
self.address_zip,
|
|
] if p]
|
|
address_str = ', '.join(addr_parts) or 'No address'
|
|
# Build subject
|
|
subject = f'Task Completed: {client_name}'
|
|
if case_ref:
|
|
subject += f' ({case_ref})'
|
|
body = Markup(
|
|
f'<div style="background:#d4edda;border-left:4px solid #28a745;padding:12px;border-radius:4px;margin-bottom:8px;">'
|
|
f'<p style="margin:0 0 8px 0;"><i class="fa fa-check-circle" style="color:#28a745;"></i> '
|
|
f'<strong>{task_type_label} Completed</strong></p>'
|
|
f'<table style="width:100%;border-collapse:collapse;">'
|
|
f'<tr><td style="padding:3px 8px 3px 0;font-weight:bold;white-space:nowrap;vertical-align:top;">Client:</td>'
|
|
f'<td style="padding:3px 0;">{client_name}</td></tr>'
|
|
f'<tr><td style="padding:3px 8px 3px 0;font-weight:bold;white-space:nowrap;vertical-align:top;">Case:</td>'
|
|
f'<td style="padding:3px 0;">{case_ref or "N/A"}</td></tr>'
|
|
f'<tr><td style="padding:3px 8px 3px 0;font-weight:bold;white-space:nowrap;vertical-align:top;">Task:</td>'
|
|
f'<td style="padding:3px 0;">{self.name}</td></tr>'
|
|
f'<tr><td style="padding:3px 8px 3px 0;font-weight:bold;white-space:nowrap;vertical-align:top;">Technician:</td>'
|
|
f'<td style="padding:3px 0;">{self.technician_id.name}</td></tr>'
|
|
f'<tr><td style="padding:3px 8px 3px 0;font-weight:bold;white-space:nowrap;vertical-align:top;">Location:</td>'
|
|
f'<td style="padding:3px 0;">{address_str}</td></tr>'
|
|
f'</table>'
|
|
f'<p style="margin:8px 0 0 0;"><a href="{task_url}">View Task</a></p>'
|
|
f'</div>'
|
|
)
|
|
# Use Odoo's internal notification system
|
|
self.env['mail.thread'].sudo().message_notify(
|
|
partner_ids=[self.create_uid.partner_id.id],
|
|
body=body,
|
|
subject=subject,
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# TASK EMAIL NOTIFICATIONS
|
|
# ------------------------------------------------------------------
|
|
|
|
def _get_task_email_details(self):
|
|
"""Build common detail rows for task emails."""
|
|
self.ensure_one()
|
|
type_label = dict(self._fields['task_type'].selection).get(
|
|
self.task_type, self.task_type or '')
|
|
rows = [
|
|
('Task', f'{self.name} ({type_label})'),
|
|
('Client', self.partner_id.name or 'N/A'),
|
|
]
|
|
if self.sale_order_id:
|
|
rows.append(('Case', self.sale_order_id.name))
|
|
if self.purchase_order_id:
|
|
rows.append(('Purchase Order', self.purchase_order_id.name))
|
|
if self.scheduled_date:
|
|
date_str = self.scheduled_date.strftime('%B %d, %Y')
|
|
start_str = self._float_to_time_str(self.time_start)
|
|
end_str = self._float_to_time_str(self.time_end)
|
|
rows.append(('Scheduled', f'{date_str}, {start_str} - {end_str}'))
|
|
if self.technician_id:
|
|
rows.append(('Technician', self.technician_id.name))
|
|
if self.address_display:
|
|
rows.append(('Address', self.address_display))
|
|
return rows
|
|
|
|
def _get_task_email_recipients(self):
|
|
"""Get email recipients for task notifications.
|
|
Returns dict with 'to' (client), 'cc' (technician, sales rep, office)."""
|
|
self.ensure_one()
|
|
to_emails = []
|
|
cc_emails = []
|
|
|
|
# Client email
|
|
if self.partner_id and self.partner_id.email:
|
|
to_emails.append(self.partner_id.email)
|
|
|
|
# Technician email
|
|
if self.technician_id and self.technician_id.email:
|
|
cc_emails.append(self.technician_id.email)
|
|
|
|
# Sales rep from the sale order
|
|
if self.sale_order_id and self.sale_order_id.user_id and \
|
|
self.sale_order_id.user_id.email:
|
|
cc_emails.append(self.sale_order_id.user_id.email)
|
|
|
|
# Office notification recipients
|
|
if self.sale_order_id:
|
|
try:
|
|
office_cc = self.sale_order_id._get_email_recipients(
|
|
include_client=False).get('office_cc', [])
|
|
cc_emails.extend(office_cc)
|
|
except Exception:
|
|
pass
|
|
|
|
return {'to': to_emails, 'cc': list(set(cc_emails))}
|
|
|
|
def _send_task_cancelled_email(self):
|
|
"""Send cancellation email for a task/delivery/appointment."""
|
|
self.ensure_one()
|
|
order = self.sale_order_id
|
|
if not order:
|
|
return False
|
|
try:
|
|
if not order._is_email_notifications_enabled():
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
recipients = self._get_task_email_recipients()
|
|
to_emails = recipients.get('to', [])
|
|
cc_emails = recipients.get('cc', [])
|
|
if not to_emails and not cc_emails:
|
|
return False
|
|
|
|
client_name = self.partner_id.name or 'Client'
|
|
type_label = dict(self._fields['task_type'].selection).get(
|
|
self.task_type, self.task_type or 'Task')
|
|
sender_name = self.env.user.name
|
|
|
|
detail_rows = self._get_task_email_details()
|
|
detail_rows.append(('Cancelled By', sender_name))
|
|
|
|
body_html = order._email_build(
|
|
title=f'{type_label.title()} Cancelled',
|
|
summary=(
|
|
f'The scheduled <strong>{type_label.lower()}</strong> for '
|
|
f'<strong>{client_name}</strong> has been <strong>cancelled</strong>.'
|
|
),
|
|
email_type='urgent',
|
|
sections=[('Cancellation Details', detail_rows)],
|
|
note=(
|
|
'<strong>What happens next:</strong> If you need to reschedule, '
|
|
'please contact our office and we will arrange a new appointment.'
|
|
),
|
|
note_color='#e53e3e',
|
|
button_url=f'{order.get_base_url()}/web#id={order.id}&model=sale.order&view_type=form',
|
|
sender_name=sender_name,
|
|
)
|
|
|
|
email_to = ', '.join(to_emails) if to_emails else ', '.join(cc_emails[:1])
|
|
email_cc = ', '.join(cc_emails) if to_emails else ', '.join(cc_emails[1:])
|
|
try:
|
|
self.env['mail.mail'].sudo().create({
|
|
'subject': f'{type_label.title()} Cancelled - {client_name} - {order.name}',
|
|
'body_html': body_html,
|
|
'email_to': email_to,
|
|
'email_cc': email_cc,
|
|
'model': 'sale.order',
|
|
'res_id': order.id,
|
|
}).send()
|
|
order._email_chatter_log(
|
|
f'{type_label.title()} Cancelled email sent', email_to, email_cc)
|
|
return True
|
|
except Exception as e:
|
|
_logger.error("Failed to send task cancelled email for %s: %s", self.name, e)
|
|
return False
|
|
|
|
def _send_task_scheduled_email(self):
|
|
"""Send appointment scheduled email to client, technician, and sales rep."""
|
|
self.ensure_one()
|
|
order = self.sale_order_id
|
|
if not order:
|
|
return False
|
|
try:
|
|
if not order._is_email_notifications_enabled():
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
recipients = self._get_task_email_recipients()
|
|
to_emails = recipients.get('to', [])
|
|
cc_emails = recipients.get('cc', [])
|
|
if not to_emails and not cc_emails:
|
|
return False
|
|
|
|
client_name = self.partner_id.name or 'Client'
|
|
type_label = dict(self._fields['task_type'].selection).get(
|
|
self.task_type, self.task_type or 'Task')
|
|
sender_name = self.env.user.name
|
|
|
|
detail_rows = self._get_task_email_details()
|
|
if self.description:
|
|
detail_rows.append(('Notes', self.description))
|
|
|
|
body_html = order._email_build(
|
|
title=f'{type_label.title()} Scheduled',
|
|
summary=(
|
|
f'A <strong>{type_label.lower()}</strong> has been scheduled for '
|
|
f'<strong>{client_name}</strong>.'
|
|
),
|
|
email_type='success',
|
|
sections=[('Appointment Details', detail_rows)],
|
|
note=(
|
|
'<strong>Please note:</strong> If you need to change this appointment, '
|
|
'please contact our office as soon as possible so we can accommodate '
|
|
'the change.'
|
|
),
|
|
note_color='#38a169',
|
|
button_url=f'{order.get_base_url()}/web#id={order.id}&model=sale.order&view_type=form',
|
|
sender_name=sender_name,
|
|
)
|
|
|
|
email_to = ', '.join(to_emails) if to_emails else ', '.join(cc_emails[:1])
|
|
email_cc = ', '.join(cc_emails) if to_emails else ', '.join(cc_emails[1:])
|
|
try:
|
|
self.env['mail.mail'].sudo().create({
|
|
'subject': f'{type_label.title()} Scheduled - {client_name} - {order.name}',
|
|
'body_html': body_html,
|
|
'email_to': email_to,
|
|
'email_cc': email_cc,
|
|
'model': 'sale.order',
|
|
'res_id': order.id,
|
|
}).send()
|
|
order._email_chatter_log(
|
|
f'{type_label.title()} Scheduled email sent', email_to, email_cc)
|
|
return True
|
|
except Exception as e:
|
|
_logger.error("Failed to send task scheduled email for %s: %s", self.name, e)
|
|
return False
|
|
|
|
def _send_task_rescheduled_email(self, old_date=None, old_start=None, old_end=None):
|
|
"""Send reschedule email to client, technician, and sales rep.
|
|
Shows old vs new schedule for clarity."""
|
|
self.ensure_one()
|
|
order = self.sale_order_id
|
|
if not order:
|
|
return False
|
|
try:
|
|
if not order._is_email_notifications_enabled():
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
recipients = self._get_task_email_recipients()
|
|
to_emails = recipients.get('to', [])
|
|
cc_emails = recipients.get('cc', [])
|
|
if not to_emails and not cc_emails:
|
|
return False
|
|
|
|
client_name = self.partner_id.name or 'Client'
|
|
type_label = dict(self._fields['task_type'].selection).get(
|
|
self.task_type, self.task_type or 'Task')
|
|
sender_name = self.env.user.name
|
|
|
|
detail_rows = self._get_task_email_details()
|
|
|
|
# Show old schedule if provided
|
|
if old_date or old_start is not None:
|
|
old_parts = []
|
|
if old_date:
|
|
old_parts.append(old_date.strftime('%B %d, %Y'))
|
|
if old_start is not None:
|
|
old_parts.append(
|
|
f'{self._float_to_time_str(old_start)} - '
|
|
f'{self._float_to_time_str(old_end or old_start + 1.0)}')
|
|
detail_rows.insert(3, ('Previous Schedule', ', '.join(old_parts)))
|
|
|
|
body_html = order._email_build(
|
|
title=f'{type_label.title()} Rescheduled',
|
|
summary=(
|
|
f'The <strong>{type_label.lower()}</strong> for '
|
|
f'<strong>{client_name}</strong> has been <strong>rescheduled</strong>.'
|
|
),
|
|
email_type='attention',
|
|
sections=[('Updated Appointment Details', detail_rows)],
|
|
note=(
|
|
'<strong>Please note:</strong> The appointment has been updated '
|
|
'to the new date and time shown above. If you have any questions, '
|
|
'please contact our office.'
|
|
),
|
|
note_color='#d69e2e',
|
|
button_url=f'{order.get_base_url()}/web#id={order.id}&model=sale.order&view_type=form',
|
|
sender_name=sender_name,
|
|
)
|
|
|
|
email_to = ', '.join(to_emails) if to_emails else ', '.join(cc_emails[:1])
|
|
email_cc = ', '.join(cc_emails) if to_emails else ', '.join(cc_emails[1:])
|
|
try:
|
|
self.env['mail.mail'].sudo().create({
|
|
'subject': f'{type_label.title()} Rescheduled - {client_name} - {order.name}',
|
|
'body_html': body_html,
|
|
'email_to': email_to,
|
|
'email_cc': email_cc,
|
|
'model': 'sale.order',
|
|
'res_id': order.id,
|
|
}).send()
|
|
order._email_chatter_log(
|
|
f'{type_label.title()} Rescheduled email sent', email_to, email_cc)
|
|
return True
|
|
except Exception as e:
|
|
_logger.error("Failed to send rescheduled email for %s: %s", self.name, e)
|
|
return False
|
|
|
|
def get_next_task_for_technician(self):
|
|
"""Get the next task in sequence for the same technician+date after this one."""
|
|
self.ensure_one()
|
|
return self.sudo().search([
|
|
('technician_id', '=', self.technician_id.id),
|
|
('scheduled_date', '=', self.scheduled_date),
|
|
('time_start', '>=', self.time_start),
|
|
('status', 'in', ['scheduled', 'en_route']),
|
|
('id', '!=', self.id),
|
|
], order='time_start, sequence, id', limit=1)
|
|
|
|
# ------------------------------------------------------------------
|
|
# GOOGLE MAPS INTEGRATION
|
|
# ------------------------------------------------------------------
|
|
|
|
def _get_google_maps_api_key(self):
|
|
"""Get the Google Maps API key from config."""
|
|
return self.env['ir.config_parameter'].sudo().get_param(
|
|
'fusion_claims.google_maps_api_key', ''
|
|
)
|
|
|
|
@api.model
|
|
def get_map_data(self, domain=None):
|
|
"""Return task data, technician locations, and Google Maps API key.
|
|
|
|
Args:
|
|
domain: optional extra domain from the search bar filters.
|
|
"""
|
|
api_key = self.env['ir.config_parameter'].sudo().get_param(
|
|
'fusion_claims.google_maps_api_key', '')
|
|
local_instance = self.env['ir.config_parameter'].sudo().get_param(
|
|
'fusion_claims.sync_instance_id', '')
|
|
base_domain = [
|
|
('status', 'not in', ['cancelled']),
|
|
]
|
|
if domain:
|
|
base_domain = expression.AND([base_domain, domain])
|
|
tasks = self.sudo().search_read(
|
|
base_domain,
|
|
['name', 'partner_id', 'technician_id', 'task_type',
|
|
'address_lat', 'address_lng', 'address_display',
|
|
'time_start', 'time_start_display', 'time_end_display',
|
|
'status', 'scheduled_date', 'travel_time_minutes',
|
|
'x_fc_sync_client_name', 'x_fc_is_shadow', 'x_fc_sync_source'],
|
|
order='scheduled_date asc NULLS LAST, time_start asc',
|
|
limit=500,
|
|
)
|
|
locations = self.env['fusion.technician.location'].get_latest_locations()
|
|
return {
|
|
'api_key': api_key,
|
|
'tasks': tasks,
|
|
'locations': locations,
|
|
'local_instance_id': local_instance,
|
|
}
|
|
|
|
def _geocode_address(self):
|
|
"""Geocode the task address using Google Geocoding API."""
|
|
self.ensure_one()
|
|
api_key = self._get_google_maps_api_key()
|
|
if not api_key or not self.address_display:
|
|
return False
|
|
|
|
try:
|
|
url = 'https://maps.googleapis.com/maps/api/geocode/json'
|
|
params = {
|
|
'address': self.address_display,
|
|
'key': api_key,
|
|
'region': 'ca',
|
|
}
|
|
resp = requests.get(url, params=params, timeout=10)
|
|
data = resp.json()
|
|
if data.get('status') == 'OK' and data.get('results'):
|
|
location = data['results'][0]['geometry']['location']
|
|
self.write({
|
|
'address_lat': location['lat'],
|
|
'address_lng': location['lng'],
|
|
})
|
|
return True
|
|
except Exception as e:
|
|
_logger.warning(f"Geocoding failed for task {self.name}: {e}")
|
|
return False
|
|
|
|
def _calculate_travel_time(self, origin_lat, origin_lng):
|
|
"""Calculate travel time from origin to this task using Distance Matrix API."""
|
|
self.ensure_one()
|
|
api_key = self._get_google_maps_api_key()
|
|
if not api_key:
|
|
return False
|
|
if not (origin_lat and origin_lng and self.address_lat and self.address_lng):
|
|
return False
|
|
|
|
try:
|
|
url = 'https://maps.googleapis.com/maps/api/distancematrix/json'
|
|
params = {
|
|
'origins': f'{origin_lat},{origin_lng}',
|
|
'destinations': f'{self.address_lat},{self.address_lng}',
|
|
'key': api_key,
|
|
'mode': 'driving',
|
|
'avoid': 'tolls',
|
|
'traffic_model': 'best_guess',
|
|
'departure_time': 'now',
|
|
}
|
|
resp = requests.get(url, params=params, timeout=10)
|
|
data = resp.json()
|
|
if data.get('status') == 'OK':
|
|
element = data['rows'][0]['elements'][0]
|
|
if element.get('status') == 'OK':
|
|
duration_seconds = element['duration_in_traffic']['value'] if 'duration_in_traffic' in element else element['duration']['value']
|
|
distance_meters = element['distance']['value']
|
|
self.write({
|
|
'travel_time_minutes': round(duration_seconds / 60),
|
|
'travel_distance_km': round(distance_meters / 1000, 1),
|
|
})
|
|
return True
|
|
except Exception as e:
|
|
_logger.warning(f"Travel time calculation failed for task {self.name}: {e}")
|
|
return False
|
|
|
|
def action_calculate_travel_times(self):
|
|
"""Calculate travel times for a day's schedule. Called from backend button or cron."""
|
|
self._do_calculate_travel_times()
|
|
# Return False to stay on the current form without navigation
|
|
return False
|
|
|
|
def _do_calculate_travel_times(self):
|
|
"""Internal: calculate travel times for tasks. Does not return an action."""
|
|
# Group tasks by technician and date
|
|
task_groups = {}
|
|
for task in self:
|
|
key = (task.technician_id.id, task.scheduled_date)
|
|
if key not in task_groups:
|
|
task_groups[key] = self.env['fusion.technician.task']
|
|
task_groups[key] |= task
|
|
|
|
api_key = self._get_google_maps_api_key()
|
|
start_coords_cache = {}
|
|
|
|
for (tech_id, date), tasks in task_groups.items():
|
|
sorted_tasks = tasks.sorted(lambda t: (t.sequence, t.time_start))
|
|
|
|
# Get this technician's start location (personal or company default)
|
|
if tech_id not in start_coords_cache:
|
|
addr = self._get_technician_start_address(tech_id)
|
|
start_coords_cache[tech_id] = self._geocode_address_string(addr, api_key)
|
|
|
|
prev_lat, prev_lng = start_coords_cache[tech_id]
|
|
|
|
for i, task in enumerate(sorted_tasks):
|
|
# Geocode task if needed
|
|
if not (task.address_lat and task.address_lng):
|
|
task._geocode_address()
|
|
|
|
if prev_lat and prev_lng and task.address_lat and task.address_lng:
|
|
task._calculate_travel_time(prev_lat, prev_lng)
|
|
task.previous_task_id = sorted_tasks[i - 1].id if i > 0 else False
|
|
task.travel_origin = 'Start Location' if i == 0 else f'Task {sorted_tasks[i - 1].name}'
|
|
|
|
prev_lat = task.address_lat
|
|
prev_lng = task.address_lng
|
|
|
|
@api.model
|
|
def _cron_calculate_travel_times(self):
|
|
"""Cron job: Calculate travel times for today and tomorrow."""
|
|
today = fields.Date.context_today(self)
|
|
tomorrow = today + timedelta(days=1)
|
|
tasks = self.search([
|
|
('scheduled_date', 'in', [today, tomorrow]),
|
|
('status', 'in', ['scheduled', 'en_route']),
|
|
])
|
|
if tasks:
|
|
tasks._do_calculate_travel_times()
|
|
_logger.info(f"Calculated travel times for {len(tasks)} tasks")
|
|
|
|
# ------------------------------------------------------------------
|
|
# PORTAL HELPERS
|
|
# ------------------------------------------------------------------
|
|
|
|
def get_technician_tasks_for_date(self, user_id, date):
|
|
"""Get all tasks for a technician on a given date, ordered by sequence."""
|
|
return self.sudo().search([
|
|
('technician_id', '=', user_id),
|
|
('scheduled_date', '=', date),
|
|
('status', '!=', 'cancelled'),
|
|
], order='sequence, time_start, id')
|
|
|
|
def get_next_task(self, user_id):
|
|
"""Get the next upcoming task for a technician."""
|
|
today = fields.Date.context_today(self)
|
|
return self.sudo().search([
|
|
('technician_id', '=', user_id),
|
|
('scheduled_date', '>=', today),
|
|
('status', 'in', ['scheduled', 'en_route']),
|
|
], order='scheduled_date, sequence, time_start', limit=1)
|
|
|
|
def get_current_task(self, user_id):
|
|
"""Get the current in-progress task for a technician."""
|
|
today = fields.Date.context_today(self)
|
|
return self.sudo().search([
|
|
('technician_id', '=', user_id),
|
|
('scheduled_date', '=', today),
|
|
('status', '=', 'in_progress'),
|
|
], limit=1)
|
|
|
|
# ------------------------------------------------------------------
|
|
# PUSH NOTIFICATIONS
|
|
# ------------------------------------------------------------------
|
|
|
|
def _send_push_notification(self, title, body_text, url=None):
|
|
"""Send a web push notification for this task."""
|
|
self.ensure_one()
|
|
PushSub = self.env['fusion.push.subscription'].sudo()
|
|
subscriptions = PushSub.search([
|
|
('user_id', '=', self.technician_id.id),
|
|
('active', '=', True),
|
|
])
|
|
if not subscriptions:
|
|
return
|
|
|
|
ICP = self.env['ir.config_parameter'].sudo()
|
|
vapid_private = ICP.get_param('fusion_claims.vapid_private_key', '')
|
|
vapid_public = ICP.get_param('fusion_claims.vapid_public_key', '')
|
|
if not vapid_private or not vapid_public:
|
|
_logger.warning("VAPID keys not configured, cannot send push notification")
|
|
return
|
|
|
|
try:
|
|
from pywebpush import webpush, WebPushException
|
|
except ImportError:
|
|
_logger.warning("pywebpush not installed, cannot send push notifications")
|
|
return
|
|
|
|
payload = json.dumps({
|
|
'title': title,
|
|
'body': body_text,
|
|
'url': url or f'/my/technician/task/{self.id}',
|
|
'task_id': self.id,
|
|
'task_type': self.task_type,
|
|
})
|
|
|
|
for sub in subscriptions:
|
|
try:
|
|
webpush(
|
|
subscription_info={
|
|
'endpoint': sub.endpoint,
|
|
'keys': {
|
|
'p256dh': sub.p256dh_key,
|
|
'auth': sub.auth_key,
|
|
},
|
|
},
|
|
data=payload,
|
|
vapid_private_key=vapid_private,
|
|
vapid_claims={'sub': 'mailto:support@nexasystems.ca'},
|
|
)
|
|
except Exception as e:
|
|
_logger.warning(f"Push notification failed for subscription {sub.id}: {e}")
|
|
# Deactivate invalid subscriptions
|
|
if 'gone' in str(e).lower() or '410' in str(e):
|
|
sub.active = False
|
|
|
|
self.write({
|
|
'push_notified': True,
|
|
'push_notified_datetime': fields.Datetime.now(),
|
|
})
|
|
|
|
@api.model
|
|
def _cron_send_push_notifications(self):
|
|
"""Cron: Send push notifications for upcoming tasks."""
|
|
ICP = self.env['ir.config_parameter'].sudo()
|
|
if not ICP.get_param('fusion_claims.push_enabled', False):
|
|
return
|
|
|
|
advance_minutes = int(ICP.get_param('fusion_claims.push_advance_minutes', '30'))
|
|
now = fields.Datetime.now()
|
|
|
|
# Find tasks starting within advance_minutes that haven't been notified
|
|
tasks = self.search([
|
|
('scheduled_date', '=', now.date()),
|
|
('status', '=', 'scheduled'),
|
|
('push_notified', '=', False),
|
|
])
|
|
|
|
for task in tasks:
|
|
# Check if task is within the notification window
|
|
task_start_hour = int(task.time_start)
|
|
task_start_min = int((task.time_start % 1) * 60)
|
|
task_start_dt = now.replace(hour=task_start_hour, minute=task_start_min, second=0)
|
|
|
|
minutes_until = (task_start_dt - now).total_seconds() / 60
|
|
if 0 <= minutes_until <= advance_minutes:
|
|
task_type_label = dict(self._fields['task_type'].selection).get(task.task_type, task.task_type)
|
|
title = f'Upcoming: {task_type_label}'
|
|
body_text = f'{task.partner_id.name or "Task"} - {task.time_start_display}'
|
|
if task.travel_time_minutes:
|
|
body_text += f' ({task.travel_time_minutes} min drive)'
|
|
task._send_push_notification(title, body_text)
|
|
|
|
# ------------------------------------------------------------------
|
|
# HELPERS
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _float_to_time_str(value):
|
|
"""Convert float hours to time string like '9:30 AM'."""
|
|
if not value and value != 0:
|
|
return ''
|
|
hours = int(value)
|
|
minutes = int(round((value % 1) * 60))
|
|
period = 'AM' if hours < 12 else 'PM'
|
|
display_hour = hours % 12 or 12
|
|
return f'{display_hour}:{minutes:02d} {period}'
|
|
|
|
def get_google_maps_url(self):
|
|
"""Get Google Maps navigation URL. Uses lat/lng coordinates to
|
|
navigate to the exact location (text addresses cause Google to
|
|
resolve to nearby business names instead)."""
|
|
self.ensure_one()
|
|
if self.address_lat and self.address_lng:
|
|
return f'https://www.google.com/maps/dir/?api=1&destination={self.address_lat},{self.address_lng}&travelmode=driving'
|
|
elif self.address_display:
|
|
return f'https://www.google.com/maps/dir/?api=1&destination={urllib.parse.quote(self.address_display)}&travelmode=driving'
|
|
return ''
|