# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
import logging
import re
from datetime import timedelta
from odoo import api, fields, models, _
from odoo.exceptions import ValidationError
_logger = logging.getLogger(__name__)
class FusionClockSchedule(models.Model):
_name = 'fusion.clock.schedule'
_description = 'Clock Shift Schedule Entry'
_order = 'schedule_date, employee_id'
_rec_name = 'display_name'
employee_id = fields.Many2one(
'hr.employee',
string='Employee',
required=True,
index=True,
ondelete='cascade',
)
schedule_date = fields.Date(
string='Date',
required=True,
index=True,
)
shift_id = fields.Many2one(
'fusion.clock.shift',
string='Shift Template',
ondelete='set null',
)
is_off = fields.Boolean(
string='Off',
default=False,
index=True,
)
start_time = fields.Float(
string='Start Time',
default=9.0,
)
end_time = fields.Float(
string='End Time',
default=17.0,
)
break_minutes = fields.Float(
string='Break (min)',
default=30.0,
)
planned_hours = fields.Float(
string='Hours',
compute='_compute_planned_hours',
store=True,
)
note = fields.Char(string='Note')
company_id = fields.Many2one(
'res.company',
string='Company',
related='employee_id.company_id',
store=True,
readonly=True,
)
department_id = fields.Many2one(
'hr.department',
string='Department',
related='employee_id.department_id',
store=True,
readonly=True,
)
display_name = fields.Char(
compute='_compute_display_name',
store=True,
)
state = fields.Selection(
[('draft', 'Draft'), ('posted', 'Posted')],
string='Status',
default='draft',
index=True,
help="Only POSTED entries drive reminders, absence checks and penalties. "
"Draft entries are ignored by automation until the team lead posts them.",
)
posted_date = fields.Datetime(string='Posted On', readonly=True)
_employee_date_unique = models.Constraint(
'UNIQUE(employee_id, schedule_date)',
'Only one shift schedule is allowed per employee per day.',
)
@api.depends('is_off', 'start_time', 'end_time', 'break_minutes')
def _compute_planned_hours(self):
for rec in self:
if rec.is_off:
rec.planned_hours = 0.0
continue
raw_hours = (rec.end_time or 0.0) - (rec.start_time or 0.0)
rec.planned_hours = round(max(raw_hours - ((rec.break_minutes or 0.0) / 60.0), 0.0), 2)
@api.depends('employee_id', 'schedule_date', 'is_off', 'start_time', 'end_time')
def _compute_display_name(self):
for rec in self:
emp = rec.employee_id.name or ''
date_str = str(rec.schedule_date) if rec.schedule_date else ''
rec.display_name = f"{emp} - {date_str} - {rec.fclk_display_value()}"
@api.constrains('is_off', 'start_time', 'end_time', 'break_minutes')
def _check_schedule_times(self):
for rec in self:
if rec.break_minutes < 0:
raise ValidationError(_("Break minutes cannot be negative."))
if rec.is_off:
continue
if rec.start_time < 0 or rec.start_time >= 24:
raise ValidationError(_("Start time must be between 00:00 and 23:59."))
if rec.end_time <= 0 or rec.end_time > 24:
raise ValidationError(_("End time must be between 00:01 and 24:00."))
if rec.end_time <= rec.start_time:
raise ValidationError(_("End time must be after start time. Overnight shifts are not supported yet."))
shift_minutes = (rec.end_time - rec.start_time) * 60.0
if rec.break_minutes >= shift_minutes:
raise ValidationError(_("Break duration must be shorter than the scheduled shift."))
@api.onchange('shift_id')
def _onchange_shift_id(self):
for rec in self:
if rec.shift_id:
rec.is_off = False
rec.start_time = rec.shift_id.start_time
rec.end_time = rec.shift_id.end_time
rec.break_minutes = rec.shift_id.break_minutes
@api.model
def fclk_float_to_display(self, value):
value = float(value or 0.0)
hour = int(value)
minute = int(round((value - hour) * 60))
if minute == 60:
hour += 1
minute = 0
suffix = 'am' if hour < 12 or hour == 24 else 'pm'
display_hour = hour % 12
if display_hour == 0:
display_hour = 12
return f"{display_hour}:{minute:02d} {suffix}"
def fclk_display_value(self):
self.ensure_one()
if self.is_off:
return 'OFF'
return (
f"{self.env['fusion.clock.schedule'].fclk_float_to_display(self.start_time)} - "
f"{self.env['fusion.clock.schedule'].fclk_float_to_display(self.end_time)}"
)
@api.model
def fclk_hours_display(self, hours):
hours = float(hours or 0.0)
whole = int(hours)
minutes = int(round((hours - whole) * 60))
if minutes == 60:
whole += 1
minutes = 0
return f"{whole}:{minutes:02d}"
@api.model
def _fclk_parse_time_part(self, raw):
text = (raw or '').strip().lower().replace('.', '')
match = re.match(r'^(\d{1,2})(?::(\d{1,2}))?\s*(am|pm)?$', text)
if not match:
raise ValidationError(_("Could not understand time '%s'.") % raw)
hour = int(match.group(1))
minute = int(match.group(2) or 0)
meridiem = match.group(3)
if minute < 0 or minute > 59:
raise ValidationError(_("Minutes must be between 00 and 59."))
if meridiem:
if hour < 1 or hour > 12:
raise ValidationError(_("12-hour times must use hours from 1 to 12."))
if meridiem == 'am':
hour = 0 if hour == 12 else hour
else:
hour = 12 if hour == 12 else hour + 12
elif hour > 24:
raise ValidationError(_("Hours must be between 0 and 24."))
return hour + (minute / 60.0)
@api.model
def fclk_parse_planner_input(self, input_value, default_break_minutes=30.0):
text = (input_value or '').strip()
if not text:
return {'clear': True}
if text.upper() == 'OFF':
return {
'clear': False,
'is_off': True,
'shift_id': False,
'start_time': 0.0,
'end_time': 0.0,
'break_minutes': 0.0,
}
normalized = (
text.replace('–', '-')
.replace('—', '-')
.replace(' to ', '-')
.replace(' TO ', '-')
)
parts = [p.strip() for p in normalized.split('-', 1)]
if len(parts) != 2 or not parts[0] or not parts[1]:
raise ValidationError(_("Enter a shift as '9-5', '9:00-5:30', '9:00 am - 5:30 pm', or OFF."))
start = self._fclk_parse_time_part(parts[0])
end = self._fclk_parse_time_part(parts[1])
if end <= start and end + 12 <= 24:
end += 12
if end <= start:
raise ValidationError(_("End time must be after start time. Overnight shifts are not supported yet."))
return {
'clear': False,
'is_off': False,
'shift_id': False,
'start_time': start,
'end_time': end,
'break_minutes': float(default_break_minutes or 0.0),
}
@api.model
def fclk_values_from_planner_payload(self, payload, employee):
payload = payload or {}
if 'start_time' in payload and 'end_time' in payload and not payload.get('shift_id'):
if payload.get('is_off'):
return {
'clear': False,
'is_off': True,
'shift_id': False,
'start_time': 0.0,
'end_time': 0.0,
'break_minutes': 0.0,
}
return {
'clear': False,
'is_off': False,
'shift_id': False,
'start_time': float(payload.get('start_time') or 0.0),
'end_time': float(payload.get('end_time') or 0.0),
'break_minutes': float(payload.get('break_minutes') or 0.0),
}
shift_id = int(payload.get('shift_id') or 0)
if shift_id:
shift = self.env['fusion.clock.shift'].sudo().browse(shift_id)
if not shift.exists():
raise ValidationError(_("Selected shift template no longer exists."))
return {
'clear': False,
'shift_id': shift.id,
'is_off': False,
'start_time': shift.start_time,
'end_time': shift.end_time,
'break_minutes': shift.break_minutes,
}
default_break = employee._get_fclk_break_minutes() if employee else 30.0
return self.fclk_parse_planner_input(payload.get('input', ''), default_break)
@api.model
def fclk_snapshot(self, schedule):
if not schedule:
return ''
return schedule.fclk_display_value()
@api.model
def fclk_apply_planner_cell(self, employee, schedule_date, payload, user=None):
self = self.sudo()
employee = employee.sudo()
date_obj = fields.Date.to_date(schedule_date)
if not employee.exists() or not date_obj:
raise ValidationError(_("Invalid employee or schedule date."))
existing = self.search([
('employee_id', '=', employee.id),
('schedule_date', '=', date_obj),
], limit=1)
old_value = self.fclk_snapshot(existing)
parsed = self.fclk_values_from_planner_payload(payload, employee)
if parsed.get('clear'):
if existing:
existing.unlink()
new_schedule = self.browse()
new_value = ''
else:
vals = {
'employee_id': employee.id,
'schedule_date': date_obj,
'shift_id': parsed.get('shift_id') or False,
'is_off': bool(parsed.get('is_off')),
'start_time': parsed.get('start_time') or 0.0,
'end_time': parsed.get('end_time') or 0.0,
'break_minutes': parsed.get('break_minutes') or 0.0,
'note': payload.get('note') or False,
# Any planner edit returns the cell to draft; it must be re-posted
# before automation acts on it.
'state': 'draft',
'posted_date': False,
}
if existing:
existing.write(vals)
new_schedule = existing
else:
new_schedule = self.create(vals)
new_value = new_schedule.fclk_display_value()
if old_value != new_value:
self.env['fusion.clock.schedule.audit'].sudo().create({
'schedule_id': new_schedule.id if new_schedule else False,
'employee_id': employee.id,
'schedule_date': date_obj,
'old_value': old_value,
'new_value': new_value,
'changed_by_id': (user or self.env.user).id,
'changed_at': fields.Datetime.now(),
'company_id': employee.company_id.id,
'department_id': employee.department_id.id,
})
return new_schedule
@api.model
def fclk_cell_payload(self, employee, date_obj, schedule=None):
schedule = schedule or self.search([
('employee_id', '=', employee.id),
('schedule_date', '=', date_obj),
], limit=1)
Schedule = self.env['fusion.clock.schedule']
if schedule:
return {
'schedule_id': schedule.id,
'source': 'schedule',
'state': schedule.state,
'input': schedule.fclk_display_value(),
'label': schedule.fclk_display_value(),
'is_off': schedule.is_off,
'shift_id': schedule.shift_id.id or False,
'start_time': schedule.start_time,
'end_time': schedule.end_time,
'break_minutes': schedule.break_minutes,
'hours': schedule.planned_hours,
'hours_display': Schedule.fclk_hours_display(schedule.planned_hours),
'note': schedule.note or '',
}
plan = employee._get_fclk_day_plan(date_obj)
return {
'schedule_id': False,
'source': plan.get('source') or 'none',
'state': False,
'input': plan.get('label') or '',
'label': plan.get('label') or '',
'is_off': plan.get('is_off', False),
'shift_id': False,
'start_time': plan.get('start_time') or 0.0,
'end_time': plan.get('end_time') or 0.0,
'break_minutes': plan.get('break_minutes') or 0.0,
'hours': plan.get('hours') or 0.0,
'hours_display': Schedule.fclk_hours_display(plan.get('hours') or 0.0),
'note': '',
}
@api.model
def fclk_email_posted_week(self, employee, week_start, week_end):
"""Email one employee a summary of their POSTED shifts for the week."""
employee = employee.sudo()
if not employee.work_email:
return False
from .hr_attendance import _fclk_email_wrap
entries = self.sudo().search([
('employee_id', '=', employee.id),
('schedule_date', '>=', week_start),
('schedule_date', '<=', week_end),
('state', '=', 'posted'),
])
by_date = {entry.schedule_date: entry for entry in entries}
rows = []
day = week_start
while day <= week_end:
entry = by_date.get(day)
rows.append((
day.strftime('%a %b %d'),
entry.fclk_display_value() if entry else 'Not scheduled',
))
day += timedelta(days=1)
company = employee.company_id or self.env.company
body = _fclk_email_wrap(
company_name=company.name or '',
title='Your Posted Schedule',
summary=(
f'Hello {employee.name}, your shifts for '
f'{week_start.strftime("%b %d")} - {week_end.strftime("%b %d, %Y")} '
f'have been posted.'
),
sections=[('This Week', rows)],
note='Log in to your portal for details.',
)
try:
mail = self.env['mail.mail'].sudo().create({
'subject': f'Your schedule: {week_start.strftime("%b %d")} - {week_end.strftime("%b %d")}',
'email_from': company.email or '',
'email_to': employee.work_email,
'body_html': body,
'auto_delete': True,
})
mail.send()
return True
except Exception as exc:
_logger.error(
"Fusion Clock: failed to email posted schedule to %s: %s", employee.name, exc
)
return False
class FusionClockScheduleAudit(models.Model):
_name = 'fusion.clock.schedule.audit'
_description = 'Clock Schedule Change Audit'
_order = 'changed_at desc, id desc'
_rec_name = 'display_name'
schedule_id = fields.Many2one(
'fusion.clock.schedule',
string='Schedule',
ondelete='set null',
index=True,
)
employee_id = fields.Many2one(
'hr.employee',
string='Employee',
required=True,
index=True,
ondelete='cascade',
)
schedule_date = fields.Date(
string='Schedule Date',
required=True,
index=True,
)
old_value = fields.Char(string='Old Value')
new_value = fields.Char(string='New Value')
changed_by_id = fields.Many2one(
'res.users',
string='Changed By',
required=True,
ondelete='restrict',
)
changed_at = fields.Datetime(
string='Changed At',
default=fields.Datetime.now,
required=True,
index=True,
)
company_id = fields.Many2one(
'res.company',
string='Company',
index=True,
)
department_id = fields.Many2one(
'hr.department',
string='Department',
index=True,
)
display_name = fields.Char(
compute='_compute_display_name',
store=True,
)
@api.depends('employee_id', 'schedule_date', 'old_value', 'new_value')
def _compute_display_name(self):
for rec in self:
rec.display_name = "%s - %s: %s -> %s" % (
rec.employee_id.name or '',
rec.schedule_date or '',
rec.old_value or 'blank',
rec.new_value or 'blank',
)