Files
Odoo-Modules/fusion_api/models/api_usage_daily.py
gsinghpal e56974d46f update
2026-03-16 08:14:56 -04:00

142 lines
5.0 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
import logging
from datetime import timedelta
from odoo import models, fields, api
_logger = logging.getLogger(__name__)
class FusionApiUsageDaily(models.Model):
_name = 'fusion.api.usage.daily'
_description = 'API Usage Daily Summary'
_order = 'date desc'
_rec_name = 'display_name'
date = fields.Date(required=True, index=True)
consumer_id = fields.Many2one(
'fusion.api.consumer', required=True, index=True, ondelete='cascade',
)
provider_id = fields.Many2one(
'fusion.api.provider', required=True, index=True, ondelete='cascade',
)
user_id = fields.Many2one('res.users', index=True, ondelete='set null')
feature = fields.Char(index=True)
model_name = fields.Char(string='AI Model')
total_tokens_in = fields.Integer()
total_tokens_out = fields.Integer()
total_tokens = fields.Integer(
compute='_compute_total_tokens', store=True,
)
total_cost_usd = fields.Float(digits=(10, 4))
request_count = fields.Integer()
error_count = fields.Integer()
avg_response_time_ms = fields.Integer()
company_id = fields.Many2one('res.company', default=lambda self: self.env.company)
display_name = fields.Char(compute='_compute_display_name')
@api.depends('total_tokens_in', 'total_tokens_out')
def _compute_total_tokens(self):
for rec in self:
rec.total_tokens = rec.total_tokens_in + rec.total_tokens_out
def _compute_display_name(self):
for rec in self:
rec.display_name = (
f"{rec.date} - {rec.consumer_id.name or ''} / "
f"{rec.provider_id.name or ''}"
)
@api.model
def _cron_aggregate_daily(self):
yesterday = fields.Date.today() - timedelta(days=1)
_logger.info("Aggregating API usage for %s", yesterday)
usages = self.env['fusion.api.usage'].sudo().search([
('create_date', '>=', fields.Datetime.to_string(yesterday)),
('create_date', '<', fields.Datetime.to_string(
yesterday + timedelta(days=1)
)),
])
if not usages:
_logger.info("No usage records to aggregate for %s", yesterday)
return
groups = {}
for usage in usages:
key = (
usage.consumer_id.id,
usage.provider_id.id,
usage.user_id.id or 0,
usage.feature or '',
usage.model_name or '',
)
if key not in groups:
groups[key] = {
'date': yesterday,
'consumer_id': usage.consumer_id.id,
'provider_id': usage.provider_id.id,
'user_id': usage.user_id.id or False,
'feature': usage.feature or False,
'model_name': usage.model_name or False,
'total_tokens_in': 0,
'total_tokens_out': 0,
'total_cost_usd': 0.0,
'request_count': 0,
'error_count': 0,
'total_response_time': 0,
}
grp = groups[key]
grp['total_tokens_in'] += usage.tokens_in
grp['total_tokens_out'] += usage.tokens_out
grp['total_cost_usd'] += usage.estimated_cost_usd
grp['request_count'] += 1
grp['total_response_time'] += usage.response_time_ms
if usage.status == 'error':
grp['error_count'] += 1
for grp in groups.values():
total_rt = grp.pop('total_response_time')
count = grp['request_count']
grp['avg_response_time_ms'] = total_rt // count if count else 0
existing = self.sudo().search([
('date', '=', grp['date']),
('consumer_id', '=', grp['consumer_id']),
('provider_id', '=', grp['provider_id']),
('user_id', '=', grp['user_id']),
('feature', '=', grp['feature']),
('model_name', '=', grp['model_name']),
], limit=1)
if existing:
existing.write(grp)
else:
self.sudo().create(grp)
_logger.info(
"Aggregated %d usage records into %d daily summaries",
len(usages), len(groups),
)
@api.model
def _cron_cleanup_old_logs(self):
ICP = self.env['ir.config_parameter'].sudo()
retention_days = int(ICP.get_param('fusion_api.log_retention_days', '90'))
if retention_days <= 0:
return
cutoff = fields.Date.today() - timedelta(days=retention_days)
old_logs = self.env['fusion.api.usage'].sudo().search([
('create_date', '<', fields.Datetime.to_string(cutoff)),
])
count = len(old_logs)
if count:
old_logs.unlink()
_logger.info("Cleaned up %d usage logs older than %s", count, cutoff)