217 lines
8.1 KiB
Python
217 lines
8.1 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2026 Nexa Systems Inc.
|
|
# License OPL-1 (Odoo Proprietary License v1.0)
|
|
|
|
import json
|
|
import hashlib
|
|
import logging
|
|
from datetime import datetime, timedelta, date
|
|
from odoo import http, fields as odoo_fields, _
|
|
from odoo.http import request
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FusionClockAIAPI(http.Controller):
|
|
|
|
def _get_employee(self):
|
|
return request.env['hr.employee'].sudo().search([
|
|
('user_id', '=', request.env.uid),
|
|
], limit=1)
|
|
|
|
@http.route('/fusion_clock_ai/manager_chat', type='json', auth='user')
|
|
def manager_chat(self, message, conversation_id=None, date_from=None, date_to=None):
|
|
user = request.env.user
|
|
if not user.has_group('fusion_clock.group_fusion_clock_manager'):
|
|
return {'error': 'Access denied. Manager role required.'}
|
|
|
|
ICP = request.env['ir.config_parameter'].sudo()
|
|
if ICP.get_param('fusion_clock_ai.enable_manager_chat', 'True') != 'True':
|
|
return {'error': 'Manager AI Chat is disabled in settings.'}
|
|
|
|
AI = request.env['fusion.clock.ai.service'].sudo()
|
|
|
|
if not date_from:
|
|
date_from = (date.today() - timedelta(days=7)).isoformat()
|
|
if not date_to:
|
|
date_to = date.today().isoformat()
|
|
|
|
d_from = date.fromisoformat(date_from)
|
|
d_to = date.fromisoformat(date_to)
|
|
|
|
employees = request.env['hr.employee'].sudo().search([
|
|
('x_fclk_enable_clock', '=', True),
|
|
('company_id', '=', request.env.company.id),
|
|
])
|
|
team_context = AI._build_team_context(employees, d_from, d_to)
|
|
|
|
Conversation = request.env['fusion.clock.ai.conversation'].sudo()
|
|
if conversation_id:
|
|
conv = Conversation.browse(int(conversation_id))
|
|
if not conv.exists() or conv.user_id.id != user.id:
|
|
conv = Conversation.create({
|
|
'user_id': user.id,
|
|
'conversation_type': 'manager_query',
|
|
})
|
|
else:
|
|
conv = Conversation.create({
|
|
'user_id': user.id,
|
|
'conversation_type': 'manager_query',
|
|
})
|
|
|
|
Message = request.env['fusion.clock.ai.message'].sudo()
|
|
Message.create({
|
|
'conversation_id': conv.id,
|
|
'role': 'user',
|
|
'content': message,
|
|
})
|
|
|
|
system_prompt = AI._get_system_prompt('manager_query')
|
|
messages = [
|
|
{'role': 'system', 'content': f"{system_prompt}\n\n--- ATTENDANCE DATA ---\n{team_context}"},
|
|
]
|
|
for msg in conv.message_ids:
|
|
messages.append({'role': msg.role, 'content': msg.content})
|
|
|
|
try:
|
|
response = AI.chat_completion(messages, feature='manager_query')
|
|
except Exception as e:
|
|
return {'error': str(e)}
|
|
|
|
Message.create({
|
|
'conversation_id': conv.id,
|
|
'role': 'assistant',
|
|
'content': response,
|
|
})
|
|
|
|
return {
|
|
'response': response,
|
|
'conversation_id': conv.id,
|
|
}
|
|
|
|
@http.route('/fusion_clock_ai/run_analysis', type='json', auth='user')
|
|
def run_analysis(self, analysis_type, date_from=None, date_to=None, employee_id=None):
|
|
user = request.env.user
|
|
if not user.has_group('fusion_clock.group_fusion_clock_manager'):
|
|
return {'error': 'Access denied.'}
|
|
|
|
AI = request.env['fusion.clock.ai.service'].sudo()
|
|
|
|
if not date_from:
|
|
date_from = (date.today() - timedelta(days=7)).isoformat()
|
|
if not date_to:
|
|
date_to = date.today().isoformat()
|
|
d_from = date.fromisoformat(date_from)
|
|
d_to = date.fromisoformat(date_to)
|
|
|
|
prompt_map = {
|
|
'anomaly': 'anomaly_detection',
|
|
'understaffing': 'understaffing_prediction',
|
|
'shift': 'shift_optimization',
|
|
'compliance': 'compliance_check',
|
|
'geofence': 'geofence_tuning',
|
|
}
|
|
prompt_key = prompt_map.get(analysis_type)
|
|
if not prompt_key:
|
|
return {'error': f'Unknown analysis type: {analysis_type}'}
|
|
|
|
employees = request.env['hr.employee'].sudo().search([
|
|
('x_fclk_enable_clock', '=', True),
|
|
('company_id', '=', request.env.company.id),
|
|
])
|
|
context_data = AI._build_team_context(employees, d_from, d_to)
|
|
|
|
if analysis_type == 'geofence':
|
|
context_data += "\n\n" + self._build_geofence_context(d_from, d_to)
|
|
|
|
system_prompt = AI._get_system_prompt(prompt_key)
|
|
|
|
cache_key = AI._get_cache_key(prompt_key, hashlib.sha256(
|
|
f"{d_from}:{d_to}:{request.env.company.id}".encode()
|
|
).hexdigest())
|
|
|
|
try:
|
|
response = AI.chat_completion(
|
|
[
|
|
{'role': 'system', 'content': system_prompt},
|
|
{'role': 'user', 'content': context_data},
|
|
],
|
|
feature=analysis_type,
|
|
cache_key=cache_key,
|
|
)
|
|
except Exception as e:
|
|
return {'error': str(e)}
|
|
|
|
return {'response': response}
|
|
|
|
def _build_geofence_context(self, date_from, date_to):
|
|
Log = request.env['fusion.clock.activity.log'].sudo()
|
|
locations = request.env['fusion.clock.location'].sudo().search([
|
|
('active', '=', True),
|
|
('company_id', '=', request.env.company.id),
|
|
])
|
|
lines = ["Geofence Data:"]
|
|
for loc in locations:
|
|
clock_ins = Log.search([
|
|
('location_id', '=', loc.id),
|
|
('log_type', '=', 'clock_in'),
|
|
('log_date', '>=', datetime.combine(date_from, datetime.min.time())),
|
|
('log_date', '<=', datetime.combine(date_to, datetime.max.time())),
|
|
])
|
|
outside = Log.search([
|
|
('location_id', '=', loc.id),
|
|
('log_type', '=', 'outside_geofence'),
|
|
('log_date', '>=', datetime.combine(date_from, datetime.min.time())),
|
|
('log_date', '<=', datetime.combine(date_to, datetime.max.time())),
|
|
])
|
|
distances = [l.distance for l in clock_ins if l.distance]
|
|
avg_dist = sum(distances) / len(distances) if distances else 0
|
|
outside_distances = [l.distance for l in outside if l.distance]
|
|
avg_overshoot = sum(outside_distances) / len(outside_distances) if outside_distances else 0
|
|
|
|
lines.append(
|
|
f"\n{loc.name} (radius: {loc.radius}m):\n"
|
|
f" Successful clock-ins: {len(clock_ins)}, avg distance: {avg_dist:.0f}m\n"
|
|
f" Blocked (outside): {len(outside)}, avg overshoot: {avg_overshoot:.0f}m"
|
|
)
|
|
return "\n".join(lines)
|
|
|
|
@http.route('/fusion_clock_ai/smart_config', type='json', auth='user')
|
|
def smart_config(self, description):
|
|
user = request.env.user
|
|
if not user.has_group('fusion_clock.group_fusion_clock_manager'):
|
|
return {'error': 'Access denied.'}
|
|
|
|
AI = request.env['fusion.clock.ai.service'].sudo()
|
|
system_prompt = AI._get_system_prompt('smart_config')
|
|
|
|
try:
|
|
response = AI.chat_completion(
|
|
[
|
|
{'role': 'system', 'content': system_prompt},
|
|
{'role': 'user', 'content': description},
|
|
],
|
|
feature='smart_config',
|
|
)
|
|
except Exception as e:
|
|
return {'error': str(e)}
|
|
|
|
return {'response': response}
|
|
|
|
@http.route('/fusion_clock_ai/apply_config', type='json', auth='user')
|
|
def apply_config(self, changes):
|
|
user = request.env.user
|
|
if not user.has_group('fusion_clock.group_fusion_clock_manager'):
|
|
return {'error': 'Access denied.'}
|
|
|
|
ICP = request.env['ir.config_parameter'].sudo()
|
|
applied = []
|
|
for change in changes:
|
|
param = change.get('parameter', '')
|
|
value = change.get('value', '')
|
|
if param.startswith('fusion_clock.'):
|
|
ICP.set_param(param, str(value))
|
|
applied.append(param)
|
|
|
|
return {'applied': applied, 'count': len(applied)}
|