# -*- coding: utf-8 -*- # Copyright 2024-2026 Nexa Systems Inc. # License OPL-1 (Odoo Proprietary License v1.0) """ Fusion Technician Task Scheduling and task management for field technicians. Replaces Monday.com for technician schedule tracking. """ from odoo import models, fields, api, _ from odoo.exceptions import UserError, ValidationError from odoo.osv import expression from markupsafe import Markup import logging import json import uuid import requests from datetime import datetime as dt_datetime, timedelta import urllib.parse _logger = logging.getLogger(__name__) class FusionTechnicianTask(models.Model): _name = 'fusion.technician.task' _description = 'Technician Task' _order = 'scheduled_date, sequence, time_start, id' _inherit = ['mail.thread', 'mail.activity.mixin'] _rec_name = 'name' def _compute_display_name(self): """Richer display name: Client - Type | 9:00 AM - 10:00 AM.""" type_labels = dict(self._fields['task_type'].selection) for task in self: client = task.x_fc_sync_client_name if task.x_fc_sync_source else (task.partner_id.name or '') ttype = type_labels.get(task.task_type, task.task_type or '') start = self._float_to_time_str(task.time_start) end = self._float_to_time_str(task.time_end) parts = [client, ttype] label = ' - '.join(p for p in parts if p) if start and end: label += f' | {start} - {end}' task.display_name = label or task.name # ------------------------------------------------------------------ # STORE HOURS HELPER # ------------------------------------------------------------------ def _get_store_hours(self): """Return (open_hour, close_hour) from settings. Defaults 9.0 / 18.0.""" ICP = self.env['ir.config_parameter'].sudo() try: open_h = float(ICP.get_param('fusion_claims.store_open_hour', '9.0') or '9.0') except (ValueError, TypeError): open_h = 9.0 try: close_h = float(ICP.get_param('fusion_claims.store_close_hour', '18.0') or '18.0') except (ValueError, TypeError): close_h = 18.0 return (open_h, close_h) # ------------------------------------------------------------------ # CORE FIELDS # ------------------------------------------------------------------ name = fields.Char( string='Task Reference', required=True, copy=False, readonly=True, default=lambda self: _('New'), ) active = fields.Boolean(default=True) # Cross-instance sync fields x_fc_sync_source = fields.Char( 'Source Instance', readonly=True, index=True, help='Origin instance ID if this is a synced shadow task (e.g. westin, mobility)', ) x_fc_sync_remote_id = fields.Integer( 'Remote Task ID', readonly=True, help='ID of the task on the remote instance', ) x_fc_sync_uuid = fields.Char( 'Sync UUID', readonly=True, index=True, copy=False, help='Unique ID for cross-instance deduplication', ) x_fc_is_shadow = fields.Boolean( 'Shadow Task', compute='_compute_is_shadow', store=True, help='True if this task was synced from another instance', ) x_fc_sync_client_name = fields.Char( 'Synced Client Name', readonly=True, help='Client name from the remote instance (shadow tasks only)', ) x_fc_source_label = fields.Char( 'Source', compute='_compute_is_shadow', store=True, ) @api.depends('x_fc_sync_source') def _compute_is_shadow(self): local_id = self.env['ir.config_parameter'].sudo().get_param( 'fusion_claims.sync_instance_id', '') for task in self: task.x_fc_is_shadow = bool(task.x_fc_sync_source) task.x_fc_source_label = task.x_fc_sync_source or local_id technician_id = fields.Many2one( 'res.users', string='Technician', required=True, tracking=True, domain="[('x_fc_is_field_staff', '=', True)]", help='Shows: users marked as Field Staff (technicians and sales reps)', ) technician_name = fields.Char( related='technician_id.name', string='Technician Name', store=True, ) sale_order_id = fields.Many2one( 'sale.order', string='Related Case', tracking=True, ondelete='restrict', help='Sale order / case linked to this task', ) sale_order_name = fields.Char( related='sale_order_id.name', string='Case Reference', store=True, ) purchase_order_id = fields.Many2one( 'purchase.order', string='Related Purchase Order', tracking=True, ondelete='restrict', help='Purchase order linked to this task (e.g. manufacturer pickup)', ) purchase_order_name = fields.Char( related='purchase_order_id.name', string='PO Reference', store=True, ) task_type = fields.Selection([ ('delivery', 'Delivery'), ('repair', 'Repair'), ('pickup', 'Pickup'), ('troubleshoot', 'Troubleshooting'), ('assessment', 'Assessment'), ('installation', 'Installation'), ('maintenance', 'Maintenance'), ('ltc_visit', 'LTC Visit'), ('other', 'Other'), ], string='Task Type', required=True, default='delivery', tracking=True) facility_id = fields.Many2one( 'fusion.ltc.facility', string='LTC Facility', tracking=True, help='LTC Home for this visit', ) # ------------------------------------------------------------------ # SCHEDULING # ------------------------------------------------------------------ scheduled_date = fields.Date( string='Scheduled Date', required=True, tracking=True, default=fields.Date.context_today, index=True, ) time_start = fields.Float( string='Start Time', help='Start time in hours (e.g. 9.5 = 9:30 AM)', default=9.0, ) time_end = fields.Float( string='End Time', help='End time in hours (e.g. 10.5 = 10:30 AM)', default=10.0, ) time_start_display = fields.Char( string='Start', compute='_compute_time_displays', ) time_end_display = fields.Char( string='End', compute='_compute_time_displays', ) # Legacy 12h selection fields -- kept for DB compatibility, hidden on form time_start_12h = fields.Selection( selection='_get_time_selection', string='Start Time (12h)', compute='_compute_time_12h', inverse='_inverse_time_start_12h', store=True, ) time_end_12h = fields.Selection( selection='_get_time_selection', string='End Time (12h)', compute='_compute_time_12h', inverse='_inverse_time_end_12h', store=True, ) sequence = fields.Integer( string='Sequence', default=10, help='Order of task within the day', ) duration_hours = fields.Float( string='Duration', default=1.0, help='Task duration in hours. Auto-calculates end time.', ) # Task type -> default duration mapping TASK_TYPE_DURATIONS = { 'delivery': 1.0, 'repair': 2.0, 'pickup': 0.5, 'troubleshoot': 1.5, 'assessment': 1.5, 'installation': 2.0, 'maintenance': 1.5, 'ltc_visit': 3.0, 'other': 1.0, } # Previous task travel warning banner prev_task_summary_html = fields.Html( string='Previous Task', compute='_compute_prev_task_summary', sanitize=False, ) # Datetime fields for calendar view (computed from date + float time) datetime_start = fields.Datetime( string='Start', compute='_compute_datetimes', inverse='_inverse_datetime_start', store=True, help='Combined start datetime for calendar display', ) datetime_end = fields.Datetime( string='End', compute='_compute_datetimes', inverse='_inverse_datetime_end', store=True, help='Combined end datetime for calendar display', ) # Schedule info helper for the form schedule_info_html = fields.Html( string='Schedule Info', compute='_compute_schedule_info', sanitize=False, ) # ------------------------------------------------------------------ # STATUS # ------------------------------------------------------------------ status = fields.Selection([ ('scheduled', 'Scheduled'), ('en_route', 'En Route'), ('in_progress', 'In Progress'), ('completed', 'Completed'), ('cancelled', 'Cancelled'), ('rescheduled', 'Rescheduled'), ], string='Status', default='scheduled', required=True, tracking=True, index=True) priority = fields.Selection([ ('0', 'Normal'), ('1', 'Urgent'), ('2', 'Emergency'), ], string='Priority', default='0') color = fields.Integer( string='Color Index', compute='_compute_color', ) # ------------------------------------------------------------------ # CLIENT / ADDRESS # ------------------------------------------------------------------ partner_id = fields.Many2one( 'res.partner', string='Client', tracking=True, help='Client for this task', ) partner_phone = fields.Char( related='partner_id.phone', string='Client Phone', ) # Address fields - computed from shipping address or manually set address_partner_id = fields.Many2one( 'res.partner', string='Task Address', help='Partner record containing the task address (usually shipping address)', ) address_street = fields.Char(string='Street') address_street2 = fields.Char(string='Unit/Suite #') address_city = fields.Char(string='City') address_state_id = fields.Many2one('res.country.state', string='Province') address_zip = fields.Char(string='Postal Code') address_buzz_code = fields.Char(string='Buzz Code', help='Building buzzer code for entry') address_display = fields.Text( string='Full Address', compute='_compute_address_display', ) # Geocoding address_lat = fields.Float(string='Latitude', digits=(10, 7)) address_lng = fields.Float(string='Longitude', digits=(10, 7)) # ------------------------------------------------------------------ # TASK DETAILS # ------------------------------------------------------------------ description = fields.Text( string='Task Description', help='What needs to be done', ) equipment_needed = fields.Text( string='Equipment / Materials Needed', help='Tools and materials the technician should bring', ) pod_required = fields.Boolean( string='POD Required', default=False, help='Proof of Delivery signature required', ) # ------------------------------------------------------------------ # COMPLETION # ------------------------------------------------------------------ completion_notes = fields.Html( string='Completion Notes', help='Notes from the technician about what was done', ) completion_datetime = fields.Datetime( string='Completed At', tracking=True, ) voice_note_audio = fields.Binary( string='Voice Recording', attachment=True, ) voice_note_transcription = fields.Text( string='Voice Transcription', ) # ------------------------------------------------------------------ # TRAVEL # ------------------------------------------------------------------ travel_time_minutes = fields.Integer( string='Travel Time (min)', help='Estimated travel time from previous task in minutes', ) travel_distance_km = fields.Float( string='Travel Distance (km)', digits=(8, 1), ) travel_origin = fields.Char( string='Travel From', help='Origin address for travel calculation', ) previous_task_id = fields.Many2one( 'fusion.technician.task', string='Previous Task', help='The task before this one in the schedule (for travel calculation)', ) # ------------------------------------------------------------------ # PUSH NOTIFICATION TRACKING # ------------------------------------------------------------------ push_notified = fields.Boolean( string='Push Notified', default=False, help='Whether a push notification was sent for this task', ) push_notified_datetime = fields.Datetime( string='Notified At', ) # ------------------------------------------------------------------ # COMPUTED FIELDS # ------------------------------------------------------------------ # ------------------------------------------------------------------ # SLOT AVAILABILITY HELPERS # ------------------------------------------------------------------ def _find_next_available_slot(self, tech_id, date, preferred_start=9.0, duration=1.0, exclude_task_id=False, dest_lat=0, dest_lng=0): """Find the next available time slot for a technician on a given date. Scans all non-cancelled tasks for that tech+date, sorts them, and walks through the day (9 AM - 6 PM) looking for a gap that fits the requested duration PLUS travel time from the previous task. :param tech_id: res.users id of the technician :param date: date object for the day to check :param preferred_start: float hour to start looking from (default 9.0) :param duration: required slot length in hours (default 1.0) :param exclude_task_id: task id to exclude (when editing an existing task) :param dest_lat: latitude of the destination (new task location) :param dest_lng: longitude of the destination (new task location) :returns: (start_float, end_float) or (False, False) if fully booked """ STORE_OPEN, STORE_CLOSE = self._get_store_hours() if not tech_id or not date: return (preferred_start, preferred_start + duration) domain = [ ('technician_id', '=', tech_id), ('scheduled_date', '=', date), ('status', 'not in', ['cancelled']), ] if exclude_task_id: domain.append(('id', '!=', exclude_task_id)) booked = self.sudo().search(domain, order='time_start') # Build sorted list of (start, end, lat, lng) intervals intervals = [] for b in booked: intervals.append(( max(b.time_start, STORE_OPEN), min(b.time_end, STORE_CLOSE), b.address_lat or 0, b.address_lng or 0, )) def _travel_hours(from_lat, from_lng, to_lat, to_lng): """Calculate travel time in hours between two locations. Returns 0 if coordinates are missing. Rounds up to 15-min.""" if not from_lat or not from_lng or not to_lat or not to_lng: return 0 travel_min = self._quick_travel_time( from_lat, from_lng, to_lat, to_lng) if travel_min > 0: import math return math.ceil(travel_min / 15.0) * 0.25 return 0 def _travel_from_prev(iv_lat, iv_lng): """Travel from a previous booked task TO the new task.""" return _travel_hours(iv_lat, iv_lng, dest_lat, dest_lng) def _travel_to_next(next_lat, next_lng): """Travel FROM the new task TO the next booked task.""" return _travel_hours(dest_lat, dest_lng, next_lat, next_lng) def _check_gap_fits(cursor, dur, idx): """Check if a slot at 'cursor' for 'dur' hours fits before the interval at index 'idx' (accounting for travel TO that task).""" if idx >= len(intervals): return cursor + dur <= STORE_CLOSE next_start, _ne, next_lat, next_lng = intervals[idx] travel_fwd = _travel_to_next(next_lat, next_lng) return cursor + dur + travel_fwd <= next_start # Walk through gaps, starting from preferred_start cursor = max(preferred_start, STORE_OPEN) for i, (iv_start, iv_end, iv_lat, iv_lng) in enumerate(intervals): if cursor + duration <= iv_start: # Check travel time from new task end TO next booked task if _check_gap_fits(cursor, duration, i): return (cursor, cursor + duration) # Not enough travel time -- try pushing start earlier or skip # If we can't fit here, fall through to jump past this interval # Jump past this booked interval + travel buffer from prev to new new_cursor = max(cursor, iv_end) travel = _travel_from_prev(iv_lat, iv_lng) new_cursor += travel # Snap to nearest 15 min new_cursor = round(new_cursor * 4) / 4 cursor = new_cursor # Check gap after last interval (no next task, so no forward travel needed) if cursor + duration <= STORE_CLOSE: return (cursor, cursor + duration) # No gap found from preferred_start onward -- wrap and try from start if preferred_start > STORE_OPEN: cursor = STORE_OPEN for i, (iv_start, iv_end, iv_lat, iv_lng) in enumerate(intervals): if cursor + duration <= iv_start: if _check_gap_fits(cursor, duration, i): return (cursor, cursor + duration) new_cursor = max(cursor, iv_end) travel = _travel_from_prev(iv_lat, iv_lng) new_cursor += travel new_cursor = round(new_cursor * 4) / 4 cursor = new_cursor if cursor + duration <= STORE_CLOSE: return (cursor, cursor + duration) return (False, False) def _get_available_gaps(self, tech_id, date, exclude_task_id=False): """Return a list of available (start, end) gaps for a technician on a date. Used by schedule_info_html to show green "available" badges. """ STORE_OPEN, STORE_CLOSE = self._get_store_hours() if not tech_id or not date: return [(STORE_OPEN, STORE_CLOSE)] domain = [ ('technician_id', '=', tech_id), ('scheduled_date', '=', date), ('status', 'not in', ['cancelled']), ] if exclude_task_id: domain.append(('id', '!=', exclude_task_id)) booked = self.sudo().search(domain, order='time_start') intervals = [(max(b.time_start, STORE_OPEN), min(b.time_end, STORE_CLOSE)) for b in booked] gaps = [] cursor = STORE_OPEN for iv_start, iv_end in intervals: if cursor < iv_start: gaps.append((cursor, iv_start)) cursor = max(cursor, iv_end) if cursor < STORE_CLOSE: gaps.append((cursor, STORE_CLOSE)) return gaps @api.model def _get_time_selection(self): """Generate 12-hour time slots every 15 minutes, store hours only (9 AM - 6 PM).""" times = [] for hour in range(9, 18): # 9 AM to 5:45 PM for minute in (0, 15, 30, 45): float_val = hour + minute / 60.0 key = f'{float_val:.2f}' period = 'AM' if hour < 12 else 'PM' display_hour = hour % 12 or 12 label = f'{display_hour}:{minute:02d} {period}' times.append((key, label)) # Add 6:00 PM as end-time option times.append(('18.00', '6:00 PM')) return times @api.depends('time_start', 'time_end') def _compute_time_12h(self): """Sync the 12h selection fields from the raw float values.""" for task in self: task.time_start_12h = f'{(task.time_start or 9.0):.2f}' task.time_end_12h = f'{(task.time_end or 10.0):.2f}' def _inverse_time_start_12h(self): for task in self: if task.time_start_12h: task.time_start = float(task.time_start_12h) def _inverse_time_end_12h(self): for task in self: if task.time_end_12h: task.time_end = float(task.time_end_12h) @api.depends('time_start', 'time_end') def _compute_time_displays(self): """Convert float hours to readable time strings.""" for task in self: task.time_start_display = self._float_to_time_str(task.time_start) task.time_end_display = self._float_to_time_str(task.time_end) @api.onchange('task_type') def _onchange_task_type_duration(self): """Set default duration based on task type.""" if self.task_type: self.duration_hours = self.TASK_TYPE_DURATIONS.get(self.task_type, 1.0) # Also recalculate end time if self.time_start: _open, close = self._get_store_hours() self.time_end = min(self.time_start + self.duration_hours, close) @api.onchange('time_start', 'duration_hours') def _onchange_compute_end_time(self): """Auto-compute end time from start + duration. Also run overlap check.""" if self.time_start and self.duration_hours: _open, close = self._get_store_hours() new_end = min(self.time_start + self.duration_hours, close) self.time_end = new_end # Run overlap snap if we have enough data if self.technician_id and self.scheduled_date and self.time_start and self.time_end: result = self._snap_if_overlap() if result: return result @api.depends('scheduled_date', 'time_start', 'time_end') def _compute_datetimes(self): """Combine date + float time into proper Datetime fields for calendar. time_start/time_end are LOCAL hours; datetime_start/end must be UTC for Odoo.""" import pytz user_tz = pytz.timezone(self.env.user.tz or 'UTC') for task in self: if task.scheduled_date: # Build local datetime, then convert to UTC base = dt_datetime.combine(task.scheduled_date, dt_datetime.min.time()) store_open, _close = task._get_store_hours() local_start = user_tz.localize(base + timedelta(hours=task.time_start or store_open)) local_end = user_tz.localize(base + timedelta(hours=task.time_end or (store_open + 1.0))) task.datetime_start = local_start.astimezone(pytz.utc).replace(tzinfo=None) task.datetime_end = local_end.astimezone(pytz.utc).replace(tzinfo=None) else: task.datetime_start = False task.datetime_end = False def _inverse_datetime_start(self): """When datetime_start is changed (e.g. from calendar drag), update date + time.""" import pytz user_tz = pytz.timezone(self.env.user.tz or 'UTC') for task in self: if task.datetime_start: local_dt = pytz.utc.localize(task.datetime_start).astimezone(user_tz) task.scheduled_date = local_dt.date() task.time_start = local_dt.hour + local_dt.minute / 60.0 def _inverse_datetime_end(self): """When datetime_end is changed (e.g. from calendar resize), update time_end.""" import pytz user_tz = pytz.timezone(self.env.user.tz or 'UTC') for task in self: if task.datetime_end: local_dt = pytz.utc.localize(task.datetime_end).astimezone(user_tz) task.time_end = local_dt.hour + local_dt.minute / 60.0 @api.depends('technician_id', 'scheduled_date') def _compute_schedule_info(self): """Show booked + available time slots for the technician on the selected date.""" for task in self: if not task.technician_id or not task.scheduled_date: task.schedule_info_html = '' continue exclude_id = task.id if task.id else 0 # Find other tasks for the same technician+date others = self.sudo().search([ ('technician_id', '=', task.technician_id.id), ('scheduled_date', '=', task.scheduled_date), ('status', 'not in', ['cancelled']), ('id', '!=', exclude_id), ], order='time_start') if not others: s_open, s_close = self._get_store_hours() open_str = self._float_to_time_str(s_open) close_str = self._float_to_time_str(s_close) task.schedule_info_html = Markup( f'
' f' All slots available ({open_str} - {close_str})
' ) continue # Booked badges booked_lines = [] for o in others: start_str = self._float_to_time_str(o.time_start) end_str = self._float_to_time_str(o.time_end) type_label = dict(self._fields['task_type'].selection).get(o.task_type, o.task_type) client_name = o.partner_id.name or '' booked_lines.append( f'' f'{start_str} - {end_str} ({type_label}{" - " + client_name if client_name else ""})' f'' ) # Available gaps badges gaps = self._get_available_gaps( task.technician_id.id, task.scheduled_date, exclude_task_id=exclude_id, ) avail_lines = [] for g_start, g_end in gaps: # Only show gaps >= 15 min if g_end - g_start >= 0.25: avail_lines.append( f'' f'{self._float_to_time_str(g_start)} - {self._float_to_time_str(g_end)}' f'' ) html_parts = [ '
', ' Booked: ', ' '.join(booked_lines), ] if avail_lines: html_parts.append( '
' 'Available: ' + ' '.join(avail_lines) ) elif not avail_lines: html_parts.append( '
' 'Fully booked' ) html_parts.append('
') task.schedule_info_html = Markup(''.join(html_parts)) @api.depends('technician_id', 'scheduled_date', 'time_start', 'address_lat', 'address_lng', 'address_street') def _compute_prev_task_summary(self): """Show previous task info + travel time warning with color coding.""" for task in self: if not task.technician_id or not task.scheduled_date: task.prev_task_summary_html = '' continue exclude_id = task.id if task.id else 0 # Find the task that ends just before this one starts prev_tasks = self.sudo().search([ ('technician_id', '=', task.technician_id.id), ('scheduled_date', '=', task.scheduled_date), ('status', 'not in', ['cancelled']), ('id', '!=', exclude_id), ('time_end', '<=', task.time_start or 99.0), ], order='time_end desc', limit=1) if not prev_tasks: # Check if this is the first task of the day -- show start location info task.prev_task_summary_html = Markup( '
' ' First task of the day -- ' 'travel calculated from start location.
' ) continue prev = prev_tasks[0] prev_start = self._float_to_time_str(prev.time_start) prev_end = self._float_to_time_str(prev.time_end) type_label = dict(self._fields['task_type'].selection).get( prev.task_type, prev.task_type or '') client_name = prev.partner_id.name or '' prev_addr = prev.address_display or 'No address' # Calculate gap between prev task end and this task start s_open, _s_close = self._get_store_hours() gap_hours = (task.time_start or s_open) - (prev.time_end or s_open) gap_minutes = int(gap_hours * 60) # Try to get travel time if both have coordinates travel_minutes = 0 travel_text = '' if (prev.address_lat and prev.address_lng and task.address_lat and task.address_lng): travel_minutes = self._quick_travel_time( prev.address_lat, prev.address_lng, task.address_lat, task.address_lng, ) if travel_minutes > 0: travel_text = f'{travel_minutes} min drive' else: travel_text = 'Could not calculate travel time' elif prev.address_street and task.address_street: travel_text = 'Save to calculate travel time' else: travel_text = 'Address missing -- cannot calculate travel' # Determine color coding if travel_minutes > 0 and gap_minutes >= travel_minutes: bg_class = 'alert-success' # Green -- enough time icon = 'fa-check-circle' status_text = ( f'{gap_minutes} min gap -- enough travel time ' f'(~{travel_minutes} min drive)' ) elif travel_minutes > 0 and gap_minutes > 0: bg_class = 'alert-warning' # Yellow -- tight icon = 'fa-exclamation-triangle' status_text = ( f'{gap_minutes} min gap -- tight! ' f'Travel is ~{travel_minutes} min drive' ) elif travel_minutes > 0 and gap_minutes <= 0: bg_class = 'alert-danger' # Red -- impossible icon = 'fa-times-circle' status_text = ( f'No gap! Previous task ends at {prev_end}. ' f'Travel is ~{travel_minutes} min drive' ) else: bg_class = 'alert-info' # Blue -- no travel data yet icon = 'fa-info-circle' status_text = travel_text html = ( f'
' f' ' f'Previous: {prev.name} ' f'({type_label}) {prev_start} - {prev_end}' f'{" -- " + client_name if client_name else ""}' f'
' f' {prev_addr}' f'
' f' {status_text}' f'
' ) task.prev_task_summary_html = Markup(html) def _quick_travel_time(self, from_lat, from_lng, to_lat, to_lng): """Quick inline travel time calculation using Google Distance Matrix API. Returns travel time in minutes, or 0 if unavailable.""" try: api_key = self.env['ir.config_parameter'].sudo().get_param( 'fusion_claims.google_maps_api_key', '') if not api_key: return 0 url = 'https://maps.googleapis.com/maps/api/distancematrix/json' params = { 'origins': f'{from_lat},{from_lng}', 'destinations': f'{to_lat},{to_lng}', 'mode': 'driving', 'avoid': 'tolls', 'departure_time': 'now', 'key': api_key, } resp = requests.get(url, params=params, timeout=5) data = resp.json() if data.get('status') == 'OK': elements = data['rows'][0]['elements'][0] if elements.get('status') == 'OK': # Use duration_in_traffic if available, else duration duration = elements.get( 'duration_in_traffic', elements.get('duration', {})) seconds = duration.get('value', 0) return max(1, int(seconds / 60)) except Exception: _logger.warning('Failed to calculate travel time', exc_info=True) return 0 @api.depends('status') def _compute_color(self): color_map = { 'scheduled': 0, # grey 'en_route': 4, # blue 'in_progress': 2, # orange 'completed': 10, # green 'cancelled': 1, # red 'rescheduled': 3, # yellow } for task in self: task.color = color_map.get(task.status, 0) @api.depends('address_street', 'address_street2', 'address_city', 'address_state_id', 'address_zip') def _compute_address_display(self): for task in self: street = task.address_street or '' # If the street field already contains a full address (has a comma), # use it directly -- Google Places stores the formatted address here. if ',' in street and ( (task.address_city and task.address_city in street) or (task.address_zip and task.address_zip in street) ): # Street already has full address; just append unit if separate if task.address_street2 and task.address_street2 not in street: task.address_display = f"{street}, {task.address_street2}" else: task.address_display = street else: # Build from components (manual entry or legacy data) parts = [ street, task.address_street2, task.address_city, task.address_state_id.name if task.address_state_id else '', task.address_zip, ] task.address_display = ', '.join([p for p in parts if p]) # ------------------------------------------------------------------ # ONCHANGE - Auto-fill address from client # ------------------------------------------------------------------ @api.onchange('partner_id') def _onchange_partner_id(self): """Auto-fill address fields from the selected client's address.""" if self.partner_id: addr = self.partner_id self.address_partner_id = addr.id self.address_street = addr.street or '' self.address_street2 = addr.street2 or '' self.address_city = addr.city or '' self.address_state_id = addr.state_id.id if addr.state_id else False self.address_zip = addr.zip or '' self.address_lat = addr.x_fc_latitude if hasattr(addr, 'x_fc_latitude') and addr.x_fc_latitude else 0 self.address_lng = addr.x_fc_longitude if hasattr(addr, 'x_fc_longitude') and addr.x_fc_longitude else 0 @api.onchange('sale_order_id') def _onchange_sale_order_id(self): """Auto-fill client and address from the sale order's shipping address.""" if self.sale_order_id: self.purchase_order_id = False order = self.sale_order_id if not self.partner_id: self.partner_id = order.partner_id addr = order.partner_shipping_id or order.partner_id self._fill_address_from_partner(addr) @api.onchange('purchase_order_id') def _onchange_purchase_order_id(self): """Auto-fill client and address from the purchase order's vendor.""" if self.purchase_order_id: self.sale_order_id = False order = self.purchase_order_id if not self.partner_id: self.partner_id = order.partner_id addr = order.dest_address_id or order.partner_id self._fill_address_from_partner(addr) @api.onchange('facility_id') def _onchange_facility_id(self): """Auto-fill address from the LTC facility.""" if self.facility_id and self.task_type == 'ltc_visit': fac = self.facility_id self.address_street = fac.street or '' self.address_street2 = fac.street2 or '' self.address_city = fac.city or '' self.address_state_id = fac.state_id.id if fac.state_id else False self.address_zip = fac.zip or '' self.description = self.description or _( 'LTC Visit at %s', fac.name ) @api.onchange('task_type') def _onchange_task_type_ltc(self): if self.task_type == 'ltc_visit': self.sale_order_id = False self.purchase_order_id = False def _fill_address_from_partner(self, addr): """Populate address fields from a partner record.""" if not addr: return self.address_partner_id = addr.id self.address_street = addr.street or '' self.address_street2 = addr.street2 or '' self.address_city = addr.city or '' self.address_state_id = addr.state_id.id if addr.state_id else False self.address_zip = addr.zip or '' self.address_lat = addr.x_fc_latitude if hasattr(addr, 'x_fc_latitude') and addr.x_fc_latitude else 0 self.address_lng = addr.x_fc_longitude if hasattr(addr, 'x_fc_longitude') and addr.x_fc_longitude else 0 # ------------------------------------------------------------------ # CONSTRAINTS + VALIDATION # ------------------------------------------------------------------ @api.constrains('sale_order_id', 'purchase_order_id') def _check_order_link(self): for task in self: if task.x_fc_sync_source: continue if task.task_type == 'ltc_visit': continue if not task.sale_order_id and not task.purchase_order_id: raise ValidationError(_( "A task must be linked to either a Sale Order (Case) or a Purchase Order." )) @api.constrains('technician_id', 'scheduled_date', 'time_start', 'time_end') def _check_no_overlap(self): """Prevent overlapping bookings for the same technician on the same date.""" for task in self: if task.status == 'cancelled': continue if task.x_fc_sync_source: continue # Validate time range if task.time_start >= task.time_end: raise ValidationError(_("Start time must be before end time.")) # Validate store hours s_open, s_close = self._get_store_hours() if task.time_start < s_open or task.time_end > s_close: open_str = self._float_to_time_str(s_open) close_str = self._float_to_time_str(s_close) raise ValidationError(_( "Tasks must be scheduled within store hours (%s - %s)." ) % (open_str, close_str)) # Validate not in the past (only for new/scheduled local tasks) if task.status == 'scheduled' and task.scheduled_date and not task.x_fc_sync_source: today = fields.Date.context_today(self) if task.scheduled_date < today: raise ValidationError(_("Cannot schedule tasks in the past.")) if task.scheduled_date == today: now = fields.Datetime.now() current_hour = now.hour + now.minute / 60.0 if task.time_start < current_hour: pass # Allow editing existing tasks that started earlier today # Check overlap with other tasks overlapping = self.sudo().search([ ('technician_id', '=', task.technician_id.id), ('scheduled_date', '=', task.scheduled_date), ('status', 'not in', ['cancelled']), ('id', '!=', task.id), ('time_start', '<', task.time_end), ('time_end', '>', task.time_start), ], limit=1) if overlapping: start_str = self._float_to_time_str(overlapping.time_start) end_str = self._float_to_time_str(overlapping.time_end) raise ValidationError(_( "Time slot overlaps with %(task)s (%(start)s - %(end)s). " "Please choose a different time.", task=overlapping.name, start=start_str, end=end_str, )) # Check travel time gap to the NEXT task on the same day next_task = self.sudo().search([ ('technician_id', '=', task.technician_id.id), ('scheduled_date', '=', task.scheduled_date), ('status', 'not in', ['cancelled']), ('id', '!=', task.id), ('time_start', '>=', task.time_end), ], order='time_start', limit=1) if next_task and task.address_lat and task.address_lng and \ next_task.address_lat and next_task.address_lng: travel_min = self._quick_travel_time( task.address_lat, task.address_lng, next_task.address_lat, next_task.address_lng, ) if travel_min > 0: gap_min = int((next_task.time_start - task.time_end) * 60) if gap_min < travel_min: raise ValidationError(_( "Not enough travel time to the next task!\n\n" "This task ends at %(end)s, and %(next)s starts " "at %(next_start)s (%(gap)d min gap).\n" "Travel time is ~%(travel)d minutes.\n\n" "Please allow at least %(travel)d minutes between tasks.", end=self._float_to_time_str(task.time_end), next=next_task.name, next_start=self._float_to_time_str(next_task.time_start), gap=gap_min, travel=travel_min, )) # Check travel time gap FROM the PREVIOUS task on the same day prev_task = self.sudo().search([ ('technician_id', '=', task.technician_id.id), ('scheduled_date', '=', task.scheduled_date), ('status', 'not in', ['cancelled']), ('id', '!=', task.id), ('time_end', '<=', task.time_start), ], order='time_end desc', limit=1) if prev_task and task.address_lat and task.address_lng and \ prev_task.address_lat and prev_task.address_lng: travel_min = self._quick_travel_time( prev_task.address_lat, prev_task.address_lng, task.address_lat, task.address_lng, ) if travel_min > 0: gap_min = int((task.time_start - prev_task.time_end) * 60) if gap_min < travel_min: raise ValidationError(_( "Not enough travel time from the previous task!\n\n" "%(prev)s ends at %(prev_end)s, and this task starts " "at %(start)s (%(gap)d min gap).\n" "Travel time is ~%(travel)d minutes.\n\n" "Please allow at least %(travel)d minutes between tasks.", prev=prev_task.name, prev_end=self._float_to_time_str(prev_task.time_end), start=self._float_to_time_str(task.time_start), gap=gap_min, travel=travel_min, )) @api.onchange('technician_id', 'scheduled_date') def _onchange_technician_date_autoset(self): """Auto-set start/end time to the first available slot when tech+date change.""" if not self.technician_id or not self.scheduled_date: return exclude_id = self._origin.id if self._origin else False duration = self.duration_hours or 1.0 s_open, _s_close = self._get_store_hours() preferred = self.time_start or s_open start, end = self._find_next_available_slot( self.technician_id.id, self.scheduled_date, preferred_start=preferred, duration=duration, exclude_task_id=exclude_id, dest_lat=self.address_lat or 0, dest_lng=self.address_lng or 0, ) if start is not False: self.time_start = start self.time_end = end self.duration_hours = end - start else: return {'warning': { 'title': _('Fully Booked'), 'message': _( '%s is fully booked on %s. No available slots.' ) % (self.technician_id.name, self.scheduled_date.strftime('%B %d, %Y')), }} def _snap_if_overlap(self): """Check if current time_start/time_end overlaps with another task. If so, auto-snap to the next available slot and return a warning dict.""" if not self.technician_id or not self.scheduled_date or not self.time_start: return None exclude_id = self._origin.id if self._origin else 0 duration = max(self.duration_hours or 1.0, 0.25) overlapping = self.sudo().search([ ('technician_id', '=', self.technician_id.id), ('scheduled_date', '=', self.scheduled_date), ('status', 'not in', ['cancelled']), ('id', '!=', exclude_id), ('time_start', '<', self.time_end), ('time_end', '>', self.time_start), ], limit=1) if overlapping: conflict_name = overlapping.name conflict_start = self._float_to_time_str(overlapping.time_start) conflict_end = self._float_to_time_str(overlapping.time_end) start, end = self._find_next_available_slot( self.technician_id.id, self.scheduled_date, preferred_start=self.time_start, duration=duration, exclude_task_id=exclude_id, dest_lat=self.address_lat or 0, dest_lng=self.address_lng or 0, ) if start is not False: new_start_str = self._float_to_time_str(start) new_end_str = self._float_to_time_str(end) self.time_start = start self.time_end = end self.duration_hours = end - start return {'warning': { 'title': _('Moved to Available Slot'), 'message': _( 'The selected time conflicts with %s (%s - %s).\n' 'Automatically moved to: %s - %s.' ) % (conflict_name, conflict_start, conflict_end, new_start_str, new_end_str), }} else: return {'warning': { 'title': _('No Available Slots'), 'message': _( 'The selected time conflicts with %s (%s - %s) ' 'and no other slots are available on this day.' ) % (conflict_name, conflict_start, conflict_end), }} return None # ------------------------------------------------------------------ # DEFAULT_GET - Calendar pre-fill # ------------------------------------------------------------------ def _snap_to_quarter(self, hour_float): """Round a float hour to the nearest 15-minute slot and clamp to store hours.""" s_open, s_close = self._get_store_hours() snapped = round(hour_float * 4) / 4 return max(s_open, min(s_close, snapped)) @api.model def default_get(self, fields_list): """Handle calendar time range selection: pre-fill date + times from context.""" res = super().default_get(fields_list) ctx = self.env.context # Set duration default based on task type from context task_type = ctx.get('default_task_type', res.get('task_type', 'delivery')) if 'duration_hours' not in res or not res.get('duration_hours'): res['duration_hours'] = self.TASK_TYPE_DURATIONS.get(task_type, 1.0) # When user clicks a time range on the calendar, Odoo passes # default_datetime_start/end in UTC dt_start_utc = None dt_end_utc = None if ctx.get('default_datetime_start'): try: dt_start_utc = fields.Datetime.from_string(ctx['default_datetime_start']) except (ValueError, TypeError): pass if ctx.get('default_datetime_end'): try: dt_end_utc = fields.Datetime.from_string(ctx['default_datetime_end']) except (ValueError, TypeError): pass if dt_start_utc or dt_end_utc: import pytz user_tz = pytz.timezone(self.env.user.tz or 'UTC') if dt_start_utc: dt_start_local = pytz.utc.localize(dt_start_utc).astimezone(user_tz) res['scheduled_date'] = dt_start_local.date() start_float = self._snap_to_quarter( dt_start_local.hour + dt_start_local.minute / 60.0) res['time_start'] = start_float if dt_end_utc: dt_end_local = pytz.utc.localize(dt_end_utc).astimezone(user_tz) end_float = self._snap_to_quarter( dt_end_local.hour + dt_end_local.minute / 60.0) if 'time_start' in res and end_float <= res['time_start']: end_float = res['time_start'] + 1.0 res['time_end'] = end_float # Compute duration from the calendar drag if 'time_start' in res: res['duration_hours'] = end_float - res['time_start'] # Always compute end from start + duration if not already set if 'time_end' not in res and 'time_start' in res and 'duration_hours' in res: _open, close = self._get_store_hours() res['time_end'] = min( res['time_start'] + res['duration_hours'], close) return res # ------------------------------------------------------------------ # CRUD OVERRIDES # ------------------------------------------------------------------ @api.model_create_multi def create(self, vals_list): for vals in vals_list: if vals.get('name', _('New')) == _('New'): vals['name'] = self.env['ir.sequence'].next_by_code('fusion.technician.task') or _('New') if not vals.get('x_fc_sync_uuid') and not vals.get('x_fc_sync_source'): vals['x_fc_sync_uuid'] = str(uuid.uuid4()) # Auto-populate address from sale order if not provided if vals.get('sale_order_id') and not vals.get('address_street'): order = self.env['sale.order'].browse(vals['sale_order_id']) addr = order.partner_shipping_id or order.partner_id if addr: self._fill_address_vals(vals, addr) if not vals.get('partner_id'): vals['partner_id'] = order.partner_id.id # Auto-populate address from purchase order if not provided elif vals.get('purchase_order_id') and not vals.get('address_street'): po = self.env['purchase.order'].browse(vals['purchase_order_id']) addr = po.dest_address_id or po.partner_id if addr: self._fill_address_vals(vals, addr) if not vals.get('partner_id'): vals['partner_id'] = po.partner_id.id # Auto-populate address from partner if no order set elif vals.get('partner_id') and not vals.get('address_street'): partner = self.env['res.partner'].browse(vals['partner_id']) if partner.street: self._fill_address_vals(vals, partner) records = super().create(vals_list) # Post creation notice to linked order chatter for rec in records: rec._post_task_created_to_linked_order() # If created from "Ready for Delivery" flow, mark the sale order if self.env.context.get('mark_ready_for_delivery'): records._mark_sale_order_ready_for_delivery() if self.env.context.get('mark_odsp_ready_for_delivery'): for rec in records: order = rec.sale_order_id if order and order.x_fc_is_odsp_sale and order._get_odsp_status() != 'ready_delivery': order._odsp_advance_status('ready_delivery', "Order is ready for delivery. Delivery task scheduled.") # Auto-calculate travel times for the full day chain if not self.env.context.get('skip_travel_recalc'): records._recalculate_day_travel_chains() # Send "Appointment Scheduled" email for rec in records: rec._send_task_scheduled_email() # Push new local tasks to remote instances local_records = records.filtered(lambda r: not r.x_fc_sync_source) if local_records and not self.env.context.get('skip_task_sync'): self.env['fusion.task.sync.config']._push_tasks(local_records, 'create') return records def write(self, vals): if self.env.context.get('skip_travel_recalc'): return super().write(vals) # Safety: ensure time_end is consistent when start/duration change # but time_end wasn't sent (readonly field in view may not save) if ('time_start' in vals or 'duration_hours' in vals) and 'time_end' not in vals: _open, close = self._get_store_hours() start = vals.get('time_start', self[:1].time_start if len(self) == 1 else 9.0) dur = vals.get('duration_hours', self[:1].duration_hours if len(self) == 1 else 1.0) or 1.0 vals['time_end'] = min(start + dur, close) # Detect reschedule mode: capture old values BEFORE write reschedule_mode = self.env.context.get('reschedule_mode') old_schedule = {} schedule_fields = {'scheduled_date', 'time_start', 'time_end', 'duration_hours', 'technician_id'} schedule_changed = schedule_fields & set(vals.keys()) if reschedule_mode and schedule_changed: for task in self: old_schedule[task.id] = { 'date': task.scheduled_date, 'time_start': task.time_start, 'time_end': task.time_end, } # Capture old tech+date combos BEFORE write for travel recalc travel_fields = {'address_street', 'address_city', 'address_zip', 'address_lat', 'address_lng', 'scheduled_date', 'sequence', 'time_start', 'technician_id'} needs_travel_recalc = travel_fields & set(vals.keys()) old_combos = set() if needs_travel_recalc: old_combos = {(t.technician_id.id, t.scheduled_date) for t in self} res = super().write(vals) if needs_travel_recalc: new_combos = {(t.technician_id.id, t.scheduled_date) for t in self} all_combos = old_combos | new_combos self._recalculate_combos_travel(all_combos) # After write: send reschedule email if schedule actually changed if reschedule_mode and old_schedule: for task in self: old = old_schedule.get(task.id, {}) if old and ( old['date'] != task.scheduled_date or abs(old['time_start'] - task.time_start) > 0.01 or abs(old['time_end'] - task.time_end) > 0.01 ): task._post_status_message('rescheduled') task._send_task_rescheduled_email( old_date=old['date'], old_start=old['time_start'], old_end=old['time_end'], ) # Push updates to remote instances for local tasks sync_fields = {'technician_id', 'scheduled_date', 'time_start', 'time_end', 'duration_hours', 'status', 'task_type', 'address_street', 'address_city', 'address_zip', 'address_lat', 'address_lng', 'partner_id'} if sync_fields & set(vals.keys()) and not self.env.context.get('skip_task_sync'): local_records = self.filtered(lambda r: not r.x_fc_sync_source) if local_records: self.env['fusion.task.sync.config']._push_tasks(local_records, 'write') return res @api.model def _fill_address_vals(self, vals, partner): """Helper to fill address vals dict from a partner record.""" vals.update({ 'address_partner_id': partner.id, 'address_street': partner.street or '', 'address_street2': partner.street2 or '', 'address_city': partner.city or '', 'address_state_id': partner.state_id.id if partner.state_id else False, 'address_zip': partner.zip or '', 'address_lat': partner.x_fc_latitude if hasattr(partner, 'x_fc_latitude') else 0, 'address_lng': partner.x_fc_longitude if hasattr(partner, 'x_fc_longitude') else 0, }) def _post_task_created_to_linked_order(self): """Post a brief task creation notice to the linked order's chatter.""" self.ensure_one() order = self.sale_order_id or self.purchase_order_id if not order: return task_type_label = dict(self._fields['task_type'].selection).get(self.task_type, self.task_type) date_str = self.scheduled_date.strftime('%B %d, %Y') if self.scheduled_date else 'TBD' time_str = self._float_to_time_str(self.time_start) task_url = f'/web#id={self.id}&model=fusion.technician.task&view_type=form' body = Markup( f'
' f' Technician Task Scheduled
' f'{self.name} ({task_type_label}) - {date_str} at {time_str}
' f'Technician: {self.technician_id.name}
' f'View Task' f'
' ) order.message_post( body=body, message_type='notification', subtype_xmlid='mail.mt_note', ) def _mark_sale_order_ready_for_delivery(self): """Mark linked sale orders as Ready for Delivery. Called when a delivery task is created from the "Ready for Delivery" button on the sale order. This replaces the old wizard workflow. """ for task in self: order = task.sale_order_id if not order: continue # Only update if not already marked if order.x_fc_adp_application_status == 'ready_delivery': continue user_name = self.env.user.name tech_name = task.technician_id.name or '' # Save current status so we can revert if task is cancelled previous_status = order.x_fc_adp_application_status # Update the sale order status and delivery fields order.with_context(skip_status_validation=True).write({ 'x_fc_adp_application_status': 'ready_delivery', 'x_fc_status_before_delivery': previous_status, 'x_fc_delivery_technician_ids': [(4, task.technician_id.id)], 'x_fc_ready_for_delivery_date': fields.Datetime.now(), 'x_fc_scheduled_delivery_datetime': task.datetime_start, }) # Post chatter message early_badge = '' if order.x_fc_early_delivery: early_badge = ' Early Delivery' scheduled_str = '' if task.scheduled_date: time_str = task._float_to_time_str(task.time_start) if task.time_start else '' date_str = task.scheduled_date.strftime('%B %d, %Y') scheduled_str = f'
  • Scheduled: {date_str} at {time_str}
  • ' notes_str = '' if task.description: notes_str = f'

    Delivery Notes: {task.description}

    ' chatter_body = Markup( f'' ) order.message_post( body=chatter_body, message_type='notification', subtype_xmlid='mail.mt_note', ) # Send email notifications try: order._send_ready_for_delivery_email( technicians=task.technician_id, scheduled_datetime=task.datetime_start, notes=task.description, ) except Exception as e: _logger.warning("Ready for delivery email failed for %s: %s", order.name, e) def _recalculate_day_travel_chains(self): """Recalculate travel for all tech+date combos affected by these tasks.""" combos = {(t.technician_id.id, t.scheduled_date) for t in self if t.technician_id and t.scheduled_date} self._recalculate_combos_travel(combos) def _get_technician_start_address(self, tech_id): """Get the start address for a technician. Priority: 1. Technician's personal x_fc_start_address (if set) 2. Company default HQ address (fusion_claims.technician_start_address) Returns the address string or ''. """ tech_user = self.env['res.users'].sudo().browse(tech_id) if tech_user.exists() and tech_user.x_fc_start_address: return tech_user.x_fc_start_address.strip() # Fallback to company default return (self.env['ir.config_parameter'].sudo() .get_param('fusion_claims.technician_start_address', '') or '').strip() def _geocode_address_string(self, address, api_key): """Geocode an address string and return (lat, lng) or (0.0, 0.0).""" if not address or not api_key: return 0.0, 0.0 try: url = 'https://maps.googleapis.com/maps/api/geocode/json' params = {'address': address, 'key': api_key, 'region': 'ca'} resp = requests.get(url, params=params, timeout=10) data = resp.json() if data.get('status') == 'OK' and data.get('results'): loc = data['results'][0]['geometry']['location'] return loc['lat'], loc['lng'] except Exception as e: _logger.warning("Address geocoding failed for '%s': %s", address, e) return 0.0, 0.0 def _recalculate_combos_travel(self, combos): """Recalculate travel for a set of (tech_id, date) combinations.""" ICP = self.env['ir.config_parameter'].sudo() enabled = ICP.get_param('fusion_claims.google_distance_matrix_enabled', False) if not enabled: return api_key = self._get_google_maps_api_key() # Cache geocoded start addresses per technician to avoid repeated API calls start_coords_cache = {} for tech_id, date in combos: if not tech_id or not date: continue all_day_tasks = self.sudo().search([ ('technician_id', '=', tech_id), ('scheduled_date', '=', date), ('status', 'not in', ['cancelled']), ], order='time_start, sequence, id') if not all_day_tasks: continue # Get this technician's start location (personal or company default) if tech_id not in start_coords_cache: addr = self._get_technician_start_address(tech_id) start_coords_cache[tech_id] = self._geocode_address_string(addr, api_key) prev_lat, prev_lng = start_coords_cache[tech_id] for i, task in enumerate(all_day_tasks): if not (task.address_lat and task.address_lng): task._geocode_address() travel_vals = {} if prev_lat and prev_lng and task.address_lat and task.address_lng: task.with_context(skip_travel_recalc=True)._calculate_travel_time(prev_lat, prev_lng) travel_vals['previous_task_id'] = all_day_tasks[i - 1].id if i > 0 else False travel_vals['travel_origin'] = 'Start Location' if i == 0 else f'Task {all_day_tasks[i - 1].name}' if travel_vals: task.with_context(skip_travel_recalc=True).write(travel_vals) prev_lat = task.address_lat or prev_lat prev_lng = task.address_lng or prev_lng # ------------------------------------------------------------------ # STATUS ACTIONS # ------------------------------------------------------------------ def _check_previous_tasks_completed(self): """Check that all earlier tasks for the same technician+date are completed.""" self.ensure_one() earlier_incomplete = self.sudo().search([ ('technician_id', '=', self.technician_id.id), ('scheduled_date', '=', self.scheduled_date), ('time_start', '<', self.time_start), ('status', 'not in', ['completed', 'cancelled']), ('id', '!=', self.id), ], limit=1) if earlier_incomplete: raise UserError(_( "Please complete previous task %s first before starting this one." ) % earlier_incomplete.name) def action_start_en_route(self): """Mark task as En Route.""" for task in self: if task.status != 'scheduled': raise UserError(_("Only scheduled tasks can be marked as En Route.")) task._check_previous_tasks_completed() task.status = 'en_route' task._post_status_message('en_route') def action_start_task(self): """Mark task as In Progress.""" for task in self: if task.status not in ('scheduled', 'en_route'): raise UserError(_("Task must be scheduled or en route to start.")) task._check_previous_tasks_completed() task.status = 'in_progress' task._post_status_message('in_progress') def action_view_sale_order(self): """Open the linked sale order / case.""" self.ensure_one() if not self.sale_order_id: return return { 'name': self.sale_order_id.name, 'type': 'ir.actions.act_window', 'res_model': 'sale.order', 'view_mode': 'form', 'res_id': self.sale_order_id.id, } def action_view_purchase_order(self): """Open the linked purchase order.""" self.ensure_one() if not self.purchase_order_id: return return { 'name': self.purchase_order_id.name, 'type': 'ir.actions.act_window', 'res_model': 'purchase.order', 'view_mode': 'form', 'res_id': self.purchase_order_id.id, } def action_complete_task(self): """Mark task as Completed.""" for task in self: if task.status not in ('in_progress', 'en_route', 'scheduled'): raise UserError(_("Task must be in progress to complete.")) task.with_context(skip_travel_recalc=True).write({ 'status': 'completed', 'completion_datetime': fields.Datetime.now(), }) task._post_status_message('completed') # Post completion notes to linked order chatter if task.completion_notes and (task.sale_order_id or task.purchase_order_id): task._post_completion_to_linked_order() # Notify the person who scheduled the task task._notify_scheduler_on_completion() # Auto-advance ODSP status for delivery tasks if (task.task_type == 'delivery' and task.sale_order_id and task.sale_order_id.x_fc_is_odsp_sale and task.sale_order_id._get_odsp_status() == 'ready_delivery'): task.sale_order_id._odsp_advance_status( 'delivered', "Delivery task completed by technician. Order marked as delivered.", ) def action_cancel_task(self): """Cancel the task. Sends cancellation email and reverts sale order if delivery.""" for task in self: if task.status == 'completed': raise UserError(_("Cannot cancel a completed task.")) task.status = 'cancelled' task._post_status_message('cancelled') # If this was a delivery task linked to a sale order that is # currently in "Ready for Delivery" -- revert the order back. # _revert_sale_order_on_cancel also sends the cancellation email # for delivery tasks. if task.task_type == 'delivery': task._revert_sale_order_on_cancel() else: # Non-delivery tasks: still send a cancellation email task._send_task_cancelled_email() def _revert_sale_order_on_cancel(self): """When a delivery task is cancelled, check if the linked sale order should revert to its previous status. Only reverts if: - Task is a delivery type - Sale order is currently 'ready_delivery' - No other active (non-cancelled) delivery tasks exist for this order """ self.ensure_one() if self.task_type != 'delivery' or not self.sale_order_id: return order = self.sale_order_id if order.x_fc_adp_application_status != 'ready_delivery': return # Check if any other non-cancelled delivery tasks exist for this order other_delivery_tasks = self.sudo().search([ ('sale_order_id', '=', order.id), ('task_type', '=', 'delivery'), ('status', 'not in', ['cancelled']), ('id', '!=', self.id), ], limit=1) if other_delivery_tasks: return # Other active delivery tasks still exist, don't revert # Revert to the status saved before Ready for Delivery prev_status = order.x_fc_status_before_delivery or 'approved' status_labels = dict(order._fields['x_fc_adp_application_status'].selection) prev_label = status_labels.get(prev_status, prev_status) # skip_status_emails prevents the "Approved" email from re-firing order.with_context( skip_status_validation=True, skip_status_emails=True, ).write({ 'x_fc_adp_application_status': prev_status, 'x_fc_status_before_delivery': False, }) # Post chatter message about the revert body = Markup( f'' ) order.message_post( body=body, message_type='notification', subtype_xmlid='mail.mt_note', ) # Send a "Delivery Cancelled" email instead self._send_task_cancelled_email() def action_reschedule(self): """Open the reschedule form for this task. Saves old schedule info, then opens the same task form for editing. On save, the write() method detects the reschedule and sends emails.""" self.ensure_one() return { 'type': 'ir.actions.act_window', 'res_model': 'fusion.technician.task', 'res_id': self.id, 'view_mode': 'form', 'target': 'new', 'context': { 'reschedule_mode': True, 'old_date': str(self.scheduled_date) if self.scheduled_date else '', 'old_time_start': self.time_start, 'old_time_end': self.time_end, }, } def action_reset_to_scheduled(self): """Reset task back to scheduled.""" for task in self: task.status = 'scheduled' # ------------------------------------------------------------------ # CHATTER / NOTIFICATIONS # ------------------------------------------------------------------ def _post_status_message(self, new_status): """Post a status change message to the task chatter.""" self.ensure_one() status_labels = dict(self._fields['status'].selection) label = status_labels.get(new_status, new_status) icons = { 'en_route': 'fa-road', 'in_progress': 'fa-wrench', 'completed': 'fa-check-circle', 'cancelled': 'fa-times-circle', 'rescheduled': 'fa-calendar', } icon = icons.get(new_status, 'fa-info-circle') body = Markup( f'

    Task status changed to ' f'{label} by {self.env.user.name}

    ' ) self.message_post(body=body, message_type='notification', subtype_xmlid='mail.mt_note') def _post_completion_to_linked_order(self): """Post the completion notes to the linked order's chatter.""" self.ensure_one() order = self.sale_order_id or self.purchase_order_id if not order or not self.completion_notes: return task_type_label = dict(self._fields['task_type'].selection).get(self.task_type, self.task_type) body = Markup( f'
    ' f'
    Technician Task Completed
    ' f'
      ' f'
    • Task: {self.name} ({task_type_label})
    • ' f'
    • Technician: {self.technician_id.name}
    • ' f'
    • Completed: {self.completion_datetime.strftime("%B %d, %Y at %I:%M %p") if self.completion_datetime else "N/A"}
    • ' f'
    ' f'
    ' f'{self.completion_notes}' f'
    ' ) order.message_post( body=body, message_type='notification', subtype_xmlid='mail.mt_note', ) def _notify_scheduler_on_completion(self): """Send an Odoo notification to whoever created/scheduled the task.""" self.ensure_one() # Notify the task creator (scheduler) if they're not the technician if self.create_uid and self.create_uid != self.technician_id: task_type_label = dict(self._fields['task_type'].selection).get(self.task_type, self.task_type) task_url = f'/web#id={self.id}&model=fusion.technician.task&view_type=form' client_name = self.partner_id.name or 'N/A' order = self.sale_order_id or self.purchase_order_id case_ref = order.name if order else '' # Build address string addr_parts = [p for p in [ self.address_street, self.address_street2, self.address_city, self.address_state_id.name if self.address_state_id else '', self.address_zip, ] if p] address_str = ', '.join(addr_parts) or 'No address' # Build subject subject = f'Task Completed: {client_name}' if case_ref: subject += f' ({case_ref})' body = Markup( f'
    ' f'

    ' f'{task_type_label} Completed

    ' f'' f'' f'' f'' f'' f'' f'' f'' f'' f'' f'' f'
    Client:{client_name}
    Case:{case_ref or "N/A"}
    Task:{self.name}
    Technician:{self.technician_id.name}
    Location:{address_str}
    ' f'

    View Task

    ' f'
    ' ) # Use Odoo's internal notification system self.env['mail.thread'].sudo().message_notify( partner_ids=[self.create_uid.partner_id.id], body=body, subject=subject, ) # ------------------------------------------------------------------ # TASK EMAIL NOTIFICATIONS # ------------------------------------------------------------------ def _get_task_email_details(self): """Build common detail rows for task emails.""" self.ensure_one() type_label = dict(self._fields['task_type'].selection).get( self.task_type, self.task_type or '') rows = [ ('Task', f'{self.name} ({type_label})'), ('Client', self.partner_id.name or 'N/A'), ] if self.sale_order_id: rows.append(('Case', self.sale_order_id.name)) if self.purchase_order_id: rows.append(('Purchase Order', self.purchase_order_id.name)) if self.scheduled_date: date_str = self.scheduled_date.strftime('%B %d, %Y') start_str = self._float_to_time_str(self.time_start) end_str = self._float_to_time_str(self.time_end) rows.append(('Scheduled', f'{date_str}, {start_str} - {end_str}')) if self.technician_id: rows.append(('Technician', self.technician_id.name)) if self.address_display: rows.append(('Address', self.address_display)) return rows def _get_task_email_recipients(self): """Get email recipients for task notifications. Returns dict with 'to' (client), 'cc' (technician, sales rep, office).""" self.ensure_one() to_emails = [] cc_emails = [] # Client email if self.partner_id and self.partner_id.email: to_emails.append(self.partner_id.email) # Technician email if self.technician_id and self.technician_id.email: cc_emails.append(self.technician_id.email) # Sales rep from the sale order if self.sale_order_id and self.sale_order_id.user_id and \ self.sale_order_id.user_id.email: cc_emails.append(self.sale_order_id.user_id.email) # Office notification recipients if self.sale_order_id: try: office_cc = self.sale_order_id._get_email_recipients( include_client=False).get('office_cc', []) cc_emails.extend(office_cc) except Exception: pass return {'to': to_emails, 'cc': list(set(cc_emails))} def _send_task_cancelled_email(self): """Send cancellation email for a task/delivery/appointment.""" self.ensure_one() order = self.sale_order_id if not order: return False try: if not order._is_email_notifications_enabled(): return False except Exception: return False recipients = self._get_task_email_recipients() to_emails = recipients.get('to', []) cc_emails = recipients.get('cc', []) if not to_emails and not cc_emails: return False client_name = self.partner_id.name or 'Client' type_label = dict(self._fields['task_type'].selection).get( self.task_type, self.task_type or 'Task') sender_name = self.env.user.name detail_rows = self._get_task_email_details() detail_rows.append(('Cancelled By', sender_name)) body_html = order._email_build( title=f'{type_label.title()} Cancelled', summary=( f'The scheduled {type_label.lower()} for ' f'{client_name} has been cancelled.' ), email_type='urgent', sections=[('Cancellation Details', detail_rows)], note=( 'What happens next: If you need to reschedule, ' 'please contact our office and we will arrange a new appointment.' ), note_color='#e53e3e', button_url=f'{order.get_base_url()}/web#id={order.id}&model=sale.order&view_type=form', sender_name=sender_name, ) email_to = ', '.join(to_emails) if to_emails else ', '.join(cc_emails[:1]) email_cc = ', '.join(cc_emails) if to_emails else ', '.join(cc_emails[1:]) try: self.env['mail.mail'].sudo().create({ 'subject': f'{type_label.title()} Cancelled - {client_name} - {order.name}', 'body_html': body_html, 'email_to': email_to, 'email_cc': email_cc, 'model': 'sale.order', 'res_id': order.id, }).send() order._email_chatter_log( f'{type_label.title()} Cancelled email sent', email_to, email_cc) return True except Exception as e: _logger.error("Failed to send task cancelled email for %s: %s", self.name, e) return False def _send_task_scheduled_email(self): """Send appointment scheduled email to client, technician, and sales rep.""" self.ensure_one() order = self.sale_order_id if not order: return False try: if not order._is_email_notifications_enabled(): return False except Exception: return False recipients = self._get_task_email_recipients() to_emails = recipients.get('to', []) cc_emails = recipients.get('cc', []) if not to_emails and not cc_emails: return False client_name = self.partner_id.name or 'Client' type_label = dict(self._fields['task_type'].selection).get( self.task_type, self.task_type or 'Task') sender_name = self.env.user.name detail_rows = self._get_task_email_details() if self.description: detail_rows.append(('Notes', self.description)) body_html = order._email_build( title=f'{type_label.title()} Scheduled', summary=( f'A {type_label.lower()} has been scheduled for ' f'{client_name}.' ), email_type='success', sections=[('Appointment Details', detail_rows)], note=( 'Please note: If you need to change this appointment, ' 'please contact our office as soon as possible so we can accommodate ' 'the change.' ), note_color='#38a169', button_url=f'{order.get_base_url()}/web#id={order.id}&model=sale.order&view_type=form', sender_name=sender_name, ) email_to = ', '.join(to_emails) if to_emails else ', '.join(cc_emails[:1]) email_cc = ', '.join(cc_emails) if to_emails else ', '.join(cc_emails[1:]) try: self.env['mail.mail'].sudo().create({ 'subject': f'{type_label.title()} Scheduled - {client_name} - {order.name}', 'body_html': body_html, 'email_to': email_to, 'email_cc': email_cc, 'model': 'sale.order', 'res_id': order.id, }).send() order._email_chatter_log( f'{type_label.title()} Scheduled email sent', email_to, email_cc) return True except Exception as e: _logger.error("Failed to send task scheduled email for %s: %s", self.name, e) return False def _send_task_rescheduled_email(self, old_date=None, old_start=None, old_end=None): """Send reschedule email to client, technician, and sales rep. Shows old vs new schedule for clarity.""" self.ensure_one() order = self.sale_order_id if not order: return False try: if not order._is_email_notifications_enabled(): return False except Exception: return False recipients = self._get_task_email_recipients() to_emails = recipients.get('to', []) cc_emails = recipients.get('cc', []) if not to_emails and not cc_emails: return False client_name = self.partner_id.name or 'Client' type_label = dict(self._fields['task_type'].selection).get( self.task_type, self.task_type or 'Task') sender_name = self.env.user.name detail_rows = self._get_task_email_details() # Show old schedule if provided if old_date or old_start is not None: old_parts = [] if old_date: old_parts.append(old_date.strftime('%B %d, %Y')) if old_start is not None: old_parts.append( f'{self._float_to_time_str(old_start)} - ' f'{self._float_to_time_str(old_end or old_start + 1.0)}') detail_rows.insert(3, ('Previous Schedule', ', '.join(old_parts))) body_html = order._email_build( title=f'{type_label.title()} Rescheduled', summary=( f'The {type_label.lower()} for ' f'{client_name} has been rescheduled.' ), email_type='attention', sections=[('Updated Appointment Details', detail_rows)], note=( 'Please note: The appointment has been updated ' 'to the new date and time shown above. If you have any questions, ' 'please contact our office.' ), note_color='#d69e2e', button_url=f'{order.get_base_url()}/web#id={order.id}&model=sale.order&view_type=form', sender_name=sender_name, ) email_to = ', '.join(to_emails) if to_emails else ', '.join(cc_emails[:1]) email_cc = ', '.join(cc_emails) if to_emails else ', '.join(cc_emails[1:]) try: self.env['mail.mail'].sudo().create({ 'subject': f'{type_label.title()} Rescheduled - {client_name} - {order.name}', 'body_html': body_html, 'email_to': email_to, 'email_cc': email_cc, 'model': 'sale.order', 'res_id': order.id, }).send() order._email_chatter_log( f'{type_label.title()} Rescheduled email sent', email_to, email_cc) return True except Exception as e: _logger.error("Failed to send rescheduled email for %s: %s", self.name, e) return False def get_next_task_for_technician(self): """Get the next task in sequence for the same technician+date after this one.""" self.ensure_one() return self.sudo().search([ ('technician_id', '=', self.technician_id.id), ('scheduled_date', '=', self.scheduled_date), ('time_start', '>=', self.time_start), ('status', 'in', ['scheduled', 'en_route']), ('id', '!=', self.id), ], order='time_start, sequence, id', limit=1) # ------------------------------------------------------------------ # GOOGLE MAPS INTEGRATION # ------------------------------------------------------------------ def _get_google_maps_api_key(self): """Get the Google Maps API key from config.""" return self.env['ir.config_parameter'].sudo().get_param( 'fusion_claims.google_maps_api_key', '' ) @api.model def get_map_data(self, domain=None): """Return task data, technician locations, and Google Maps API key. Args: domain: optional extra domain from the search bar filters. """ api_key = self.env['ir.config_parameter'].sudo().get_param( 'fusion_claims.google_maps_api_key', '') local_instance = self.env['ir.config_parameter'].sudo().get_param( 'fusion_claims.sync_instance_id', '') base_domain = [ ('status', 'not in', ['cancelled']), ] if domain: base_domain = expression.AND([base_domain, domain]) tasks = self.sudo().search_read( base_domain, ['name', 'partner_id', 'technician_id', 'task_type', 'address_lat', 'address_lng', 'address_display', 'time_start', 'time_start_display', 'time_end_display', 'status', 'scheduled_date', 'travel_time_minutes', 'x_fc_sync_client_name', 'x_fc_is_shadow', 'x_fc_sync_source'], order='scheduled_date asc, time_start asc', limit=500, ) locations = self.env['fusion.technician.location'].get_latest_locations() return { 'api_key': api_key, 'tasks': tasks, 'locations': locations, 'local_instance_id': local_instance, } def _geocode_address(self): """Geocode the task address using Google Geocoding API.""" self.ensure_one() api_key = self._get_google_maps_api_key() if not api_key or not self.address_display: return False try: url = 'https://maps.googleapis.com/maps/api/geocode/json' params = { 'address': self.address_display, 'key': api_key, 'region': 'ca', } resp = requests.get(url, params=params, timeout=10) data = resp.json() if data.get('status') == 'OK' and data.get('results'): location = data['results'][0]['geometry']['location'] self.write({ 'address_lat': location['lat'], 'address_lng': location['lng'], }) return True except Exception as e: _logger.warning(f"Geocoding failed for task {self.name}: {e}") return False def _calculate_travel_time(self, origin_lat, origin_lng): """Calculate travel time from origin to this task using Distance Matrix API.""" self.ensure_one() api_key = self._get_google_maps_api_key() if not api_key: return False if not (origin_lat and origin_lng and self.address_lat and self.address_lng): return False try: url = 'https://maps.googleapis.com/maps/api/distancematrix/json' params = { 'origins': f'{origin_lat},{origin_lng}', 'destinations': f'{self.address_lat},{self.address_lng}', 'key': api_key, 'mode': 'driving', 'avoid': 'tolls', 'traffic_model': 'best_guess', 'departure_time': 'now', } resp = requests.get(url, params=params, timeout=10) data = resp.json() if data.get('status') == 'OK': element = data['rows'][0]['elements'][0] if element.get('status') == 'OK': duration_seconds = element['duration_in_traffic']['value'] if 'duration_in_traffic' in element else element['duration']['value'] distance_meters = element['distance']['value'] self.write({ 'travel_time_minutes': round(duration_seconds / 60), 'travel_distance_km': round(distance_meters / 1000, 1), }) return True except Exception as e: _logger.warning(f"Travel time calculation failed for task {self.name}: {e}") return False def action_calculate_travel_times(self): """Calculate travel times for a day's schedule. Called from backend button or cron.""" self._do_calculate_travel_times() # Return False to stay on the current form without navigation return False def _do_calculate_travel_times(self): """Internal: calculate travel times for tasks. Does not return an action.""" # Group tasks by technician and date task_groups = {} for task in self: key = (task.technician_id.id, task.scheduled_date) if key not in task_groups: task_groups[key] = self.env['fusion.technician.task'] task_groups[key] |= task api_key = self._get_google_maps_api_key() start_coords_cache = {} for (tech_id, date), tasks in task_groups.items(): sorted_tasks = tasks.sorted(lambda t: (t.sequence, t.time_start)) # Get this technician's start location (personal or company default) if tech_id not in start_coords_cache: addr = self._get_technician_start_address(tech_id) start_coords_cache[tech_id] = self._geocode_address_string(addr, api_key) prev_lat, prev_lng = start_coords_cache[tech_id] for i, task in enumerate(sorted_tasks): # Geocode task if needed if not (task.address_lat and task.address_lng): task._geocode_address() if prev_lat and prev_lng and task.address_lat and task.address_lng: task._calculate_travel_time(prev_lat, prev_lng) task.previous_task_id = sorted_tasks[i - 1].id if i > 0 else False task.travel_origin = 'Start Location' if i == 0 else f'Task {sorted_tasks[i - 1].name}' prev_lat = task.address_lat prev_lng = task.address_lng @api.model def _cron_calculate_travel_times(self): """Cron job: Calculate travel times for today and tomorrow.""" today = fields.Date.context_today(self) tomorrow = today + timedelta(days=1) tasks = self.search([ ('scheduled_date', 'in', [today, tomorrow]), ('status', 'in', ['scheduled', 'en_route']), ]) if tasks: tasks._do_calculate_travel_times() _logger.info(f"Calculated travel times for {len(tasks)} tasks") # ------------------------------------------------------------------ # PORTAL HELPERS # ------------------------------------------------------------------ def get_technician_tasks_for_date(self, user_id, date): """Get all tasks for a technician on a given date, ordered by sequence.""" return self.sudo().search([ ('technician_id', '=', user_id), ('scheduled_date', '=', date), ('status', '!=', 'cancelled'), ], order='sequence, time_start, id') def get_next_task(self, user_id): """Get the next upcoming task for a technician.""" today = fields.Date.context_today(self) return self.sudo().search([ ('technician_id', '=', user_id), ('scheduled_date', '>=', today), ('status', 'in', ['scheduled', 'en_route']), ], order='scheduled_date, sequence, time_start', limit=1) def get_current_task(self, user_id): """Get the current in-progress task for a technician.""" today = fields.Date.context_today(self) return self.sudo().search([ ('technician_id', '=', user_id), ('scheduled_date', '=', today), ('status', '=', 'in_progress'), ], limit=1) # ------------------------------------------------------------------ # PUSH NOTIFICATIONS # ------------------------------------------------------------------ def _send_push_notification(self, title, body_text, url=None): """Send a web push notification for this task.""" self.ensure_one() PushSub = self.env['fusion.push.subscription'].sudo() subscriptions = PushSub.search([ ('user_id', '=', self.technician_id.id), ('active', '=', True), ]) if not subscriptions: return ICP = self.env['ir.config_parameter'].sudo() vapid_private = ICP.get_param('fusion_claims.vapid_private_key', '') vapid_public = ICP.get_param('fusion_claims.vapid_public_key', '') if not vapid_private or not vapid_public: _logger.warning("VAPID keys not configured, cannot send push notification") return try: from pywebpush import webpush, WebPushException except ImportError: _logger.warning("pywebpush not installed, cannot send push notifications") return payload = json.dumps({ 'title': title, 'body': body_text, 'url': url or f'/my/technician/task/{self.id}', 'task_id': self.id, 'task_type': self.task_type, }) for sub in subscriptions: try: webpush( subscription_info={ 'endpoint': sub.endpoint, 'keys': { 'p256dh': sub.p256dh_key, 'auth': sub.auth_key, }, }, data=payload, vapid_private_key=vapid_private, vapid_claims={'sub': 'mailto:support@nexasystems.ca'}, ) except Exception as e: _logger.warning(f"Push notification failed for subscription {sub.id}: {e}") # Deactivate invalid subscriptions if 'gone' in str(e).lower() or '410' in str(e): sub.active = False self.write({ 'push_notified': True, 'push_notified_datetime': fields.Datetime.now(), }) @api.model def _cron_send_push_notifications(self): """Cron: Send push notifications for upcoming tasks.""" ICP = self.env['ir.config_parameter'].sudo() if not ICP.get_param('fusion_claims.push_enabled', False): return advance_minutes = int(ICP.get_param('fusion_claims.push_advance_minutes', '30')) now = fields.Datetime.now() # Find tasks starting within advance_minutes that haven't been notified tasks = self.search([ ('scheduled_date', '=', now.date()), ('status', '=', 'scheduled'), ('push_notified', '=', False), ]) for task in tasks: # Check if task is within the notification window task_start_hour = int(task.time_start) task_start_min = int((task.time_start % 1) * 60) task_start_dt = now.replace(hour=task_start_hour, minute=task_start_min, second=0) minutes_until = (task_start_dt - now).total_seconds() / 60 if 0 <= minutes_until <= advance_minutes: task_type_label = dict(self._fields['task_type'].selection).get(task.task_type, task.task_type) title = f'Upcoming: {task_type_label}' body_text = f'{task.partner_id.name or "Task"} - {task.time_start_display}' if task.travel_time_minutes: body_text += f' ({task.travel_time_minutes} min drive)' task._send_push_notification(title, body_text) # ------------------------------------------------------------------ # HELPERS # ------------------------------------------------------------------ @staticmethod def _float_to_time_str(value): """Convert float hours to time string like '9:30 AM'.""" if not value and value != 0: return '' hours = int(value) minutes = int(round((value % 1) * 60)) period = 'AM' if hours < 12 else 'PM' display_hour = hours % 12 or 12 return f'{display_hour}:{minutes:02d} {period}' def get_google_maps_url(self): """Get Google Maps navigation URL. Uses lat/lng coordinates to navigate to the exact location (text addresses cause Google to resolve to nearby business names instead).""" self.ensure_one() if self.address_lat and self.address_lng: return f'https://www.google.com/maps/dir/?api=1&destination={self.address_lat},{self.address_lng}&travelmode=driving' elif self.address_display: return f'https://www.google.com/maps/dir/?api=1&destination={urllib.parse.quote(self.address_display)}&travelmode=driving' return ''