Spaces:
Sleeping
Sleeping
| """ | |
| OpenRouter API Client for SAAP | |
| Provides OpenAI-compatible interface with cost tracking and performance metrics | |
| """ | |
| import asyncio | |
| import logging | |
| import time | |
| import os | |
| from typing import Dict, List, Optional, Any, Tuple | |
| from datetime import datetime | |
| import aiohttp | |
| import json | |
| from dataclasses import dataclass | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| logger = logging.getLogger(__name__) | |
| class OpenRouterResponse: | |
| """OpenRouter API response with cost tracking""" | |
| success: bool | |
| content: Optional[str] = None | |
| error: Optional[str] = None | |
| response_time: float = 0.0 | |
| tokens_used: int = 0 | |
| input_tokens: int = 0 | |
| output_tokens: int = 0 | |
| cost_usd: float = 0.0 | |
| model: str = "" | |
| provider: str = "openrouter" | |
| timestamp: datetime = None | |
| def __post_init__(self): | |
| if self.timestamp is None: | |
| self.timestamp = datetime.utcnow() | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert to dictionary for logging and API responses""" | |
| return { | |
| "success": self.success, | |
| "content": self.content, | |
| "error": self.error, | |
| "response_time": self.response_time, | |
| "tokens_used": self.tokens_used, | |
| "input_tokens": self.input_tokens, | |
| "output_tokens": self.output_tokens, | |
| "cost_usd": self.cost_usd, | |
| "model": self.model, | |
| "provider": self.provider, | |
| "timestamp": self.timestamp.isoformat(), | |
| "cost_efficiency": f"${self.cost_usd:.6f} ({self.tokens_used} tokens, {self.response_time:.1f}s)" if self.success else "N/A" | |
| } | |
| class OpenRouterClient: | |
| """ | |
| OpenRouter API Client with Cost Optimization for SAAP | |
| Features: | |
| - OpenAI-compatible API interface | |
| - Agent-specific model selection (Jane: GPT-4o-mini, John: Claude-3.5-Sonnet, etc.) | |
| - Cost tracking and budget management | |
| - Performance monitoring and fallback models | |
| - Async/await support for high-performance integration | |
| """ | |
| def __init__(self, api_key: str, base_url: str = "https://openrouter.ai/api/v1"): | |
| self.api_key = api_key | |
| self.base_url = base_url.rstrip('/') | |
| self.session: Optional[aiohttp.ClientSession] = None | |
| self.daily_budget = 10.0 # $10/day default | |
| self.current_daily_cost = 0.0 | |
| self.cost_alert_threshold = 0.8 # 80% of budget | |
| # π PERFORMANCE OPTIMIZATION: Reduced token limits for faster responses | |
| # Phase 1.3 Quick Win: 40-50% token reduction = 0.5-1s faster per request | |
| # Agent-specific model configurations with cost data | |
| self.agent_models = { | |
| "jane_alesi": { | |
| "model": "openai/gpt-4o-mini", | |
| "max_tokens": 400, # Reduced from 800 (-50%) | |
| "temperature": 0.7, | |
| "cost_per_1m_input": 0.15, # $0.15/1M tokens | |
| "cost_per_1m_output": 0.60, # $0.60/1M tokens | |
| "description": "Efficient coordination and management" | |
| }, | |
| "john_alesi": { | |
| "model": "anthropic/claude-3-5-sonnet-20241022", | |
| "max_tokens": 600, # Reduced from 1200 (-50%) | |
| "temperature": 0.5, | |
| "cost_per_1m_input": 3.00, # $3.00/1M tokens | |
| "cost_per_1m_output": 15.00, # $15.00/1M tokens | |
| "description": "Advanced code generation and development" | |
| }, | |
| "lara_alesi": { | |
| "model": "openai/gpt-4o-mini", | |
| "max_tokens": 500, # Reduced from 1000 (-50%) | |
| "temperature": 0.3, | |
| "cost_per_1m_input": 0.15, # $0.15/1M tokens | |
| "cost_per_1m_output": 0.60, # $0.60/1M tokens | |
| "description": "Precise medical and analytical tasks" | |
| }, | |
| "fallback": { | |
| "model": "meta-llama/llama-3.2-3b-instruct:free", | |
| "max_tokens": 600, | |
| "temperature": 0.7, | |
| "cost_per_1m_input": 0.0, # FREE | |
| "cost_per_1m_output": 0.0, # FREE | |
| "description": "Free backup model for budget protection" | |
| } | |
| } | |
| logger.info(f"π OpenRouter Client initialized with {len(self.agent_models)} agent models") | |
| logger.info(f"π° Daily budget: ${self.daily_budget}, Alert threshold: {self.cost_alert_threshold*100}%") | |
| async def __aenter__(self): | |
| """Async context manager entry""" | |
| self.session = aiohttp.ClientSession( | |
| timeout=aiohttp.ClientTimeout(total=60), | |
| headers={ | |
| "Authorization": f"Bearer {self.api_key}", | |
| "Content-Type": "application/json", | |
| "HTTP-Referer": "https://saap.satware.ai", # Optional: your app URL | |
| "X-Title": "SAAP Agent Platform" # Optional: app title | |
| } | |
| ) | |
| logger.info("π OpenRouter session created") | |
| return self | |
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| """Async context manager exit""" | |
| if self.session: | |
| await self.session.close() | |
| logger.info("π OpenRouter session closed") | |
| def get_model_config(self, agent_id: str) -> Dict[str, Any]: | |
| """Get model configuration for specific agent""" | |
| return self.agent_models.get(agent_id, self.agent_models["fallback"]) | |
| def estimate_cost(self, message: str, agent_id: str) -> float: | |
| """Estimate request cost before sending""" | |
| config = self.get_model_config(agent_id) | |
| # Rough token estimation: ~4 characters per token | |
| estimated_input_tokens = len(message) / 4 | |
| estimated_output_tokens = config["max_tokens"] * 0.5 # Assume 50% of max tokens | |
| cost_usd = ( | |
| (estimated_input_tokens / 1_000_000) * config["cost_per_1m_input"] + | |
| (estimated_output_tokens / 1_000_000) * config["cost_per_1m_output"] | |
| ) | |
| return cost_usd | |
| async def chat_completion( | |
| self, | |
| messages: List[Dict[str, str]], | |
| agent_id: str, | |
| max_tokens: Optional[int] = None, | |
| temperature: Optional[float] = None | |
| ) -> OpenRouterResponse: | |
| """ | |
| Send chat completion request to OpenRouter with cost tracking | |
| Args: | |
| messages: List of message dicts with 'role' and 'content' | |
| agent_id: Agent identifier for model selection | |
| max_tokens: Override max tokens | |
| temperature: Override temperature | |
| Returns: | |
| OpenRouterResponse with content, cost, and performance data | |
| """ | |
| if not self.session: | |
| return OpenRouterResponse( | |
| success=False, | |
| error="OpenRouter client session not initialized - call async context manager", | |
| model="", | |
| provider="openrouter" | |
| ) | |
| # Get agent-specific model config | |
| config = self.get_model_config(agent_id) | |
| model = config["model"] | |
| # Budget check before expensive request | |
| estimated_cost = self.estimate_cost(str(messages), agent_id) | |
| if self.current_daily_cost + estimated_cost > self.daily_budget: | |
| logger.warning(f"πΈ Daily budget would be exceeded - switching to free fallback") | |
| config = self.agent_models["fallback"] | |
| model = config["model"] | |
| start_time = time.time() | |
| # Prepare request payload | |
| payload = { | |
| "model": model, | |
| "messages": messages, | |
| "max_tokens": max_tokens or config["max_tokens"], | |
| "temperature": temperature or config["temperature"], | |
| "stream": False # We want complete responses for cost calculation | |
| } | |
| logger.info(f"π€ OpenRouter request: {agent_id} β {model}") | |
| try: | |
| async with self.session.post( | |
| f"{self.base_url}/chat/completions", | |
| json=payload | |
| ) as response: | |
| response_time = time.time() - start_time | |
| if response.status == 200: | |
| data = await response.json() | |
| # Extract response content | |
| content = "" | |
| if "choices" in data and len(data["choices"]) > 0: | |
| choice = data["choices"][0] | |
| if "message" in choice and "content" in choice["message"]: | |
| content = choice["message"]["content"] | |
| # Extract token usage and calculate cost | |
| usage = data.get("usage", {}) | |
| input_tokens = usage.get("prompt_tokens", 0) | |
| output_tokens = usage.get("completion_tokens", 0) | |
| total_tokens = usage.get("total_tokens", 0) | |
| # Calculate actual cost | |
| cost_usd = ( | |
| (input_tokens / 1_000_000) * config["cost_per_1m_input"] + | |
| (output_tokens / 1_000_000) * config["cost_per_1m_output"] | |
| ) | |
| # Update daily cost tracking | |
| self.current_daily_cost += cost_usd | |
| # Log cost alert if needed | |
| if self.current_daily_cost >= (self.daily_budget * self.cost_alert_threshold): | |
| logger.warning(f"β οΈ OpenRouter cost alert: ${self.current_daily_cost:.4f} / ${self.daily_budget} ({self.current_daily_cost/self.daily_budget*100:.1f}%)") | |
| logger.info(f"β OpenRouter success: {response_time:.2f}s, ${cost_usd:.6f}, {total_tokens} tokens") | |
| return OpenRouterResponse( | |
| success=True, | |
| content=content, | |
| response_time=response_time, | |
| tokens_used=total_tokens, | |
| input_tokens=input_tokens, | |
| output_tokens=output_tokens, | |
| cost_usd=cost_usd, | |
| model=model, | |
| provider="openrouter" | |
| ) | |
| else: | |
| error_text = await response.text() | |
| error_msg = f"HTTP {response.status}: {error_text}" | |
| # Handle rate limiting and payment errors with fallback | |
| if response.status in [429, 402]: | |
| logger.warning(f"β οΈ OpenRouter limit reached: {error_msg}") | |
| if config != self.agent_models["fallback"]: | |
| logger.info("π Attempting fallback to free model...") | |
| return await self.chat_completion(messages, "fallback", max_tokens, temperature) | |
| logger.error(f"β OpenRouter API error: {error_msg}") | |
| return OpenRouterResponse( | |
| success=False, | |
| error=error_msg, | |
| response_time=response_time, | |
| model=model, | |
| provider="openrouter" | |
| ) | |
| except asyncio.TimeoutError: | |
| error_msg = f"Request timeout after {time.time() - start_time:.1f}s" | |
| logger.error(f"β OpenRouter timeout: {error_msg}") | |
| return OpenRouterResponse( | |
| success=False, | |
| error=error_msg, | |
| response_time=time.time() - start_time, | |
| model=model, | |
| provider="openrouter" | |
| ) | |
| except Exception as e: | |
| error_msg = f"OpenRouter request failed: {str(e)}" | |
| logger.error(f"β OpenRouter error: {error_msg}") | |
| return OpenRouterResponse( | |
| success=False, | |
| error=error_msg, | |
| response_time=time.time() - start_time, | |
| model=model, | |
| provider="openrouter" | |
| ) | |
| async def health_check(self) -> Dict[str, Any]: | |
| """Check OpenRouter API health and available models""" | |
| if not self.session: | |
| return { | |
| "status": "unhealthy", | |
| "error": "Session not initialized" | |
| } | |
| try: | |
| # Test with a simple completion | |
| test_messages = [{"role": "user", "content": "Reply with just 'OK' to confirm API connection."}] | |
| result = await self.chat_completion(test_messages, "fallback") # Use free model for health check | |
| return { | |
| "status": "healthy" if result.success else "unhealthy", | |
| "response_time": result.response_time, | |
| "error": result.error if not result.success else None, | |
| "daily_cost": self.current_daily_cost, | |
| "budget_remaining": max(0, self.daily_budget - self.current_daily_cost), | |
| "available_models": len(self.agent_models), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "error": str(e), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| def get_cost_summary(self) -> Dict[str, Any]: | |
| """Get current cost and budget status""" | |
| return { | |
| "daily_cost_usd": round(self.current_daily_cost, 4), | |
| "daily_budget_usd": self.daily_budget, | |
| "budget_used_percent": round(self.current_daily_cost / self.daily_budget * 100, 1), | |
| "budget_remaining_usd": max(0, self.daily_budget - self.current_daily_cost), | |
| "alert_threshold_percent": self.cost_alert_threshold * 100, | |
| "cost_alert_active": self.current_daily_cost >= (self.daily_budget * self.cost_alert_threshold), | |
| "agent_models_available": list(self.agent_models.keys()), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| def reset_daily_costs(self): | |
| """Reset daily cost tracking (call at midnight)""" | |
| yesterday_cost = self.current_daily_cost | |
| self.current_daily_cost = 0.0 | |
| logger.info(f"π OpenRouter daily cost reset - Yesterday: ${yesterday_cost:.4f}") | |
| if __name__ == "__main__": | |
| # Demo OpenRouter integration | |
| async def demo_openrouter(): | |
| api_key = os.getenv("OPENROUTER_API_KEY", "") | |
| if not api_key: | |
| print("β Error: OPENROUTER_API_KEY environment variable not set") | |
| print("Please add OPENROUTER_API_KEY to backend/.env file") | |
| return | |
| async with OpenRouterClient(api_key) as client: | |
| print("π OpenRouter Client Demo") | |
| # Health check | |
| health = await client.health_check() | |
| print(f"Health: {health['status']}") | |
| if health["status"] == "healthy": | |
| # Test different agents | |
| for agent in ["jane_alesi", "john_alesi", "fallback"]: | |
| config = client.get_model_config(agent) | |
| print(f"\nπ€ Testing {agent} - Model: {config['model']}") | |
| messages = [ | |
| {"role": "user", "content": f"Hello! I'm testing the {agent} agent. Please respond briefly."} | |
| ] | |
| result = await client.chat_completion(messages, agent) | |
| if result.success: | |
| print(f"β Response: {result.content[:100]}...") | |
| print(f"π° Cost: ${result.cost_usd:.6f}") | |
| print(f"β±οΈ Time: {result.response_time:.2f}s") | |
| print(f"π’ Tokens: {result.tokens_used}") | |
| else: | |
| print(f"β Error: {result.error}") | |
| # Cost summary | |
| print(f"\nπ° Cost Summary: {client.get_cost_summary()}") | |
| asyncio.run(demo_openrouter()) | |