# -*- coding: utf-8 -*- # Copyright 2026 Nexa Systems Inc. # License OPL-1 (Odoo Proprietary License v1.0) import base64 import io from collections import defaultdict from datetime import timedelta from odoo import fields, http, _ from odoo.exceptions import ValidationError from odoo.http import request class FusionClockShiftPlanner(http.Controller): """Backend JSON-RPC API for the Excel-style weekly shift planner.""" def _check_manager(self): return request.env.user.has_group('fusion_clock.group_fusion_clock_manager') def _week_start(self, week_start=None): date_obj = fields.Date.to_date(week_start) if week_start else fields.Date.today() return date_obj - timedelta(days=date_obj.weekday()) def _manager_employees(self): return request.env['hr.employee'].sudo().search([ ('x_fclk_enable_clock', '=', True), ('company_id', 'in', request.env.user.company_ids.ids), ], order='department_id, name') def _load_week_data(self, week_start=None): start = self._week_start(week_start) days = [start + timedelta(days=i) for i in range(7)] employees = self._manager_employees() Schedule = request.env['fusion.clock.schedule'].sudo() schedules = Schedule.search([ ('employee_id', 'in', employees.ids), ('schedule_date', '>=', start), ('schedule_date', '<=', days[-1]), ]) schedule_map = { (schedule.employee_id.id, schedule.schedule_date): schedule for schedule in schedules } grouped = defaultdict(list) for employee in employees: grouped[employee.department_id.id or 0].append(employee) departments = [] employee_rows = [] for department_id, department_employees in grouped.items(): department = department_employees[0].department_id departments.append({ 'id': department_id, 'name': department.name if department else _('No Department'), 'employee_ids': [emp.id for emp in department_employees], }) for employee in department_employees: cells = {} for day in days: cells[str(day)] = Schedule.fclk_cell_payload( employee, day, schedule_map.get((employee.id, day)), ) employee_rows.append({ 'id': employee.id, 'name': employee.name, 'department_id': department_id, 'department_name': department.name if department else _('No Department'), 'job_title': employee.job_title or '', 'cells': cells, }) shifts = request.env['fusion.clock.shift'].sudo().search([ ('active', '=', True), ('company_id', 'in', request.env.user.company_ids.ids), ], order='sequence, name') open_rows = Schedule.search([ ('is_open', '=', True), ('company_id', 'in', request.env.user.company_ids.ids), ('schedule_date', '>=', start), ('schedule_date', '<=', days[-1]), ], order='schedule_date, start_time') open_by_day = {} for row in open_rows: open_by_day.setdefault(str(row.schedule_date), []).append({ 'id': row.id, 'label': row.fclk_display_value(), 'role_name': row.role_id.name or '', 'role_color': row.role_id._get_color_from_code(True) if row.role_id else '', 'hours_display': Schedule.fclk_hours_display(row.planned_hours), }) return { 'week_start': str(start), 'week_end': str(days[-1]), 'open_shifts': open_by_day, 'days': [{ 'date': str(day), 'weekday': day.strftime('%a').upper(), 'label': day.strftime('%d-%b'), } for day in days], 'departments': departments, 'employees': employee_rows, 'shifts': [{ 'id': shift.id, 'name': shift.name, 'start_time': shift.start_time, 'end_time': shift.end_time, 'break_minutes': shift.break_minutes, 'hours': shift.scheduled_hours, 'hours_display': Schedule.fclk_hours_display(shift.scheduled_hours), 'label': '%s - %s' % ( Schedule.fclk_float_to_display(shift.start_time), Schedule.fclk_float_to_display(shift.end_time), ), 'option_label': '%s (%s - %s)' % ( shift.name, Schedule.fclk_float_to_display(shift.start_time), Schedule.fclk_float_to_display(shift.end_time), ), } for shift in shifts], } @http.route('/fusion_clock/shift_planner/load', type='jsonrpc', auth='user', methods=['POST']) def load(self, week_start=None, **kw): if not self._check_manager(): return {'error': 'Access denied.'} return self._load_week_data(week_start) @http.route('/fusion_clock/shift_planner/save', type='jsonrpc', auth='user', methods=['POST']) def save(self, week_start=None, changes=None, **kw): if not self._check_manager(): return {'error': 'Access denied.'} employees = self._manager_employees() employee_map = {employee.id: employee for employee in employees} Schedule = request.env['fusion.clock.schedule'].sudo() errors = [] saved = 0 for change in changes or []: employee_id = int(change.get('employee_id') or 0) employee = employee_map.get(employee_id) date_str = change.get('date') if not employee: errors.append({ 'employee_id': employee_id, 'date': date_str, 'message': 'Employee not found or not allowed.', }) continue try: Schedule.fclk_apply_planner_cell(employee, date_str, change, request.env.user) saved += 1 except ValidationError as exc: errors.append({ 'employee_id': employee_id, 'date': date_str, 'message': str(exc.args[0] if exc.args else exc), }) if errors: return {'success': False, 'saved': saved, 'errors': errors} return { 'success': True, 'saved': saved, 'data': self._load_week_data(week_start), } @http.route('/fusion_clock/shift_planner/post_week', type='jsonrpc', auth='user', methods=['POST']) def post_week(self, week_start=None, **kw): """Publish (post) the viewed week's draft entries so automation acts on them, and email each newly-affected employee their posted shifts.""" if not self._check_manager(): return {'error': 'Access denied.'} start = self._week_start(week_start) end = start + timedelta(days=6) employees = self._manager_employees() Schedule = request.env['fusion.clock.schedule'].sudo() posted_count, notified = Schedule.fclk_publish_range(employees, start, end) return { 'success': True, 'posted': posted_count, 'notified': notified, 'data': self._load_week_data(start), } @http.route('/fusion_clock/shift_planner/publish_range', type='jsonrpc', auth='user', methods=['POST']) def publish_range(self, date_from=None, date_to=None, employee_ids=None, message=None, week_start=None, **kw): """Publish & Notify over an arbitrary date range, optionally limited to a subset of employees, with an optional custom message in the email.""" if not self._check_manager(): return {'error': 'Access denied.'} start = fields.Date.to_date(date_from) or self._week_start(week_start) end = fields.Date.to_date(date_to) or (start + timedelta(days=6)) if end < start: return {'success': False, 'message': 'End date must be on or after the start date.'} employees = self._manager_employees() if employee_ids: wanted = {int(eid) for eid in employee_ids} employees = employees.filtered(lambda e: e.id in wanted) Schedule = request.env['fusion.clock.schedule'].sudo() posted_count, notified = Schedule.fclk_publish_range(employees, start, end, message=message) return { 'success': True, 'posted': posted_count, 'notified': notified, 'data': self._load_week_data(week_start), } @http.route('/fusion_clock/shift_planner/copy_previous_week', type='jsonrpc', auth='user', methods=['POST']) def copy_previous_week(self, week_start=None, **kw): if not self._check_manager(): return {'error': 'Access denied.'} start = self._week_start(week_start) prev_start = start - timedelta(days=7) employees = self._manager_employees() Schedule = request.env['fusion.clock.schedule'].sudo() prev_schedules = Schedule.search([ ('employee_id', 'in', employees.ids), ('schedule_date', '>=', prev_start), ('schedule_date', '<=', prev_start + timedelta(days=6)), ]) prev_map = { (schedule.employee_id.id, schedule.schedule_date): schedule for schedule in prev_schedules } before_count = request.env['fusion.clock.schedule.audit'].sudo().search_count([]) for employee in employees: for offset in range(7): source_date = prev_start + timedelta(days=offset) target_date = start + timedelta(days=offset) source = prev_map.get((employee.id, source_date)) if not source: payload = {'input': ''} elif source.is_off: payload = {'input': 'OFF'} elif source.shift_id: payload = {'shift_id': source.shift_id.id, 'input': source.fclk_display_value()} else: payload = { 'input': source.fclk_display_value(), 'start_time': source.start_time, 'end_time': source.end_time, 'break_minutes': source.break_minutes, } Schedule.fclk_apply_planner_cell(employee, target_date, payload, request.env.user) after_count = request.env['fusion.clock.schedule.audit'].sudo().search_count([]) return { 'success': True, 'changed': after_count - before_count, 'data': self._load_week_data(start), } @http.route('/fusion_clock/shift_planner/set_recurrence', type='jsonrpc', auth='user', methods=['POST']) def set_recurrence(self, employee_id=None, date=None, repeat=None, week_start=None, **kw): """Make the shift at (employee, date) recurring and generate it forward.""" if not self._check_manager(): return {'error': 'Access denied.'} Schedule = request.env['fusion.clock.schedule'].sudo() schedule = Schedule.search([ ('employee_id', '=', int(employee_id or 0)), ('schedule_date', '=', date), ], limit=1) if not schedule: return {'success': False, 'message': 'Save this shift before repeating it.'} try: Schedule.fclk_attach_recurrence(schedule, repeat or {}) except ValidationError as exc: return {'success': False, 'message': str(exc.args[0] if exc.args else exc)} return {'success': True, 'data': self._load_week_data(week_start)} @http.route('/fusion_clock/shift_planner/clear_recurrence', type='jsonrpc', auth='user', methods=['POST']) def clear_recurrence(self, employee_id=None, date=None, week_start=None, **kw): """Stop the recurrence seeded at (employee, date); keep posted rows.""" if not self._check_manager(): return {'error': 'Access denied.'} Schedule = request.env['fusion.clock.schedule'].sudo() schedule = Schedule.search([ ('employee_id', '=', int(employee_id or 0)), ('schedule_date', '=', date), ], limit=1) if schedule: Schedule.fclk_clear_recurrence(schedule) return {'success': True, 'data': self._load_week_data(week_start)} @http.route('/fusion_clock/shift_planner/create_open_shift', type='jsonrpc', auth='user', methods=['POST']) def create_open_shift(self, date=None, start_time=None, end_time=None, role_id=None, count=1, break_minutes=0.0, week_start=None, **kw): """Create one or more open (unassignable) shifts for a day.""" if not self._check_manager(): return {'error': 'Access denied.'} Schedule = request.env['fusion.clock.schedule'].sudo() company = request.env.company try: Schedule.fclk_create_open_shifts( company, date, start_time, end_time, role_id=role_id, count=count, break_minutes=break_minutes) except ValidationError as exc: return {'success': False, 'message': str(exc.args[0] if exc.args else exc)} return {'success': True, 'data': self._load_week_data(week_start)} @http.route('/fusion_clock/shift_planner/delete_open_shift', type='jsonrpc', auth='user', methods=['POST']) def delete_open_shift(self, schedule_id=None, week_start=None, **kw): if not self._check_manager(): return {'error': 'Access denied.'} Schedule = request.env['fusion.clock.schedule'].sudo() row = Schedule.browse(int(schedule_id or 0)) if not (row.exists() and row.is_open): return {'success': False, 'message': 'That open shift is no longer available (it may have just been claimed).'} row.unlink() return {'success': True, 'data': self._load_week_data(week_start)} @http.route('/fusion_clock/shift_planner/bulk_apply', type='jsonrpc', auth='user', methods=['POST']) def bulk_apply(self, employee_ids=None, date=None, payload=None, week_start=None, **kw): """Apply one shift to several employees at once (Apply Also To).""" if not self._check_manager(): return {'error': 'Access denied.'} employees = self._manager_employees() wanted = {int(eid) for eid in (employee_ids or [])} employees = employees.filtered(lambda e: e.id in wanted) if not employees: return {'success': False, 'message': 'Pick at least one employee.'} Schedule = request.env['fusion.clock.schedule'].sudo() try: Schedule.fclk_bulk_apply(employees, date, payload or {}, request.env.user) except ValidationError as exc: return {'success': False, 'message': str(exc.args[0] if exc.args else exc)} return {'success': True, 'data': self._load_week_data(week_start)} @http.route('/fusion_clock/shift_planner/export_xlsx', type='jsonrpc', auth='user', methods=['POST']) def export_xlsx(self, week_start=None, **kw): if not self._check_manager(): return {'error': 'Access denied.'} data = self._load_week_data(week_start) output = io.BytesIO() import xlsxwriter workbook = xlsxwriter.Workbook(output, {'in_memory': True}) sheet = workbook.add_worksheet('Shift Planner') fmt_day = workbook.add_format({'bold': True, 'align': 'center', 'bg_color': '#b7dff5', 'border': 1}) fmt_sub = workbook.add_format({'bold': True, 'align': 'center', 'bg_color': '#d8e9bd', 'border': 1}) fmt_employee = workbook.add_format({'bold': True, 'border': 1}) fmt_shift = workbook.add_format({'border': 1}) fmt_hours = workbook.add_format({'border': 1, 'align': 'center', 'bg_color': '#f5d39b'}) fmt_department = workbook.add_format({'bold': True, 'bg_color': '#eeeeee', 'border': 1}) sheet.set_column(0, 0, 22) for col in range(1, 15, 2): sheet.set_column(col, col, 24) sheet.set_column(col + 1, col + 1, 9) sheet.write(0, 0, 'EMPLOYEE', fmt_day) col = 1 for day in data['days']: sheet.merge_range(0, col, 0, col + 1, day['weekday'], fmt_day) sheet.merge_range(1, col, 1, col + 1, day['label'], fmt_day) sheet.write(2, col, 'Shift', fmt_sub) sheet.write(2, col + 1, 'Hours', fmt_sub) col += 2 sheet.write(2, 0, 'EMPLOYEE', fmt_sub) row = 3 employee_by_id = {emp['id']: emp for emp in data['employees']} for department in data['departments']: sheet.merge_range(row, 0, row, 14, department['name'], fmt_department) row += 1 for employee_id in department['employee_ids']: employee = employee_by_id[employee_id] sheet.write(row, 0, employee['name'], fmt_employee) col = 1 for day in data['days']: cell = employee['cells'][day['date']] sheet.write(row, col, cell.get('label') or '', fmt_shift) sheet.write(row, col + 1, cell.get('hours_display') or '0:00', fmt_hours) col += 2 row += 1 workbook.close() output.seek(0) filename = 'shift_planner_%s.xlsx' % data['week_start'] attachment = request.env['ir.attachment'].sudo().create({ 'name': filename, 'type': 'binary', 'datas': base64.b64encode(output.read()), 'mimetype': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', }) return { 'success': True, 'attachment_id': attachment.id, 'filename': filename, 'url': '/web/content/%s?download=true' % attachment.id, }