# -*- coding: utf-8 -*- # Copyright 2026 Nexa Systems Inc. # License OPL-1 (Odoo Proprietary License v1.0) import base64 import logging import pytz from datetime import timedelta from odoo import models, fields, api, _ from odoo.exceptions import UserError from .hr_attendance import _fclk_email_wrap, _fclk_utc_to_local_str from .tz_utils import get_local_today, get_local_day_boundaries, _resolve_tz _logger = logging.getLogger(__name__) class FusionClockReport(models.Model): _name = 'fusion.clock.report' _description = 'Clock Period Report' _order = 'date_end desc, employee_id' _rec_name = 'name' name = fields.Char( string='Report Name', compute='_compute_name', store=True, ) date_start = fields.Date(string='Period Start', required=True, index=True) date_end = fields.Date(string='Period End', required=True, index=True) schedule_type = fields.Selection( [ ('weekly', 'Weekly'), ('biweekly', 'Bi-Weekly'), ('semi_monthly', 'Semi-Monthly'), ('monthly', 'Monthly'), ], string='Schedule Type', ) state = fields.Selection( [ ('draft', 'Draft'), ('generated', 'Generated'), ('sent', 'Sent'), ], string='Status', default='draft', index=True, ) employee_id = fields.Many2one( 'hr.employee', string='Employee', index=True, help="Empty = batch report for all employees.", ) is_batch = fields.Boolean( string='Batch Report', compute='_compute_is_batch', store=True, ) company_id = fields.Many2one( 'res.company', string='Company', default=lambda self: self.env.company, required=True, ) # Computed totals total_hours = fields.Float(string='Total Worked Hours', compute='_compute_totals', store=True) net_hours = fields.Float(string='Net Hours', compute='_compute_totals', store=True) total_breaks = fields.Float(string='Total Breaks (min)', compute='_compute_totals', store=True) total_penalties = fields.Integer(string='Penalty Count', compute='_compute_totals', store=True) days_worked = fields.Integer(string='Days Worked', compute='_compute_totals', store=True) leave_days = fields.Integer(string='Leave Days', compute='_compute_totals', store=True) attendance_ids = fields.Many2many( 'hr.attendance', 'fusion_clock_report_attendance_rel', 'report_id', 'attendance_id', string='Attendance Records', ) leave_request_ids = fields.Many2many( 'fusion.clock.leave.request', 'fusion_clock_report_leave_rel', 'report_id', 'leave_request_id', string='Leave Requests', ) # PDF report_pdf = fields.Binary(string='Report PDF', attachment=True) report_pdf_filename = fields.Char(string='PDF Filename') @api.depends('employee_id', 'date_start', 'date_end') def _compute_name(self): for rec in self: if rec.employee_id: rec.name = f"{rec.employee_id.name} - {rec.date_start} to {rec.date_end}" else: rec.name = f"Batch Report - {rec.date_start} to {rec.date_end}" @api.depends('employee_id') def _compute_is_batch(self): for rec in self: rec.is_batch = not bool(rec.employee_id) @api.depends('attendance_ids', 'attendance_ids.worked_hours', 'attendance_ids.x_fclk_net_hours', 'attendance_ids.x_fclk_break_minutes', 'leave_request_ids') def _compute_totals(self): for rec in self: atts = rec.attendance_ids rec.total_hours = sum(a.worked_hours or 0.0 for a in atts) rec.net_hours = sum(a.x_fclk_net_hours or 0.0 for a in atts) rec.total_breaks = sum(a.x_fclk_break_minutes or 0.0 for a in atts) rec.total_penalties = self.env['fusion.clock.penalty'].search_count([ ('employee_id', '=', rec.employee_id.id), ('date', '>=', rec.date_start), ('date', '<=', rec.date_end), ]) if rec.employee_id else 0 tz = _resolve_tz(rec.env, rec.employee_id) dates = set() for a in atts: if a.check_in: local_dt = pytz.UTC.localize(a.check_in).astimezone(tz) dates.add(local_dt.date()) rec.days_worked = len(dates) rec.leave_days = len(rec.leave_request_ids) def action_generate_report(self): """Generate the PDF report for this record.""" self.ensure_one() self._collect_attendance_records() self._generate_pdf() self.state = 'generated' def action_reset_draft(self): """Reset the report back to draft so the user can make changes.""" for rec in self: rec.state = 'draft' def action_send_report(self): """Send the report via email.""" self.ensure_one() if self.state != 'generated': raise UserError(_("Please generate the report first.")) self._send_report_email() self.state = 'sent' def _collect_attendance_records(self): """Link attendance and leave records for the period and employee.""" self.ensure_one() employee = self.employee_id or None start_utc, _ = get_local_day_boundaries(self.env, self.date_start, employee) _, end_utc = get_local_day_boundaries(self.env, self.date_end, employee) domain = [ ('check_in', '>=', start_utc), ('check_in', '<', end_utc), ('check_out', '!=', False), ] if self.employee_id: domain.append(('employee_id', '=', self.employee_id.id)) else: domain.append(('employee_id.company_id', '=', self.company_id.id)) attendances = self.env['hr.attendance'].search(domain) self.attendance_ids = [(6, 0, attendances.ids)] leave_domain = [ ('leave_date', '>=', self.date_start), ('leave_date', '<=', self.date_end), ] if self.employee_id: leave_domain.append(('employee_id', '=', self.employee_id.id)) else: leave_domain.append(('company_id', '=', self.company_id.id)) leaves = self.env['fusion.clock.leave.request'].search(leave_domain) self.leave_request_ids = [(6, 0, leaves.ids)] def _generate_pdf(self): """Render the QWeb report to PDF and store it.""" self.ensure_one() if self.employee_id: report_ref = 'fusion_clock.action_report_clock_employee' else: report_ref = 'fusion_clock.action_report_clock_batch' report = self.env['ir.actions.report']._get_report_from_name(report_ref) if not report: # Fallback to employee report report_ref = 'fusion_clock.action_report_clock_employee' pdf_content, _ = self.env['ir.actions.report']._render_qweb_pdf( report_ref, [self.id] ) filename = f"clock_report_{self.date_start}_{self.date_end}" if self.employee_id: filename += f"_{self.employee_id.name.replace(' ', '_')}" filename += ".pdf" self.write({ 'report_pdf': base64.b64encode(pdf_content), 'report_pdf_filename': filename, }) def _send_report_email(self): """Send the report with the PDF attached.""" self.ensure_one() company = self.company_id or self.env.company company_email = company.email or '' company_name = company.name or '' ds_fmt = self.date_start.strftime('%b %d, %Y') if self.date_start else '' de_fmt = self.date_end.strftime('%b %d, %Y') if self.date_end else '' if self.employee_id: email_to = self.employee_id.work_email or '' subject = f"Your Attendance Report - {ds_fmt} to {de_fmt}" body = _fclk_email_wrap( company_name=company_name, title='Attendance Report', summary=( f'Hello {self.employee_id.name}, your attendance ' f'report for {ds_fmt} to ' f'{de_fmt} is ready.' ), sections=[('Report Summary', [ ('Pay Period', f'{ds_fmt} — {de_fmt}'), ('Days Worked', str(self.days_worked)), ('Leave Days', str(self.leave_days)), ('Total Hours', f'{self.total_hours:.1f}h'), ('Net Hours', f'{self.net_hours:.1f}h'), ('Total Breaks', f'{self.total_breaks:.0f} min'), ('Penalties', str(self.total_penalties)), ])], note='The full PDF report is attached. You can also download ' 'it from ' 'your portal at any time.', attachments_note='Attendance Report (PDF)', ) else: ICP = self.env['ir.config_parameter'].sudo() email_to = ICP.get_param('fusion_clock.report_recipient_emails', '') user_ids_str = ICP.get_param('fusion_clock.report_recipient_user_ids', '') if user_ids_str: try: user_ids = [int(x) for x in user_ids_str.split(',') if x.strip()] users = self.env['res.users'].sudo().browse(user_ids).filtered('email') user_emails = ','.join(u.email for u in users if u.email) if email_to and user_emails: email_to = f"{email_to},{user_emails}" elif user_emails: email_to = user_emails except (ValueError, TypeError): pass subject = f"Employee Attendance Batch Report - {ds_fmt} to {de_fmt}" body = _fclk_email_wrap( company_name=company_name, title='Batch Attendance Report', summary=( f'The attendance batch report for {ds_fmt} to ' f'{de_fmt} is attached.' ), note='This report includes all employees\' attendance summaries ' 'with daily breakdowns, total hours, and penalty information.', attachments_note='Batch Attendance Report (PDF)', ) if not email_to: _logger.warning("Fusion Clock: No email recipient for report %s", self.id) return mail_vals = { 'subject': subject, 'email_from': company_email, 'email_to': email_to, 'body_html': body, 'auto_delete': True, } if self.report_pdf: mail_vals['attachment_ids'] = [(0, 0, { 'name': self.report_pdf_filename or 'report.pdf', 'datas': self.report_pdf, 'mimetype': 'application/pdf', })] try: self.env['mail.mail'].sudo().create(mail_vals).send() except Exception as e: _logger.error("Fusion Clock: Failed to send report email: %s", e) def action_export_csv(self): """Export the report data as a CSV file for payroll.""" import csv import io self.ensure_one() if not self.attendance_ids: self._collect_attendance_records() ICP = self.env['ir.config_parameter'].sudo() mapping_raw = ICP.get_param('fusion_clock.csv_column_mapping', '') import json as json_mod try: col_map = json_mod.loads(mapping_raw) if mapping_raw else {} except Exception: col_map = {} default_cols = { 'employee': 'Employee', 'date': 'Date', 'clock_in': 'Clock In', 'clock_out': 'Clock Out', 'worked_hours': 'Worked Hours', 'net_hours': 'Net Hours', 'break_min': 'Break (min)', 'overtime': 'Overtime (h)', 'penalties': 'Penalties', 'location': 'Location', } for k in default_cols: if k in col_map: default_cols[k] = col_map[k] output = io.StringIO() writer = csv.writer(output) writer.writerow(list(default_cols.values())) for att in self.attendance_ids.sorted(key=lambda a: a.check_in): date_str = att.check_in.strftime('%Y-%m-%d') if att.check_in else '' in_str = _fclk_utc_to_local_str(att.check_in, att.employee_id, '%H:%M') if att.check_in else '' out_str = _fclk_utc_to_local_str(att.check_out, att.employee_id, '%H:%M') if att.check_out else '' penalties = self.env['fusion.clock.penalty'].search_count([ ('attendance_id', '=', att.id), ]) writer.writerow([ att.employee_id.name or '', date_str, in_str, out_str, round(att.worked_hours or 0, 2), round(att.x_fclk_net_hours or 0, 2), round(att.x_fclk_break_minutes or 0, 0), round(att.x_fclk_overtime_hours or 0, 2), penalties, att.x_fclk_location_id.name or '', ]) for lv in self.leave_request_ids.sorted(key=lambda l: l.leave_date): writer.writerow([ lv.employee_id.name or '', str(lv.leave_date), 'LEAVE', 'LEAVE', 0, 0, 0, 0, 0, lv.reason or '', ]) csv_data = output.getvalue().encode('utf-8') output.close() filename = f"clock_export_{self.date_start}_{self.date_end}" if self.employee_id: filename += f"_{self.employee_id.name.replace(' ', '_')}" filename += ".csv" attachment = self.env['ir.attachment'].create({ 'name': filename, 'type': 'binary', 'datas': base64.b64encode(csv_data), 'mimetype': 'text/csv', }) return { 'type': 'ir.actions.act_url', 'url': f'/web/content/{attachment.id}/{filename}?download=true', 'target': 'new', } @api.model def _cron_generate_period_reports(self): """Cron: Generate reports when a pay period ends.""" ICP = self.env['ir.config_parameter'].sudo() auto_generate = ICP.get_param('fusion_clock.auto_generate_reports', 'True') if auto_generate != 'True': return schedule_type = ICP.get_param('fusion_clock.pay_period_type', 'biweekly') period_start_str = ICP.get_param('fusion_clock.pay_period_start', '') today = get_local_today(self.env) yesterday = today - timedelta(days=1) period_start, period_end = self._calculate_current_period(schedule_type, period_start_str, yesterday) if period_end != yesterday: return _logger.info("Fusion Clock: Generating reports for period %s to %s", period_start, period_end) companies = self.env['res.company'].search([]) for company in companies: employees = self.env['hr.employee'].search([ ('company_id', '=', company.id), ('x_fclk_enable_clock', '=', True), ]) # Generate individual reports for employee in employees: existing = self.search([ ('employee_id', '=', employee.id), ('date_start', '=', period_start), ('date_end', '=', period_end), ], limit=1) if not existing: report = self.create({ 'date_start': period_start, 'date_end': period_end, 'schedule_type': schedule_type, 'employee_id': employee.id, 'company_id': company.id, }) try: report.action_generate_report() # Auto-send if configured send_employee = ICP.get_param('fusion_clock.send_employee_reports', 'True') if send_employee == 'True': report.action_send_report() except Exception as e: _logger.error("Fusion Clock: Error generating report for %s: %s", employee.name, e) # Generate batch report existing_batch = self.search([ ('employee_id', '=', False), ('date_start', '=', period_start), ('date_end', '=', period_end), ('company_id', '=', company.id), ], limit=1) if not existing_batch: batch = self.create({ 'date_start': period_start, 'date_end': period_end, 'schedule_type': schedule_type, 'employee_id': False, 'company_id': company.id, }) try: batch.action_generate_report() batch.action_send_report() except Exception as e: _logger.error("Fusion Clock: Error generating batch report: %s", e) @api.model def _calculate_current_period(self, schedule_type, period_start_str, reference_date): """Calculate the period start/end dates based on schedule type.""" from dateutil.relativedelta import relativedelta import datetime if period_start_str: try: anchor = fields.Date.from_string(period_start_str) except Exception: anchor = reference_date.replace(day=1) else: anchor = reference_date.replace(day=1) if schedule_type == 'weekly': days_diff = (reference_date - anchor).days period_num = days_diff // 7 period_start = anchor + timedelta(days=period_num * 7) period_end = period_start + timedelta(days=6) elif schedule_type == 'biweekly': days_diff = (reference_date - anchor).days period_num = days_diff // 14 period_start = anchor + timedelta(days=period_num * 14) period_end = period_start + timedelta(days=13) elif schedule_type == 'semi_monthly': if reference_date.day <= 15: period_start = reference_date.replace(day=1) period_end = reference_date.replace(day=15) else: period_start = reference_date.replace(day=16) # Last day of month next_month = reference_date.replace(day=28) + timedelta(days=4) period_end = next_month - timedelta(days=next_month.day) elif schedule_type == 'monthly': period_start = reference_date.replace(day=1) next_month = reference_date.replace(day=28) + timedelta(days=4) period_end = next_month - timedelta(days=next_month.day) else: # Default biweekly days_diff = (reference_date - anchor).days period_num = days_diff // 14 period_start = anchor + timedelta(days=period_num * 14) period_end = period_start + timedelta(days=13) return period_start, period_end @api.model def action_generate_historical_reports(self): """Generate reports for all past pay periods from historical attendance data. Iterates per-employee so each starts from their own earliest attendance record rather than the global earliest. Includes ALL employees who have completed attendance records, not only those with ``x_fclk_enable_clock``. """ ICP = self.env['ir.config_parameter'].sudo() schedule_type = ICP.get_param('fusion_clock.pay_period_type', 'biweekly') period_start_str = ICP.get_param('fusion_clock.pay_period_start', '') today = get_local_today(self.env) created_count = 0 emp_groups = self.env['hr.attendance'].sudo().read_group( [('check_out', '!=', False)], ['employee_id'], ['employee_id'], ) for emp_data in emp_groups: emp_id = emp_data['employee_id'][0] employee = self.env['hr.employee'].sudo().browse(emp_id) if not employee.exists(): continue earliest = self.env['hr.attendance'].sudo().search( [('employee_id', '=', emp_id), ('check_out', '!=', False)], order='check_in asc', limit=1, ) if not earliest: continue current = earliest.check_in.date() while current <= today: period_start, period_end = self._calculate_current_period( schedule_type, period_start_str, current, ) if period_end > today: break existing = self.sudo().search([ ('employee_id', '=', emp_id), ('date_start', '=', period_start), ('date_end', '=', period_end), ], limit=1) if not existing: att_count = self.env['hr.attendance'].sudo().search_count([ ('employee_id', '=', emp_id), ('check_in', '>=', fields.Datetime.to_datetime(period_start)), ('check_in', '<', fields.Datetime.to_datetime( period_end + timedelta(days=1), )), ('check_out', '!=', False), ]) if att_count > 0: report = self.sudo().create({ 'date_start': period_start, 'date_end': period_end, 'schedule_type': schedule_type, 'employee_id': emp_id, 'company_id': employee.company_id.id, }) report._collect_attendance_records() created_count += 1 current = period_end + timedelta(days=1) _logger.info("Fusion Clock: Generated %d historical reports", created_count) return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': 'Historical Reports', 'message': f'{created_count} reports generated from historical data.', 'type': 'success', 'sticky': False, }, }