# -*- coding: utf-8 -*- # Copyright 2024-2026 Nexa Systems Inc. # License OPL-1 (Odoo Proprietary License v1.0) """ Fusion Technician Task Scheduling and task management for field technicians. Replaces Monday.com for technician schedule tracking. """ from odoo import models, fields, api, _ from odoo.exceptions import UserError, ValidationError from odoo.osv import expression from markupsafe import Markup import logging import json import uuid import requests from datetime import datetime as dt_datetime, timedelta import urllib.parse _logger = logging.getLogger(__name__) class FusionTechnicianTask(models.Model): _name = 'fusion.technician.task' _description = 'Technician Task' _order = 'scheduled_date, sequence, time_start, id' _inherit = ['mail.thread', 'mail.activity.mixin', 'fusion.email.builder.mixin'] _rec_name = 'name' def _compute_display_name(self): """Richer display name: Client - Type | 9:00 AM - 10:00 AM [+2 techs].""" type_labels = dict(self._fields['task_type'].selection) for task in self: client = task.x_fc_sync_client_name if task.x_fc_sync_source else (task.partner_id.name or '') ttype = type_labels.get(task.task_type, task.task_type or '') start = self._float_to_time_str(task.time_start) end = self._float_to_time_str(task.time_end) parts = [client, ttype] label = ' - '.join(p for p in parts if p) if start and end: label += f' | {start} - {end}' extra = len(task.additional_technician_ids) if extra: label += f' [+{extra} tech{"s" if extra > 1 else ""}]' task.display_name = label or task.name # ------------------------------------------------------------------ # STORE HOURS HELPER # ------------------------------------------------------------------ def _get_store_hours(self): """Return (open_hour, close_hour) from settings. Defaults 9.0 / 18.0.""" ICP = self.env['ir.config_parameter'].sudo() try: open_h = float(ICP.get_param('fusion_claims.store_open_hour', '9.0') or '9.0') except (ValueError, TypeError): open_h = 9.0 try: close_h = float(ICP.get_param('fusion_claims.store_close_hour', '18.0') or '18.0') except (ValueError, TypeError): close_h = 18.0 return (open_h, close_h) # ------------------------------------------------------------------ # CORE FIELDS # ------------------------------------------------------------------ name = fields.Char( string='Task Reference', required=True, copy=False, readonly=True, default=lambda self: _('New'), ) active = fields.Boolean(default=True) # Cross-instance sync fields x_fc_sync_source = fields.Char( 'Source Instance', readonly=True, index=True, help='Origin instance ID if this is a synced shadow task (e.g. westin, mobility)', ) x_fc_sync_remote_id = fields.Integer( 'Remote Task ID', readonly=True, help='ID of the task on the remote instance', ) x_fc_sync_uuid = fields.Char( 'Sync UUID', readonly=True, index=True, copy=False, help='Unique ID for cross-instance deduplication', ) x_fc_is_shadow = fields.Boolean( 'Shadow Task', compute='_compute_is_shadow', store=True, help='True if this task was synced from another instance', ) x_fc_sync_client_name = fields.Char( 'Synced Client Name', readonly=True, help='Client name from the remote instance (shadow tasks only)', ) x_fc_sync_client_phone = fields.Char( 'Synced Client Phone', readonly=True, help='Client phone from the remote instance (shadow tasks only)', ) client_display_name = fields.Char( compute='_compute_client_display', string='Client Name (Display)', ) client_display_phone = fields.Char( compute='_compute_client_display', string='Client Phone (Display)', ) x_fc_source_label = fields.Char( 'Source', compute='_compute_is_shadow', store=True, ) @api.depends('x_fc_sync_source') def _compute_is_shadow(self): local_id = self.env['ir.config_parameter'].sudo().get_param( 'fusion_claims.sync_instance_id', '') for task in self: task.x_fc_is_shadow = bool(task.x_fc_sync_source) task.x_fc_source_label = task.x_fc_sync_source or local_id @api.depends('x_fc_sync_source', 'x_fc_sync_client_name', 'x_fc_sync_client_phone', 'partner_id') def _compute_client_display(self): for task in self: if task.x_fc_sync_source: task.client_display_name = task.x_fc_sync_client_name or task.name or '' task.client_display_phone = task.x_fc_sync_client_phone or '' else: task.client_display_name = task.partner_id.name if task.partner_id else '' task.client_display_phone = task.partner_id.phone if task.partner_id else '' technician_id = fields.Many2one( 'res.users', string='Technician', required=True, tracking=True, domain="[('x_fc_is_field_staff', '=', True)]", help='Lead technician responsible for this task', ) technician_name = fields.Char( related='technician_id.name', string='Technician Name', store=True, ) additional_technician_ids = fields.Many2many( 'res.users', 'technician_task_additional_tech_rel', 'task_id', 'user_id', string='Additional Technicians', domain="[('x_fc_is_field_staff', '=', True)]", tracking=True, help='Additional technicians assigned to assist on this task', ) all_technician_ids = fields.Many2many( 'res.users', compute='_compute_all_technician_ids', string='All Technicians', help='Lead + additional technicians combined', ) additional_tech_count = fields.Integer( compute='_compute_all_technician_ids', string='Extra Techs', ) all_technician_names = fields.Char( compute='_compute_all_technician_ids', string='All Technician Names', ) @api.depends('technician_id', 'additional_technician_ids') def _compute_all_technician_ids(self): for task in self: all_techs = task.technician_id | task.additional_technician_ids task.all_technician_ids = all_techs task.additional_tech_count = len(task.additional_technician_ids) task.all_technician_names = ', '.join(all_techs.mapped('name')) task_type = fields.Selection([ ('delivery', 'Delivery'), ('repair', 'Repair'), ('pickup', 'Pickup'), ('troubleshoot', 'Troubleshooting'), ('assessment', 'Assessment'), ('installation', 'Installation'), ('maintenance', 'Maintenance'), ('ltc_visit', 'LTC Visit'), ('other', 'Other'), ], string='Task Type', required=True, default='delivery', tracking=True) # ------------------------------------------------------------------ # SCHEDULING # ------------------------------------------------------------------ scheduled_date = fields.Date( string='Scheduled Date', tracking=True, default=fields.Date.context_today, index=True, ) time_start = fields.Float( string='Start Time', help='Start time in hours (e.g. 9.5 = 9:30 AM)', default=9.0, ) time_end = fields.Float( string='End Time', help='End time in hours (e.g. 10.5 = 10:30 AM)', default=10.0, ) time_start_display = fields.Char( string='Start', compute='_compute_time_displays', ) time_end_display = fields.Char( string='End', compute='_compute_time_displays', ) # Legacy 12h selection fields -- kept for DB compatibility, hidden on form time_start_12h = fields.Selection( selection='_get_time_selection', string='Start Time (12h)', compute='_compute_time_12h', inverse='_inverse_time_start_12h', store=True, ) time_end_12h = fields.Selection( selection='_get_time_selection', string='End Time (12h)', compute='_compute_time_12h', inverse='_inverse_time_end_12h', store=True, ) sequence = fields.Integer( string='Sequence', default=10, help='Order of task within the day', ) duration_hours = fields.Float( string='Duration', default=1.0, help='Task duration in hours. Auto-calculates end time.', ) # Task type -> default duration mapping TASK_TYPE_DURATIONS = { 'delivery': 1.0, 'repair': 2.0, 'pickup': 0.5, 'troubleshoot': 1.5, 'assessment': 1.5, 'installation': 2.0, 'maintenance': 1.5, 'ltc_visit': 3.0, 'other': 1.0, } # Previous task travel warning banner prev_task_summary_html = fields.Html( string='Previous Task', compute='_compute_prev_task_summary', sanitize=False, ) # Datetime fields for calendar view (computed from date + float time) datetime_start = fields.Datetime( string='Start', compute='_compute_datetimes', inverse='_inverse_datetime_start', store=True, help='Combined start datetime for calendar display', ) datetime_end = fields.Datetime( string='End', compute='_compute_datetimes', inverse='_inverse_datetime_end', store=True, help='Combined end datetime for calendar display', ) calendar_event_id = fields.Many2one( 'calendar.event', string='Calendar Event', copy=False, ondelete='set null', help='Linked calendar event for external calendar sync', ) # Schedule info helper for the form schedule_info_html = fields.Html( string='Schedule Info', compute='_compute_schedule_info', sanitize=False, ) # ------------------------------------------------------------------ # STATUS # ------------------------------------------------------------------ status = fields.Selection([ ('pending', 'Pending'), ('scheduled', 'Scheduled'), ('en_route', 'En Route'), ('in_progress', 'In Progress'), ('completed', 'Completed'), ('cancelled', 'Cancelled'), ('rescheduled', 'Rescheduled'), ], string='Status', default='scheduled', required=True, tracking=True, index=True) priority = fields.Selection([ ('0', 'Normal'), ('1', 'Urgent'), ('2', 'Emergency'), ], string='Priority', default='0') color = fields.Integer( string='Color Index', compute='_compute_color', ) # ------------------------------------------------------------------ # CLIENT / ADDRESS # ------------------------------------------------------------------ partner_id = fields.Many2one( 'res.partner', string='Client Name', tracking=True, help='Client for this task', ) partner_phone = fields.Char( related='partner_id.phone', string='Client Phone', ) # Address fields - computed from shipping address or manually set address_partner_id = fields.Many2one( 'res.partner', string='Task Address', help='Partner record containing the task address (usually shipping address)', ) address_street = fields.Char(string='Street') address_street2 = fields.Char(string='Unit/Suite #') address_city = fields.Char(string='City') address_state_id = fields.Many2one('res.country.state', string='Province') address_zip = fields.Char(string='Postal Code') address_buzz_code = fields.Char(string='Buzz Code', help='Building buzzer code for entry') address_display = fields.Text( string='Full Address', compute='_compute_address_display', ) # In-store flag -- uses company address instead of client address is_in_store = fields.Boolean( string='In Store', default=False, help='Task takes place at the store/office. Uses company address automatically.', ) # Geocoding address_lat = fields.Float(string='Latitude', digits=(10, 7)) address_lng = fields.Float(string='Longitude', digits=(10, 7)) # ------------------------------------------------------------------ # TASK DETAILS # ------------------------------------------------------------------ description = fields.Text( string='Task Description', required=True, help='What needs to be done', ) equipment_needed = fields.Text( string='Equipment / Materials Needed', help='Tools and materials the technician should bring', ) pod_required = fields.Boolean( string='POD Required', default=False, help='Proof of Delivery signature required', ) pod_signature = fields.Binary( string='POD Signature', attachment=True, ) pod_client_name = fields.Char(string='POD Signer Name') pod_signature_date = fields.Date(string='POD Signature Date') pod_signed_by_user_id = fields.Many2one( 'res.users', string='POD Collected By', readonly=True, ) pod_signed_datetime = fields.Datetime( string='POD Collected At', readonly=True, ) # ------------------------------------------------------------------ # CLIENT EMAIL / REVIEW OPTIONS # ------------------------------------------------------------------ x_fc_send_client_updates = fields.Boolean( string='Send Client Email Updates', default=True, help='Send automatic emails to the client when the technician is en route and when the task is completed', ) x_fc_ask_google_review = fields.Boolean( string='Request Google Review', default=True, help='Include a Google review request in the completion email to the client', ) x_fc_late_notified = fields.Boolean( string='Late Notification Sent', default=False, readonly=True, help='Internal flag: whether a late-arrival notification has already been sent for this task', ) # ------------------------------------------------------------------ # COMPLETION # ------------------------------------------------------------------ completion_notes = fields.Html( string='Completion Notes', help='Notes from the technician about what was done', ) completion_datetime = fields.Datetime( string='Completed At', tracking=True, ) # GPS location captured at task actions started_latitude = fields.Float( string='Started Latitude', digits=(10, 7), readonly=True, ) started_longitude = fields.Float( string='Started Longitude', digits=(10, 7), readonly=True, ) completed_latitude = fields.Float( string='Completed Latitude', digits=(10, 7), readonly=True, ) completed_longitude = fields.Float( string='Completed Longitude', digits=(10, 7), readonly=True, ) action_latitude = fields.Float( string='Last Action Latitude', digits=(10, 7), readonly=True, ) action_longitude = fields.Float( string='Last Action Longitude', digits=(10, 7), readonly=True, ) action_location_accuracy = fields.Float( string='Location Accuracy (m)', readonly=True, ) voice_note_audio = fields.Binary( string='Voice Recording', attachment=True, ) voice_note_transcription = fields.Text( string='Voice Transcription', ) # ------------------------------------------------------------------ # TRAVEL # ------------------------------------------------------------------ travel_time_minutes = fields.Integer( string='Travel Time (min)', help='Estimated travel time from previous task in minutes', ) travel_distance_km = fields.Float( string='Travel Distance (km)', digits=(8, 1), ) travel_origin = fields.Char( string='Travel From', help='Origin address for travel calculation', ) previous_task_id = fields.Many2one( 'fusion.technician.task', string='Previous Task', help='The task before this one in the schedule (for travel calculation)', ) # ------------------------------------------------------------------ # PUSH NOTIFICATION TRACKING # ------------------------------------------------------------------ push_notified = fields.Boolean( string='Push Notified', default=False, help='Whether a push notification was sent for this task', ) push_notified_datetime = fields.Datetime( string='Notified At', ) # ------------------------------------------------------------------ # COMPUTED FIELDS # ------------------------------------------------------------------ # ------------------------------------------------------------------ # SLOT AVAILABILITY HELPERS # ------------------------------------------------------------------ def _find_next_available_slot(self, tech_id, date, preferred_start=9.0, duration=1.0, exclude_task_id=False, dest_lat=0, dest_lng=0): """Find the next available time slot for a technician on a given date. Scans all non-cancelled tasks for that tech+date, sorts them, and walks through the day (9 AM - 6 PM) looking for a gap that fits the requested duration PLUS travel time from the previous task. :param tech_id: res.users id of the technician :param date: date object for the day to check :param preferred_start: float hour to start looking from (default 9.0) :param duration: required slot length in hours (default 1.0) :param exclude_task_id: task id to exclude (when editing an existing task) :param dest_lat: latitude of the destination (new task location) :param dest_lng: longitude of the destination (new task location) :returns: (start_float, end_float) or (False, False) if fully booked """ STORE_OPEN, STORE_CLOSE = self._get_store_hours() if not tech_id or not date: return (preferred_start, preferred_start + duration) domain = [ '|', ('technician_id', '=', tech_id), ('additional_technician_ids', 'in', [tech_id]), ('scheduled_date', '=', date), ('status', 'not in', ['cancelled']), ] if exclude_task_id: domain.append(('id', '!=', exclude_task_id)) booked = self.sudo().search(domain, order='time_start') # Build sorted list of (start, end, lat, lng) intervals intervals = [] for b in booked: intervals.append(( max(b.time_start, STORE_OPEN), min(b.time_end, STORE_CLOSE), b.address_lat or 0, b.address_lng or 0, )) def _travel_hours(from_lat, from_lng, to_lat, to_lng): """Calculate travel time in hours between two locations. Returns 0 if coordinates are missing. Rounds up to 15-min.""" if not from_lat or not from_lng or not to_lat or not to_lng: return 0 travel_min = self._quick_travel_time( from_lat, from_lng, to_lat, to_lng) if travel_min > 0: import math return math.ceil(travel_min / 15.0) * 0.25 return 0 def _travel_from_prev(iv_lat, iv_lng): """Travel from a previous booked task TO the new task.""" return _travel_hours(iv_lat, iv_lng, dest_lat, dest_lng) def _travel_to_next(next_lat, next_lng): """Travel FROM the new task TO the next booked task.""" return _travel_hours(dest_lat, dest_lng, next_lat, next_lng) def _check_gap_fits(cursor, dur, idx): """Check if a slot at 'cursor' for 'dur' hours fits before the interval at index 'idx' (accounting for travel TO that task).""" if idx >= len(intervals): return cursor + dur <= STORE_CLOSE next_start, _ne, next_lat, next_lng = intervals[idx] travel_fwd = _travel_to_next(next_lat, next_lng) return cursor + dur + travel_fwd <= next_start # Walk through gaps, starting from preferred_start cursor = max(preferred_start, STORE_OPEN) for i, (iv_start, iv_end, iv_lat, iv_lng) in enumerate(intervals): if cursor + duration <= iv_start: # Check travel time from new task end TO next booked task if _check_gap_fits(cursor, duration, i): return (cursor, cursor + duration) # Not enough travel time -- try pushing start earlier or skip # If we can't fit here, fall through to jump past this interval # Jump past this booked interval + travel buffer from prev to new new_cursor = max(cursor, iv_end) travel = _travel_from_prev(iv_lat, iv_lng) new_cursor += travel # Snap to nearest 15 min new_cursor = round(new_cursor * 4) / 4 cursor = new_cursor # Check gap after last interval (no next task, so no forward travel needed) if cursor + duration <= STORE_CLOSE: return (cursor, cursor + duration) # No gap found from preferred_start onward -- wrap and try from start if preferred_start > STORE_OPEN: cursor = STORE_OPEN for i, (iv_start, iv_end, iv_lat, iv_lng) in enumerate(intervals): if cursor + duration <= iv_start: if _check_gap_fits(cursor, duration, i): return (cursor, cursor + duration) new_cursor = max(cursor, iv_end) travel = _travel_from_prev(iv_lat, iv_lng) new_cursor += travel new_cursor = round(new_cursor * 4) / 4 cursor = new_cursor if cursor + duration <= STORE_CLOSE: return (cursor, cursor + duration) return (False, False) def _get_available_gaps(self, tech_id, date, exclude_task_id=False): """Return a list of available (start, end) gaps for a technician on a date. Used by schedule_info_html to show green "available" badges. Considers tasks where the tech is either lead or additional. """ STORE_OPEN, STORE_CLOSE = self._get_store_hours() if not tech_id or not date: return [(STORE_OPEN, STORE_CLOSE)] domain = [ '|', ('technician_id', '=', tech_id), ('additional_technician_ids', 'in', [tech_id]), ('scheduled_date', '=', date), ('status', 'not in', ['cancelled']), ] if exclude_task_id: domain.append(('id', '!=', exclude_task_id)) booked = self.sudo().search(domain, order='time_start') intervals = [(max(b.time_start, STORE_OPEN), min(b.time_end, STORE_CLOSE)) for b in booked] gaps = [] cursor = STORE_OPEN for iv_start, iv_end in intervals: if cursor < iv_start: gaps.append((cursor, iv_start)) cursor = max(cursor, iv_end) if cursor < STORE_CLOSE: gaps.append((cursor, STORE_CLOSE)) return gaps @api.model def _get_time_selection(self): """Generate 12-hour time slots every 15 minutes, store hours only (9 AM - 6 PM).""" times = [] for hour in range(9, 18): # 9 AM to 5:45 PM for minute in (0, 15, 30, 45): float_val = hour + minute / 60.0 key = f'{float_val:.2f}' period = 'AM' if hour < 12 else 'PM' display_hour = hour % 12 or 12 label = f'{display_hour}:{minute:02d} {period}' times.append((key, label)) # Add 6:00 PM as end-time option times.append(('18.00', '6:00 PM')) return times @api.depends('time_start', 'time_end') def _compute_time_12h(self): """Sync the 12h selection fields from the raw float values.""" for task in self: task.time_start_12h = f'{(task.time_start or 9.0):.2f}' task.time_end_12h = f'{(task.time_end or 10.0):.2f}' def _inverse_time_start_12h(self): for task in self: if task.time_start_12h: task.time_start = float(task.time_start_12h) def _inverse_time_end_12h(self): for task in self: if task.time_end_12h: task.time_end = float(task.time_end_12h) @api.depends('time_start', 'time_end') def _compute_time_displays(self): """Convert float hours to readable time strings.""" for task in self: task.time_start_display = self._float_to_time_str(task.time_start) task.time_end_display = self._float_to_time_str(task.time_end) @api.onchange('task_type') def _onchange_task_type_duration(self): """Set default duration based on task type.""" if self.task_type: self.duration_hours = self.TASK_TYPE_DURATIONS.get(self.task_type, 1.0) # Also recalculate end time if self.time_start: _open, close = self._get_store_hours() self.time_end = min(self.time_start + self.duration_hours, close) @api.onchange('time_start', 'duration_hours') def _onchange_compute_end_time(self): """Auto-compute end time from start + duration. Also run overlap check.""" if self.time_start and self.duration_hours: _open, close = self._get_store_hours() new_end = min(self.time_start + self.duration_hours, close) self.time_end = new_end # Run overlap snap if we have enough data if self.technician_id and self.scheduled_date and self.time_start and self.time_end: result = self._snap_if_overlap() if result: return result @api.depends('scheduled_date', 'time_start', 'time_end') def _compute_datetimes(self): """Combine date + float time into proper Datetime fields for calendar. time_start/time_end are LOCAL hours; datetime_start/end must be UTC for Odoo.""" import pytz user_tz = pytz.timezone(self.env.user.tz or 'UTC') for task in self: if task.scheduled_date: # Build local datetime, then convert to UTC base = dt_datetime.combine(task.scheduled_date, dt_datetime.min.time()) store_open, _close = task._get_store_hours() local_start = user_tz.localize(base + timedelta(hours=task.time_start or store_open)) local_end = user_tz.localize(base + timedelta(hours=task.time_end or (store_open + 1.0))) task.datetime_start = local_start.astimezone(pytz.utc).replace(tzinfo=None) task.datetime_end = local_end.astimezone(pytz.utc).replace(tzinfo=None) else: task.datetime_start = False task.datetime_end = False def _inverse_datetime_start(self): """When datetime_start is changed (e.g. from calendar drag), update date + time.""" import pytz user_tz = pytz.timezone(self.env.user.tz or 'UTC') for task in self: if task.datetime_start: local_dt = pytz.utc.localize(task.datetime_start).astimezone(user_tz) task.scheduled_date = local_dt.date() task.time_start = local_dt.hour + local_dt.minute / 60.0 def _inverse_datetime_end(self): """When datetime_end is changed (e.g. from calendar resize), update time_end.""" import pytz user_tz = pytz.timezone(self.env.user.tz or 'UTC') for task in self: if task.datetime_end: local_dt = pytz.utc.localize(task.datetime_end).astimezone(user_tz) task.time_end = local_dt.hour + local_dt.minute / 60.0 @api.depends('technician_id', 'scheduled_date') def _compute_schedule_info(self): """Show booked + available time slots for the technician on the selected date.""" for task in self: if not task.technician_id or not task.scheduled_date: task.schedule_info_html = '' continue exclude_id = task.id if task.id else 0 # Find other tasks for the same technician+date (lead or additional) others = self.sudo().search([ '|', ('technician_id', '=', task.technician_id.id), ('additional_technician_ids', 'in', [task.technician_id.id]), ('scheduled_date', '=', task.scheduled_date), ('status', 'not in', ['cancelled']), ('id', '!=', exclude_id), ], order='time_start') if not others: s_open, s_close = self._get_store_hours() open_str = self._float_to_time_str(s_open) close_str = self._float_to_time_str(s_close) task.schedule_info_html = Markup( f'
' f' All slots available ({open_str} - {close_str})
' ) continue # Booked badges booked_lines = [] for o in others: start_str = self._float_to_time_str(o.time_start) end_str = self._float_to_time_str(o.time_end) type_label = dict(self._fields['task_type'].selection).get(o.task_type, o.task_type) client_name = o.partner_id.name or '' booked_lines.append( f'' f'{start_str} - {end_str} ({type_label}{" - " + client_name if client_name else ""})' f'' ) # Available gaps badges gaps = self._get_available_gaps( task.technician_id.id, task.scheduled_date, exclude_task_id=exclude_id, ) avail_lines = [] for g_start, g_end in gaps: # Only show gaps >= 15 min if g_end - g_start >= 0.25: avail_lines.append( f'' f'{self._float_to_time_str(g_start)} - {self._float_to_time_str(g_end)}' f'' ) html_parts = [ '
', ' Booked: ', ' '.join(booked_lines), ] if avail_lines: html_parts.append( '
' 'Available: ' + ' '.join(avail_lines) ) elif not avail_lines: html_parts.append( '
' 'Fully booked' ) html_parts.append('
') task.schedule_info_html = Markup(''.join(html_parts)) @api.depends('technician_id', 'scheduled_date', 'time_start', 'address_lat', 'address_lng', 'address_street') def _compute_prev_task_summary(self): """Show previous task info + travel time warning with color coding.""" for task in self: if not task.technician_id or not task.scheduled_date: task.prev_task_summary_html = '' continue exclude_id = task.id if task.id else 0 # Find the task that ends just before this one starts (lead or additional) prev_tasks = self.sudo().search([ '|', ('technician_id', '=', task.technician_id.id), ('additional_technician_ids', 'in', [task.technician_id.id]), ('scheduled_date', '=', task.scheduled_date), ('status', 'not in', ['cancelled']), ('id', '!=', exclude_id), ('time_end', '<=', task.time_start or 99.0), ], order='time_end desc', limit=1) if not prev_tasks: # Check if this is the first task of the day -- show start location info task.prev_task_summary_html = Markup( '
' ' First task of the day -- ' 'travel calculated from start location.
' ) continue prev = prev_tasks[0] prev_start = self._float_to_time_str(prev.time_start) prev_end = self._float_to_time_str(prev.time_end) type_label = dict(self._fields['task_type'].selection).get( prev.task_type, prev.task_type or '') client_name = prev.partner_id.name or '' prev_addr = prev.address_display or 'No address' # Calculate gap between prev task end and this task start s_open, _s_close = self._get_store_hours() gap_hours = (task.time_start or s_open) - (prev.time_end or s_open) gap_minutes = int(gap_hours * 60) # Try to get travel time if both have coordinates travel_minutes = 0 travel_text = '' if (prev.address_lat and prev.address_lng and task.address_lat and task.address_lng): travel_minutes = self._quick_travel_time( prev.address_lat, prev.address_lng, task.address_lat, task.address_lng, ) if travel_minutes > 0: travel_text = f'{travel_minutes} min drive' else: travel_text = 'Could not calculate travel time' elif prev.address_street and task.address_street: travel_text = 'Save to calculate travel time' else: travel_text = 'Address missing -- cannot calculate travel' # Determine color coding if travel_minutes > 0 and gap_minutes >= travel_minutes: bg_class = 'alert-success' # Green -- enough time icon = 'fa-check-circle' status_text = ( f'{gap_minutes} min gap -- enough travel time ' f'(~{travel_minutes} min drive)' ) elif travel_minutes > 0 and gap_minutes > 0: bg_class = 'alert-warning' # Yellow -- tight icon = 'fa-exclamation-triangle' status_text = ( f'{gap_minutes} min gap -- tight! ' f'Travel is ~{travel_minutes} min drive' ) elif travel_minutes > 0 and gap_minutes <= 0: bg_class = 'alert-danger' # Red -- impossible icon = 'fa-times-circle' status_text = ( f'No gap! Previous task ends at {prev_end}. ' f'Travel is ~{travel_minutes} min drive' ) else: bg_class = 'alert-info' # Blue -- no travel data yet icon = 'fa-info-circle' status_text = travel_text html = ( f'
' f' ' f'Previous: {prev.name} ' f'({type_label}) {prev_start} - {prev_end}' f'{" -- " + client_name if client_name else ""}' f'
' f' {prev_addr}' f'
' f' {status_text}' f'
' ) task.prev_task_summary_html = Markup(html) def _quick_travel_time(self, from_lat, from_lng, to_lat, to_lng): """Quick inline travel time calculation using Google Distance Matrix API. Returns travel time in minutes, or 0 if unavailable.""" try: api_key = self.env['ir.config_parameter'].sudo().get_param( 'fusion_claims.google_maps_api_key', '') if not api_key: return 0 url = 'https://maps.googleapis.com/maps/api/distancematrix/json' params = { 'origins': f'{from_lat},{from_lng}', 'destinations': f'{to_lat},{to_lng}', 'mode': 'driving', 'avoid': 'tolls', 'departure_time': 'now', 'key': api_key, } resp = requests.get(url, params=params, timeout=5) data = resp.json() if data.get('status') == 'OK': elements = data['rows'][0]['elements'][0] if elements.get('status') == 'OK': # Use duration_in_traffic if available, else duration duration = elements.get( 'duration_in_traffic', elements.get('duration', {})) seconds = duration.get('value', 0) return max(1, int(seconds / 60)) except Exception: _logger.warning('Failed to calculate travel time', exc_info=True) return 0 @api.depends('status') def _compute_color(self): color_map = { 'pending': 5, # purple 'scheduled': 0, # grey 'en_route': 4, # blue 'in_progress': 2, # orange 'completed': 10, # green 'cancelled': 1, # red 'rescheduled': 3, # yellow } for task in self: task.color = color_map.get(task.status, 0) @api.depends('address_street', 'address_street2', 'address_city', 'address_state_id', 'address_zip') def _compute_address_display(self): for task in self: street = task.address_street or '' # If the street field already contains a full address (has a comma), # use it directly -- Google Places stores the formatted address here. if ',' in street and ( (task.address_city and task.address_city in street) or (task.address_zip and task.address_zip in street) ): # Street already has full address; just append unit if separate if task.address_street2 and task.address_street2 not in street: task.address_display = f"{street}, {task.address_street2}" else: task.address_display = street else: # Build from components (manual entry or legacy data) parts = [ street, task.address_street2, task.address_city, task.address_state_id.name if task.address_state_id else '', task.address_zip, ] task.address_display = ', '.join([p for p in parts if p]) # ------------------------------------------------------------------ # ONCHANGE - Auto-fill address from client # ------------------------------------------------------------------ @api.onchange('is_in_store') def _onchange_is_in_store(self): """Auto-fill company address when task is marked as in-store.""" if self.is_in_store: company_partner = self.env.company.partner_id if company_partner and company_partner.street: self._fill_address_from_partner(company_partner) else: self.address_street = self.env.company.name or 'In Store' @api.onchange('partner_id') def _onchange_partner_id(self): """Auto-fill address fields from the selected client's address.""" if self.is_in_store: return if self.partner_id: addr = self.partner_id self.address_partner_id = addr.id self.address_street = addr.street or '' self.address_street2 = addr.street2 or '' self.address_city = addr.city or '' self.address_state_id = addr.state_id.id if addr.state_id else False self.address_zip = addr.zip or '' self.address_lat = addr.x_fc_latitude if hasattr(addr, 'x_fc_latitude') and addr.x_fc_latitude else 0 self.address_lng = addr.x_fc_longitude if hasattr(addr, 'x_fc_longitude') and addr.x_fc_longitude else 0 def _fill_address_from_partner(self, addr): """Populate address fields from a partner record.""" if not addr: return self.address_partner_id = addr.id self.address_street = addr.street or '' self.address_street2 = addr.street2 or '' self.address_city = addr.city or '' self.address_state_id = addr.state_id.id if addr.state_id else False self.address_zip = addr.zip or '' self.address_lat = addr.x_fc_latitude if hasattr(addr, 'x_fc_latitude') and addr.x_fc_latitude else 0 self.address_lng = addr.x_fc_longitude if hasattr(addr, 'x_fc_longitude') and addr.x_fc_longitude else 0 # ------------------------------------------------------------------ # CONSTRAINTS + VALIDATION # ------------------------------------------------------------------ @api.constrains('address_street', 'address_lat', 'address_lng', 'is_in_store') def _check_address_required(self): """Non-in-store tasks must have a geocoded address.""" for task in self: if task.x_fc_sync_source: continue if task.is_in_store: continue if not task.address_street: raise ValidationError(_( "A valid address is required. If this task is at the store, " "please check the 'In Store' option." )) @api.constrains('technician_id', 'additional_technician_ids', 'scheduled_date', 'time_start', 'time_end') def _check_no_overlap(self): """Prevent overlapping bookings for the same technician on the same date. Checks both the lead technician and all additional technicians. """ for task in self: if task.status == 'cancelled': continue if task.x_fc_sync_source: continue # Validate time range if task.time_start >= task.time_end: raise ValidationError(_("Start time must be before end time.")) # Validate store hours s_open, s_close = self._get_store_hours() if task.time_start < s_open or task.time_end > s_close: open_str = self._float_to_time_str(s_open) close_str = self._float_to_time_str(s_close) raise ValidationError(_( "Tasks must be scheduled within store hours (%s - %s)." ) % (open_str, close_str)) # Validate not in the past (only for new/scheduled local tasks) if task.status == 'scheduled' and task.scheduled_date and not task.x_fc_sync_source: local_now = self._local_now() today = local_now.date() if task.scheduled_date < today: raise ValidationError(_("Cannot schedule tasks in the past.")) if task.scheduled_date == today: current_hour = local_now.hour + local_now.minute / 60.0 if task.time_start < current_hour: pass # Allow editing existing tasks that started earlier today # Check overlap for lead + additional technicians all_tech_ids = (task.technician_id | task.additional_technician_ids).ids for tech_id in all_tech_ids: tech_name = self.env['res.users'].browse(tech_id).name overlapping = self.sudo().search([ '|', ('technician_id', '=', tech_id), ('additional_technician_ids', 'in', [tech_id]), ('scheduled_date', '=', task.scheduled_date), ('status', 'not in', ['cancelled']), ('id', '!=', task.id), ('time_start', '<', task.time_end), ('time_end', '>', task.time_start), ], limit=1) if overlapping: start_str = self._float_to_time_str(overlapping.time_start) end_str = self._float_to_time_str(overlapping.time_end) raise ValidationError(_( "%(tech)s has a time conflict with %(task)s " "(%(start)s - %(end)s). Please choose a different time.", tech=tech_name, task=overlapping.name, start=start_str, end=end_str, )) # Check travel time gaps for lead technician only # (additional techs travel with the lead, same destination) next_task = self.sudo().search([ '|', ('technician_id', '=', task.technician_id.id), ('additional_technician_ids', 'in', [task.technician_id.id]), ('scheduled_date', '=', task.scheduled_date), ('status', 'not in', ['cancelled']), ('id', '!=', task.id), ('time_start', '>=', task.time_end), ], order='time_start', limit=1) if next_task and task.address_lat and task.address_lng and \ next_task.address_lat and next_task.address_lng: travel_min = self._quick_travel_time( task.address_lat, task.address_lng, next_task.address_lat, next_task.address_lng, ) if travel_min > 0: gap_min = int((next_task.time_start - task.time_end) * 60) if gap_min < travel_min: raise ValidationError(_( "Not enough travel time to the next task!\n\n" "This task ends at %(end)s, and %(next)s starts " "at %(next_start)s (%(gap)d min gap).\n" "Travel time is ~%(travel)d minutes.\n\n" "Please allow at least %(travel)d minutes between tasks.", end=self._float_to_time_str(task.time_end), next=next_task.name, next_start=self._float_to_time_str(next_task.time_start), gap=gap_min, travel=travel_min, )) prev_task = self.sudo().search([ '|', ('technician_id', '=', task.technician_id.id), ('additional_technician_ids', 'in', [task.technician_id.id]), ('scheduled_date', '=', task.scheduled_date), ('status', 'not in', ['cancelled']), ('id', '!=', task.id), ('time_end', '<=', task.time_start), ], order='time_end desc', limit=1) if prev_task and task.address_lat and task.address_lng and \ prev_task.address_lat and prev_task.address_lng: travel_min = self._quick_travel_time( prev_task.address_lat, prev_task.address_lng, task.address_lat, task.address_lng, ) if travel_min > 0: gap_min = int((task.time_start - prev_task.time_end) * 60) if gap_min < travel_min: raise ValidationError(_( "Not enough travel time from the previous task!\n\n" "%(prev)s ends at %(prev_end)s, and this task starts " "at %(start)s (%(gap)d min gap).\n" "Travel time is ~%(travel)d minutes.\n\n" "Please allow at least %(travel)d minutes between tasks.", prev=prev_task.name, prev_end=self._float_to_time_str(prev_task.time_end), start=self._float_to_time_str(task.time_start), gap=gap_min, travel=travel_min, )) @api.onchange('technician_id', 'scheduled_date') def _onchange_technician_date_autoset(self): """Auto-set start/end time to the first available slot when tech+date change.""" if not self.technician_id or not self.scheduled_date: return exclude_id = self._origin.id if self._origin else False duration = self.duration_hours or 1.0 s_open, _s_close = self._get_store_hours() preferred = self.time_start or s_open start, end = self._find_next_available_slot( self.technician_id.id, self.scheduled_date, preferred_start=preferred, duration=duration, exclude_task_id=exclude_id, dest_lat=self.address_lat or 0, dest_lng=self.address_lng or 0, ) if start is not False: self.time_start = start self.time_end = end self.duration_hours = end - start else: return {'warning': { 'title': _('Fully Booked'), 'message': _( '%s is fully booked on %s. No available slots.' ) % (self.technician_id.name, self.scheduled_date.strftime('%B %d, %Y')), }} def _snap_if_overlap(self): """Check if current time_start/time_end overlaps with another task. If so, auto-snap to the next available slot and return a warning dict.""" if not self.technician_id or not self.scheduled_date or not self.time_start: return None exclude_id = self._origin.id if self._origin else 0 duration = max(self.duration_hours or 1.0, 0.25) all_tech_ids = (self.technician_id | self.additional_technician_ids).ids overlapping = self.sudo().search([ '|', ('technician_id', 'in', all_tech_ids), ('additional_technician_ids', 'in', all_tech_ids), ('scheduled_date', '=', self.scheduled_date), ('status', 'not in', ['cancelled']), ('id', '!=', exclude_id), ('time_start', '<', self.time_end), ('time_end', '>', self.time_start), ], limit=1) if overlapping: conflict_name = overlapping.name conflict_start = self._float_to_time_str(overlapping.time_start) conflict_end = self._float_to_time_str(overlapping.time_end) start, end = self._find_next_available_slot( self.technician_id.id, self.scheduled_date, preferred_start=self.time_start, duration=duration, exclude_task_id=exclude_id, dest_lat=self.address_lat or 0, dest_lng=self.address_lng or 0, ) if start is not False: new_start_str = self._float_to_time_str(start) new_end_str = self._float_to_time_str(end) self.time_start = start self.time_end = end self.duration_hours = end - start return {'warning': { 'title': _('Moved to Available Slot'), 'message': _( 'The selected time conflicts with %s (%s - %s).\n' 'Automatically moved to: %s - %s.' ) % (conflict_name, conflict_start, conflict_end, new_start_str, new_end_str), }} else: return {'warning': { 'title': _('No Available Slots'), 'message': _( 'The selected time conflicts with %s (%s - %s) ' 'and no other slots are available on this day.' ) % (conflict_name, conflict_start, conflict_end), }} return None # ------------------------------------------------------------------ # DEFAULT_GET - Calendar pre-fill # ------------------------------------------------------------------ def _snap_to_quarter(self, hour_float): """Round a float hour to the nearest 15-minute slot and clamp to store hours.""" s_open, s_close = self._get_store_hours() snapped = round(hour_float * 4) / 4 return max(s_open, min(s_close, snapped)) @api.model def default_get(self, fields_list): """Handle calendar time range selection: pre-fill date + times from context.""" res = super().default_get(fields_list) ctx = self.env.context # Set duration default based on task type from context task_type = ctx.get('default_task_type', res.get('task_type', 'delivery')) if 'duration_hours' not in res or not res.get('duration_hours'): res['duration_hours'] = self.TASK_TYPE_DURATIONS.get(task_type, 1.0) # When user clicks a time range on the calendar, Odoo passes # default_datetime_start/end in UTC dt_start_utc = None dt_end_utc = None if ctx.get('default_datetime_start'): try: dt_start_utc = fields.Datetime.from_string(ctx['default_datetime_start']) except (ValueError, TypeError): pass if ctx.get('default_datetime_end'): try: dt_end_utc = fields.Datetime.from_string(ctx['default_datetime_end']) except (ValueError, TypeError): pass if dt_start_utc or dt_end_utc: import pytz user_tz = pytz.timezone(self.env.user.tz or 'UTC') if dt_start_utc: dt_start_local = pytz.utc.localize(dt_start_utc).astimezone(user_tz) res['scheduled_date'] = dt_start_local.date() start_float = self._snap_to_quarter( dt_start_local.hour + dt_start_local.minute / 60.0) res['time_start'] = start_float if dt_end_utc: dt_end_local = pytz.utc.localize(dt_end_utc).astimezone(user_tz) end_float = self._snap_to_quarter( dt_end_local.hour + dt_end_local.minute / 60.0) if 'time_start' in res and end_float <= res['time_start']: end_float = res['time_start'] + 1.0 res['time_end'] = end_float # Compute duration from the calendar drag if 'time_start' in res: res['duration_hours'] = end_float - res['time_start'] # Always compute end from start + duration if not already set if 'time_end' not in res and 'time_start' in res and 'duration_hours' in res: _open, close = self._get_store_hours() res['time_end'] = min( res['time_start'] + res['duration_hours'], close) return res # ------------------------------------------------------------------ # CRUD OVERRIDES # ------------------------------------------------------------------ @api.model_create_multi def create(self, vals_list): for vals in vals_list: if vals.get('name', _('New')) == _('New'): vals['name'] = self.env['ir.sequence'].next_by_code('fusion.technician.task') or _('New') if not vals.get('x_fc_sync_uuid') and not vals.get('x_fc_sync_source'): vals['x_fc_sync_uuid'] = str(uuid.uuid4()) # In-store tasks: auto-fill company address if vals.get('is_in_store') and not vals.get('address_street'): company_partner = self.env.company.partner_id if company_partner and company_partner.street: self._fill_address_vals(vals, company_partner) else: vals['address_street'] = self.env.company.name or 'In Store' # Hook: fill address from linked records (overridden by fusion_claims) self._create_vals_fill(vals) records = super().create(vals_list) # Hook: post-create actions for linked records records._on_create_post_actions() # Auto-calculate travel times for the full day chain if not self.env.context.get('skip_travel_recalc'): records._recalculate_day_travel_chains() # Send "Appointment Scheduled" email for rec in records: rec._send_task_scheduled_email() # Push new local tasks to remote instances local_records = records.filtered(lambda r: not r.x_fc_sync_source) if local_records and not self.env.context.get('skip_task_sync'): self.env['fusion.task.sync.config']._push_tasks(local_records, 'create') # Sync to calendar for external calendar integrations records._sync_calendar_event() return records def _create_vals_fill(self, vals): """Hook: fill address from linked records during create. Base implementation fills from partner_id. Override in fusion_claims to also fill from sale_order_id or purchase_order_id. """ if vals.get('partner_id') and not vals.get('address_street'): partner = self.env['res.partner'].browse(vals['partner_id']) if partner.street: self._fill_address_vals(vals, partner) def _on_create_post_actions(self): """Hook: post-create side-effects for linked records. Override in fusion_claims to post chatter messages to linked orders, mark sale orders as ready for delivery, etc. """ pass def write(self, vals): if self.env.context.get('skip_travel_recalc'): res = super().write(vals) if ('status' in vals and vals['status'] in ('completed', 'cancelled') and not self.env.context.get('skip_task_sync')): shadow_records = self.filtered(lambda r: r.x_fc_sync_source) if shadow_records: self.env['fusion.task.sync.config']._push_shadow_status(shadow_records) local_records = self.filtered(lambda r: not r.x_fc_sync_source) if local_records: self.env['fusion.task.sync.config']._push_tasks(local_records, 'write') return res # Safety: ensure time_end is consistent when start/duration change # but time_end wasn't sent (readonly field in view may not save) if ('time_start' in vals or 'duration_hours' in vals) and 'time_end' not in vals: _open, close = self._get_store_hours() start = vals.get('time_start', self[:1].time_start if len(self) == 1 else 9.0) dur = vals.get('duration_hours', self[:1].duration_hours if len(self) == 1 else 1.0) or 1.0 vals['time_end'] = min(start + dur, close) # Detect reschedule mode: capture old values BEFORE write reschedule_mode = self.env.context.get('reschedule_mode') old_schedule = {} schedule_fields = {'scheduled_date', 'time_start', 'time_end', 'duration_hours', 'technician_id'} schedule_changed = schedule_fields & set(vals.keys()) if reschedule_mode and schedule_changed: for task in self: old_schedule[task.id] = { 'date': task.scheduled_date, 'time_start': task.time_start, 'time_end': task.time_end, } # Capture old tech+date combos BEFORE write for travel recalc travel_fields = {'address_street', 'address_city', 'address_zip', 'address_lat', 'address_lng', 'scheduled_date', 'sequence', 'time_start', 'technician_id', 'additional_technician_ids'} needs_travel_recalc = travel_fields & set(vals.keys()) old_combos = set() if needs_travel_recalc: for t in self: old_combos.add((t.technician_id.id, t.scheduled_date)) for tech in t.additional_technician_ids: old_combos.add((tech.id, t.scheduled_date)) res = super().write(vals) if needs_travel_recalc: new_combos = set() for t in self: new_combos.add((t.technician_id.id, t.scheduled_date)) for tech in t.additional_technician_ids: new_combos.add((tech.id, t.scheduled_date)) all_combos = old_combos | new_combos self._recalculate_combos_travel(all_combos) # After write: send reschedule email if schedule actually changed if reschedule_mode and old_schedule: for task in self: old = old_schedule.get(task.id, {}) if old and ( old['date'] != task.scheduled_date or abs(old['time_start'] - task.time_start) > 0.01 or abs(old['time_end'] - task.time_end) > 0.01 ): task._post_status_message('rescheduled') task._send_task_rescheduled_email( old_date=old['date'], old_start=old['time_start'], old_end=old['time_end'], ) # Push updates to remote instances for local tasks sync_fields = {'technician_id', 'additional_technician_ids', 'scheduled_date', 'time_start', 'time_end', 'duration_hours', 'status', 'task_type', 'address_street', 'address_city', 'address_zip', 'address_lat', 'address_lng', 'partner_id'} if sync_fields & set(vals.keys()) and not self.env.context.get('skip_task_sync'): local_records = self.filtered(lambda r: not r.x_fc_sync_source) if local_records: self.env['fusion.task.sync.config']._push_tasks(local_records, 'write') if 'status' in vals and vals['status'] in ('completed', 'cancelled'): shadow_records = self.filtered(lambda r: r.x_fc_sync_source) if shadow_records: self.env['fusion.task.sync.config']._push_shadow_status(shadow_records) # Re-sync calendar event when schedule fields change cal_fields = {'scheduled_date', 'time_start', 'time_end', 'duration_hours', 'technician_id', 'task_type', 'partner_id', 'address_street', 'address_city', 'notes'} if cal_fields & set(vals.keys()): self._sync_calendar_event() return res def _sync_calendar_event(self): """Create or update a linked calendar.event for external calendar sync. Only syncs tasks that have a scheduled date and an assigned technician. Uses sudo() because portal users should not need calendar write access. Falls back gracefully if external calendar validation fails (e.g. Microsoft Calendar requires the organizer to have Outlook synced). """ CalendarEvent = self.env['calendar.event'].sudo() for task in self: if not task.datetime_start or not task.datetime_end or not task.technician_id: if task.calendar_event_id: task.calendar_event_id.unlink() task.with_context(skip_travel_recalc=True).write({'calendar_event_id': False}) continue order = task._get_linked_order() partner = task.partner_id or (order.partner_id if order else False) client_name = partner.name if partner else '' type_label = dict(self._fields['task_type'].selection).get(task.task_type, task.task_type or '') event_name = f"{type_label}: {client_name}" if client_name else f"{type_label} - {task.name}" location_parts = [task.address_street, task.address_city] location = ', '.join(p for p in location_parts if p) or '' description_parts = [] if order: description_parts.append(f"Ref: {order.name}") if task.description: description_parts.append(task.description) vals = { 'name': event_name, 'start': task.datetime_start, 'stop': task.datetime_end, 'user_id': task.technician_id.id, 'location': location, 'partner_ids': [(6, 0, [task.technician_id.partner_id.id])], 'show_as': 'busy', 'description': '\n'.join(description_parts), } try: if task.calendar_event_id: task.calendar_event_id.write(vals) else: event = CalendarEvent.create(vals) task.with_context(skip_travel_recalc=True).write({'calendar_event_id': event.id}) except Exception as e: _logger.warning( "Calendar sync skipped for task %s (tech=%s): %s", task.name, task.technician_id.name, e, ) if not task.calendar_event_id: try: vals['user_id'] = self.env.uid event = CalendarEvent.create(vals) task.with_context(skip_travel_recalc=True).write({'calendar_event_id': event.id}) except Exception: pass @api.model def _fill_address_vals(self, vals, partner): """Helper to fill address vals dict from a partner record.""" vals.update({ 'address_partner_id': partner.id, 'address_street': partner.street or '', 'address_street2': partner.street2 or '', 'address_city': partner.city or '', 'address_state_id': partner.state_id.id if partner.state_id else False, 'address_zip': partner.zip or '', 'address_lat': partner.x_fc_latitude if hasattr(partner, 'x_fc_latitude') else 0, 'address_lng': partner.x_fc_longitude if hasattr(partner, 'x_fc_longitude') else 0, }) def _post_task_created_to_linked_order(self): """Hook: post task creation notice to linked order chatter. Override in fusion_claims.""" pass def _mark_sale_order_ready_for_delivery(self): """Hook: mark linked sale orders as ready for delivery. Override in fusion_claims.""" pass def _recalculate_day_travel_chains(self): """Recalculate travel for all tech+date combos affected by these tasks. Includes combos for additional technicians so their schedules update too. """ combos = set() for t in self: if not t.scheduled_date: continue if t.technician_id: combos.add((t.technician_id.id, t.scheduled_date)) for tech in t.additional_technician_ids: combos.add((tech.id, t.scheduled_date)) self._recalculate_combos_travel(combos) def _get_technician_start_address(self, tech_id): """Get the start address for a technician. Priority: 1. Technician's personal x_fc_start_address (if set) 2. Company default HQ address (fusion_claims.technician_start_address) Returns the address string or ''. """ tech_user = self.env['res.users'].sudo().browse(tech_id) if tech_user.exists() and tech_user.x_fc_start_address: return tech_user.x_fc_start_address.strip() # Fallback to company default return (self.env['ir.config_parameter'].sudo() .get_param('fusion_claims.technician_start_address', '') or '').strip() def _geocode_address_string(self, address, api_key): """Geocode an address string and return (lat, lng) or (0.0, 0.0).""" if not address or not api_key: return 0.0, 0.0 try: url = 'https://maps.googleapis.com/maps/api/geocode/json' params = {'address': address, 'key': api_key, 'region': 'ca'} resp = requests.get(url, params=params, timeout=10) data = resp.json() if data.get('status') == 'OK' and data.get('results'): loc = data['results'][0]['geometry']['location'] return loc['lat'], loc['lng'] except Exception as e: _logger.warning("Address geocoding failed for '%s': %s", address, e) return 0.0, 0.0 def _recalculate_combos_travel(self, combos): """Recalculate travel for a set of (tech_id, date) combinations. Start-point priority per technician (for today only): 1. Latest GPS location (from periodic tracking or task actions) 2. Actual GPS from today's fusion_clock check-in 3. Personal start address (x_fc_start_address) 4. Company default HQ address For future dates, only 3 and 4 apply. """ ICP = self.env['ir.config_parameter'].sudo() enabled = ICP.get_param('fusion_claims.google_distance_matrix_enabled', False) if not enabled: return api_key = self._get_google_maps_api_key() start_coords_cache = {} today = self._local_now().date() today_str = str(today) today_tech_ids = {tid for tid, d in combos if tid and str(d) == today_str} clock_locations = {} if today_tech_ids: clock_locations = self._get_clock_in_locations(today_tech_ids, today) for tech_id, date in combos: if not tech_id or not date: continue cache_key = (tech_id, str(date)) if cache_key not in start_coords_cache: if str(date) == today_str: # Try latest GPS first (most accurate real-time position) lat, lng = self._get_tech_current_location(tech_id) if lat and lng: start_coords_cache[cache_key] = (lat, lng) elif tech_id in clock_locations: cl = clock_locations[tech_id] start_coords_cache[cache_key] = (cl['lat'], cl['lng']) else: addr = self._get_technician_start_address(tech_id) start_coords_cache[cache_key] = self._geocode_address_string(addr, api_key) else: addr = self._get_technician_start_address(tech_id) start_coords_cache[cache_key] = self._geocode_address_string(addr, api_key) all_day_tasks = self.sudo().search([ '|', ('technician_id', '=', tech_id), ('additional_technician_ids', 'in', [tech_id]), ('scheduled_date', '=', date), ('status', 'not in', ['cancelled']), ], order='time_start, sequence, id') if not all_day_tasks: continue prev_lat, prev_lng = start_coords_cache[cache_key] for i, task in enumerate(all_day_tasks): if not (task.address_lat and task.address_lng): task._geocode_address() travel_vals = {} if prev_lat and prev_lng and task.address_lat and task.address_lng: task.with_context(skip_travel_recalc=True)._calculate_travel_time(prev_lat, prev_lng) travel_vals['previous_task_id'] = all_day_tasks[i - 1].id if i > 0 else False travel_vals['travel_origin'] = 'Clock-In Location' if i == 0 and str(date) == today_str and tech_id in clock_locations else ('Start Location' if i == 0 else f'Task {all_day_tasks[i - 1].name}') if travel_vals: task.with_context(skip_travel_recalc=True).write(travel_vals) prev_lat = task.address_lat or prev_lat prev_lng = task.address_lng or prev_lng # ------------------------------------------------------------------ # LIVE TRAVEL RECALCULATION (uses tech's current GPS position) # ------------------------------------------------------------------ def _get_tech_current_location(self, tech_id): """Get the technician's most recent GPS location. Priority: 1. Latest fusion.technician.location record from last 30 min 2. Latest action_latitude/longitude from today's tasks 3. Clock-in location 4. None (caller falls back to start address) """ Location = self.env['fusion.technician.location'].sudo() cutoff = fields.Datetime.subtract(fields.Datetime.now(), minutes=30) latest = Location.search([ ('user_id', '=', tech_id), ('logged_at', '>', cutoff), ('source', '!=', 'sync'), ], order='logged_at desc', limit=1) if latest and latest.latitude and latest.longitude: return latest.latitude, latest.longitude # Fallback: last completed task's location today today = self._local_now().date() last_completed = self.sudo().search([ ('technician_id', '=', tech_id), ('scheduled_date', '=', today), ('status', '=', 'completed'), ('completed_latitude', '!=', 0), ('completed_longitude', '!=', 0), ], order='completion_datetime desc', limit=1) if last_completed: return last_completed.completed_latitude, last_completed.completed_longitude # Fallback: clock-in location clock_locs = self._get_clock_in_locations({tech_id}, today) if tech_id in clock_locs: cl = clock_locs[tech_id] return cl['lat'], cl['lng'] return None, None def _recalculate_travel_from_current_location(self): """Recalculate travel time for THIS task from the tech's current GPS. Called when tech starts en_route to get a live ETA. """ self.ensure_one() ICP = self.env['ir.config_parameter'].sudo() if not ICP.get_param('fusion_claims.google_distance_matrix_enabled', False): return tech_id = self.technician_id.id if not tech_id: return lat, lng = self._get_tech_current_location(tech_id) if lat and lng and self.address_lat and self.address_lng: self.with_context(skip_travel_recalc=True)._calculate_travel_time(lat, lng) self.with_context(skip_travel_recalc=True).write({ 'travel_origin': 'Current Location (Live)', }) def _recalculate_remaining_tasks_travel(self): """After completing a task, recalculate travel for all remaining tasks in the chain using the completion location as the new origin. This ensures ETAs update in real-time as the tech progresses through their schedule, and the route reflects their actual position. """ self.ensure_one() ICP = self.env['ir.config_parameter'].sudo() if not ICP.get_param('fusion_claims.google_distance_matrix_enabled', False): return tech_id = self.technician_id.id if not tech_id or not self.scheduled_date: return # Use completion GPS as origin for next task origin_lat = self.completed_latitude or self.action_latitude origin_lng = self.completed_longitude or self.action_longitude # If no GPS from completion, try task address (tech was physically there) if not origin_lat or not origin_lng: origin_lat = self.address_lat origin_lng = self.address_lng if not origin_lat or not origin_lng: return remaining = self.sudo().search([ '|', ('technician_id', '=', tech_id), ('additional_technician_ids', 'in', [tech_id]), ('scheduled_date', '=', self.scheduled_date), ('status', 'not in', ['completed', 'cancelled']), ('time_start', '>=', self.time_start), ], order='time_start, sequence, id') if not remaining: return prev_lat, prev_lng = origin_lat, origin_lng for i, task in enumerate(remaining): if not (task.address_lat and task.address_lng): task._geocode_address() if prev_lat and prev_lng and task.address_lat and task.address_lng: task.with_context(skip_travel_recalc=True)._calculate_travel_time( prev_lat, prev_lng) origin_label = (f'Completed: {self.name}' if i == 0 else f'Task {remaining[i - 1].name}') task.with_context(skip_travel_recalc=True).write({ 'previous_task_id': self.id if i == 0 else remaining[i - 1].id, 'travel_origin': origin_label, }) prev_lat = task.address_lat or prev_lat prev_lng = task.address_lng or prev_lng # ------------------------------------------------------------------ # STATUS ACTIONS # ------------------------------------------------------------------ def _check_previous_tasks_completed(self): """Check that all earlier tasks for the same technician+date are completed. Considers tasks where the technician is either lead or additional. """ self.ensure_one() earlier_incomplete = self.sudo().search([ '|', ('technician_id', '=', self.technician_id.id), ('additional_technician_ids', 'in', [self.technician_id.id]), ('scheduled_date', '=', self.scheduled_date), ('time_start', '<', self.time_start), ('status', 'not in', ['completed', 'cancelled']), ('id', '!=', self.id), ], limit=1) if earlier_incomplete: raise UserError(_( "Please complete previous task %s first before starting this one." ) % earlier_incomplete.name) def _write_action_location(self, extra_vals=None): """Write GPS coordinates from context onto the task record.""" ctx = self.env.context lat = ctx.get('action_latitude', 0) lng = ctx.get('action_longitude', 0) acc = ctx.get('action_accuracy', 0) vals = { 'action_latitude': lat, 'action_longitude': lng, 'action_location_accuracy': acc, } if extra_vals: vals.update(extra_vals) if lat and lng: self.with_context(skip_travel_recalc=True).write(vals) def action_start_en_route(self): """Mark task as En Route.""" for task in self: if task.status != 'scheduled': raise UserError(_("Only scheduled tasks can be marked as En Route.")) task._check_previous_tasks_completed() task.status = 'en_route' task._write_action_location() task._post_status_message('en_route') task._send_task_en_route_email() # Recalculate travel from tech's current location to THIS task task._recalculate_travel_from_current_location() if task.x_fc_sync_source: try: self.env['fusion.task.sync.config']._push_shadow_status(task) except Exception: _logger.exception( "Failed to push en_route for shadow %s", task.name) try: remaining = self.sudo().search_count([ ('technician_id', '=', task.technician_id.id), ('scheduled_date', '=', task.scheduled_date), ('status', 'in', ['scheduled', 'en_route']), ('id', '!=', task.id), ]) client = task.client_display_name or 'your next client' ttype = dict(self._fields['task_type'].selection).get( task.task_type, task.task_type or 'Task') task._send_push_notification( f'En Route to {client}', f'{ttype} at {task.address_display or "scheduled location"}. ' f'{remaining} more task(s) today.', ) except Exception: pass def action_start_task(self): """Mark task as In Progress.""" for task in self: if task.status not in ('scheduled', 'en_route'): raise UserError(_("Task must be scheduled or en route to start.")) task._check_previous_tasks_completed() task.status = 'in_progress' ctx = self.env.context task._write_action_location({ 'started_latitude': ctx.get('action_latitude', 0), 'started_longitude': ctx.get('action_longitude', 0), }) task._post_status_message('in_progress') def action_complete_task(self): """Mark task as Completed.""" for task in self: if task.status not in ('in_progress', 'en_route', 'scheduled'): raise UserError(_("Task must be in progress to complete.")) task._check_completion_requirements() ctx = self.env.context task.with_context(skip_travel_recalc=True).write({ 'status': 'completed', 'completion_datetime': fields.Datetime.now(), 'completed_latitude': ctx.get('action_latitude', 0), 'completed_longitude': ctx.get('action_longitude', 0), 'action_latitude': ctx.get('action_latitude', 0), 'action_longitude': ctx.get('action_longitude', 0), 'action_location_accuracy': ctx.get('action_accuracy', 0), }) task._post_status_message('completed') task._post_completion_to_linked_order() task._notify_scheduler_on_completion() task._send_task_completion_email() # Recalculate travel for remaining tasks from this completion location task._recalculate_remaining_tasks_travel() task._on_complete_extra() def _check_completion_requirements(self): """Hook: check additional requirements before task completion. Override in fusion_claims for rental inspection checks.""" pass def _on_complete_extra(self): """Hook: additional side-effects after task completion. Override in fusion_claims for ODSP advancement and rental inspection.""" pass def action_cancel_task(self): """Cancel the task. Sends cancellation email and runs cancel hooks.""" for task in self: if task.status == 'completed': raise UserError(_("Cannot cancel a completed task.")) task.status = 'cancelled' task._write_action_location() task._post_status_message('cancelled') if task.x_fc_sync_source: try: self.env['fusion.task.sync.config']._push_shadow_status(task) except Exception: _logger.exception( "Failed to push cancel for shadow %s", task.name) else: task._on_cancel_extra() def _on_cancel_extra(self): """Hook: additional side-effects after task cancellation. Override in fusion_claims for sale order revert and email.""" self._send_task_cancelled_email() def action_reschedule(self): """Open the reschedule form for this task. Saves old schedule info, then opens the same task form for editing. On save, the write() method detects the reschedule and sends emails.""" self.ensure_one() return { 'type': 'ir.actions.act_window', 'res_model': 'fusion.technician.task', 'res_id': self.id, 'view_mode': 'form', 'target': 'new', 'context': { 'reschedule_mode': True, 'old_date': str(self.scheduled_date) if self.scheduled_date else '', 'old_time_start': self.time_start, 'old_time_end': self.time_end, }, } def action_reset_to_scheduled(self): """Reset task back to scheduled.""" for task in self: task.status = 'scheduled' # ------------------------------------------------------------------ # CHATTER / NOTIFICATIONS # ------------------------------------------------------------------ def _post_status_message(self, new_status): """Post a status change message to the task chatter.""" self.ensure_one() status_labels = dict(self._fields['status'].selection) label = status_labels.get(new_status, new_status) icons = { 'en_route': 'fa-road', 'in_progress': 'fa-wrench', 'completed': 'fa-check-circle', 'cancelled': 'fa-times-circle', 'rescheduled': 'fa-calendar', } icon = icons.get(new_status, 'fa-info-circle') body = Markup( f'

