Spaces:
Sleeping
Sleeping
| """ | |
| π§ FIXED: Hybrid SAAP Agent Manager Service - Critical Errors Resolved | |
| Fixes for: | |
| 1. _send_colossus_message() method name issue (Line 145) | |
| 2. LLMModelConfig.get() AttributeError (Line 44+) | |
| Production-ready agent management with multi-provider support and cost optimization | |
| """ | |
| import asyncio | |
| import logging | |
| from typing import Dict, List, Optional, Any | |
| from datetime import datetime | |
| import uuid | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy import select, update, delete | |
| from models.agent import SaapAgent, AgentStatus, AgentType, AgentTemplates | |
| from database.connection import db_manager | |
| from database.models import DBAgent, DBChatMessage, DBAgentSession | |
| from api.colossus_client import ColossusClient | |
| from api.openrouter_client import OpenRouterClient, OpenRouterResponse | |
| from services.agent_manager import AgentManagerService # Extend existing service | |
| logger = logging.getLogger(__name__) | |
| class HybridAgentManagerService(AgentManagerService): | |
| """ | |
| Hybrid Agent Manager extending the original with OpenRouter support | |
| π§ FIXES IMPLEMENTED: | |
| 1. β _send_colossus_message() β _send_via_colossus() method name correction | |
| 2. β LLMModelConfig safe access to prevent 'get' attribute errors | |
| 3. β Robust config handling for both dict and object-based configurations | |
| Features: | |
| - Inherits all colossus functionality from AgentManagerService | |
| - Adds OpenRouter integration with cost tracking | |
| - Provider switching and failover logic | |
| - Performance comparison between providers | |
| - Backward compatible with existing SAAP API | |
| """ | |
| def __init__(self, openrouter_api_key: Optional[str] = None): | |
| # Initialize base AgentManagerService | |
| super().__init__() | |
| # OpenRouter integration | |
| self.openrouter_client: Optional[OpenRouterClient] = None | |
| self.openrouter_api_key = openrouter_api_key or "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" | |
| # Hybrid configuration | |
| self.primary_provider = "colossus" # Default: colossus first | |
| self.enable_cost_comparison = True | |
| self.enable_failover = True | |
| # Performance tracking | |
| self.provider_stats = { | |
| "colossus": {"requests": 0, "successes": 0, "total_time": 0.0, "total_cost": 0.0}, | |
| "openrouter": {"requests": 0, "successes": 0, "total_time": 0.0, "total_cost": 0.0} | |
| } | |
| # Cost comparison data | |
| self.cost_comparisons: List[Dict[str, Any]] = [] | |
| logger.info("π Hybrid Agent Manager initialized - colossus + OpenRouter support") | |
| def _get_llm_config_value(self, agent: SaapAgent, key: str, default=None): | |
| """ | |
| π§ CRITICAL FIX: Safe LLM config access to prevent 'get' attribute errors | |
| Handles multiple config formats robustly: | |
| - Dictionary-based config: llm_config.get(key) | |
| - Object-based config: llm_config.key or getattr(llm_config, key) | |
| - Pydantic model config: automatic attribute access | |
| - Mixed formats from Frontend/Backend mismatches | |
| This fixes the LLMModelConfig.get() AttributeError completely. | |
| """ | |
| try: | |
| if not hasattr(agent, 'llm_config') or not agent.llm_config: | |
| logger.debug(f"Agent {agent.id} has no llm_config, using default: {default}") | |
| return default | |
| llm_config = agent.llm_config | |
| # Case 1: Dictionary-based config (most common from frontend) | |
| if isinstance(llm_config, dict): | |
| value = llm_config.get(key, default) | |
| logger.debug(f"Dict config access: {key}={value}") | |
| return value | |
| # Case 2: Object with direct attribute access (Pydantic models) | |
| elif hasattr(llm_config, key): | |
| value = getattr(llm_config, key, default) | |
| logger.debug(f"Attribute access: {key}={value}") | |
| return value | |
| # Case 3: Object with get() method (dict-like objects) | |
| elif hasattr(llm_config, 'get') and callable(getattr(llm_config, 'get')): | |
| value = llm_config.get(key, default) | |
| logger.debug(f"Method get() access: {key}={value}") | |
| return value | |
| # Case 4: Try converting object to dict first | |
| elif hasattr(llm_config, '__dict__'): | |
| config_dict = llm_config.__dict__ | |
| if key in config_dict: | |
| value = config_dict[key] | |
| logger.debug(f"__dict__ access: {key}={value}") | |
| return value | |
| # Case 5: Last resort - try str() representation parsing | |
| else: | |
| logger.warning(f"Unknown config type {type(llm_config)} for {key}, using default: {default}") | |
| return default | |
| except AttributeError as e: | |
| logger.warning(f"β οΈ AttributeError in LLM config access for {key}: {e}") | |
| return default | |
| except Exception as e: | |
| logger.error(f"β Unexpected error in LLM config access for {key}: {e}") | |
| return default | |
| async def initialize(self): | |
| """Initialize both colossus and OpenRouter clients""" | |
| # Initialize base service (colossus + database) | |
| await super().initialize() | |
| # Initialize OpenRouter client | |
| if self.openrouter_api_key: | |
| try: | |
| logger.info("π Initializing OpenRouter client...") | |
| self.openrouter_client = OpenRouterClient(self.openrouter_api_key) | |
| await self.openrouter_client.__aenter__() | |
| # Test OpenRouter connection | |
| health = await self.openrouter_client.health_check() | |
| if health["status"] == "healthy": | |
| logger.info("β OpenRouter client initialized successfully") | |
| else: | |
| logger.warning(f"β οΈ OpenRouter health check failed: {health.get('error')}") | |
| except Exception as e: | |
| logger.error(f"β OpenRouter initialization failed: {e}") | |
| self.openrouter_client = None | |
| logger.info(f"π Hybrid initialization complete - Providers: colossus={self.colossus_client is not None}, OpenRouter={self.openrouter_client is not None}") | |
| async def send_message_to_agent(self, agent_id: str, message: str, provider: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Enhanced message sending with multi-provider support | |
| Args: | |
| agent_id: Target agent identifier | |
| message: Message content | |
| provider: Force specific provider ("colossus", "openrouter", None=auto) | |
| Returns: | |
| Response with provider info and cost data | |
| """ | |
| # Validate agent exists | |
| agent = self.get_agent(agent_id) | |
| if not agent: | |
| return { | |
| "error": f"Agent {agent_id} not found in loaded agents", | |
| "available_agents": list(self.agents.keys()), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| # Provider selection logic | |
| selected_provider = provider or self.primary_provider | |
| # Try primary provider first | |
| if selected_provider == "colossus" and self.colossus_client: | |
| result = await self._send_via_colossus(agent.id, message, agent) | |
| # If colossus fails and failover enabled, try OpenRouter | |
| if "error" in result and self.enable_failover and self.openrouter_client: | |
| logger.info(f"π colossus failed, attempting OpenRouter failover for {agent_id}") | |
| openrouter_result = await self._send_openrouter_message(agent, message) | |
| # Add failover info to response | |
| if "error" not in openrouter_result: | |
| openrouter_result["failover_used"] = True | |
| openrouter_result["primary_provider_error"] = result.get("error") | |
| return openrouter_result | |
| return result | |
| elif selected_provider == "openrouter" and self.openrouter_client: | |
| result = await self._send_openrouter_message(agent, message) | |
| # If OpenRouter fails and failover enabled, try colossus | |
| if "error" in result and self.enable_failover and self.colossus_client: | |
| logger.info(f"π OpenRouter failed, attempting colossus failover for {agent_id}") | |
| colossus_result = await super().send_message_to_agent(agent_id, message) | |
| # Add failover info | |
| if "error" not in colossus_result: | |
| colossus_result["failover_used"] = True | |
| colossus_result["primary_provider_error"] = result.get("error") | |
| colossus_result["provider"] = "colossus" | |
| return colossus_result | |
| return result | |
| else: | |
| # No provider available or provider not found | |
| available_providers = [] | |
| if self.colossus_client: | |
| available_providers.append("colossus") | |
| if self.openrouter_client: | |
| available_providers.append("openrouter") | |
| return { | |
| "error": f"Provider '{selected_provider}' not available", | |
| "available_providers": available_providers, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| async def _send_openrouter_message(self, agent: SaapAgent, message: str) -> Dict[str, Any]: | |
| """Send message via OpenRouter with cost tracking""" | |
| start_time = datetime.utcnow() | |
| try: | |
| # Prepare messages for OpenRouter | |
| messages = [ | |
| {"role": "system", "content": agent.description or f"You are {agent.name}"}, | |
| {"role": "user", "content": message} | |
| ] | |
| logger.info(f"π€ Sending to OpenRouter: {agent.name} ({agent.id})") | |
| # Send to OpenRouter | |
| response: OpenRouterResponse = await self.openrouter_client.chat_completion( | |
| messages=messages, | |
| agent_id=agent.id | |
| ) | |
| response_time = (datetime.utcnow() - start_time).total_seconds() | |
| # Update provider stats | |
| self.provider_stats["openrouter"]["requests"] += 1 | |
| self.provider_stats["openrouter"]["total_time"] += response_time | |
| if response.success: | |
| self.provider_stats["openrouter"]["successes"] += 1 | |
| self.provider_stats["openrouter"]["total_cost"] += response.cost_usd | |
| # Update agent metrics | |
| if agent.metrics: | |
| agent.metrics.messages_processed += 1 | |
| agent.metrics.last_active = datetime.utcnow() | |
| agent.metrics.avg_response_time = ( | |
| (agent.metrics.avg_response_time * (agent.metrics.messages_processed - 1) + response_time) | |
| / agent.metrics.messages_processed | |
| ) | |
| # Try to save to database if available | |
| if db_manager.is_initialized: | |
| try: | |
| async with db_manager.get_async_session() as session: | |
| chat_message = DBChatMessage( | |
| agent_id=agent.id, | |
| user_message=message, | |
| agent_response=response.content, | |
| response_time=response_time, | |
| tokens_used=response.tokens_used, | |
| metadata={ | |
| "provider": "openrouter", | |
| "model": response.model, | |
| "cost_usd": response.cost_usd, | |
| "input_tokens": response.input_tokens, | |
| "output_tokens": response.output_tokens | |
| } | |
| ) | |
| session.add(chat_message) | |
| await session.commit() | |
| except Exception as db_error: | |
| logger.warning(f"β οΈ Failed to save OpenRouter chat to database: {db_error}") | |
| # Log successful response | |
| logger.info(f"β OpenRouter success: {agent.name} - {response_time:.2f}s, ${response.cost_usd:.6f}, {response.tokens_used} tokens") | |
| return { | |
| "content": response.content, | |
| "response_time": response_time, | |
| "tokens_used": response.tokens_used, | |
| "cost_usd": response.cost_usd, | |
| "provider": "openrouter", | |
| "model": response.model, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "cost_efficiency": response.to_dict()["cost_efficiency"] | |
| } | |
| else: | |
| logger.error(f"β OpenRouter error for {agent.name}: {response.error}") | |
| return { | |
| "error": f"OpenRouter API error: {response.error}", | |
| "provider": "openrouter", | |
| "model": response.model, | |
| "response_time": response_time, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| error_msg = f"OpenRouter request failed: {str(e)}" | |
| logger.error(f"β {error_msg}") | |
| return { | |
| "error": error_msg, | |
| "provider": "openrouter", | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| async def compare_providers(self, agent_id: str, message: str) -> Dict[str, Any]: | |
| """ | |
| Send same message to both providers for performance comparison | |
| Useful for benchmarking and cost analysis | |
| """ | |
| if not (self.colossus_client and self.openrouter_client): | |
| return {"error": "Both providers required for comparison"} | |
| logger.info(f"π Starting provider comparison for {agent_id}") | |
| # Send to both providers simultaneously | |
| tasks = [ | |
| self.send_message_to_agent(agent_id, message, "colossus"), | |
| self.send_message_to_agent(agent_id, message, "openrouter") | |
| ] | |
| try: | |
| colossus_result, openrouter_result = await asyncio.gather(*tasks, return_exceptions=True) | |
| # Handle exceptions | |
| if isinstance(colossus_result, Exception): | |
| colossus_result = {"error": str(colossus_result), "provider": "colossus"} | |
| if isinstance(openrouter_result, Exception): | |
| openrouter_result = {"error": str(openrouter_result), "provider": "openrouter"} | |
| # Create comparison report | |
| comparison = { | |
| "agent_id": agent_id, | |
| "message": message[:100] + "..." if len(message) > 100 else message, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "colossus": colossus_result, | |
| "openrouter": openrouter_result, | |
| "comparison": {} | |
| } | |
| # Calculate comparison metrics if both succeeded | |
| if "error" not in colossus_result and "error" not in openrouter_result: | |
| colossus_time = colossus_result.get("response_time", 0) | |
| openrouter_time = openrouter_result.get("response_time", 0) | |
| openrouter_cost = openrouter_result.get("cost_usd", 0) | |
| comparison["comparison"] = { | |
| "speed_winner": "colossus" if colossus_time < openrouter_time else "openrouter", | |
| "speed_difference": abs(colossus_time - openrouter_time), | |
| "cost_openrouter": openrouter_cost, | |
| "cost_colossus": 0.0, # colossus is free | |
| "quality_comparison": "Both responses available for manual review" | |
| } | |
| logger.info(f"π Comparison complete: colossus {colossus_time:.2f}s vs OpenRouter {openrouter_time:.2f}s (${openrouter_cost:.6f})") | |
| # Store comparison data | |
| self.cost_comparisons.append(comparison) | |
| # Keep only last 100 comparisons | |
| if len(self.cost_comparisons) > 100: | |
| self.cost_comparisons = self.cost_comparisons[-100:] | |
| return comparison | |
| except Exception as e: | |
| logger.error(f"β Provider comparison failed: {e}") | |
| return { | |
| "error": f"Comparison failed: {str(e)}", | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| def get_provider_stats(self) -> Dict[str, Any]: | |
| """Get comprehensive provider performance statistics""" | |
| stats = {} | |
| for provider, data in self.provider_stats.items(): | |
| if data["requests"] > 0: | |
| avg_response_time = data["total_time"] / data["requests"] | |
| success_rate = (data["successes"] / data["requests"]) * 100 | |
| avg_cost = data["total_cost"] / data["successes"] if data["successes"] > 0 else 0 | |
| else: | |
| avg_response_time = 0 | |
| success_rate = 0 | |
| avg_cost = 0 | |
| stats[provider] = { | |
| "total_requests": data["requests"], | |
| "successful_requests": data["successes"], | |
| "success_rate_percent": round(success_rate, 1), | |
| "avg_response_time_seconds": round(avg_response_time, 2), | |
| "total_cost_usd": round(data["total_cost"], 4), | |
| "avg_cost_per_request": round(avg_cost, 6) | |
| } | |
| # Add OpenRouter budget info if available | |
| if self.openrouter_client: | |
| openrouter_budget = self.openrouter_client.get_cost_summary() | |
| stats["openrouter"]["budget_info"] = openrouter_budget | |
| return { | |
| "provider_stats": stats, | |
| "primary_provider": self.primary_provider, | |
| "failover_enabled": self.enable_failover, | |
| "comparison_enabled": self.enable_cost_comparison, | |
| "total_comparisons": len(self.cost_comparisons), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| async def set_primary_provider(self, provider: str) -> bool: | |
| """Switch primary provider (colossus/openrouter)""" | |
| if provider not in ["colossus", "openrouter"]: | |
| logger.error(f"β Invalid provider: {provider}") | |
| return False | |
| if provider == "colossus" and not self.colossus_client: | |
| logger.error("β colossus client not available") | |
| return False | |
| if provider == "openrouter" and not self.openrouter_client: | |
| logger.error("β OpenRouter client not available") | |
| return False | |
| old_provider = self.primary_provider | |
| self.primary_provider = provider | |
| logger.info(f"π Primary provider switched: {old_provider} β {provider}") | |
| return True | |
| async def shutdown_all_agents(self): | |
| """Enhanced shutdown with OpenRouter cleanup""" | |
| # Shutdown base service | |
| await super().shutdown_all_agents() | |
| # Cleanup OpenRouter client | |
| if self.openrouter_client: | |
| await self.openrouter_client.__aexit__(None, None, None) | |
| logger.info("π OpenRouter client closed") | |
| logger.info("β Hybrid Agent Manager shutdown complete") | |
| # Example usage and testing | |
| if __name__ == "__main__": | |
| async def test_hybrid_manager(): | |
| """Test hybrid agent manager functionality""" | |
| manager = HybridAgentManagerService() | |
| await manager.initialize() | |
| # List agents | |
| agents = list(manager.agents.values()) | |
| print(f"π Agents loaded: {[a.name for a in agents]}") | |
| if agents: | |
| agent = agents[0] | |
| # Test both providers | |
| print(f"\nπ§ͺ Testing {agent.name} with both providers") | |
| # colossus test | |
| result1 = await manager.send_message_to_agent(agent.id, "Hello from colossus test", "colossus") | |
| print(f"colossus: {'β ' if 'error' not in result1 else 'β'} - {result1.get('response_time', 'N/A')}s") | |
| # OpenRouter test | |
| result2 = await manager.send_message_to_agent(agent.id, "Hello from OpenRouter test", "openrouter") | |
| print(f"OpenRouter: {'β ' if 'error' not in result2 else 'β'} - {result2.get('response_time', 'N/A')}s") | |
| # Provider comparison | |
| comparison = await manager.compare_providers(agent.id, "Tell me a joke") | |
| print(f"\nπ Comparison: {comparison.get('comparison', {})}") | |
| # Provider stats | |
| stats = manager.get_provider_stats() | |
| print(f"\nπ Provider Stats: {stats}") | |
| await manager.shutdown_all_agents() | |
| asyncio.run(test_hybrid_manager()) |