270 lines
11 KiB
Python
270 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2026 Nexa Systems Inc.
|
|
# License OPL-1 (Odoo Proprietary License v1.0)
|
|
|
|
import base64
|
|
import io
|
|
from collections import defaultdict
|
|
from datetime import timedelta
|
|
|
|
from odoo import fields, http, _
|
|
from odoo.exceptions import ValidationError
|
|
from odoo.http import request
|
|
|
|
|
|
class FusionClockShiftPlanner(http.Controller):
|
|
"""Backend JSON-RPC API for the Excel-style weekly shift planner."""
|
|
|
|
def _check_manager(self):
|
|
return request.env.user.has_group('fusion_clock.group_fusion_clock_manager')
|
|
|
|
def _week_start(self, week_start=None):
|
|
date_obj = fields.Date.to_date(week_start) if week_start else fields.Date.today()
|
|
return date_obj - timedelta(days=date_obj.weekday())
|
|
|
|
def _manager_employees(self):
|
|
return request.env['hr.employee'].sudo().search([
|
|
('x_fclk_enable_clock', '=', True),
|
|
('company_id', 'in', request.env.user.company_ids.ids),
|
|
], order='department_id, name')
|
|
|
|
def _load_week_data(self, week_start=None):
|
|
start = self._week_start(week_start)
|
|
days = [start + timedelta(days=i) for i in range(7)]
|
|
employees = self._manager_employees()
|
|
Schedule = request.env['fusion.clock.schedule'].sudo()
|
|
|
|
schedules = Schedule.search([
|
|
('employee_id', 'in', employees.ids),
|
|
('schedule_date', '>=', start),
|
|
('schedule_date', '<=', days[-1]),
|
|
])
|
|
schedule_map = {
|
|
(schedule.employee_id.id, schedule.schedule_date): schedule
|
|
for schedule in schedules
|
|
}
|
|
|
|
grouped = defaultdict(list)
|
|
for employee in employees:
|
|
grouped[employee.department_id.id or 0].append(employee)
|
|
|
|
departments = []
|
|
employee_rows = []
|
|
for department_id, department_employees in grouped.items():
|
|
department = department_employees[0].department_id
|
|
departments.append({
|
|
'id': department_id,
|
|
'name': department.name if department else _('No Department'),
|
|
'employee_ids': [emp.id for emp in department_employees],
|
|
})
|
|
for employee in department_employees:
|
|
cells = {}
|
|
for day in days:
|
|
cells[str(day)] = Schedule.fclk_cell_payload(
|
|
employee,
|
|
day,
|
|
schedule_map.get((employee.id, day)),
|
|
)
|
|
employee_rows.append({
|
|
'id': employee.id,
|
|
'name': employee.name,
|
|
'department_id': department_id,
|
|
'department_name': department.name if department else _('No Department'),
|
|
'job_title': employee.job_title or '',
|
|
'cells': cells,
|
|
})
|
|
|
|
shifts = request.env['fusion.clock.shift'].sudo().search([
|
|
('active', '=', True),
|
|
('company_id', 'in', request.env.user.company_ids.ids),
|
|
], order='sequence, name')
|
|
|
|
return {
|
|
'week_start': str(start),
|
|
'week_end': str(days[-1]),
|
|
'days': [{
|
|
'date': str(day),
|
|
'weekday': day.strftime('%a').upper(),
|
|
'label': day.strftime('%d-%b'),
|
|
} for day in days],
|
|
'departments': departments,
|
|
'employees': employee_rows,
|
|
'shifts': [{
|
|
'id': shift.id,
|
|
'name': shift.name,
|
|
'start_time': shift.start_time,
|
|
'end_time': shift.end_time,
|
|
'break_minutes': shift.break_minutes,
|
|
'hours': shift.scheduled_hours,
|
|
'hours_display': Schedule.fclk_hours_display(shift.scheduled_hours),
|
|
'label': '%s - %s' % (
|
|
Schedule.fclk_float_to_display(shift.start_time),
|
|
Schedule.fclk_float_to_display(shift.end_time),
|
|
),
|
|
'option_label': '%s (%s - %s)' % (
|
|
shift.name,
|
|
Schedule.fclk_float_to_display(shift.start_time),
|
|
Schedule.fclk_float_to_display(shift.end_time),
|
|
),
|
|
} for shift in shifts],
|
|
}
|
|
|
|
@http.route('/fusion_clock/shift_planner/load', type='jsonrpc', auth='user', methods=['POST'])
|
|
def load(self, week_start=None, **kw):
|
|
if not self._check_manager():
|
|
return {'error': 'Access denied.'}
|
|
return self._load_week_data(week_start)
|
|
|
|
@http.route('/fusion_clock/shift_planner/save', type='jsonrpc', auth='user', methods=['POST'])
|
|
def save(self, week_start=None, changes=None, **kw):
|
|
if not self._check_manager():
|
|
return {'error': 'Access denied.'}
|
|
|
|
employees = self._manager_employees()
|
|
employee_map = {employee.id: employee for employee in employees}
|
|
Schedule = request.env['fusion.clock.schedule'].sudo()
|
|
errors = []
|
|
saved = 0
|
|
|
|
for change in changes or []:
|
|
employee_id = int(change.get('employee_id') or 0)
|
|
employee = employee_map.get(employee_id)
|
|
date_str = change.get('date')
|
|
if not employee:
|
|
errors.append({
|
|
'employee_id': employee_id,
|
|
'date': date_str,
|
|
'message': 'Employee not found or not allowed.',
|
|
})
|
|
continue
|
|
try:
|
|
Schedule.fclk_apply_planner_cell(employee, date_str, change, request.env.user)
|
|
saved += 1
|
|
except ValidationError as exc:
|
|
errors.append({
|
|
'employee_id': employee_id,
|
|
'date': date_str,
|
|
'message': str(exc.args[0] if exc.args else exc),
|
|
})
|
|
|
|
if errors:
|
|
return {'success': False, 'saved': saved, 'errors': errors}
|
|
return {
|
|
'success': True,
|
|
'saved': saved,
|
|
'data': self._load_week_data(week_start),
|
|
}
|
|
|
|
@http.route('/fusion_clock/shift_planner/copy_previous_week', type='jsonrpc', auth='user', methods=['POST'])
|
|
def copy_previous_week(self, week_start=None, **kw):
|
|
if not self._check_manager():
|
|
return {'error': 'Access denied.'}
|
|
|
|
start = self._week_start(week_start)
|
|
prev_start = start - timedelta(days=7)
|
|
employees = self._manager_employees()
|
|
Schedule = request.env['fusion.clock.schedule'].sudo()
|
|
prev_schedules = Schedule.search([
|
|
('employee_id', 'in', employees.ids),
|
|
('schedule_date', '>=', prev_start),
|
|
('schedule_date', '<=', prev_start + timedelta(days=6)),
|
|
])
|
|
prev_map = {
|
|
(schedule.employee_id.id, schedule.schedule_date): schedule
|
|
for schedule in prev_schedules
|
|
}
|
|
|
|
before_count = request.env['fusion.clock.schedule.audit'].sudo().search_count([])
|
|
for employee in employees:
|
|
for offset in range(7):
|
|
source_date = prev_start + timedelta(days=offset)
|
|
target_date = start + timedelta(days=offset)
|
|
source = prev_map.get((employee.id, source_date))
|
|
if not source:
|
|
payload = {'input': ''}
|
|
elif source.is_off:
|
|
payload = {'input': 'OFF'}
|
|
elif source.shift_id:
|
|
payload = {'shift_id': source.shift_id.id, 'input': source.fclk_display_value()}
|
|
else:
|
|
payload = {
|
|
'input': source.fclk_display_value(),
|
|
'start_time': source.start_time,
|
|
'end_time': source.end_time,
|
|
'break_minutes': source.break_minutes,
|
|
}
|
|
Schedule.fclk_apply_planner_cell(employee, target_date, payload, request.env.user)
|
|
|
|
after_count = request.env['fusion.clock.schedule.audit'].sudo().search_count([])
|
|
return {
|
|
'success': True,
|
|
'changed': after_count - before_count,
|
|
'data': self._load_week_data(start),
|
|
}
|
|
|
|
@http.route('/fusion_clock/shift_planner/export_xlsx', type='jsonrpc', auth='user', methods=['POST'])
|
|
def export_xlsx(self, week_start=None, **kw):
|
|
if not self._check_manager():
|
|
return {'error': 'Access denied.'}
|
|
|
|
data = self._load_week_data(week_start)
|
|
output = io.BytesIO()
|
|
import xlsxwriter
|
|
|
|
workbook = xlsxwriter.Workbook(output, {'in_memory': True})
|
|
sheet = workbook.add_worksheet('Shift Planner')
|
|
|
|
fmt_day = workbook.add_format({'bold': True, 'align': 'center', 'bg_color': '#b7dff5', 'border': 1})
|
|
fmt_sub = workbook.add_format({'bold': True, 'align': 'center', 'bg_color': '#d8e9bd', 'border': 1})
|
|
fmt_employee = workbook.add_format({'bold': True, 'border': 1})
|
|
fmt_shift = workbook.add_format({'border': 1})
|
|
fmt_hours = workbook.add_format({'border': 1, 'align': 'center', 'bg_color': '#f5d39b'})
|
|
fmt_department = workbook.add_format({'bold': True, 'bg_color': '#eeeeee', 'border': 1})
|
|
|
|
sheet.set_column(0, 0, 22)
|
|
for col in range(1, 15, 2):
|
|
sheet.set_column(col, col, 24)
|
|
sheet.set_column(col + 1, col + 1, 9)
|
|
|
|
sheet.write(0, 0, 'EMPLOYEE', fmt_day)
|
|
col = 1
|
|
for day in data['days']:
|
|
sheet.merge_range(0, col, 0, col + 1, day['weekday'], fmt_day)
|
|
sheet.merge_range(1, col, 1, col + 1, day['label'], fmt_day)
|
|
sheet.write(2, col, 'Shift', fmt_sub)
|
|
sheet.write(2, col + 1, 'Hours', fmt_sub)
|
|
col += 2
|
|
sheet.write(2, 0, 'EMPLOYEE', fmt_sub)
|
|
|
|
row = 3
|
|
employee_by_id = {emp['id']: emp for emp in data['employees']}
|
|
for department in data['departments']:
|
|
sheet.merge_range(row, 0, row, 14, department['name'], fmt_department)
|
|
row += 1
|
|
for employee_id in department['employee_ids']:
|
|
employee = employee_by_id[employee_id]
|
|
sheet.write(row, 0, employee['name'], fmt_employee)
|
|
col = 1
|
|
for day in data['days']:
|
|
cell = employee['cells'][day['date']]
|
|
sheet.write(row, col, cell.get('label') or '', fmt_shift)
|
|
sheet.write(row, col + 1, cell.get('hours_display') or '0:00', fmt_hours)
|
|
col += 2
|
|
row += 1
|
|
|
|
workbook.close()
|
|
output.seek(0)
|
|
filename = 'shift_planner_%s.xlsx' % data['week_start']
|
|
attachment = request.env['ir.attachment'].sudo().create({
|
|
'name': filename,
|
|
'type': 'binary',
|
|
'datas': base64.b64encode(output.read()),
|
|
'mimetype': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
|
})
|
|
return {
|
|
'success': True,
|
|
'attachment_id': attachment.id,
|
|
'filename': filename,
|
|
'url': '/web/content/%s?download=true' % attachment.id,
|
|
}
|