Task status changed to ' f'{label} by {self.env.user.name}

' ) self.message_post(body=body, message_type='notification', subtype_xmlid='mail.mt_note') def _post_completion_to_linked_order(self): """Hook: post completion notes to linked order chatter. Override in fusion_claims.""" pass def _notify_scheduler_on_completion(self): """Send an Odoo notification to the person who scheduled the task. Shadow tasks skip this -- the push-back to the source instance triggers the notification there where the real scheduler exists. """ self.ensure_one() if self.x_fc_sync_source: return recipient = None order = self._get_linked_order() if order and order.user_id: recipient = order.user_id elif self.create_uid: recipient = self.create_uid if not recipient or recipient in self.all_technician_ids: return task_type_label = dict(self._fields['task_type'].selection).get(self.task_type, self.task_type) task_url = f'/web#id={self.id}&model=fusion.technician.task&view_type=form' client_name = self.client_display_name or 'N/A' order = self._get_linked_order() case_ref = order.name if order else '' addr_parts = [p for p in [ self.address_street, self.address_street2, self.address_city, self.address_state_id.name if self.address_state_id else '', self.address_zip, ] if p] address_str = ', '.join(addr_parts) or 'No address' subject = f'Task Completed: {client_name}' if case_ref: subject += f' ({case_ref})' body = Markup( f'
' f'

