# -*- coding: utf-8 -*- # Copyright 2026 Nexa Systems Inc. # License OPL-1 (Odoo Proprietary License v1.0) import base64 import logging from datetime import timedelta from odoo import models, fields, api, _ from odoo.exceptions import UserError _logger = logging.getLogger(__name__) class FusionClockReport(models.Model): _name = 'fusion.clock.report' _description = 'Clock Period Report' _order = 'date_end desc, employee_id' _rec_name = 'name' name = fields.Char( string='Report Name', compute='_compute_name', store=True, ) date_start = fields.Date(string='Period Start', required=True, index=True) date_end = fields.Date(string='Period End', required=True, index=True) schedule_type = fields.Selection( [ ('weekly', 'Weekly'), ('biweekly', 'Bi-Weekly'), ('semi_monthly', 'Semi-Monthly'), ('monthly', 'Monthly'), ], string='Schedule Type', ) state = fields.Selection( [ ('draft', 'Draft'), ('generated', 'Generated'), ('sent', 'Sent'), ], string='Status', default='draft', index=True, ) employee_id = fields.Many2one( 'hr.employee', string='Employee', index=True, help="Empty = batch report for all employees.", ) is_batch = fields.Boolean( string='Batch Report', compute='_compute_is_batch', store=True, ) company_id = fields.Many2one( 'res.company', string='Company', default=lambda self: self.env.company, required=True, ) # Computed totals total_hours = fields.Float(string='Total Worked Hours', compute='_compute_totals', store=True) net_hours = fields.Float(string='Net Hours', compute='_compute_totals', store=True) total_breaks = fields.Float(string='Total Breaks (min)', compute='_compute_totals', store=True) total_penalties = fields.Integer(string='Penalty Count', compute='_compute_totals', store=True) days_worked = fields.Integer(string='Days Worked', compute='_compute_totals', store=True) attendance_ids = fields.Many2many( 'hr.attendance', 'fusion_clock_report_attendance_rel', 'report_id', 'attendance_id', string='Attendance Records', ) # PDF report_pdf = fields.Binary(string='Report PDF', attachment=True) report_pdf_filename = fields.Char(string='PDF Filename') @api.depends('employee_id', 'date_start', 'date_end') def _compute_name(self): for rec in self: if rec.employee_id: rec.name = f"{rec.employee_id.name} - {rec.date_start} to {rec.date_end}" else: rec.name = f"Batch Report - {rec.date_start} to {rec.date_end}" @api.depends('employee_id') def _compute_is_batch(self): for rec in self: rec.is_batch = not bool(rec.employee_id) @api.depends('attendance_ids', 'attendance_ids.worked_hours', 'attendance_ids.x_fclk_net_hours', 'attendance_ids.x_fclk_break_minutes') def _compute_totals(self): for rec in self: atts = rec.attendance_ids rec.total_hours = sum(a.worked_hours or 0.0 for a in atts) rec.net_hours = sum(a.x_fclk_net_hours or 0.0 for a in atts) rec.total_breaks = sum(a.x_fclk_break_minutes or 0.0 for a in atts) rec.total_penalties = self.env['fusion.clock.penalty'].search_count([ ('employee_id', '=', rec.employee_id.id), ('date', '>=', rec.date_start), ('date', '<=', rec.date_end), ]) if rec.employee_id else 0 # Count unique dates dates = set() for a in atts: if a.check_in: dates.add(a.check_in.date()) rec.days_worked = len(dates) def action_generate_report(self): """Generate the PDF report for this record.""" self.ensure_one() self._collect_attendance_records() self._generate_pdf() self.state = 'generated' def action_send_report(self): """Send the report via email.""" self.ensure_one() if self.state != 'generated': raise UserError(_("Please generate the report first.")) self._send_report_email() self.state = 'sent' def _collect_attendance_records(self): """Link attendance records for the period and employee.""" self.ensure_one() domain = [ ('check_in', '>=', fields.Datetime.to_datetime(self.date_start)), ('check_in', '<', fields.Datetime.to_datetime(self.date_end + timedelta(days=1))), ('check_out', '!=', False), ] if self.employee_id: domain.append(('employee_id', '=', self.employee_id.id)) else: domain.append(('employee_id.company_id', '=', self.company_id.id)) attendances = self.env['hr.attendance'].search(domain) self.attendance_ids = [(6, 0, attendances.ids)] def _generate_pdf(self): """Render the QWeb report to PDF and store it.""" self.ensure_one() if self.employee_id: report_ref = 'fusion_clock.action_report_clock_employee' else: report_ref = 'fusion_clock.action_report_clock_batch' report = self.env['ir.actions.report']._get_report_from_name(report_ref) if not report: # Fallback to employee report report_ref = 'fusion_clock.action_report_clock_employee' pdf_content, _ = self.env['ir.actions.report']._render_qweb_pdf( report_ref, [self.id] ) filename = f"clock_report_{self.date_start}_{self.date_end}" if self.employee_id: filename += f"_{self.employee_id.name.replace(' ', '_')}" filename += ".pdf" self.write({ 'report_pdf': base64.b64encode(pdf_content), 'report_pdf_filename': filename, }) def _send_report_email(self): """Send the report with the PDF attached.""" self.ensure_one() company_email = self.company_id.email or '' if self.employee_id: email_to = self.employee_id.work_email or '' subject = f"Your Attendance Report - {self.date_start} to {self.date_end}" body = ( '
' '' '
' '

