Files
Odoo-Modules/fusion_clock/models/clock_report.py
gsinghpal e56974d46f update
2026-03-16 08:14:56 -04:00

587 lines
23 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
import base64
import logging
import pytz
from datetime import timedelta
from odoo import models, fields, api, _
from odoo.exceptions import UserError
from .hr_attendance import _fclk_email_wrap, _fclk_utc_to_local_str
from .tz_utils import get_local_today, get_local_day_boundaries, _resolve_tz
_logger = logging.getLogger(__name__)
class FusionClockReport(models.Model):
_name = 'fusion.clock.report'
_description = 'Clock Period Report'
_order = 'date_end desc, employee_id'
_rec_name = 'name'
name = fields.Char(
string='Report Name',
compute='_compute_name',
store=True,
)
date_start = fields.Date(string='Period Start', required=True, index=True)
date_end = fields.Date(string='Period End', required=True, index=True)
schedule_type = fields.Selection(
[
('weekly', 'Weekly'),
('biweekly', 'Bi-Weekly'),
('semi_monthly', 'Semi-Monthly'),
('monthly', 'Monthly'),
],
string='Schedule Type',
)
state = fields.Selection(
[
('draft', 'Draft'),
('generated', 'Generated'),
('sent', 'Sent'),
],
string='Status',
default='draft',
index=True,
)
employee_id = fields.Many2one(
'hr.employee',
string='Employee',
index=True,
help="Empty = batch report for all employees.",
)
is_batch = fields.Boolean(
string='Batch Report',
compute='_compute_is_batch',
store=True,
)
company_id = fields.Many2one(
'res.company',
string='Company',
default=lambda self: self.env.company,
required=True,
)
# Computed totals
total_hours = fields.Float(string='Total Worked Hours', compute='_compute_totals', store=True)
net_hours = fields.Float(string='Net Hours', compute='_compute_totals', store=True)
total_breaks = fields.Float(string='Total Breaks (min)', compute='_compute_totals', store=True)
total_penalties = fields.Integer(string='Penalty Count', compute='_compute_totals', store=True)
days_worked = fields.Integer(string='Days Worked', compute='_compute_totals', store=True)
leave_days = fields.Integer(string='Leave Days', compute='_compute_totals', store=True)
attendance_ids = fields.Many2many(
'hr.attendance',
'fusion_clock_report_attendance_rel',
'report_id',
'attendance_id',
string='Attendance Records',
)
leave_request_ids = fields.Many2many(
'fusion.clock.leave.request',
'fusion_clock_report_leave_rel',
'report_id',
'leave_request_id',
string='Leave Requests',
)
# PDF
report_pdf = fields.Binary(string='Report PDF', attachment=True)
report_pdf_filename = fields.Char(string='PDF Filename')
@api.depends('employee_id', 'date_start', 'date_end')
def _compute_name(self):
for rec in self:
if rec.employee_id:
rec.name = f"{rec.employee_id.name} - {rec.date_start} to {rec.date_end}"
else:
rec.name = f"Batch Report - {rec.date_start} to {rec.date_end}"
@api.depends('employee_id')
def _compute_is_batch(self):
for rec in self:
rec.is_batch = not bool(rec.employee_id)
@api.depends('attendance_ids', 'attendance_ids.worked_hours',
'attendance_ids.x_fclk_net_hours', 'attendance_ids.x_fclk_break_minutes',
'leave_request_ids')
def _compute_totals(self):
for rec in self:
atts = rec.attendance_ids
rec.total_hours = sum(a.worked_hours or 0.0 for a in atts)
rec.net_hours = sum(a.x_fclk_net_hours or 0.0 for a in atts)
rec.total_breaks = sum(a.x_fclk_break_minutes or 0.0 for a in atts)
rec.total_penalties = self.env['fusion.clock.penalty'].search_count([
('employee_id', '=', rec.employee_id.id),
('date', '>=', rec.date_start),
('date', '<=', rec.date_end),
]) if rec.employee_id else 0
tz = _resolve_tz(rec.env, rec.employee_id)
dates = set()
for a in atts:
if a.check_in:
local_dt = pytz.UTC.localize(a.check_in).astimezone(tz)
dates.add(local_dt.date())
rec.days_worked = len(dates)
rec.leave_days = len(rec.leave_request_ids)
def action_generate_report(self):
"""Generate the PDF report for this record."""
self.ensure_one()
self._collect_attendance_records()
self._generate_pdf()
self.state = 'generated'
def action_reset_draft(self):
"""Reset the report back to draft so the user can make changes."""
for rec in self:
rec.state = 'draft'
def action_send_report(self):
"""Send the report via email."""
self.ensure_one()
if self.state != 'generated':
raise UserError(_("Please generate the report first."))
self._send_report_email()
self.state = 'sent'
def _collect_attendance_records(self):
"""Link attendance and leave records for the period and employee."""
self.ensure_one()
employee = self.employee_id or None
start_utc, _ = get_local_day_boundaries(self.env, self.date_start, employee)
_, end_utc = get_local_day_boundaries(self.env, self.date_end, employee)
domain = [
('check_in', '>=', start_utc),
('check_in', '<', end_utc),
('check_out', '!=', False),
]
if self.employee_id:
domain.append(('employee_id', '=', self.employee_id.id))
else:
domain.append(('employee_id.company_id', '=', self.company_id.id))
attendances = self.env['hr.attendance'].search(domain)
self.attendance_ids = [(6, 0, attendances.ids)]
leave_domain = [
('leave_date', '>=', self.date_start),
('leave_date', '<=', self.date_end),
]
if self.employee_id:
leave_domain.append(('employee_id', '=', self.employee_id.id))
else:
leave_domain.append(('company_id', '=', self.company_id.id))
leaves = self.env['fusion.clock.leave.request'].search(leave_domain)
self.leave_request_ids = [(6, 0, leaves.ids)]
def _generate_pdf(self):
"""Render the QWeb report to PDF and store it."""
self.ensure_one()
if self.employee_id:
report_ref = 'fusion_clock.action_report_clock_employee'
else:
report_ref = 'fusion_clock.action_report_clock_batch'
report = self.env['ir.actions.report']._get_report_from_name(report_ref)
if not report:
# Fallback to employee report
report_ref = 'fusion_clock.action_report_clock_employee'
pdf_content, _ = self.env['ir.actions.report']._render_qweb_pdf(
report_ref, [self.id]
)
filename = f"clock_report_{self.date_start}_{self.date_end}"
if self.employee_id:
filename += f"_{self.employee_id.name.replace(' ', '_')}"
filename += ".pdf"
self.write({
'report_pdf': base64.b64encode(pdf_content),
'report_pdf_filename': filename,
})
def _send_report_email(self):
"""Send the report with the PDF attached."""
self.ensure_one()
company = self.company_id or self.env.company
company_email = company.email or ''
company_name = company.name or ''
ds_fmt = self.date_start.strftime('%b %d, %Y') if self.date_start else ''
de_fmt = self.date_end.strftime('%b %d, %Y') if self.date_end else ''
if self.employee_id:
email_to = self.employee_id.work_email or ''
subject = f"Your Attendance Report - {ds_fmt} to {de_fmt}"
body = _fclk_email_wrap(
company_name=company_name,
title='Attendance Report',
summary=(
f'Hello <strong>{self.employee_id.name}</strong>, your attendance '
f'report for <strong>{ds_fmt}</strong> to '
f'<strong>{de_fmt}</strong> is ready.'
),
sections=[('Report Summary', [
('Pay Period', f'{ds_fmt} &mdash; {de_fmt}'),
('Days Worked', str(self.days_worked)),
('Leave Days', str(self.leave_days)),
('Total Hours', f'{self.total_hours:.1f}h'),
('Net Hours', f'{self.net_hours:.1f}h'),
('Total Breaks', f'{self.total_breaks:.0f} min'),
('Penalties', str(self.total_penalties)),
])],
note='The full PDF report is attached. You can also download '
'it from <a href="/my/clock" style="color:#10B981;">'
'your portal</a> at any time.',
attachments_note='Attendance Report (PDF)',
)
else:
ICP = self.env['ir.config_parameter'].sudo()
email_to = ICP.get_param('fusion_clock.report_recipient_emails', '')
user_ids_str = ICP.get_param('fusion_clock.report_recipient_user_ids', '')
if user_ids_str:
try:
user_ids = [int(x) for x in user_ids_str.split(',') if x.strip()]
users = self.env['res.users'].sudo().browse(user_ids).filtered('email')
user_emails = ','.join(u.email for u in users if u.email)
if email_to and user_emails:
email_to = f"{email_to},{user_emails}"
elif user_emails:
email_to = user_emails
except (ValueError, TypeError):
pass
subject = f"Employee Attendance Batch Report - {ds_fmt} to {de_fmt}"
body = _fclk_email_wrap(
company_name=company_name,
title='Batch Attendance Report',
summary=(
f'The attendance batch report for <strong>{ds_fmt}</strong> to '
f'<strong>{de_fmt}</strong> is attached.'
),
note='This report includes all employees\' attendance summaries '
'with daily breakdowns, total hours, and penalty information.',
attachments_note='Batch Attendance Report (PDF)',
)
if not email_to:
_logger.warning("Fusion Clock: No email recipient for report %s", self.id)
return
mail_vals = {
'subject': subject,
'email_from': company_email,
'email_to': email_to,
'body_html': body,
'auto_delete': True,
}
if self.report_pdf:
mail_vals['attachment_ids'] = [(0, 0, {
'name': self.report_pdf_filename or 'report.pdf',
'datas': self.report_pdf,
'mimetype': 'application/pdf',
})]
try:
self.env['mail.mail'].sudo().create(mail_vals).send()
except Exception as e:
_logger.error("Fusion Clock: Failed to send report email: %s", e)
def action_export_csv(self):
"""Export the report data as a CSV file for payroll."""
import csv
import io
self.ensure_one()
if not self.attendance_ids:
self._collect_attendance_records()
ICP = self.env['ir.config_parameter'].sudo()
mapping_raw = ICP.get_param('fusion_clock.csv_column_mapping', '')
import json as json_mod
try:
col_map = json_mod.loads(mapping_raw) if mapping_raw else {}
except Exception:
col_map = {}
default_cols = {
'employee': 'Employee',
'date': 'Date',
'clock_in': 'Clock In',
'clock_out': 'Clock Out',
'worked_hours': 'Worked Hours',
'net_hours': 'Net Hours',
'break_min': 'Break (min)',
'overtime': 'Overtime (h)',
'penalties': 'Penalties',
'location': 'Location',
}
for k in default_cols:
if k in col_map:
default_cols[k] = col_map[k]
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(list(default_cols.values()))
for att in self.attendance_ids.sorted(key=lambda a: a.check_in):
date_str = att.check_in.strftime('%Y-%m-%d') if att.check_in else ''
in_str = _fclk_utc_to_local_str(att.check_in, att.employee_id, '%H:%M') if att.check_in else ''
out_str = _fclk_utc_to_local_str(att.check_out, att.employee_id, '%H:%M') if att.check_out else ''
penalties = self.env['fusion.clock.penalty'].search_count([
('attendance_id', '=', att.id),
])
writer.writerow([
att.employee_id.name or '',
date_str,
in_str,
out_str,
round(att.worked_hours or 0, 2),
round(att.x_fclk_net_hours or 0, 2),
round(att.x_fclk_break_minutes or 0, 0),
round(att.x_fclk_overtime_hours or 0, 2),
penalties,
att.x_fclk_location_id.name or '',
])
for lv in self.leave_request_ids.sorted(key=lambda l: l.leave_date):
writer.writerow([
lv.employee_id.name or '',
str(lv.leave_date),
'LEAVE',
'LEAVE',
0,
0,
0,
0,
0,
lv.reason or '',
])
csv_data = output.getvalue().encode('utf-8')
output.close()
filename = f"clock_export_{self.date_start}_{self.date_end}"
if self.employee_id:
filename += f"_{self.employee_id.name.replace(' ', '_')}"
filename += ".csv"
attachment = self.env['ir.attachment'].create({
'name': filename,
'type': 'binary',
'datas': base64.b64encode(csv_data),
'mimetype': 'text/csv',
})
return {
'type': 'ir.actions.act_url',
'url': f'/web/content/{attachment.id}/{filename}?download=true',
'target': 'new',
}
@api.model
def _cron_generate_period_reports(self):
"""Cron: Generate reports when a pay period ends."""
ICP = self.env['ir.config_parameter'].sudo()
auto_generate = ICP.get_param('fusion_clock.auto_generate_reports', 'True')
if auto_generate != 'True':
return
schedule_type = ICP.get_param('fusion_clock.pay_period_type', 'biweekly')
period_start_str = ICP.get_param('fusion_clock.pay_period_start', '')
today = get_local_today(self.env)
yesterday = today - timedelta(days=1)
period_start, period_end = self._calculate_current_period(schedule_type, period_start_str, yesterday)
if period_end != yesterday:
return
_logger.info("Fusion Clock: Generating reports for period %s to %s", period_start, period_end)
companies = self.env['res.company'].search([])
for company in companies:
employees = self.env['hr.employee'].search([
('company_id', '=', company.id),
('x_fclk_enable_clock', '=', True),
])
# Generate individual reports
for employee in employees:
existing = self.search([
('employee_id', '=', employee.id),
('date_start', '=', period_start),
('date_end', '=', period_end),
], limit=1)
if not existing:
report = self.create({
'date_start': period_start,
'date_end': period_end,
'schedule_type': schedule_type,
'employee_id': employee.id,
'company_id': company.id,
})
try:
report.action_generate_report()
# Auto-send if configured
send_employee = ICP.get_param('fusion_clock.send_employee_reports', 'True')
if send_employee == 'True':
report.action_send_report()
except Exception as e:
_logger.error("Fusion Clock: Error generating report for %s: %s", employee.name, e)
# Generate batch report
existing_batch = self.search([
('employee_id', '=', False),
('date_start', '=', period_start),
('date_end', '=', period_end),
('company_id', '=', company.id),
], limit=1)
if not existing_batch:
batch = self.create({
'date_start': period_start,
'date_end': period_end,
'schedule_type': schedule_type,
'employee_id': False,
'company_id': company.id,
})
try:
batch.action_generate_report()
batch.action_send_report()
except Exception as e:
_logger.error("Fusion Clock: Error generating batch report: %s", e)
@api.model
def _calculate_current_period(self, schedule_type, period_start_str, reference_date):
"""Calculate the period start/end dates based on schedule type."""
from dateutil.relativedelta import relativedelta
import datetime
if period_start_str:
try:
anchor = fields.Date.from_string(period_start_str)
except Exception:
anchor = reference_date.replace(day=1)
else:
anchor = reference_date.replace(day=1)
if schedule_type == 'weekly':
days_diff = (reference_date - anchor).days
period_num = days_diff // 7
period_start = anchor + timedelta(days=period_num * 7)
period_end = period_start + timedelta(days=6)
elif schedule_type == 'biweekly':
days_diff = (reference_date - anchor).days
period_num = days_diff // 14
period_start = anchor + timedelta(days=period_num * 14)
period_end = period_start + timedelta(days=13)
elif schedule_type == 'semi_monthly':
if reference_date.day <= 15:
period_start = reference_date.replace(day=1)
period_end = reference_date.replace(day=15)
else:
period_start = reference_date.replace(day=16)
# Last day of month
next_month = reference_date.replace(day=28) + timedelta(days=4)
period_end = next_month - timedelta(days=next_month.day)
elif schedule_type == 'monthly':
period_start = reference_date.replace(day=1)
next_month = reference_date.replace(day=28) + timedelta(days=4)
period_end = next_month - timedelta(days=next_month.day)
else:
# Default biweekly
days_diff = (reference_date - anchor).days
period_num = days_diff // 14
period_start = anchor + timedelta(days=period_num * 14)
period_end = period_start + timedelta(days=13)
return period_start, period_end
@api.model
def action_generate_historical_reports(self):
"""Generate reports for all past pay periods from historical attendance data.
Iterates per-employee so each starts from their own earliest
attendance record rather than the global earliest. Includes ALL
employees who have completed attendance records, not only those
with ``x_fclk_enable_clock``.
"""
ICP = self.env['ir.config_parameter'].sudo()
schedule_type = ICP.get_param('fusion_clock.pay_period_type', 'biweekly')
period_start_str = ICP.get_param('fusion_clock.pay_period_start', '')
today = get_local_today(self.env)
created_count = 0
emp_groups = self.env['hr.attendance'].sudo().read_group(
[('check_out', '!=', False)],
['employee_id'],
['employee_id'],
)
for emp_data in emp_groups:
emp_id = emp_data['employee_id'][0]
employee = self.env['hr.employee'].sudo().browse(emp_id)
if not employee.exists():
continue
earliest = self.env['hr.attendance'].sudo().search(
[('employee_id', '=', emp_id), ('check_out', '!=', False)],
order='check_in asc',
limit=1,
)
if not earliest:
continue
current = earliest.check_in.date()
while current <= today:
period_start, period_end = self._calculate_current_period(
schedule_type, period_start_str, current,
)
if period_end > today:
break
existing = self.sudo().search([
('employee_id', '=', emp_id),
('date_start', '=', period_start),
('date_end', '=', period_end),
], limit=1)
if not existing:
att_count = self.env['hr.attendance'].sudo().search_count([
('employee_id', '=', emp_id),
('check_in', '>=', fields.Datetime.to_datetime(period_start)),
('check_in', '<', fields.Datetime.to_datetime(
period_end + timedelta(days=1),
)),
('check_out', '!=', False),
])
if att_count > 0:
report = self.sudo().create({
'date_start': period_start,
'date_end': period_end,
'schedule_type': schedule_type,
'employee_id': emp_id,
'company_id': employee.company_id.id,
})
report._collect_attendance_records()
created_count += 1
current = period_end + timedelta(days=1)
_logger.info("Fusion Clock: Generated %d historical reports", created_count)
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'title': 'Historical Reports',
'message': f'{created_count} reports generated from historical data.',
'type': 'success',
'sticky': False,
},
}