' f'{task_type_label} Completed

' f'' f'' f'' f'' f'' f'' f'' f'' f'' f'' f'' f'
Client:{client_name}
Case:{case_ref or "N/A"}
Task:{self.name}
Technician(s):{self.all_technician_names or self.technician_id.name}
Location:{address_str}
' f'

View Task

' f'
' ) self.env['mail.thread'].sudo().message_notify( partner_ids=[recipient.partner_id.id], body=body, subject=subject, ) # ------------------------------------------------------------------ # TASK EMAIL NOTIFICATIONS # ------------------------------------------------------------------ def _get_task_email_details(self): """Build common detail rows for task emails.""" self.ensure_one() type_label = dict(self._fields['task_type'].selection).get( self.task_type, self.task_type or '') rows = [ ('Task', f'{self.name} ({type_label})'), ('Client', self.partner_id.name or 'N/A'), ] if self.scheduled_date: date_str = self.scheduled_date.strftime('%B %d, %Y') start_str = self._float_to_time_str(self.time_start) end_str = self._float_to_time_str(self.time_end) rows.append(('Scheduled', f'{date_str}, {start_str} - {end_str}')) if self.technician_id: rows.append(('Technician', self.all_technician_names or self.technician_id.name)) if self.address_display: rows.append(('Address', self.address_display)) return rows def _get_task_email_recipients(self): """Get email recipients for task notifications. Returns dict with 'to' (client), 'cc' (technician, sales rep, office).""" self.ensure_one() to_emails = [] cc_emails = [] # Client email if self.partner_id and self.partner_id.email: to_emails.append(self.partner_id.email) # Technician emails (lead + additional) for tech in (self.technician_id | self.additional_technician_ids): if tech.email: cc_emails.append(tech.email) return {'to': to_emails, 'cc': list(set(cc_emails))} def _send_task_cancelled_email(self): """Send cancellation email. Base: no-op. Override in fusion_claims.""" return False def _send_task_scheduled_email(self): """Send scheduled email. Base: no-op. Override in fusion_claims.""" return False def _send_task_rescheduled_email(self, old_date=None, old_start=None, old_end=None): """Send rescheduled email. Base: no-op. Override in fusion_claims.""" return False # ------------------------------------------------------------------ # CLIENT UPDATE EMAILS (en-route + completion) # ------------------------------------------------------------------ def _get_email_builder(self): """Return a record that has the _email_build mixin. Base: returns self (task model inherits mixin). Override in fusion_claims to prefer linked sale order. """ return self def _is_email_notifications_enabled(self): """Check if email notifications are enabled. Base: always True. Override in fusion_claims to check linked sale order's notification settings. """ return True def _get_linked_order(self): """Return the linked order record (SO or PO), or False. Base: always False. Override in fusion_claims to return sale_order_id or purchase_order_id. """ return False def _send_task_en_route_email(self): """Email the client that the technician is on the way.""" self.ensure_one() if self.x_fc_sync_source or not self.x_fc_send_client_updates: return False if not self.partner_id or not self.partner_id.email: return False if not self._is_email_notifications_enabled(): return False client_name = self.client_display_name or self.partner_id.name or 'Client' tech_name = self.all_technician_names or (self.technician_id.name if self.technician_id else 'Our technician') type_label = dict(self._fields['task_type'].selection).get( self.task_type, self.task_type or 'service') company = self.env.company detail_rows = self._get_task_email_details() builder = self._get_email_builder() time_range = '' if self.scheduled_date and self.time_start is not None: time_range = ( f'{self.scheduled_date.strftime("%B %d, %Y")}, ' f'{self._float_to_time_str(self.time_start)} - ' f'{self._float_to_time_str(self.time_end or self.time_start + 1.0)}' ) preparation_note = ( 'Please prepare for the visit:
' '
    ' '
  • Ensure the area where service is needed is accessible and clear.
  • ' '
  • If applicable, secure pets away from the work area.
  • ' '
  • Have any relevant documents or information ready for the technician.
  • ' '
  • An adult (18+) must be present during the visit.
  • ' '
' 'Important: Our technicians will not ask for credit card ' 'details or request payment directly unless payment on arrival has been ' 'previously agreed upon. If a cash payment was arranged, please have ' 'the payment ready in an envelope.' ) body_html = builder._email_build( title='Your Technician Is On The Way', summary=( f'Our technician {tech_name} is heading to your ' f'location and is expected to arrive between ' f'{time_range or "the scheduled time"}.' ), email_type='info', sections=[('Visit Details', detail_rows)], note=preparation_note, note_color='#2B6CB0', sender_name=f'The {company.name} Team', ) recipients = self._get_task_email_recipients() to_emails = recipients.get('to', []) cc_emails = recipients.get('cc', []) if not to_emails: return False email_to = ', '.join(to_emails) email_cc = ', '.join(cc_emails) order = self._get_linked_order() case_ref = order.name if order else self.name try: mail_vals = { 'subject': f'Your Technician Is On The Way - {client_name} - {case_ref}', 'body_html': body_html, 'email_to': email_to, 'email_cc': email_cc, } if order: mail_vals['model'] = order._name mail_vals['res_id'] = order.id self.env['mail.mail'].sudo().create(mail_vals).send() _logger.info("Sent en-route email for task %s to %s", self.name, email_to) return True except Exception as e: _logger.error("Failed to send en-route email for %s: %s", self.name, e) return False def _send_task_completion_email(self): """Email the client that the visit is complete. Sends one of two variants depending on x_fc_ask_google_review: - With Google review request (default) - Standard thank-you without review request """ self.ensure_one() if self.x_fc_sync_source or not self.x_fc_send_client_updates: return False if not self.partner_id or not self.partner_id.email: return False if not self._is_email_notifications_enabled(): return False client_name = self.client_display_name or self.partner_id.name or 'Client' tech_name = self.all_technician_names or (self.technician_id.name if self.technician_id else 'Our technician') type_label = dict(self._fields['task_type'].selection).get( self.task_type, self.task_type or 'service') company = self.env.company builder = self._get_email_builder() summary_rows = [ ('Client', client_name), ('Service', type_label.title()), ] if self.scheduled_date: summary_rows.append(('Date', self.scheduled_date.strftime('%B %d, %Y'))) if self.technician_id: summary_rows.append(('Technician', tech_name)) contact_note = ( 'If you have any questions or concerns about the service provided, ' f'please don\'t hesitate to contact us at ' f'{company.phone or company.email or "our office"}.' ) google_url = company.x_fc_google_review_url or '' include_review = self.x_fc_ask_google_review and google_url extra_html = '' button_url = '' button_text = '' if include_review: extra_html = builder._email_note( 'We Value Your Feedback

' 'We hope you had a great experience! Your feedback helps us ' 'improve and serve you better. We would truly appreciate it ' 'if you could take a moment to share your experience.', '#38a169', ) button_url = google_url button_text = 'Leave a Review' else: company_website = company.website or '' if company_website: button_url = company_website button_text = 'Visit Our Website' body_html = builder._email_build( title='Service Visit Completed', summary=( f'Our technician {tech_name} has completed the ' f'{type_label.lower()} at your location. ' f'Thank you for choosing {company.name}.' ), email_type='success', sections=[('Visit Summary', summary_rows)], extra_html=extra_html, note=contact_note, note_color='#38a169', button_url=button_url, button_text=button_text, sender_name=f'The {company.name} Team', ) recipients = self._get_task_email_recipients() to_emails = recipients.get('to', []) cc_emails = recipients.get('cc', []) if not to_emails: return False email_to = ', '.join(to_emails) email_cc = ', '.join(cc_emails) order = self._get_linked_order() case_ref = order.name if order else self.name try: mail_vals = { 'subject': f'Service Visit Completed - {client_name} - {case_ref}', 'body_html': body_html, 'email_to': email_to, 'email_cc': email_cc, } if order: mail_vals['model'] = order._name mail_vals['res_id'] = order.id self.env['mail.mail'].sudo().create(mail_vals).send() _logger.info("Sent completion email for task %s to %s", self.name, email_to) return True except Exception as e: _logger.error("Failed to send completion email for %s: %s", self.name, e) return False def get_next_task_for_technician(self): """Get the next task in sequence for the same technician+date after this one. Considers tasks where the technician is either lead or additional. """ self.ensure_one() return self.sudo().search([ '|', ('technician_id', '=', self.technician_id.id), ('additional_technician_ids', 'in', [self.technician_id.id]), ('scheduled_date', '=', self.scheduled_date), ('time_start', '>=', self.time_start), ('status', 'in', ['scheduled', 'en_route']), ('id', '!=', self.id), ], order='time_start, sequence, id', limit=1) # ------------------------------------------------------------------ # GOOGLE MAPS INTEGRATION # ------------------------------------------------------------------ def _get_google_maps_api_key(self): """Get the Google Maps API key from config.""" return self.env['ir.config_parameter'].sudo().get_param( 'fusion_claims.google_maps_api_key', '' ) @api.model def get_map_data(self, domain=None): """Return task data, technician locations, and Google Maps API key. Args: domain: optional extra domain from the search bar filters. """ api_key = self.env['ir.config_parameter'].sudo().get_param( 'fusion_claims.google_maps_api_key', '') local_instance = self.env['ir.config_parameter'].sudo().get_param( 'fusion_claims.sync_instance_id', '') base_domain = [ ('status', 'not in', ['cancelled']), ] if domain: base_domain = expression.AND([base_domain, domain]) tasks = self.sudo().search_read( base_domain, ['name', 'partner_id', 'technician_id', 'task_type', 'address_lat', 'address_lng', 'address_display', 'time_start', 'time_end', 'time_start_display', 'time_end_display', 'status', 'scheduled_date', 'travel_time_minutes', 'x_fc_sync_client_name', 'x_fc_is_shadow', 'x_fc_sync_source'], order='scheduled_date asc NULLS LAST, time_start asc', limit=500, ) locations = self.env['fusion.technician.location'].get_latest_locations() tech_starts = self._get_tech_start_locations(tasks, api_key) return { 'api_key': api_key, 'tasks': tasks, 'locations': locations, 'local_instance_id': local_instance, 'tech_start_locations': tech_starts, } @api.model def _get_tech_start_locations(self, tasks, api_key): """Build a dict of technician start locations for route origins. Priority per technician: 1. Today's fusion_clock check-in location (if module installed) 2. Personal start address (x_fc_start_address with cached lat/lng) 3. Company default HQ address """ tech_ids = { t['technician_id'][0] for t in tasks if t.get('technician_id') } if not tech_ids: return {} result = {} today = self._local_now().date() clock_locations = self._get_clock_in_locations(tech_ids, today) hq_address = ( self.env['ir.config_parameter'].sudo() .get_param('fusion_claims.technician_start_address', '') or '' ).strip() hq_lat, hq_lng = 0.0, 0.0 for uid in tech_ids: if uid in clock_locations: result[uid] = clock_locations[uid] continue user = self.env['res.users'].sudo().browse(uid) if not user.exists(): continue partner = user.partner_id if partner.x_fc_start_address and partner.x_fc_start_address.strip(): lat = partner.x_fc_start_address_lat lng = partner.x_fc_start_address_lng if not lat or not lng: lat, lng = self._geocode_address_string( partner.x_fc_start_address, api_key) if lat and lng: partner.sudo().write({ 'x_fc_start_address_lat': lat, 'x_fc_start_address_lng': lng, }) if lat and lng: result[uid] = { 'lat': lat, 'lng': lng, 'address': partner.x_fc_start_address.strip(), 'source': 'start_address', } continue if hq_address: if not hq_lat and not hq_lng: hq_lat, hq_lng = self._geocode_address_string( hq_address, api_key) if hq_lat and hq_lng: result[uid] = { 'lat': hq_lat, 'lng': hq_lng, 'address': hq_address, 'source': 'company_hq', } return result @api.model def _get_clock_in_locations(self, tech_ids, today): """Get today's clock-in lat/lng from fusion_clock if installed. Uses the technician's actual GPS position at the moment they clocked in (from the activity log), not the geofenced location's fixed coordinates. Falls back to the geofence center if no activity-log GPS is available. """ result = {} try: module = self.env['ir.module.module'].sudo().search([ ('name', '=', 'fusion_clock'), ('state', '=', 'installed'), ], limit=1) if not module: return result except Exception: return result try: Attendance = self.env['hr.attendance'].sudo() Employee = self.env['hr.employee'].sudo() ActivityLog = self.env['fusion.clock.activity.log'].sudo() except KeyError: return result employees = Employee.search([ ('user_id', 'in', list(tech_ids)), ]) emp_to_user = {e.id: e.user_id.id for e in employees} if not employees: return result today_start = dt_datetime.combine(today, dt_datetime.min.time()) today_end = today_start + timedelta(days=1) attendances = Attendance.search([ ('employee_id', 'in', employees.ids), ('check_in', '>=', today_start), ('check_in', '<', today_end), ], order='check_in asc') for att in attendances: uid = emp_to_user.get(att.employee_id.id) if not uid or uid in result: continue lat, lng, address = 0, 0, '' log = ActivityLog.search([ ('attendance_id', '=', att.id), ('log_type', '=', 'clock_in'), ('latitude', '!=', 0), ('longitude', '!=', 0), ], limit=1) if log: lat, lng = log.latitude, log.longitude loc = att.x_fclk_location_id if hasattr(att, 'x_fclk_location_id') else False address = (loc.address or loc.name) if loc else '' if not lat or not lng: loc = att.x_fclk_location_id if hasattr(att, 'x_fclk_location_id') else False if loc and loc.latitude and loc.longitude: lat, lng = loc.latitude, loc.longitude address = loc.address or loc.name or '' if lat and lng: result[uid] = { 'lat': lat, 'lng': lng, 'address': address, 'source': 'clock_in', } return result def _geocode_address(self): """Geocode the task address using Google Geocoding API.""" self.ensure_one() api_key = self._get_google_maps_api_key() if not api_key or not self.address_display: return False try: url = 'https://maps.googleapis.com/maps/api/geocode/json' params = { 'address': self.address_display, 'key': api_key, 'region': 'ca', } resp = requests.get(url, params=params, timeout=10) data = resp.json() if data.get('status') == 'OK' and data.get('results'): location = data['results'][0]['geometry']['location'] self.write({ 'address_lat': location['lat'], 'address_lng': location['lng'], }) return True except Exception as e: _logger.warning(f"Geocoding failed for task {self.name}: {e}") return False def _calculate_travel_time(self, origin_lat, origin_lng): """Calculate travel time from origin to this task using Distance Matrix API.""" self.ensure_one() api_key = self._get_google_maps_api_key() if not api_key: return False if not (origin_lat and origin_lng and self.address_lat and self.address_lng): return False try: url = 'https://maps.googleapis.com/maps/api/distancematrix/json' params = { 'origins': f'{origin_lat},{origin_lng}', 'destinations': f'{self.address_lat},{self.address_lng}', 'key': api_key, 'mode': 'driving', 'avoid': 'tolls', 'traffic_model': 'best_guess', 'departure_time': 'now', } resp = requests.get(url, params=params, timeout=10) data = resp.json() if data.get('status') == 'OK': element = data['rows'][0]['elements'][0] if element.get('status') == 'OK': duration_seconds = element['duration_in_traffic']['value'] if 'duration_in_traffic' in element else element['duration']['value'] distance_meters = element['distance']['value'] self.write({ 'travel_time_minutes': round(duration_seconds / 60), 'travel_distance_km': round(distance_meters / 1000, 1), }) return True except Exception as e: _logger.warning(f"Travel time calculation failed for task {self.name}: {e}") return False def action_calculate_travel_times(self): """Calculate travel times for a day's schedule. Called from backend button or cron.""" self._do_calculate_travel_times() # Return False to stay on the current form without navigation return False def _do_calculate_travel_times(self): """Internal: calculate travel times for tasks. Does not return an action. For today's tasks: uses the tech's current GPS location as origin for the first non-completed task, so ETAs reflect reality. For future tasks: uses personal start address or company HQ. """ # Group tasks by technician and date task_groups = {} for task in self: key = (task.technician_id.id, task.scheduled_date) if key not in task_groups: task_groups[key] = self.env['fusion.technician.task'] task_groups[key] |= task api_key = self._get_google_maps_api_key() today = self._local_now().date() for (tech_id, date), tasks in task_groups.items(): sorted_tasks = tasks.sorted(lambda t: (t.sequence, t.time_start)) # For today: try current GPS, then clock-in, then start address # For future: use start address if date == today: lat, lng = self._get_tech_current_location(tech_id) if lat and lng: prev_lat, prev_lng = lat, lng origin_label = 'Current Location' else: clock_locs = self._get_clock_in_locations({tech_id}, today) if tech_id in clock_locs: cl = clock_locs[tech_id] prev_lat, prev_lng = cl['lat'], cl['lng'] origin_label = 'Clock-In Location' else: addr = self._get_technician_start_address(tech_id) prev_lat, prev_lng = self._geocode_address_string(addr, api_key) origin_label = 'Start Location' else: addr = self._get_technician_start_address(tech_id) prev_lat, prev_lng = self._geocode_address_string(addr, api_key) origin_label = 'Start Location' # Skip already-completed tasks for today (chain starts from # last completed task's location instead) first_pending_idx = 0 if date == today: for idx, task in enumerate(sorted_tasks): if task.status == 'completed': if task.completed_latitude and task.completed_longitude: prev_lat = task.completed_latitude prev_lng = task.completed_longitude elif task.address_lat and task.address_lng: prev_lat = task.address_lat prev_lng = task.address_lng origin_label = f'Completed: {task.name}' first_pending_idx = idx + 1 else: break for i, task in enumerate(sorted_tasks): if i < first_pending_idx: continue # Geocode task if needed if not (task.address_lat and task.address_lng): task._geocode_address() if prev_lat and prev_lng and task.address_lat and task.address_lng: task._calculate_travel_time(prev_lat, prev_lng) task.previous_task_id = sorted_tasks[i - 1].id if i > 0 else False task.travel_origin = origin_label if i == first_pending_idx else f'Task {sorted_tasks[i - 1].name}' prev_lat = task.address_lat or prev_lat prev_lng = task.address_lng or prev_lng @api.model def _cron_calculate_travel_times(self): """Cron job: Calculate travel times for today and tomorrow. Runs every 15 minutes. For today's tasks, uses the tech's latest GPS location so ETAs stay accurate as technicians move. Includes completed tasks in the search so the chain can skip them and use their completion location as origin. """ today = fields.Date.context_today(self) tomorrow = today + timedelta(days=1) tasks = self.search([ ('scheduled_date', 'in', [today, tomorrow]), ('status', 'not in', ['cancelled']), ]) if tasks: tasks._do_calculate_travel_times() _logger.info(f"Calculated travel times for {len(tasks)} tasks") @api.model def _cron_check_late_arrivals(self): """Cron: detect tasks where the technician hasn't started and the scheduled start time has passed. Sends a push notification to the tech and an in-app notification to the office (once per task). """ ICP = self.env['ir.config_parameter'].sudo() push_enabled = ICP.get_param('fusion_claims.push_enabled', 'False') if push_enabled.lower() not in ('true', '1', 'yes'): return local_now = self._local_now() today = local_now.date() current_hour = local_now.hour + local_now.minute / 60.0 late_tasks = self.sudo().search([ ('scheduled_date', '=', today), ('status', '=', 'scheduled'), ('time_start', '<', current_hour), ('x_fc_late_notified', '=', False), ('x_fc_sync_source', '=', False), ('technician_id', '!=', False), ]) for task in late_tasks: minutes_late = int((current_hour - task.time_start) * 60) if minutes_late < 5: continue client = task.client_display_name or 'Client' try: task._send_push_notification( f'Running Late - {client}', f'Your {task._float_to_time_str(task.time_start)} ' f'{dict(self._fields["task_type"].selection).get(task.task_type, "task")} ' f'is {minutes_late} min overdue. Please update your status.', ) except Exception: pass try: order = task._get_linked_order() if order and order.user_id: recipient = order.user_id elif task.create_uid: recipient = task.create_uid else: recipient = None if recipient: self.env['mail.thread'].sudo().message_notify( partner_ids=[recipient.partner_id.id], subject=f'Late: {task.name} - {client}', body=Markup( f'