Fusion Clock

' '

Attendance Report

' '
' f'

Hello {self.employee_id.name},

' f'

Your attendance report for {self.date_start} to ' f'{self.date_end} is ready.

' '' f'' f'' f'' f'' f'' f'' f'' f'' '
Days Worked{self.days_worked}
Total Hours{self.total_hours:.1f}h
Net Hours{self.net_hours:.1f}h
Total Breaks{self.total_breaks:.0f} min
' '

The full PDF report is attached.

' '

This is an automated message from Fusion Clock.

' '
' ) else: ICP = self.env['ir.config_parameter'].sudo() email_to = ICP.get_param('fusion_clock.report_recipient_emails', '') subject = f"Employee Attendance Batch Report - {self.date_start} to {self.date_end}" body = ( '
' '' '
' '

Fusion Clock

' '

Batch Attendance Report

' '
' f'

The attendance batch report for {self.date_start} to ' f'{self.date_end} is attached.

' '

This report includes all employees\' attendance summaries with daily breakdowns, ' 'total hours, and penalty information.

' '

This is an automated message from Fusion Clock.

' '
' ) if not email_to: _logger.warning("Fusion Clock: No email recipient for report %s", self.id) return mail_vals = { 'subject': subject, 'email_from': company_email, 'email_to': email_to, 'body_html': body, 'auto_delete': True, } if self.report_pdf: mail_vals['attachment_ids'] = [(0, 0, { 'name': self.report_pdf_filename or 'report.pdf', 'datas': self.report_pdf, 'mimetype': 'application/pdf', })] try: self.env['mail.mail'].sudo().create(mail_vals).send() except Exception as e: _logger.error("Fusion Clock: Failed to send report email: %s", e) def action_export_csv(self): """Export the report data as a CSV file for payroll.""" import csv import io self.ensure_one() if not self.attendance_ids: self._collect_attendance_records() ICP = self.env['ir.config_parameter'].sudo() mapping_raw = ICP.get_param('fusion_clock.csv_column_mapping', '') import json as json_mod try: col_map = json_mod.loads(mapping_raw) if mapping_raw else {} except Exception: col_map = {} default_cols = { 'employee': 'Employee', 'date': 'Date', 'clock_in': 'Clock In', 'clock_out': 'Clock Out', 'worked_hours': 'Worked Hours', 'net_hours': 'Net Hours', 'break_min': 'Break (min)', 'overtime': 'Overtime (h)', 'penalties': 'Penalties', 'location': 'Location', } for k in default_cols: if k in col_map: default_cols[k] = col_map[k] output = io.StringIO() writer = csv.writer(output) writer.writerow(list(default_cols.values())) for att in self.attendance_ids.sorted(key=lambda a: a.check_in): date_str = att.check_in.strftime('%Y-%m-%d') if att.check_in else '' in_str = att.check_in.strftime('%H:%M') if att.check_in else '' out_str = att.check_out.strftime('%H:%M') if att.check_out else '' penalties = self.env['fusion.clock.penalty'].search_count([ ('attendance_id', '=', att.id), ]) writer.writerow([ att.employee_id.name or '', date_str, in_str, out_str, round(att.worked_hours or 0, 2), round(att.x_fclk_net_hours or 0, 2), round(att.x_fclk_break_minutes or 0, 0), round(att.x_fclk_overtime_hours or 0, 2), penalties, att.x_fclk_location_id.name or '', ]) csv_data = output.getvalue().encode('utf-8') output.close() filename = f"clock_export_{self.date_start}_{self.date_end}" if self.employee_id: filename += f"_{self.employee_id.name.replace(' ', '_')}" filename += ".csv" attachment = self.env['ir.attachment'].create({ 'name': filename, 'type': 'binary', 'datas': base64.b64encode(csv_data), 'mimetype': 'text/csv', }) return { 'type': 'ir.actions.act_url', 'url': f'/web/content/{attachment.id}/{filename}?download=true', 'target': 'new', } @api.model def _cron_generate_period_reports(self): """Cron: Generate reports when a pay period ends.""" ICP = self.env['ir.config_parameter'].sudo() auto_generate = ICP.get_param('fusion_clock.auto_generate_reports', 'True') if auto_generate != 'True': return schedule_type = ICP.get_param('fusion_clock.pay_period_type', 'biweekly') period_start_str = ICP.get_param('fusion_clock.pay_period_start', '') today = fields.Date.today() period_start, period_end = self._calculate_current_period(schedule_type, period_start_str, today) # Only generate if yesterday was the end of a period if period_end != today - timedelta(days=1): return _logger.info("Fusion Clock: Generating reports for period %s to %s", period_start, period_end) companies = self.env['res.company'].search([]) for company in companies: employees = self.env['hr.employee'].search([ ('company_id', '=', company.id), ('x_fclk_enable_clock', '=', True), ]) # Generate individual reports for employee in employees: existing = self.search([ ('employee_id', '=', employee.id), ('date_start', '=', period_start), ('date_end', '=', period_end), ], limit=1) if not existing: report = self.create({ 'date_start': period_start, 'date_end': period_end, 'schedule_type': schedule_type, 'employee_id': employee.id, 'company_id': company.id, }) try: report.action_generate_report() # Auto-send if configured send_employee = ICP.get_param('fusion_clock.send_employee_reports', 'True') if send_employee == 'True': report.action_send_report() except Exception as e: _logger.error("Fusion Clock: Error generating report for %s: %s", employee.name, e) # Generate batch report existing_batch = self.search([ ('employee_id', '=', False), ('date_start', '=', period_start), ('date_end', '=', period_end), ('company_id', '=', company.id), ], limit=1) if not existing_batch: batch = self.create({ 'date_start': period_start, 'date_end': period_end, 'schedule_type': schedule_type, 'employee_id': False, 'company_id': company.id, }) try: batch.action_generate_report() batch.action_send_report() except Exception as e: _logger.error("Fusion Clock: Error generating batch report: %s", e) @api.model def _calculate_current_period(self, schedule_type, period_start_str, reference_date): """Calculate the period start/end dates based on schedule type.""" from dateutil.relativedelta import relativedelta import datetime if period_start_str: try: anchor = fields.Date.from_string(period_start_str) except Exception: anchor = reference_date.replace(day=1) else: anchor = reference_date.replace(day=1) if schedule_type == 'weekly': days_diff = (reference_date - anchor).days period_num = days_diff // 7 period_start = anchor + timedelta(days=period_num * 7) period_end = period_start + timedelta(days=6) elif schedule_type == 'biweekly': days_diff = (reference_date - anchor).days period_num = days_diff // 14 period_start = anchor + timedelta(days=period_num * 14) period_end = period_start + timedelta(days=13) elif schedule_type == 'semi_monthly': if reference_date.day <= 15: period_start = reference_date.replace(day=1) period_end = reference_date.replace(day=15) else: period_start = reference_date.replace(day=16) # Last day of month next_month = reference_date.replace(day=28) + timedelta(days=4) period_end = next_month - timedelta(days=next_month.day) elif schedule_type == 'monthly': period_start = reference_date.replace(day=1) next_month = reference_date.replace(day=28) + timedelta(days=4) period_end = next_month - timedelta(days=next_month.day) else: # Default biweekly days_diff = (reference_date - anchor).days period_num = days_diff // 14 period_start = anchor + timedelta(days=period_num * 14) period_end = period_start + timedelta(days=13) return period_start, period_end