Files
Odoo-Modules/fusion_api/models/api_service.py
gsinghpal e56974d46f update
2026-03-16 08:14:56 -04:00

500 lines
19 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
import time
import logging
from datetime import timedelta
from odoo import models, fields, api, _
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
try:
from openai import OpenAI
except ImportError:
OpenAI = None
_logger.info("openai package not installed. OpenAI calls via Fusion API unavailable.")
try:
import anthropic as anthropic_sdk
except ImportError:
anthropic_sdk = None
_logger.info("anthropic package not installed. Anthropic calls via Fusion API unavailable.")
class FusionApiService(models.AbstractModel):
_name = 'fusion.api.service'
_description = 'Fusion API Service'
COST_PER_1K = {
'openai': {
'gpt-4o': {'input': 0.0025, 'output': 0.01},
'gpt-4o-mini': {'input': 0.00015, 'output': 0.0006},
'gpt-4-turbo': {'input': 0.01, 'output': 0.03},
'gpt-3.5-turbo': {'input': 0.0005, 'output': 0.0015},
'o1': {'input': 0.015, 'output': 0.06},
'o1-mini': {'input': 0.003, 'output': 0.012},
},
'anthropic': {
'claude-sonnet-4-20250514': {'input': 0.003, 'output': 0.015},
'claude-3-5-haiku-20241022': {'input': 0.001, 'output': 0.005},
'claude-3-opus-20240229': {'input': 0.015, 'output': 0.075},
},
}
# ------------------------------------------------------------------
# Provider / key resolution
# ------------------------------------------------------------------
def _get_provider(self, provider_type):
provider = self.env['fusion.api.provider'].sudo().search([
('provider_type', '=', provider_type),
('status', '=', 'active'),
('company_id', '=', self.env.company.id),
], limit=1)
if not provider:
provider = self.env['fusion.api.provider'].sudo().search([
('provider_type', '=', provider_type),
('status', '=', 'active'),
], limit=1)
if not provider:
raise UserError(_(
"No active %(type)s provider configured. "
"Go to Fusion API > Providers to set one up.",
type=provider_type,
))
return provider
def _get_default_key(self, provider):
domain = [
('provider_id', '=', provider.id),
('is_active', '=', True),
('environment', '=', 'production'),
]
key = self.env['fusion.api.key'].sudo().search(
domain + [('is_default', '=', True)], limit=1,
)
if not key:
key = self.env['fusion.api.key'].sudo().search(domain, limit=1)
if not key:
raise UserError(_(
"No active API key for provider '%(provider)s'. "
"Go to Fusion API > Providers to add one.",
provider=provider.name,
))
return key
# ------------------------------------------------------------------
# Consumer auto-registration
# ------------------------------------------------------------------
def _auto_register_consumer(self, consumer_name):
consumer = self.env['fusion.api.consumer'].sudo().search([
('technical_name', '=', consumer_name),
], limit=1)
if consumer:
return consumer
ICP = self.env['ir.config_parameter'].sudo()
if not ICP.get_param('fusion_api.auto_detect_consumers', 'True') == 'True':
raise UserError(_(
"Consumer '%(name)s' is not registered and auto-detection is disabled.",
name=consumer_name,
))
module = self.env['ir.module.module'].sudo().search([
('name', '=', consumer_name),
], limit=1)
display = consumer_name.replace('fusion_', 'Fusion ').replace('_', ' ').title()
consumer = self.env['fusion.api.consumer'].sudo().create({
'name': display,
'technical_name': consumer_name,
'module_id': module.id if module else False,
'is_active': True,
'auto_detected': True,
'first_seen_at': fields.Datetime.now(),
})
_logger.info("Auto-registered API consumer: %s", consumer_name)
return consumer
# ------------------------------------------------------------------
# Access control checks
# ------------------------------------------------------------------
def _check_access(self, consumer, provider, user=None):
if not consumer.is_active:
raise UserError(_(
"API access is disabled for '%(module)s'. "
"An administrator can re-enable it in Fusion API > Consumers.",
module=consumer.name,
))
access = self.env['fusion.api.access'].sudo().search([
('consumer_id', '=', consumer.id),
('provider_id', '=', provider.id),
], limit=1)
if access and not access.is_enabled:
raise UserError(_(
"'%(module)s' access to '%(provider)s' is disabled.",
module=consumer.name, provider=provider.name,
))
if access:
self._check_budget(access, consumer, provider)
self._check_rate_limit(access, consumer, provider)
if user:
self._check_user_limit(user, provider)
return access
def _check_budget(self, access, consumer, provider):
if access.monthly_budget_usd > 0 and access.is_budget_exceeded:
raise UserError(_(
"Monthly budget of $%(budget).2f exceeded for "
"'%(module)s' on '%(provider)s' ($%(spent).2f spent).",
budget=access.monthly_budget_usd,
module=consumer.name,
provider=provider.name,
spent=access.current_month_cost,
))
if access.daily_budget_usd > 0:
if access.current_day_cost >= access.daily_budget_usd:
raise UserError(_(
"Daily budget of $%(budget).2f exceeded for "
"'%(module)s' on '%(provider)s'.",
budget=access.daily_budget_usd,
module=consumer.name,
provider=provider.name,
))
def _check_rate_limit(self, access, consumer, provider):
if access.max_rpm > 0:
one_min_ago = fields.Datetime.now() - timedelta(minutes=1)
recent_count = self.env['fusion.api.usage'].sudo().search_count([
('consumer_id', '=', consumer.id),
('provider_id', '=', provider.id),
('create_date', '>=', one_min_ago),
])
if recent_count >= access.max_rpm:
raise UserError(_(
"Rate limit of %(limit)d requests/minute exceeded for "
"'%(module)s' on '%(provider)s'.",
limit=access.max_rpm,
module=consumer.name,
provider=provider.name,
))
if access.max_rpd > 0:
today = fields.Date.today()
day_count = self.env['fusion.api.usage'].sudo().search_count([
('consumer_id', '=', consumer.id),
('provider_id', '=', provider.id),
('create_date', '>=', fields.Datetime.to_string(today)),
])
if day_count >= access.max_rpd:
raise UserError(_(
"Daily limit of %(limit)d requests exceeded for "
"'%(module)s' on '%(provider)s'.",
limit=access.max_rpd,
module=consumer.name,
provider=provider.name,
))
def _check_user_limit(self, user, provider):
limit = self.env['fusion.api.user.limit'].sudo().search([
('user_id', '=', user.id),
('provider_id', '=', provider.id),
], limit=1)
if not limit:
return
if limit.is_blocked:
raise UserError(_(
"API access to '%(provider)s' is blocked for user '%(user)s'.",
provider=provider.name, user=user.name,
))
if limit.monthly_budget_usd > 0:
if limit.current_month_cost >= limit.monthly_budget_usd:
raise UserError(_(
"User monthly budget of $%(budget).2f exceeded for "
"'%(provider)s' ($%(spent).2f spent).",
budget=limit.monthly_budget_usd,
provider=provider.name,
spent=limit.current_month_cost,
))
if limit.max_rpd > 0:
if limit.current_day_requests >= limit.max_rpd:
raise UserError(_(
"User daily limit of %(limit)d requests exceeded for "
"'%(provider)s'.",
limit=limit.max_rpd,
provider=provider.name,
))
# ------------------------------------------------------------------
# Usage logging
# ------------------------------------------------------------------
def _log_usage(self, consumer, provider, user, feature, model_name,
tokens_in, tokens_out, cost, response_time_ms,
status, error_message=None):
self.env['fusion.api.usage'].sudo().create({
'consumer_id': consumer.id,
'provider_id': provider.id,
'user_id': user.id if user else False,
'feature': feature,
'model_name': model_name,
'tokens_in': tokens_in,
'tokens_out': tokens_out,
'estimated_cost_usd': cost,
'response_time_ms': response_time_ms,
'status': status,
'error_message': error_message,
})
def _estimate_cost(self, provider_type, model_name, tokens_in, tokens_out):
provider_costs = self.COST_PER_1K.get(provider_type, {})
model_costs = provider_costs.get(
model_name, {'input': 0.001, 'output': 0.002},
)
return (
(tokens_in / 1000) * model_costs['input']
+ (tokens_out / 1000) * model_costs['output']
)
# ------------------------------------------------------------------
# Public API: OpenAI
# ------------------------------------------------------------------
def call_openai(self, consumer, feature, messages,
model=None, max_tokens=1024, user=None, **kwargs):
if OpenAI is None:
raise UserError(_(
"The 'openai' Python package is not installed. "
"Run: pip install openai"
))
provider = self._get_provider('openai')
consumer_rec = self._auto_register_consumer(consumer)
user_rec = user or self.env.user
self._check_access(consumer_rec, provider, user_rec)
key = self._get_default_key(provider)
model_name = model or 'gpt-4o-mini'
start = time.time()
try:
client = OpenAI(api_key=key.api_key)
response = client.chat.completions.create(
model=model_name,
messages=messages,
max_tokens=max_tokens,
**kwargs,
)
elapsed_ms = int((time.time() - start) * 1000)
usage = response.usage
cost = self._estimate_cost(
'openai', model_name,
usage.prompt_tokens, usage.completion_tokens,
)
self._log_usage(
consumer_rec, provider, user_rec, feature, model_name,
usage.prompt_tokens, usage.completion_tokens,
cost, elapsed_ms, 'success',
)
return response.choices[0].message.content
except UserError:
raise
except Exception as e:
elapsed_ms = int((time.time() - start) * 1000)
self._log_usage(
consumer_rec, provider, user_rec, feature, model_name,
0, 0, 0, elapsed_ms, 'error', str(e),
)
_logger.error("OpenAI API error for %s: %s", consumer, e)
raise UserError(_("OpenAI API error: %s", str(e)))
# ------------------------------------------------------------------
# Public API: Anthropic
# ------------------------------------------------------------------
def call_anthropic(self, consumer, feature, messages,
model=None, max_tokens=1024, system=None,
user=None, **kwargs):
if anthropic_sdk is None:
raise UserError(_(
"The 'anthropic' Python package is not installed. "
"Run: pip install anthropic"
))
provider = self._get_provider('anthropic')
consumer_rec = self._auto_register_consumer(consumer)
user_rec = user or self.env.user
self._check_access(consumer_rec, provider, user_rec)
key = self._get_default_key(provider)
model_name = model or 'claude-sonnet-4-20250514'
call_kwargs = {
'model': model_name,
'messages': messages,
'max_tokens': max_tokens,
}
if system:
call_kwargs['system'] = system
call_kwargs.update(kwargs)
start = time.time()
try:
client = anthropic_sdk.Anthropic(api_key=key.api_key)
response = client.messages.create(**call_kwargs)
elapsed_ms = int((time.time() - start) * 1000)
cost = self._estimate_cost(
'anthropic', model_name,
response.usage.input_tokens, response.usage.output_tokens,
)
self._log_usage(
consumer_rec, provider, user_rec, feature, model_name,
response.usage.input_tokens, response.usage.output_tokens,
cost, elapsed_ms, 'success',
)
text_blocks = [
b.text for b in response.content if b.type == 'text'
]
return '\n'.join(text_blocks)
except UserError:
raise
except Exception as e:
elapsed_ms = int((time.time() - start) * 1000)
self._log_usage(
consumer_rec, provider, user_rec, feature, model_name,
0, 0, 0, elapsed_ms, 'error', str(e),
)
_logger.error("Anthropic API error for %s: %s", consumer, e)
raise UserError(_("Anthropic API error: %s", str(e)))
# ------------------------------------------------------------------
# Public API: Raw key access (Google Maps, Twilio, etc.)
# ------------------------------------------------------------------
def get_api_key(self, provider_type, consumer,
feature=None, user=None):
provider = self._get_provider(provider_type)
consumer_rec = self._auto_register_consumer(consumer)
user_rec = user or self.env.user
self._check_access(consumer_rec, provider, user_rec)
key = self._get_default_key(provider)
self._log_usage(
consumer_rec, provider, user_rec,
feature or 'key_access', '', 0, 0, 0, 0, 'success',
)
return key.api_key
def get_oauth_credentials(self, provider_type, consumer,
feature=None, user=None):
provider = self._get_provider(provider_type)
consumer_rec = self._auto_register_consumer(consumer)
user_rec = user or self.env.user
self._check_access(consumer_rec, provider, user_rec)
key = self._get_default_key(provider)
self._log_usage(
consumer_rec, provider, user_rec,
feature or 'oauth_access', '', 0, 0, 0, 0, 'success',
)
return {
'client_id': key.client_id,
'client_secret': key.client_secret,
'access_token': key.access_token,
'refresh_token': key.refresh_token,
'token_expiry': key.token_expiry,
'redirect_uri': key.redirect_uri,
}
# ------------------------------------------------------------------
# Key validation
# ------------------------------------------------------------------
def _validate_key(self, key_record):
ptype = key_record.provider_id.provider_type
if ptype == 'openai':
if OpenAI is None:
raise UserError(_("openai package not installed."))
try:
from openai import AuthenticationError as OpenAIAuthError
try:
client = OpenAI(api_key=key_record.api_key)
client.models.list()
except OpenAIAuthError as e:
raise UserError(_(
"OpenAI key is invalid (authentication failed): %s", str(e),
))
except Exception as e:
err_str = str(e).lower()
if 'billing' in err_str or 'quota' in err_str or 'insufficient' in err_str:
return
raise UserError(_(
"OpenAI key validation failed: %s", str(e),
))
except UserError:
raise
elif ptype == 'anthropic':
if anthropic_sdk is None:
raise UserError(_("anthropic package not installed."))
try:
client = anthropic_sdk.Anthropic(api_key=key_record.api_key)
client.messages.create(
model='claude-3-5-haiku-20241022',
max_tokens=1,
messages=[{'role': 'user', 'content': 'hi'}],
)
except anthropic_sdk.AuthenticationError as e:
raise UserError(_(
"Anthropic key is invalid (authentication failed): %s", str(e),
))
except anthropic_sdk.BadRequestError as e:
if 'credit balance' in str(e) or 'billing' in str(e).lower():
return
raise UserError(_(
"Anthropic key validation failed: %s", str(e),
))
except anthropic_sdk.PermissionDeniedError:
raise UserError(_(
"Anthropic key lacks required permissions.",
))
except Exception as e:
raise UserError(_(
"Anthropic key validation failed: %s", str(e),
))
elif ptype in ('google_maps', 'twilio'):
if not key_record.api_key:
raise UserError(_("API key is empty."))
elif ptype in ('google_oauth', 'microsoft_oauth'):
if not key_record.client_id or not key_record.client_secret:
raise UserError(_("Client ID and Client Secret are required."))
else:
if not key_record.api_key:
raise UserError(_("API key is empty."))