' f'{task.technician_id.name} has not started ' f'{task.name} for {client}, ' f'scheduled at {task._float_to_time_str(task.time_start)}. ' f'Currently {minutes_late} min overdue.

' ), ) except Exception: _logger.warning("Failed to notify office about late task %s", task.name) task.with_context(skip_travel_recalc=True).write({ 'x_fc_late_notified': True, }) if late_tasks: _logger.info("Late arrival notifications sent for %d tasks", len(late_tasks)) # ------------------------------------------------------------------ # PORTAL HELPERS # ------------------------------------------------------------------ def get_technician_tasks_for_date(self, user_id, date): """Get all tasks for a technician on a given date, ordered by sequence.""" return self.sudo().search([ ('technician_id', '=', user_id), ('scheduled_date', '=', date), ('status', '!=', 'cancelled'), ], order='sequence, time_start, id') def get_next_task(self, user_id): """Get the next upcoming task for a technician.""" today = fields.Date.context_today(self) return self.sudo().search([ ('technician_id', '=', user_id), ('scheduled_date', '>=', today), ('status', 'in', ['scheduled', 'en_route']), ], order='scheduled_date, sequence, time_start', limit=1) def get_current_task(self, user_id): """Get the current in-progress task for a technician.""" today = fields.Date.context_today(self) return self.sudo().search([ ('technician_id', '=', user_id), ('scheduled_date', '=', today), ('status', '=', 'in_progress'), ], limit=1) # ------------------------------------------------------------------ # PUSH NOTIFICATIONS # ------------------------------------------------------------------ def _send_push_notification(self, title, body_text, url=None): """Send a web push notification for this task.""" self.ensure_one() PushSub = self.env['fusion.push.subscription'].sudo() subscriptions = PushSub.search([ ('user_id', '=', self.technician_id.id), ('active', '=', True), ]) if not subscriptions: return ICP = self.env['ir.config_parameter'].sudo() vapid_private = ICP.get_param('fusion_claims.vapid_private_key', '') vapid_public = ICP.get_param('fusion_claims.vapid_public_key', '') if not vapid_private or not vapid_public: _logger.warning("VAPID keys not configured, cannot send push notification") return try: from pywebpush import webpush, WebPushException except ImportError: _logger.warning("pywebpush not installed, cannot send push notifications") return payload = json.dumps({ 'title': title, 'body': body_text, 'url': url or f'/my/technician/task/{self.id}', 'task_id': self.id, 'task_type': self.task_type, }) for sub in subscriptions: try: webpush( subscription_info={ 'endpoint': sub.endpoint, 'keys': { 'p256dh': sub.p256dh_key, 'auth': sub.auth_key, }, }, data=payload, vapid_private_key=vapid_private, vapid_claims={'sub': 'mailto:support@nexasystems.ca'}, ) except Exception as e: _logger.warning(f"Push notification failed for subscription {sub.id}: {e}") # Deactivate invalid subscriptions if 'gone' in str(e).lower() or '410' in str(e): sub.active = False self.write({ 'push_notified': True, 'push_notified_datetime': fields.Datetime.now(), }) @api.model def _cron_send_push_notifications(self): """Cron: Send push notifications for upcoming tasks.""" ICP = self.env['ir.config_parameter'].sudo() if not ICP.get_param('fusion_claims.push_enabled', False): return advance_minutes = int(ICP.get_param('fusion_claims.push_advance_minutes', '30')) local_now = self._local_now() tasks = self.search([ ('scheduled_date', '=', local_now.date()), ('status', '=', 'scheduled'), ('push_notified', '=', False), ]) for task in tasks: task_start_hour = int(task.time_start) task_start_min = int((task.time_start % 1) * 60) task_start_dt = local_now.replace( hour=task_start_hour, minute=task_start_min, second=0, microsecond=0) minutes_until = (task_start_dt - local_now).total_seconds() / 60 if 0 <= minutes_until <= advance_minutes: task_type_label = dict(self._fields['task_type'].selection).get(task.task_type, task.task_type) title = f'Upcoming: {task_type_label}' body_text = f'{task.partner_id.name or "Task"} - {task.time_start_display}' if task.travel_time_minutes: body_text += f' ({task.travel_time_minutes} min drive)' task._send_push_notification(title, body_text) # ------------------------------------------------------------------ # HELPERS # ------------------------------------------------------------------ def _get_local_tz(self): """Return the pytz timezone for local time calculations. Prefers company resource calendar, then user tz, then Eastern.""" import pytz tz_name = ( self.env.company.resource_calendar_id.tz or self.env.user.tz or 'America/Toronto' ) try: return pytz.timezone(tz_name) except pytz.UnknownTimeZoneError: return pytz.timezone('America/Toronto') def _utc_to_local(self, dt_utc): """Convert a naive UTC datetime to a timezone-aware local datetime.""" import pytz if not dt_utc: return None return pytz.utc.localize(dt_utc).astimezone(self._get_local_tz()) def _local_now(self): """Current datetime in the local (company) timezone.""" return self._utc_to_local(fields.Datetime.now()) @staticmethod def _float_to_time_str(value): """Convert float hours to time string like '9:30 AM'.""" if not value and value != 0: return '' hours = int(value) minutes = int(round((value % 1) * 60)) period = 'AM' if hours < 12 else 'PM' display_hour = hours % 12 or 12 return f'{display_hour}:{minutes:02d} {period}' def get_google_maps_url(self): """Get Google Maps navigation URL using the text address so the destination shows a proper street name instead of raw coordinates. Returns a google.com/maps URL that Android auto-opens in the app; iOS handling is done client-side via JS to launch comgooglemaps://.""" self.ensure_one() if self.address_display: addr = urllib.parse.quote(self.address_display) return f'https://www.google.com/maps/dir/?api=1&destination={addr}&travelmode=driving' if self.address_lat and self.address_lng: return f'https://www.google.com/maps/dir/?api=1&destination={self.address_lat},{self.address_lng}&travelmode=driving' return ''