""" Enhanced SAAP Agent Manager with OpenRouter Integration & Cost Efficiency Logging Supports multi-provider architecture: colossus + OpenRouter with cost optimization """ import asyncio import logging import time from datetime import datetime, timedelta from typing import Dict, List, Optional, Any, Tuple import json from dataclasses import dataclass, asdict from enum import Enum import aiohttp import openai # For OpenRouter compatibility from openai import AsyncOpenAI from ..config.settings import get_settings from ..models.agent import SaapAgent, AgentStatus, AgentCapability from ..models.chat import ChatMessage, MessageRole from ..database.service import DatabaseService # Cost efficiency logging cost_logger = logging.getLogger("saap.cost") performance_logger = logging.getLogger("saap.performance") @dataclass class CostMetrics: """Cost tracking metrics for OpenRouter requests""" provider: str model: str input_tokens: int output_tokens: int total_tokens: int cost_usd: float response_time_seconds: float timestamp: datetime request_success: bool agent_id: str def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for logging""" return { **asdict(self), 'timestamp': self.timestamp.isoformat(), 'cost_per_1k_tokens': round(self.cost_usd / (self.total_tokens / 1000), 6) if self.total_tokens > 0 else 0, 'tokens_per_second': round(self.total_tokens / self.response_time_seconds, 2) if self.response_time_seconds > 0 else 0 } @dataclass class PerformanceMetrics: """Performance tracking for different providers""" provider: str model: str avg_response_time: float success_rate: float total_requests: int avg_cost_per_request: float tokens_per_second: float last_24h_cost: float class ProviderType(Enum): COLOSSUS = "colossus" OPENROUTER = "openrouter" HYBRID = "hybrid" class EnhancedAgentManager: """Enhanced Agent Manager with Multi-Provider Support & Cost Optimization""" def __init__(self, database_service: Optional[DatabaseService] = None): self.settings = get_settings() self.database = database_service self.agents: Dict[str, SaapAgent] = {} self.agent_clients: Dict[str, Any] = {} # Cost tracking self.cost_metrics: List[CostMetrics] = [] self.daily_cost_budget = self.settings.agents.daily_cost_budget self.current_daily_cost = 0.0 self.cost_alert_threshold = self.settings.agents.warning_cost_threshold # Performance tracking self.performance_stats: Dict[str, PerformanceMetrics] = {} # Provider configurations self._setup_providers() # Initialize cost logger if self.settings.openrouter.enable_cost_tracking: cost_logger.info(f"๐Ÿ’ฐ Cost tracking initialized - Daily budget: ${self.daily_cost_budget}") cost_logger.info(f"๐Ÿ“Š Alert threshold: {self.cost_alert_threshold*100}% of budget") def _setup_providers(self): """Setup provider configurations""" self.providers = {} # colossus Configuration if self.settings.colossus.api_key: self.providers[ProviderType.COLOSSUS] = { 'client': None, # Will be set up per agent 'base_url': self.settings.colossus.api_base, 'api_key': self.settings.colossus.api_key, 'default_model': self.settings.colossus.default_model, 'cost_per_1m_tokens': 0.0, # colossus is free 'timeout': self.settings.colossus.timeout } # OpenRouter Configuration if self.settings.openrouter.enabled and self.settings.openrouter.api_key: self.providers[ProviderType.OPENROUTER] = { 'client': AsyncOpenAI( api_key=self.settings.openrouter.api_key, base_url=self.settings.openrouter.base_url, ), 'base_url': self.settings.openrouter.base_url, 'api_key': self.settings.openrouter.api_key, 'models': self._get_openrouter_models(), 'timeout': 30 } cost_logger.info(f"๐Ÿ”— OpenRouter provider initialized with {len(self._get_openrouter_models())} models") def _get_openrouter_models(self) -> Dict[str, Dict[str, Any]]: """Get OpenRouter model configurations with cost data""" models = { # Jane Alesi - Coordination & Management (GPT-4o-mini) 'jane_alesi': { 'model': self.settings.openrouter.jane_model, 'max_tokens': self.settings.openrouter.jane_max_tokens, 'temperature': self.settings.openrouter.jane_temperature, 'cost_per_1m_input': 0.15, 'cost_per_1m_output': 0.60, 'description': 'General coordination and management tasks' }, # John Alesi - Development & Code (Claude-3-Haiku) 'john_alesi': { 'model': self.settings.openrouter.john_model, 'max_tokens': self.settings.openrouter.john_max_tokens, 'temperature': self.settings.openrouter.john_temperature, 'cost_per_1m_input': 0.25, 'cost_per_1m_output': 1.25, 'description': 'Code generation and development tasks' }, # Lara Alesi - Medical & Analysis (GPT-4o-mini) 'lara_alesi': { 'model': self.settings.openrouter.lara_model, 'max_tokens': self.settings.openrouter.lara_max_tokens, 'temperature': self.settings.openrouter.lara_temperature, 'cost_per_1m_input': 0.15, 'cost_per_1m_output': 0.60, 'description': 'Medical analysis and specialized queries' }, # Fallback Model - Free (Meta Llama) 'fallback': { 'model': self.settings.openrouter.fallback_model, 'max_tokens': 600, 'temperature': 0.7, 'cost_per_1m_input': 0.0, 'cost_per_1m_output': 0.0, 'description': 'Cost-free fallback for budget limits' } } return models async def create_agent(self, agent_data: Dict[str, Any]) -> SaapAgent: """Create new agent with provider selection""" agent_id = agent_data.get('agent_id', f"agent_{len(self.agents)}") # Determine optimal provider based on agent type and cost considerations provider = self._select_optimal_provider(agent_id, agent_data) agent = SaapAgent( agent_id=agent_id, name=agent_data.get('name', f'Agent {agent_id}'), role=agent_data.get('role', 'General Assistant'), capabilities=agent_data.get('capabilities', [AgentCapability.CHAT]), status=AgentStatus.CREATED, llm_config=agent_data.get('llm_config', {}), personality=agent_data.get('personality', {}), provider=provider.value ) # Setup provider-specific client await self._setup_agent_client(agent) self.agents[agent_id] = agent # Log agent creation with cost implications cost_logger.info(f"๐Ÿค– Agent created: {agent_id} using {provider.value} provider") if provider == ProviderType.OPENROUTER: model_config = self._get_openrouter_models().get(agent_id, self._get_openrouter_models()['fallback']) cost_logger.info(f"๐Ÿ’ฐ Model: {model_config['model']} - Cost: ${model_config['cost_per_1m_input']}/1M input tokens") # Database persistence if self.database: await self.database.create_agent(agent) return agent def _select_optimal_provider(self, agent_id: str, agent_data: Dict[str, Any]) -> ProviderType: """Select optimal provider based on cost and performance requirements""" # Check daily budget constraints if self.current_daily_cost >= (self.daily_cost_budget * self.cost_alert_threshold): cost_logger.warning(f"โš ๏ธ Daily cost budget at {self.cost_alert_threshold*100}% - using free providers") if ProviderType.COLOSSUS in self.providers: return ProviderType.COLOSSUS # Agent-specific provider selection primary_provider = getattr(self.settings.agents, 'primary_provider', 'colossus') if primary_provider == 'openrouter' and ProviderType.OPENROUTER in self.providers: # Use OpenRouter with cost-optimized models return ProviderType.OPENROUTER elif primary_provider == 'colossus' and ProviderType.COLOSSUS in self.providers: # Use colossus as primary (free) return ProviderType.COLOSSUS else: # Fallback to available provider if ProviderType.COLOSSUS in self.providers: return ProviderType.COLOSSUS elif ProviderType.OPENROUTER in self.providers: return ProviderType.OPENROUTER else: raise ValueError("No providers available") async def _setup_agent_client(self, agent: SaapAgent): """Setup provider-specific client for agent""" provider_type = ProviderType(agent.provider) if provider_type == ProviderType.COLOSSUS: # colossus HTTP client setup (existing logic) self.agent_clients[agent.agent_id] = aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=self.settings.colossus.timeout) ) elif provider_type == ProviderType.OPENROUTER: # OpenRouter client already set up in providers self.agent_clients[agent.agent_id] = self.providers[ProviderType.OPENROUTER]['client'] async def start_agent(self, agent_id: str) -> bool: """Start agent with provider-specific initialization""" if agent_id not in self.agents: raise ValueError(f"Agent {agent_id} not found") agent = self.agents[agent_id] try: # Provider-specific startup if agent.provider == ProviderType.COLOSSUS.value: success = await self._start_colossus_agent(agent) elif agent.provider == ProviderType.OPENROUTER.value: success = await self._start_openrouter_agent(agent) else: success = False if success: agent.status = AgentStatus.ACTIVE agent.metrics.last_active = datetime.now() cost_logger.info(f"โœ… Agent {agent_id} started successfully using {agent.provider}") # Database update if self.database: await self.database.update_agent_status(agent_id, AgentStatus.ACTIVE) return success except Exception as e: cost_logger.error(f"โŒ Failed to start agent {agent_id}: {str(e)}") agent.status = AgentStatus.ERROR return False async def _start_colossus_agent(self, agent: SaapAgent) -> bool: """Start colossus agent (existing logic)""" try: client = self.agent_clients[agent.agent_id] # Test connection to colossus async with client.post( f"{self.settings.colossus.api_base}/v1/chat/completions", headers={"Authorization": f"Bearer {self.settings.colossus.api_key}"}, json={ "model": self.settings.colossus.default_model, "messages": [{"role": "user", "content": "Test connection"}], "max_tokens": 10 } ) as response: return response.status == 200 except Exception as e: cost_logger.error(f"colossus connection failed for {agent.agent_id}: {str(e)}") return False async def _start_openrouter_agent(self, agent: SaapAgent) -> bool: """Start OpenRouter agent""" try: client = self.agent_clients[agent.agent_id] model_config = self._get_openrouter_models().get(agent.agent_id, self._get_openrouter_models()['fallback']) # Test connection to OpenRouter response = await client.chat.completions.create( model=model_config['model'], messages=[{"role": "user", "content": "Test connection"}], max_tokens=10, temperature=model_config['temperature'] ) cost_logger.info(f"๐Ÿ”— OpenRouter agent {agent.agent_id} connected - Model: {model_config['model']}") return True except Exception as e: cost_logger.error(f"OpenRouter connection failed for {agent.agent_id}: {str(e)}") return False async def send_message(self, agent_id: str, message: str, conversation_context: Optional[List[Dict]] = None) -> ChatMessage: """Send message to agent with cost tracking""" if agent_id not in self.agents: raise ValueError(f"Agent {agent_id} not found") agent = self.agents[agent_id] start_time = time.time() try: if agent.provider == ProviderType.COLOSSUS.value: response = await self._send_colossus_message(agent, message, conversation_context) elif agent.provider == ProviderType.OPENROUTER.value: response = await self._send_openrouter_message(agent, message, conversation_context) else: raise ValueError(f"Unsupported provider: {agent.provider}") response_time = time.time() - start_time # Update agent metrics agent.metrics.messages_processed += 1 agent.metrics.last_active = datetime.now() agent.metrics.avg_response_time = ( (agent.metrics.avg_response_time * (agent.metrics.messages_processed - 1) + response_time) / agent.metrics.messages_processed ) # Log performance performance_logger.info(f"๐Ÿ“Š Agent {agent_id} - Response time: {response_time:.2f}s, Total messages: {agent.metrics.messages_processed}") return response except Exception as e: cost_logger.error(f"โŒ Message failed for agent {agent_id}: {str(e)}") agent.status = AgentStatus.ERROR raise async def _send_colossus_message(self, agent: SaapAgent, message: str, context: Optional[List[Dict]] = None) -> ChatMessage: """Send message via colossus (existing logic with cost tracking)""" client = self.agent_clients[agent.agent_id] # Prepare messages messages = [] if context: messages.extend(context) messages.append({"role": "user", "content": message}) start_time = time.time() try: async with client.post( f"{self.settings.colossus.api_base}/v1/chat/completions", headers={"Authorization": f"Bearer {self.settings.colossus.api_key}"}, json={ "model": self.settings.colossus.default_model, "messages": messages, "max_tokens": 800, "temperature": 0.7 } ) as response: if response.status != 200: raise Exception(f"colossus API error: {response.status}") data = await response.json() content = data['choices'][0]['message']['content'] response_time = time.time() - start_time # Cost tracking for colossus (free) cost_metrics = CostMetrics( provider="colossus", model=self.settings.colossus.default_model, input_tokens=data.get('usage', {}).get('prompt_tokens', 0), output_tokens=data.get('usage', {}).get('completion_tokens', 0), total_tokens=data.get('usage', {}).get('total_tokens', 0), cost_usd=0.0, # colossus is free response_time_seconds=response_time, timestamp=datetime.now(), request_success=True, agent_id=agent.agent_id ) self._log_cost_metrics(cost_metrics) return ChatMessage( message_id=f"msg_{int(time.time())}", role=MessageRole.ASSISTANT, content=content, timestamp=datetime.now(), agent_id=agent.agent_id, metadata={'provider': 'colossus', 'cost_usd': 0.0, 'tokens': cost_metrics.total_tokens} ) except Exception as e: # Log failed request cost_metrics = CostMetrics( provider="colossus", model=self.settings.colossus.default_model, input_tokens=0, output_tokens=0, total_tokens=0, cost_usd=0.0, response_time_seconds=time.time() - start_time, timestamp=datetime.now(), request_success=False, agent_id=agent.agent_id ) self._log_cost_metrics(cost_metrics) raise async def _send_openrouter_message(self, agent: SaapAgent, message: str, context: Optional[List[Dict]] = None) -> ChatMessage: """Send message via OpenRouter with cost tracking""" client = self.agent_clients[agent.agent_id] model_config = self._get_openrouter_models().get(agent.agent_id, self._get_openrouter_models()['fallback']) # Check budget before expensive request estimated_cost = self._estimate_request_cost(message, model_config) if self.current_daily_cost + estimated_cost > self.daily_cost_budget: cost_logger.warning(f"๐Ÿ’ธ Daily budget exceeded - switching to free fallback model") model_config = self._get_openrouter_models()['fallback'] # Prepare messages messages = [] if context: messages.extend(context) messages.append({"role": "user", "content": message}) start_time = time.time() try: response = await client.chat.completions.create( model=model_config['model'], messages=messages, max_tokens=model_config['max_tokens'], temperature=model_config['temperature'] ) response_time = time.time() - start_time content = response.choices[0].message.content # Calculate actual cost input_tokens = response.usage.prompt_tokens output_tokens = response.usage.completion_tokens total_tokens = response.usage.total_tokens cost_usd = ( (input_tokens / 1_000_000) * model_config['cost_per_1m_input'] + (output_tokens / 1_000_000) * model_config['cost_per_1m_output'] ) # Update daily cost tracking self.current_daily_cost += cost_usd # Cost tracking cost_metrics = CostMetrics( provider="openrouter", model=model_config['model'], input_tokens=input_tokens, output_tokens=output_tokens, total_tokens=total_tokens, cost_usd=cost_usd, response_time_seconds=response_time, timestamp=datetime.now(), request_success=True, agent_id=agent.agent_id ) self._log_cost_metrics(cost_metrics) # Budget alert check if self.current_daily_cost >= (self.daily_cost_budget * self.cost_alert_threshold): cost_logger.warning(f"โš ๏ธ Cost alert: ${self.current_daily_cost:.4f} / ${self.daily_cost_budget} ({self.current_daily_cost/self.daily_cost_budget*100:.1f}%)") return ChatMessage( message_id=f"msg_{int(time.time())}", role=MessageRole.ASSISTANT, content=content, timestamp=datetime.now(), agent_id=agent.agent_id, metadata={ 'provider': 'openrouter', 'model': model_config['model'], 'cost_usd': cost_usd, 'tokens': total_tokens, 'cost_efficiency': f"${cost_usd:.6f} ({total_tokens} tokens, {response_time:.1f}s)" } ) except Exception as e: # Log failed request cost_metrics = CostMetrics( provider="openrouter", model=model_config['model'], input_tokens=0, output_tokens=0, total_tokens=0, cost_usd=0.0, response_time_seconds=time.time() - start_time, timestamp=datetime.now(), request_success=False, agent_id=agent.agent_id ) self._log_cost_metrics(cost_metrics) raise def _estimate_request_cost(self, message: str, model_config: Dict[str, Any]) -> float: """Estimate cost for request (rough approximation)""" # Rough token estimation: ~4 chars per token estimated_input_tokens = len(message) / 4 estimated_output_tokens = model_config['max_tokens'] * 0.5 # Assume 50% of max tokens cost_usd = ( (estimated_input_tokens / 1_000_000) * model_config['cost_per_1m_input'] + (estimated_output_tokens / 1_000_000) * model_config['cost_per_1m_output'] ) return cost_usd def _log_cost_metrics(self, metrics: CostMetrics): """Log cost metrics for analysis""" self.cost_metrics.append(metrics) # Detailed cost logging cost_logger.info(f"๐Ÿ’ฐ Cost Metrics: {json.dumps(metrics.to_dict())}") # Performance summary if metrics.request_success: efficiency = f"{metrics.total_tokens/metrics.response_time_seconds:.1f}" if metrics.response_time_seconds > 0 else "N/A" cost_logger.info(f"๐Ÿ“Š {metrics.agent_id} - ${metrics.cost_usd:.6f} | {metrics.total_tokens} tokens | {metrics.response_time_seconds:.2f}s | {efficiency} tokens/s") # Cleanup old metrics (keep last 1000) if len(self.cost_metrics) > 1000: self.cost_metrics = self.cost_metrics[-1000:] def get_cost_summary(self, hours: int = 24) -> Dict[str, Any]: """Get cost summary for specified time period""" cutoff_time = datetime.now() - timedelta(hours=hours) recent_metrics = [m for m in self.cost_metrics if m.timestamp >= cutoff_time] if not recent_metrics: return {"total_cost": 0.0, "total_requests": 0, "average_cost_per_request": 0.0} total_cost = sum(m.cost_usd for m in recent_metrics) successful_requests = [m for m in recent_metrics if m.request_success] by_provider = {} for metrics in recent_metrics: if metrics.provider not in by_provider: by_provider[metrics.provider] = {"cost": 0.0, "requests": 0, "tokens": 0} by_provider[metrics.provider]["cost"] += metrics.cost_usd by_provider[metrics.provider]["requests"] += 1 by_provider[metrics.provider]["tokens"] += metrics.total_tokens return { "total_cost_usd": round(total_cost, 4), "total_requests": len(recent_metrics), "successful_requests": len(successful_requests), "success_rate": len(successful_requests) / len(recent_metrics) if recent_metrics else 0, "average_cost_per_request": round(total_cost / len(recent_metrics), 6) if recent_metrics else 0, "daily_budget_used": round(self.current_daily_cost / self.daily_cost_budget * 100, 1), "by_provider": by_provider, "period_hours": hours, "budget_remaining_usd": max(0, self.daily_cost_budget - self.current_daily_cost) } async def get_all_agents(self) -> List[SaapAgent]: """Get all agents with current status""" return list(self.agents.values()) async def get_agent(self, agent_id: str) -> Optional[SaapAgent]: """Get specific agent""" return self.agents.get(agent_id) async def stop_agent(self, agent_id: str) -> bool: """Stop agent and cleanup resources""" if agent_id not in self.agents: return False agent = self.agents[agent_id] agent.status = AgentStatus.INACTIVE # Cleanup client connection if agent_id in self.agent_clients: client = self.agent_clients[agent_id] if hasattr(client, 'close'): if asyncio.iscoroutinefunction(client.close): await client.close() else: client.close() del self.agent_clients[agent_id] cost_logger.info(f"๐Ÿ›‘ Agent {agent_id} stopped") # Database update if self.database: await self.database.update_agent_status(agent_id, AgentStatus.INACTIVE) return True async def delete_agent(self, agent_id: str) -> bool: """Delete agent completely""" if agent_id not in self.agents: return False # Stop agent first await self.stop_agent(agent_id) # Remove from memory del self.agents[agent_id] cost_logger.info(f"๐Ÿ—‘๏ธ Agent {agent_id} deleted") # Database deletion if self.database: await self.database.delete_agent(agent_id) return True def reset_daily_costs(self): """Reset daily cost tracking (called at midnight)""" yesterday_cost = self.current_daily_cost self.current_daily_cost = 0.0 cost_logger.info(f"๐Ÿ“… Daily cost reset - Yesterday: ${yesterday_cost:.4f}") async def __aenter__(self): """Async context manager entry""" return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit - cleanup all resources""" for agent_id in list(self.agent_clients.keys()): await self.stop_agent(agent_id) cost_logger.info("๐Ÿงน Enhanced Agent Manager cleanup completed")