# -*- coding: utf-8 -*- # Copyright 2026 Nexa Systems Inc. # License OPL-1 (Odoo Proprietary License v1.0) import logging from datetime import timedelta from odoo import models, fields, api _logger = logging.getLogger(__name__) class FusionApiUsageDaily(models.Model): _name = 'fusion.api.usage.daily' _description = 'API Usage Daily Summary' _order = 'date desc' _rec_name = 'display_name' date = fields.Date(required=True, index=True) consumer_id = fields.Many2one( 'fusion.api.consumer', required=True, index=True, ondelete='cascade', ) provider_id = fields.Many2one( 'fusion.api.provider', required=True, index=True, ondelete='cascade', ) user_id = fields.Many2one('res.users', index=True, ondelete='set null') feature = fields.Char(index=True) model_name = fields.Char(string='AI Model') total_tokens_in = fields.Integer() total_tokens_out = fields.Integer() total_tokens = fields.Integer( compute='_compute_total_tokens', store=True, ) total_cost_usd = fields.Float(digits=(10, 4)) request_count = fields.Integer() error_count = fields.Integer() avg_response_time_ms = fields.Integer() company_id = fields.Many2one('res.company', default=lambda self: self.env.company) display_name = fields.Char(compute='_compute_display_name') @api.depends('total_tokens_in', 'total_tokens_out') def _compute_total_tokens(self): for rec in self: rec.total_tokens = rec.total_tokens_in + rec.total_tokens_out def _compute_display_name(self): for rec in self: rec.display_name = ( f"{rec.date} - {rec.consumer_id.name or ''} / " f"{rec.provider_id.name or ''}" ) @api.model def _cron_aggregate_daily(self): yesterday = fields.Date.today() - timedelta(days=1) _logger.info("Aggregating API usage for %s", yesterday) usages = self.env['fusion.api.usage'].sudo().search([ ('create_date', '>=', fields.Datetime.to_string(yesterday)), ('create_date', '<', fields.Datetime.to_string( yesterday + timedelta(days=1) )), ]) if not usages: _logger.info("No usage records to aggregate for %s", yesterday) return groups = {} for usage in usages: key = ( usage.consumer_id.id, usage.provider_id.id, usage.user_id.id or 0, usage.feature or '', usage.model_name or '', ) if key not in groups: groups[key] = { 'date': yesterday, 'consumer_id': usage.consumer_id.id, 'provider_id': usage.provider_id.id, 'user_id': usage.user_id.id or False, 'feature': usage.feature or False, 'model_name': usage.model_name or False, 'total_tokens_in': 0, 'total_tokens_out': 0, 'total_cost_usd': 0.0, 'request_count': 0, 'error_count': 0, 'total_response_time': 0, } grp = groups[key] grp['total_tokens_in'] += usage.tokens_in grp['total_tokens_out'] += usage.tokens_out grp['total_cost_usd'] += usage.estimated_cost_usd grp['request_count'] += 1 grp['total_response_time'] += usage.response_time_ms if usage.status == 'error': grp['error_count'] += 1 for grp in groups.values(): total_rt = grp.pop('total_response_time') count = grp['request_count'] grp['avg_response_time_ms'] = total_rt // count if count else 0 existing = self.sudo().search([ ('date', '=', grp['date']), ('consumer_id', '=', grp['consumer_id']), ('provider_id', '=', grp['provider_id']), ('user_id', '=', grp['user_id']), ('feature', '=', grp['feature']), ('model_name', '=', grp['model_name']), ], limit=1) if existing: existing.write(grp) else: self.sudo().create(grp) _logger.info( "Aggregated %d usage records into %d daily summaries", len(usages), len(groups), ) @api.model def _cron_cleanup_old_logs(self): ICP = self.env['ir.config_parameter'].sudo() retention_days = int(ICP.get_param('fusion_api.log_retention_days', '90')) if retention_days <= 0: return cutoff = fields.Date.today() - timedelta(days=retention_days) old_logs = self.env['fusion.api.usage'].sudo().search([ ('create_date', '<', fields.Datetime.to_string(cutoff)), ]) count = len(old_logs) if count: old_logs.unlink() _logger.info("Cleaned up %d usage logs older than %s", count, cutoff)