# -*- coding: utf-8 -*- # Copyright 2026 Nexa Systems Inc. # License OPL-1 (Odoo Proprietary License v1.0) import time import logging from datetime import timedelta from odoo import models, fields, api, _ from odoo.exceptions import UserError _logger = logging.getLogger(__name__) try: from openai import OpenAI except ImportError: OpenAI = None _logger.info("openai package not installed. OpenAI calls via Fusion API unavailable.") try: import anthropic as anthropic_sdk except ImportError: anthropic_sdk = None _logger.info("anthropic package not installed. Anthropic calls via Fusion API unavailable.") class FusionApiService(models.AbstractModel): _name = 'fusion.api.service' _description = 'Fusion API Service' COST_PER_1K = { 'openai': { 'gpt-4o': {'input': 0.0025, 'output': 0.01}, 'gpt-4o-mini': {'input': 0.00015, 'output': 0.0006}, 'gpt-4-turbo': {'input': 0.01, 'output': 0.03}, 'gpt-3.5-turbo': {'input': 0.0005, 'output': 0.0015}, 'o1': {'input': 0.015, 'output': 0.06}, 'o1-mini': {'input': 0.003, 'output': 0.012}, }, 'anthropic': { 'claude-sonnet-4-20250514': {'input': 0.003, 'output': 0.015}, 'claude-3-5-haiku-20241022': {'input': 0.001, 'output': 0.005}, 'claude-3-opus-20240229': {'input': 0.015, 'output': 0.075}, }, } # ------------------------------------------------------------------ # Provider / key resolution # ------------------------------------------------------------------ def _get_provider(self, provider_type): provider = self.env['fusion.api.provider'].sudo().search([ ('provider_type', '=', provider_type), ('status', '=', 'active'), ('company_id', '=', self.env.company.id), ], limit=1) if not provider: provider = self.env['fusion.api.provider'].sudo().search([ ('provider_type', '=', provider_type), ('status', '=', 'active'), ], limit=1) if not provider: raise UserError(_( "No active %(type)s provider configured. " "Go to Fusion API > Providers to set one up.", type=provider_type, )) return provider def _get_default_key(self, provider): domain = [ ('provider_id', '=', provider.id), ('is_active', '=', True), ('environment', '=', 'production'), ] key = self.env['fusion.api.key'].sudo().search( domain + [('is_default', '=', True)], limit=1, ) if not key: key = self.env['fusion.api.key'].sudo().search(domain, limit=1) if not key: raise UserError(_( "No active API key for provider '%(provider)s'. " "Go to Fusion API > Providers to add one.", provider=provider.name, )) return key # ------------------------------------------------------------------ # Consumer auto-registration # ------------------------------------------------------------------ def _auto_register_consumer(self, consumer_name): consumer = self.env['fusion.api.consumer'].sudo().search([ ('technical_name', '=', consumer_name), ], limit=1) if consumer: return consumer ICP = self.env['ir.config_parameter'].sudo() if not ICP.get_param('fusion_api.auto_detect_consumers', 'True') == 'True': raise UserError(_( "Consumer '%(name)s' is not registered and auto-detection is disabled.", name=consumer_name, )) module = self.env['ir.module.module'].sudo().search([ ('name', '=', consumer_name), ], limit=1) display = consumer_name.replace('fusion_', 'Fusion ').replace('_', ' ').title() consumer = self.env['fusion.api.consumer'].sudo().create({ 'name': display, 'technical_name': consumer_name, 'module_id': module.id if module else False, 'is_active': True, 'auto_detected': True, 'first_seen_at': fields.Datetime.now(), }) _logger.info("Auto-registered API consumer: %s", consumer_name) return consumer # ------------------------------------------------------------------ # Access control checks # ------------------------------------------------------------------ def _check_access(self, consumer, provider, user=None): if not consumer.is_active: raise UserError(_( "API access is disabled for '%(module)s'. " "An administrator can re-enable it in Fusion API > Consumers.", module=consumer.name, )) access = self.env['fusion.api.access'].sudo().search([ ('consumer_id', '=', consumer.id), ('provider_id', '=', provider.id), ], limit=1) if access and not access.is_enabled: raise UserError(_( "'%(module)s' access to '%(provider)s' is disabled.", module=consumer.name, provider=provider.name, )) if access: self._check_budget(access, consumer, provider) self._check_rate_limit(access, consumer, provider) if user: self._check_user_limit(user, provider) return access def _check_budget(self, access, consumer, provider): if access.monthly_budget_usd > 0 and access.is_budget_exceeded: raise UserError(_( "Monthly budget of $%(budget).2f exceeded for " "'%(module)s' on '%(provider)s' ($%(spent).2f spent).", budget=access.monthly_budget_usd, module=consumer.name, provider=provider.name, spent=access.current_month_cost, )) if access.daily_budget_usd > 0: if access.current_day_cost >= access.daily_budget_usd: raise UserError(_( "Daily budget of $%(budget).2f exceeded for " "'%(module)s' on '%(provider)s'.", budget=access.daily_budget_usd, module=consumer.name, provider=provider.name, )) def _check_rate_limit(self, access, consumer, provider): if access.max_rpm > 0: one_min_ago = fields.Datetime.now() - timedelta(minutes=1) recent_count = self.env['fusion.api.usage'].sudo().search_count([ ('consumer_id', '=', consumer.id), ('provider_id', '=', provider.id), ('create_date', '>=', one_min_ago), ]) if recent_count >= access.max_rpm: raise UserError(_( "Rate limit of %(limit)d requests/minute exceeded for " "'%(module)s' on '%(provider)s'.", limit=access.max_rpm, module=consumer.name, provider=provider.name, )) if access.max_rpd > 0: today = fields.Date.today() day_count = self.env['fusion.api.usage'].sudo().search_count([ ('consumer_id', '=', consumer.id), ('provider_id', '=', provider.id), ('create_date', '>=', fields.Datetime.to_string(today)), ]) if day_count >= access.max_rpd: raise UserError(_( "Daily limit of %(limit)d requests exceeded for " "'%(module)s' on '%(provider)s'.", limit=access.max_rpd, module=consumer.name, provider=provider.name, )) def _check_user_limit(self, user, provider): limit = self.env['fusion.api.user.limit'].sudo().search([ ('user_id', '=', user.id), ('provider_id', '=', provider.id), ], limit=1) if not limit: return if limit.is_blocked: raise UserError(_( "API access to '%(provider)s' is blocked for user '%(user)s'.", provider=provider.name, user=user.name, )) if limit.monthly_budget_usd > 0: if limit.current_month_cost >= limit.monthly_budget_usd: raise UserError(_( "User monthly budget of $%(budget).2f exceeded for " "'%(provider)s' ($%(spent).2f spent).", budget=limit.monthly_budget_usd, provider=provider.name, spent=limit.current_month_cost, )) if limit.max_rpd > 0: if limit.current_day_requests >= limit.max_rpd: raise UserError(_( "User daily limit of %(limit)d requests exceeded for " "'%(provider)s'.", limit=limit.max_rpd, provider=provider.name, )) # ------------------------------------------------------------------ # Usage logging # ------------------------------------------------------------------ def _log_usage(self, consumer, provider, user, feature, model_name, tokens_in, tokens_out, cost, response_time_ms, status, error_message=None): self.env['fusion.api.usage'].sudo().create({ 'consumer_id': consumer.id, 'provider_id': provider.id, 'user_id': user.id if user else False, 'feature': feature, 'model_name': model_name, 'tokens_in': tokens_in, 'tokens_out': tokens_out, 'estimated_cost_usd': cost, 'response_time_ms': response_time_ms, 'status': status, 'error_message': error_message, }) def _estimate_cost(self, provider_type, model_name, tokens_in, tokens_out): provider_costs = self.COST_PER_1K.get(provider_type, {}) model_costs = provider_costs.get( model_name, {'input': 0.001, 'output': 0.002}, ) return ( (tokens_in / 1000) * model_costs['input'] + (tokens_out / 1000) * model_costs['output'] ) # ------------------------------------------------------------------ # Public API: OpenAI # ------------------------------------------------------------------ def call_openai(self, consumer, feature, messages, model=None, max_tokens=1024, user=None, **kwargs): if OpenAI is None: raise UserError(_( "The 'openai' Python package is not installed. " "Run: pip install openai" )) provider = self._get_provider('openai') consumer_rec = self._auto_register_consumer(consumer) user_rec = user or self.env.user self._check_access(consumer_rec, provider, user_rec) key = self._get_default_key(provider) model_name = model or 'gpt-4o-mini' start = time.time() try: client = OpenAI(api_key=key.api_key) response = client.chat.completions.create( model=model_name, messages=messages, max_tokens=max_tokens, **kwargs, ) elapsed_ms = int((time.time() - start) * 1000) usage = response.usage cost = self._estimate_cost( 'openai', model_name, usage.prompt_tokens, usage.completion_tokens, ) self._log_usage( consumer_rec, provider, user_rec, feature, model_name, usage.prompt_tokens, usage.completion_tokens, cost, elapsed_ms, 'success', ) return response.choices[0].message.content except UserError: raise except Exception as e: elapsed_ms = int((time.time() - start) * 1000) self._log_usage( consumer_rec, provider, user_rec, feature, model_name, 0, 0, 0, elapsed_ms, 'error', str(e), ) _logger.error("OpenAI API error for %s: %s", consumer, e) raise UserError(_("OpenAI API error: %s", str(e))) # ------------------------------------------------------------------ # Public API: Anthropic # ------------------------------------------------------------------ def call_anthropic(self, consumer, feature, messages, model=None, max_tokens=1024, system=None, user=None, **kwargs): if anthropic_sdk is None: raise UserError(_( "The 'anthropic' Python package is not installed. " "Run: pip install anthropic" )) provider = self._get_provider('anthropic') consumer_rec = self._auto_register_consumer(consumer) user_rec = user or self.env.user self._check_access(consumer_rec, provider, user_rec) key = self._get_default_key(provider) model_name = model or 'claude-sonnet-4-20250514' call_kwargs = { 'model': model_name, 'messages': messages, 'max_tokens': max_tokens, } if system: call_kwargs['system'] = system call_kwargs.update(kwargs) start = time.time() try: client = anthropic_sdk.Anthropic(api_key=key.api_key) response = client.messages.create(**call_kwargs) elapsed_ms = int((time.time() - start) * 1000) cost = self._estimate_cost( 'anthropic', model_name, response.usage.input_tokens, response.usage.output_tokens, ) self._log_usage( consumer_rec, provider, user_rec, feature, model_name, response.usage.input_tokens, response.usage.output_tokens, cost, elapsed_ms, 'success', ) text_blocks = [ b.text for b in response.content if b.type == 'text' ] return '\n'.join(text_blocks) except UserError: raise except Exception as e: elapsed_ms = int((time.time() - start) * 1000) self._log_usage( consumer_rec, provider, user_rec, feature, model_name, 0, 0, 0, elapsed_ms, 'error', str(e), ) _logger.error("Anthropic API error for %s: %s", consumer, e) raise UserError(_("Anthropic API error: %s", str(e))) # ------------------------------------------------------------------ # Public API: Raw key access (Google Maps, Twilio, etc.) # ------------------------------------------------------------------ def get_api_key(self, provider_type, consumer, feature=None, user=None): provider = self._get_provider(provider_type) consumer_rec = self._auto_register_consumer(consumer) user_rec = user or self.env.user self._check_access(consumer_rec, provider, user_rec) key = self._get_default_key(provider) self._log_usage( consumer_rec, provider, user_rec, feature or 'key_access', '', 0, 0, 0, 0, 'success', ) return key.api_key def get_oauth_credentials(self, provider_type, consumer, feature=None, user=None): provider = self._get_provider(provider_type) consumer_rec = self._auto_register_consumer(consumer) user_rec = user or self.env.user self._check_access(consumer_rec, provider, user_rec) key = self._get_default_key(provider) self._log_usage( consumer_rec, provider, user_rec, feature or 'oauth_access', '', 0, 0, 0, 0, 'success', ) return { 'client_id': key.client_id, 'client_secret': key.client_secret, 'access_token': key.access_token, 'refresh_token': key.refresh_token, 'token_expiry': key.token_expiry, 'redirect_uri': key.redirect_uri, } # ------------------------------------------------------------------ # Key validation # ------------------------------------------------------------------ def _validate_key(self, key_record): ptype = key_record.provider_id.provider_type if ptype == 'openai': if OpenAI is None: raise UserError(_("openai package not installed.")) try: from openai import AuthenticationError as OpenAIAuthError try: client = OpenAI(api_key=key_record.api_key) client.models.list() except OpenAIAuthError as e: raise UserError(_( "OpenAI key is invalid (authentication failed): %s", str(e), )) except Exception as e: err_str = str(e).lower() if 'billing' in err_str or 'quota' in err_str or 'insufficient' in err_str: return raise UserError(_( "OpenAI key validation failed: %s", str(e), )) except UserError: raise elif ptype == 'anthropic': if anthropic_sdk is None: raise UserError(_("anthropic package not installed.")) try: client = anthropic_sdk.Anthropic(api_key=key_record.api_key) client.messages.create( model='claude-3-5-haiku-20241022', max_tokens=1, messages=[{'role': 'user', 'content': 'hi'}], ) except anthropic_sdk.AuthenticationError as e: raise UserError(_( "Anthropic key is invalid (authentication failed): %s", str(e), )) except anthropic_sdk.BadRequestError as e: if 'credit balance' in str(e) or 'billing' in str(e).lower(): return raise UserError(_( "Anthropic key validation failed: %s", str(e), )) except anthropic_sdk.PermissionDeniedError: raise UserError(_( "Anthropic key lacks required permissions.", )) except Exception as e: raise UserError(_( "Anthropic key validation failed: %s", str(e), )) elif ptype in ('google_maps', 'twilio'): if not key_record.api_key: raise UserError(_("API key is empty.")) elif ptype in ('google_oauth', 'microsoft_oauth'): if not key_record.client_id or not key_record.client_secret: raise UserError(_("Client ID and Client Secret are required.")) else: if not key_record.api_key: raise UserError(_("API key is empty."))