Files
gsinghpal e56974d46f update
2026-03-16 08:14:56 -04:00

324 lines
13 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
import json
import hashlib
import logging
from datetime import datetime, timedelta, date
from odoo import models, fields, api, _
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
try:
from openai import OpenAI
except ImportError:
OpenAI = None
_logger.warning("openai package not installed. Fusion Clock AI features will be unavailable.")
class FusionClockAIService(models.AbstractModel):
_name = 'fusion.clock.ai.service'
_description = 'Fusion Clock AI Service'
# ------------------------------------------------------------------
# Configuration helpers
# ------------------------------------------------------------------
def _get_client(self):
if OpenAI is None:
raise UserError(_("The 'openai' Python package is not installed. Run: pip install openai"))
ICP = self.env['ir.config_parameter'].sudo()
api_key = ICP.get_param('fusion_clock_ai.openai_api_key', '')
if not api_key:
raise UserError(_("OpenAI API key not configured. Go to Fusion Clock > Configuration > AI Settings."))
return OpenAI(api_key=api_key)
def _get_model(self):
ICP = self.env['ir.config_parameter'].sudo()
return ICP.get_param('fusion_clock_ai.openai_model', 'gpt-4o-mini')
def _get_max_tokens(self):
ICP = self.env['ir.config_parameter'].sudo()
return int(ICP.get_param('fusion_clock_ai.max_response_tokens', '1024'))
# ------------------------------------------------------------------
# Budget enforcement
# ------------------------------------------------------------------
def _check_budget(self):
ICP = self.env['ir.config_parameter'].sudo()
monthly_budget = float(ICP.get_param('fusion_clock_ai.monthly_budget_usd', '50.0'))
if monthly_budget <= 0:
return True
today = date.today()
month_start = today.replace(day=1)
usage_records = self.env['fusion.clock.ai.usage'].sudo().search([
('date', '>=', month_start),
('date', '<=', today),
])
total_cost = sum(r.estimated_cost_usd for r in usage_records)
if total_cost >= monthly_budget:
raise UserError(_(
"Monthly AI budget of $%(budget).2f has been reached ($%(spent).2f spent). "
"Increase the budget in AI Settings or wait until next month.",
budget=monthly_budget, spent=total_cost,
))
return True
# ------------------------------------------------------------------
# Cache helpers
# ------------------------------------------------------------------
def _get_cache_key(self, prompt_key, context_hash):
return hashlib.sha256(f"{prompt_key}:{context_hash}".encode()).hexdigest()
def _check_cache(self, cache_key, ttl_minutes=15):
cutoff = datetime.now() - timedelta(minutes=ttl_minutes)
cached = self.env['fusion.clock.ai.cache'].sudo().search([
('cache_key', '=', cache_key),
('created_at', '>=', cutoff),
], limit=1)
if cached:
return cached.response_text
return False
def _store_cache(self, cache_key, response_text, prompt_key=''):
self.env['fusion.clock.ai.cache'].sudo().create({
'cache_key': cache_key,
'prompt_key': prompt_key,
'response_text': response_text,
})
# ------------------------------------------------------------------
# Usage logging with cost estimation
# ------------------------------------------------------------------
MODEL_COST_PER_1K_INPUT = {
'gpt-4o': 0.0025,
'gpt-4o-mini': 0.00015,
'gpt-3.5-turbo': 0.0005,
}
MODEL_COST_PER_1K_OUTPUT = {
'gpt-4o': 0.01,
'gpt-4o-mini': 0.0006,
'gpt-3.5-turbo': 0.0015,
}
def _log_usage(self, feature, model_name, prompt_tokens, completion_tokens):
input_cost = (prompt_tokens / 1000) * self.MODEL_COST_PER_1K_INPUT.get(model_name, 0.001)
output_cost = (completion_tokens / 1000) * self.MODEL_COST_PER_1K_OUTPUT.get(model_name, 0.002)
today = date.today()
existing = self.env['fusion.clock.ai.usage'].sudo().search([
('date', '=', today),
('feature', '=', feature),
('model_name', '=', model_name),
], limit=1)
if existing:
existing.write({
'prompt_tokens': existing.prompt_tokens + prompt_tokens,
'completion_tokens': existing.completion_tokens + completion_tokens,
'estimated_cost_usd': existing.estimated_cost_usd + input_cost + output_cost,
'request_count': existing.request_count + 1,
})
else:
self.env['fusion.clock.ai.usage'].sudo().create({
'date': today,
'feature': feature,
'model_name': model_name,
'prompt_tokens': prompt_tokens,
'completion_tokens': completion_tokens,
'estimated_cost_usd': input_cost + output_cost,
'request_count': 1,
})
# ------------------------------------------------------------------
# Prompt template retrieval
# ------------------------------------------------------------------
def _get_system_prompt(self, prompt_key):
prompt = self.env['fusion.clock.ai.prompt'].sudo().search([
('key', '=', prompt_key),
('active', '=', True),
], limit=1)
if not prompt:
return "You are a helpful attendance management assistant."
return prompt.content
# ------------------------------------------------------------------
# Main chat completion method
# ------------------------------------------------------------------
def chat_completion(self, messages, feature='general', cache_key=None, cache_ttl=15):
if cache_key:
cached = self._check_cache(cache_key, cache_ttl)
if cached:
return cached
self._check_budget()
client = self._get_client()
model_name = self._get_model()
max_tokens = self._get_max_tokens()
try:
response = client.chat.completions.create(
model=model_name,
messages=messages,
max_tokens=max_tokens,
temperature=0.3,
)
except Exception as e:
_logger.error("OpenAI API error: %s", str(e))
raise UserError(_("AI service error: %s", str(e)))
result = response.choices[0].message.content
usage = response.usage
self._log_usage(feature, model_name, usage.prompt_tokens, usage.completion_tokens)
if cache_key:
self._store_cache(cache_key, result, feature)
return result
# ------------------------------------------------------------------
# Data aggregation helpers
# ------------------------------------------------------------------
def _build_employee_context(self, employee, date_from=None, date_to=None):
if not date_from:
date_from = date.today() - timedelta(days=7)
if not date_to:
date_to = date.today()
Attendance = self.env['hr.attendance'].sudo()
attendances = Attendance.search([
('employee_id', '=', employee.id),
('check_in', '>=', datetime.combine(date_from, datetime.min.time())),
('check_in', '<=', datetime.combine(date_to, datetime.max.time())),
], order='check_in asc')
shift = employee.x_fclk_shift_id
shift_info = f"Shift: {shift.name} ({shift.start_time:.1f}-{shift.end_time:.1f})" if shift else "No assigned shift"
lines = [
f"Employee: {employee.name}",
f"Department: {employee.department_id.name or 'N/A'}",
shift_info,
f"On-time streak: {employee.x_fclk_ontime_streak}",
f"Absences this month: {employee.x_fclk_absences_this_month}",
f"Absences this year: {employee.x_fclk_absences_this_year}",
f"Overtime this week: {employee.x_fclk_overtime_this_week:.1f}h",
f"Overtime this month: {employee.x_fclk_overtime_this_month:.1f}h",
"",
f"Attendance records ({date_from} to {date_to}):",
]
for att in attendances:
check_out_str = att.check_out.strftime('%Y-%m-%d %H:%M') if att.check_out else 'MISSING'
lines.append(
f" {att.check_in.strftime('%Y-%m-%d %H:%M')} -> {check_out_str} | "
f"Net: {att.x_fclk_net_hours:.1f}h | Break: {att.x_fclk_break_minutes:.0f}m | "
f"OT: {att.x_fclk_overtime_hours:.1f}h | Source: {att.x_fclk_clock_source or 'N/A'}"
)
Log = self.env['fusion.clock.activity.log'].sudo()
logs = Log.search([
('employee_id', '=', employee.id),
('log_date', '>=', datetime.combine(date_from, datetime.min.time())),
('log_date', '<=', datetime.combine(date_to, datetime.max.time())),
('log_type', 'in', ['late_clock_in', 'early_clock_out', 'auto_clock_out',
'missed_clock_out', 'absent', 'outside_geofence']),
], order='log_date asc')
if logs:
lines.append("")
lines.append("Incidents:")
for log in logs:
lines.append(f" [{log.log_type}] {log.log_date.strftime('%Y-%m-%d %H:%M')}: {log.description or ''}")
Penalty = self.env['fusion.clock.penalty'].sudo()
penalties = Penalty.search([
('employee_id', '=', employee.id),
('date', '>=', date_from),
('date', '<=', date_to),
])
if penalties:
lines.append("")
lines.append("Penalties:")
for p in penalties:
lines.append(f" [{p.penalty_type}] {p.date}: {p.penalty_minutes:.0f}m deducted")
return "\n".join(lines)
def _build_team_context(self, employees, date_from=None, date_to=None):
if not date_from:
date_from = date.today() - timedelta(days=7)
if not date_to:
date_to = date.today()
lines = [f"Team Attendance Summary ({date_from} to {date_to})", ""]
for emp in employees:
Attendance = self.env['hr.attendance'].sudo()
attendances = Attendance.search([
('employee_id', '=', emp.id),
('check_in', '>=', datetime.combine(date_from, datetime.min.time())),
('check_in', '<=', datetime.combine(date_to, datetime.max.time())),
])
total_net = sum(a.x_fclk_net_hours for a in attendances)
total_ot = sum(a.x_fclk_overtime_hours for a in attendances)
days_worked = len(set(a.check_in.date() for a in attendances))
Penalty = self.env['fusion.clock.penalty'].sudo()
penalty_count = Penalty.search_count([
('employee_id', '=', emp.id),
('date', '>=', date_from),
('date', '<=', date_to),
])
Log = self.env['fusion.clock.activity.log'].sudo()
absence_count = Log.search_count([
('employee_id', '=', emp.id),
('log_type', '=', 'absent'),
('log_date', '>=', datetime.combine(date_from, datetime.min.time())),
('log_date', '<=', datetime.combine(date_to, datetime.max.time())),
])
late_count = Log.search_count([
('employee_id', '=', emp.id),
('log_type', '=', 'late_clock_in'),
('log_date', '>=', datetime.combine(date_from, datetime.min.time())),
('log_date', '<=', datetime.combine(date_to, datetime.max.time())),
])
shift_name = emp.x_fclk_shift_id.name if emp.x_fclk_shift_id else 'No shift'
lines.append(
f"- {emp.name} ({emp.department_id.name or 'N/A'}, {shift_name}): "
f"{days_worked} days, {total_net:.1f}h net, {total_ot:.1f}h OT, "
f"{penalty_count} penalties, {late_count} late, {absence_count} absences, "
f"streak: {emp.x_fclk_ontime_streak}"
)
return "\n".join(lines)
def _build_payroll_context(self, report):
lines = [
f"Payroll Report: {report.name}",
f"Period: {report.date_start} to {report.date_end}",
f"Employee: {report.employee_id.name if report.employee_id else 'Batch (all employees)'}",
f"Total hours: {report.total_hours:.1f}",
f"Net hours: {report.net_hours:.1f}",
f"Total breaks: {report.total_breaks:.0f} minutes",
f"Total penalties: {report.total_penalties}",
f"Days worked: {report.days_worked}",
"",
"Daily breakdown:",
]
for att in report.attendance_ids:
check_out_str = att.check_out.strftime('%H:%M') if att.check_out else 'MISSING'
auto = ' [AUTO-CLOCKED-OUT]' if att.x_fclk_auto_clocked_out else ''
lines.append(
f" {att.check_in.strftime('%Y-%m-%d')} | "
f"{att.check_in.strftime('%H:%M')}-{check_out_str} | "
f"Net: {att.x_fclk_net_hours:.1f}h | OT: {att.x_fclk_overtime_hours:.1f}h{auto}"
)
return "\n".join(lines)