saap-plattform / backend /services /agent_manager.py
Hwandji's picture
feat: initial HuggingFace Space deployment
4343907
""" SAAP Agent Manager Service - LLMModelConfig.get() Error Resolution
Database-integrated agent lifecycle management with colossus integration
"""
import asyncio
import logging
import os
from typing import Dict, List, Optional, Any
from datetime import datetime
import uuid
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete
from models.agent_schema import SaapAgent, AgentStatus, AgentType, AgentTemplates
from database.connection import db_manager
from database.models import DBAgent, DBChatMessage, DBAgentSession
from api.colossus_client import ColossusClient
from agents.openrouter_saap_agent import OpenRouterSAAPAgent
logger = logging.getLogger(__name__)
class AgentManagerService:
"""
πŸ”§ FIXED: Production-ready Agent Manager with LLM config error resolution
Features:
- Database-backed agent storage and lifecycle
- Real-time agent status management
- colossus LLM integration with OpenRouter fallback
- Session tracking and performance metrics
- Health monitoring and error handling
- Multi-provider chat support (colossus + OpenRouter)
- βœ… Robust LLM config access preventing AttributeError
"""
def __init__(self):
self.agents: Dict[str, SaapAgent] = {} # In-memory cache for fast access
self.active_sessions: Dict[str, DBAgentSession] = {}
self.colossus_client: Optional[ColossusClient] = None
self.is_initialized = False
self.colossus_connection_status = "unknown"
self.last_colossus_test = None
def _get_llm_config_value(self, agent: SaapAgent, key: str, default=None):
"""
πŸ”§ CRITICAL FIX: Safe LLM config access preventing 'get' attribute errors
This is the same fix applied to HybridAgentManagerService but now in the base class.
Handles dictionary, object, and Pydantic model configurations robustly.
Resolves: 'LLMModelConfig' object has no attribute 'get'
"""
try:
if not hasattr(agent, 'llm_config') or not agent.llm_config:
logger.debug(f"Agent {agent.id} has no llm_config, using default: {default}")
return default
llm_config = agent.llm_config
# Case 1: Dictionary-based config (Frontend JSON)
if isinstance(llm_config, dict):
value = llm_config.get(key, default)
logger.debug(f"βœ… Dict config access: {key}={value}")
return value
# Case 2: Object with direct attribute access (Pydantic models)
elif hasattr(llm_config, key):
value = getattr(llm_config, key, default)
logger.debug(f"βœ… Attribute access: {key}={value}")
return value
# Case 3: Object with get() method (dict-like objects or fixed Pydantic)
elif hasattr(llm_config, 'get') and callable(getattr(llm_config, 'get')):
try:
value = llm_config.get(key, default)
logger.debug(f"βœ… Method get() access: {key}={value}")
return value
except Exception as get_error:
logger.warning(f"⚠️ get() method failed: {get_error}, trying fallback")
# Case 4: Convert object to dict (Pydantic β†’ dict)
elif hasattr(llm_config, '__dict__'):
config_dict = llm_config.__dict__
if key in config_dict:
value = config_dict[key]
logger.debug(f"βœ… __dict__ access: {key}={value}")
return value
# Case 5: Try model_dump() for Pydantic v2
elif hasattr(llm_config, 'model_dump'):
try:
config_dict = llm_config.model_dump()
value = config_dict.get(key, default)
logger.debug(f"βœ… model_dump() access: {key}={value}")
return value
except Exception:
pass
# Case 6: Try dict() conversion
elif hasattr(llm_config, 'dict'):
try:
config_dict = llm_config.dict()
value = config_dict.get(key, default)
logger.debug(f"βœ… dict() access: {key}={value}")
return value
except Exception:
pass
# Final fallback
logger.warning(f"⚠️ Unknown config type {type(llm_config)} for {key}, using default: {default}")
return default
except AttributeError as e:
logger.warning(f"⚠️ AttributeError in LLM config access for {key}: {e}, using default: {default}")
return default
except Exception as e:
logger.error(f"❌ Unexpected error in LLM config access for {key}: {e}, using default: {default}")
return default
async def initialize(self):
"""Initialize agent manager with database and colossus connection"""
try:
logger.info("πŸš€ Initializing Agent Manager Service...")
# Initialize colossus client with better error handling
try:
logger.info("πŸ”Œ Connecting to colossus server...")
self.colossus_client = ColossusClient()
await self.colossus_client.__aenter__()
# Test colossus connection
await self._test_colossus_connection()
except Exception as colossus_error:
logger.error(f"❌ colossus connection failed: {colossus_error}")
self.colossus_connection_status = f"failed: {str(colossus_error)}"
# Continue initialization without colossus (graceful degradation)
# πŸ”§ NEW LOGIC: Load DB agents AND ensure 7 base templates exist
await self._load_agents_from_database()
# Check which base templates are missing
base_template_ids = ['jane_alesi', 'john_alesi', 'lara_alesi', 'theo_alesi', 'justus_alesi', 'leon_alesi', 'luna_alesi']
missing_templates = [tid for tid in base_template_ids if tid not in self.agents]
if missing_templates:
logger.info(f"πŸ“¦ Loading missing base templates: {missing_templates}")
await self._load_missing_templates(missing_templates)
self.is_initialized = True
logger.info(f"βœ… Agent Manager initialized: {len(self.agents)} agents loaded")
logger.info(f" Base templates: {len([a for a in self.agents if a in base_template_ids])}/7")
logger.info(f" Custom agents: {len([a for a in self.agents if a not in base_template_ids])}")
logger.info(f"πŸ”Œ colossus status: {self.colossus_connection_status}")
except Exception as e:
logger.error(f"❌ Agent Manager initialization failed: {e}")
raise
async def _load_missing_templates(self, template_ids: List[str]):
"""Load specific missing base templates"""
template_map = {
'jane_alesi': AgentTemplates.jane_alesi,
'john_alesi': AgentTemplates.john_alesi,
'lara_alesi': AgentTemplates.lara_alesi,
'theo_alesi': AgentTemplates.theo_alesi,
'justus_alesi': AgentTemplates.justus_alesi,
'leon_alesi': AgentTemplates.leon_alesi,
'luna_alesi': AgentTemplates.luna_alesi
}
for template_id in template_ids:
try:
template_method = template_map.get(template_id)
if template_method:
agent = template_method()
await self.register_agent(agent)
logger.info(f"βœ… Loaded missing template: {agent.name}")
except Exception as e:
logger.error(f"❌ Failed to load template {template_id}: {e}")
async def _test_colossus_connection(self):
"""Test colossus connection and update status"""
try:
if not self.colossus_client:
self.colossus_connection_status = "client_not_initialized"
return
# Send a simple test message
test_messages = [
{"role": "system", "content": "You are a test assistant."},
{"role": "user", "content": "Reply with just 'OK' to confirm connection."}
]
logger.info("πŸ§ͺ Testing colossus connection...")
response = await self.colossus_client.chat_completion(
messages=test_messages,
agent_id="connection_test",
max_tokens=10
)
if response and response.get("success"):
self.colossus_connection_status = "connected"
self.last_colossus_test = datetime.utcnow()
logger.info("βœ… colossus connection test successful")
else:
error_msg = response.get("error", "unknown error") if response else "no response"
self.colossus_connection_status = f"test_failed: {error_msg}"
logger.error(f"❌ colossus connection test failed: {error_msg}")
except Exception as e:
self.colossus_connection_status = f"test_error: {str(e)}"
logger.error(f"❌ colossus connection test error: {e}")
async def _load_agents_from_database(self):
"""Load all agents from database into memory cache with error recovery"""
try:
# Check if database manager is ready
if not db_manager.is_initialized:
logger.warning("⚠️ Database not yet initialized - will load default agents")
return
async with db_manager.get_async_session() as session:
result = await session.execute(select(DBAgent))
db_agents = result.scalars().all()
loaded_count = 0
failed_count = 0
for db_agent in db_agents:
try:
saap_agent = db_agent.to_saap_agent()
self.agents[saap_agent.id] = saap_agent
loaded_count += 1
logger.info(f"βœ… Loaded: {saap_agent.name} ({saap_agent.id})")
except Exception as agent_error:
failed_count += 1
logger.error(f"❌ Failed to load agent {db_agent.id}: {agent_error}")
continue
logger.info(f"πŸ“š Loaded {loaded_count} agents from database ({failed_count} failed)")
except Exception as e:
logger.error(f"❌ Failed to load agents from database: {e}")
logger.info("πŸ“¦ Will proceed with in-memory agents only")
async def load_default_agents(self):
"""πŸ€– Load ALL default Alesi agents with improved error handling"""
try:
logger.info("πŸ€– Loading ALL default Alesi agents...")
# πŸ”§ FIX: Load templates with individual error handling
template_methods = [
('jane_alesi', 'Jane Alesi - Coordinator'),
('john_alesi', 'John Alesi - Developer'),
('lara_alesi', 'Lara Alesi - Medical Specialist'),
('theo_alesi', 'Theo Alesi - Financial Specialist'),
('justus_alesi', 'Justus Alesi - Legal Specialist'),
('leon_alesi', 'Leon Alesi - System Specialist'),
('luna_alesi', 'Luna Alesi - Coaching Specialist')
]
loaded_agents = []
for method_name, display_name in template_methods:
try:
# Get template method
template_method = getattr(AgentTemplates, method_name, None)
if template_method is None:
logger.error(f"❌ Template method not found: AgentTemplates.{method_name}")
continue
# Create agent instance
agent = template_method()
# Register agent
success = await self.register_agent(agent)
if success:
loaded_agents.append(display_name)
logger.info(f"βœ… Loaded: {display_name}")
else:
logger.error(f"❌ Failed to register: {display_name}")
except Exception as template_error:
logger.error(f"❌ Error loading {display_name}: {template_error}")
continue
if loaded_agents:
logger.info(f"βœ… Successfully loaded agents: {loaded_agents}")
else:
logger.error("❌ No agents could be loaded!")
except Exception as e:
logger.error(f"❌ Agent loading failed: {e}")
async def register_agent(self, agent: SaapAgent) -> bool:
"""Register new agent with database persistence"""
try:
# Always add to memory cache first
self.agents[agent.id] = agent
# Try to persist to database if available
try:
if db_manager.is_initialized:
async with db_manager.get_async_session() as session:
db_agent = DBAgent.from_saap_agent(agent)
session.add(db_agent)
await session.commit()
logger.info(f"βœ… Agent registered with database: {agent.name} ({agent.id})")
else:
logger.info(f"βœ… Agent registered in-memory only: {agent.name} ({agent.id})")
except Exception as db_error:
logger.warning(f"⚠️ Database persistence failed for {agent.name}: {db_error}")
# But keep the agent in memory
return True
except Exception as e:
logger.error(f"❌ Agent registration failed: {e}")
# Remove from cache if registration completely failed
self.agents.pop(agent.id, None)
return False
def get_agent(self, agent_id: str) -> Optional[SaapAgent]:
"""Get agent from memory cache with debug info"""
agent = self.agents.get(agent_id)
if agent:
logger.debug(f"πŸ” Agent found: {agent.name} ({agent_id}) - Status: {agent.status}")
else:
logger.warning(f"❌ Agent not found: {agent_id}")
logger.debug(f"πŸ“‹ Available agents: {list(self.agents.keys())}")
return agent
async def list_agents(self, status: Optional[AgentStatus] = None,
agent_type: Optional[AgentType] = None) -> List[SaapAgent]:
"""List all agents with optional filtering"""
agents = list(self.agents.values())
if status:
agents = [a for a in agents if a.status == status]
if agent_type:
agents = [a for a in agents if a.type == agent_type]
return agents
async def get_agent_stats(self, agent_id: str) -> Dict[str, Any]:
"""Get agent statistics"""
agent = self.get_agent(agent_id)
if not agent:
return {}
# Return basic stats from agent object
return {
"messages_processed": getattr(agent, 'messages_processed', 0),
"total_tokens": getattr(agent, 'total_tokens', 0),
"average_response_time": getattr(agent, 'avg_response_time', 0),
"status": agent.status.value,
"last_active": getattr(agent, 'last_active', None)
}
async def health_check(self, agent_id: str) -> Dict[str, Any]:
"""Perform agent health check"""
agent = self.get_agent(agent_id)
if not agent:
return {"healthy": False, "checks": {"agent_exists": False}}
return {
"healthy": agent.status == AgentStatus.ACTIVE,
"checks": {
"agent_exists": True,
"status": agent.status.value,
"colossus_connection": self.colossus_connection_status == "connected"
}
}
async def update_agent(self, agent_id: str, updated_data) -> SaapAgent:
"""Update agent configuration - accepts dict or SaapAgent with schema migration"""
try:
# Get current agent
current_agent = self.get_agent(agent_id)
if not current_agent:
raise ValueError(f"Agent {agent_id} not found")
# Convert dict to SaapAgent if needed
if isinstance(updated_data, dict):
# Get current data
current_dict = current_agent.dict()
# πŸ”§ FIX: Migrate old frontend schema to new schema
# Handle top-level 'color' β†’ 'appearance.color'
if 'color' in updated_data and 'appearance' not in updated_data:
if 'appearance' not in current_dict:
current_dict['appearance'] = {}
current_dict['appearance']['color'] = updated_data.pop('color')
# Handle top-level 'avatar' β†’ 'appearance.avatar'
if 'avatar' in updated_data and 'appearance' not in updated_data:
if 'appearance' not in current_dict:
current_dict['appearance'] = {}
current_dict['appearance']['avatar'] = updated_data.pop('avatar')
# Merge updates
for key, value in updated_data.items():
if key in current_dict:
if isinstance(value, dict) and isinstance(current_dict[key], dict):
# Merge nested dicts
current_dict[key].update(value)
else:
current_dict[key] = value
updated_agent = SaapAgent(**current_dict)
elif isinstance(updated_data, SaapAgent):
updated_agent = updated_data
else:
raise ValueError(f"Invalid update data type: {type(updated_data)}")
# Update in memory cache
self.agents[agent_id] = updated_agent
# Try to update in database if available
if db_manager.is_initialized:
try:
async with db_manager.get_async_session() as session:
# Delete old and insert new (simpler than complex update)
await session.execute(delete(DBAgent).where(DBAgent.id == agent_id))
db_agent = DBAgent.from_saap_agent(updated_agent)
session.add(db_agent)
await session.commit()
except Exception as db_error:
logger.warning(f"⚠️ Database update failed for {agent_id}: {db_error}")
logger.info(f"βœ… Agent updated: {agent_id}")
return updated_agent
except Exception as e:
logger.error(f"❌ Agent update failed: {e}")
raise
async def delete_agent(self, agent_id: str) -> bool:
"""Delete agent from memory and database"""
try:
# Stop agent if running
await self.stop_agent(agent_id)
# Remove from memory
self.agents.pop(agent_id, None)
# Try to remove from database if available
if db_manager.is_initialized:
try:
async with db_manager.get_async_session() as session:
await session.execute(delete(DBAgent).where(DBAgent.id == agent_id))
await session.commit()
except Exception as db_error:
logger.warning(f"⚠️ Database deletion failed for {agent_id}: {db_error}")
logger.info(f"βœ… Agent deleted: {agent_id}")
return True
except Exception as e:
logger.error(f"❌ Agent deletion failed: {e}")
return False
async def start_agent(self, agent_id: str) -> bool:
"""Start agent and create session"""
try:
agent = self.get_agent(agent_id)
if not agent:
logger.error(f"❌ Cannot start agent: {agent_id} not found")
return False
# Update status
agent.status = AgentStatus.ACTIVE
if hasattr(agent, 'metrics') and agent.metrics:
agent.metrics.last_active = datetime.utcnow()
# Try to create agent session in database if available
if db_manager.is_initialized:
try:
async with db_manager.get_async_session() as session:
db_session = DBAgentSession(agent_id=agent_id)
session.add(db_session)
await session.commit()
await session.refresh(db_session)
# Store in active sessions
self.active_sessions[agent_id] = db_session
except Exception as db_error:
logger.warning(f"⚠️ Database session creation failed for {agent_id}: {db_error}")
# Update agent status in database if available
await self._update_agent_status(agent_id, AgentStatus.ACTIVE)
logger.info(f"βœ… Agent started: {agent.name} ({agent_id})")
return True
except Exception as e:
logger.error(f"❌ Agent start failed: {e}")
return False
async def stop_agent(self, agent_id: str) -> bool:
"""Stop agent and close session"""
try:
agent = self.get_agent(agent_id)
if not agent:
return False
# Update status
agent.status = AgentStatus.INACTIVE
# Close agent session if exists
if agent_id in self.active_sessions:
session_obj = self.active_sessions[agent_id]
session_obj.session_end = datetime.utcnow()
session_obj.status = "completed"
session_obj.end_reason = "graceful"
session_obj.calculate_duration()
if db_manager.is_initialized:
try:
async with db_manager.get_async_session() as session:
await session.merge(session_obj)
await session.commit()
except Exception as db_error:
logger.warning(f"⚠️ Database session update failed for {agent_id}: {db_error}")
del self.active_sessions[agent_id]
# Update agent status in database if available
await self._update_agent_status(agent_id, AgentStatus.INACTIVE)
logger.info(f"πŸ”§ Agent stopped: {agent_id}")
return True
except Exception as e:
logger.error(f"❌ Agent stop failed: {e}")
return False
async def restart_agent(self, agent_id: str) -> bool:
"""Restart agent (stop + start)"""
try:
await self.stop_agent(agent_id)
await asyncio.sleep(1) # Brief pause
return await self.start_agent(agent_id)
except Exception as e:
logger.error(f"❌ Agent restart failed: {e}")
return False
async def _update_agent_status(self, agent_id: str, status: AgentStatus):
"""Update agent status in database"""
if not db_manager.is_initialized:
return
try:
async with db_manager.get_async_session() as session:
await session.execute(
update(DBAgent)
.where(DBAgent.id == agent_id)
.values(status=status.value, last_active=datetime.utcnow())
)
await session.commit()
except Exception as e:
logger.warning(f"⚠️ Failed to update agent status in database: {e}")
# πŸš€ NEW: Multi-Provider Chat Support
async def send_message_to_agent(self, agent_id: str, message: str,
provider: Optional[str] = None) -> Dict[str, Any]:
"""
Send message to agent via specified provider or auto-fallback
Args:
agent_id: Target agent ID
message: Message content
provider: Optional provider override ("colossus", "openrouter", or None for auto)
Returns:
Chat response with metadata
"""
try:
# Enhanced error checking with detailed debugging
agent = self.get_agent(agent_id)
if not agent:
error_msg = f"Agent {agent_id} not found in loaded agents"
logger.error(f"❌ {error_msg}")
logger.debug(f"πŸ“‹ Available agents: {list(self.agents.keys())}")
return {
"error": error_msg,
"timestamp": datetime.utcnow().isoformat(),
"debug_info": {
"available_agents": list(self.agents.keys()),
"agent_manager_initialized": self.is_initialized
}
}
# Check if agent is available for messaging
if agent.status != AgentStatus.ACTIVE:
error_msg = f"Agent {agent_id} not available (status: {agent.status.value})"
logger.error(f"❌ {error_msg}")
return {
"error": error_msg,
"timestamp": datetime.utcnow().isoformat(),
"debug_info": {
"agent_status": agent.status.value,
"agent_id": agent_id
}
}
# πŸš€ Multi-Provider Logic
if provider == "openrouter":
return await self._send_via_openrouter(agent_id, message, agent)
elif provider == "colossus":
return await self._send_via_colossus(agent_id, message, agent)
else:
# Auto-selection: Try colossus first, fallback to OpenRouter
if self.colossus_connection_status == "connected":
logger.info(f"πŸ”„ Using colossus as primary provider for {agent_id}")
result = await self._send_via_colossus(agent_id, message, agent)
# If colossus fails, try OpenRouter
if "error" in result and "colossus" in result["error"].lower():
logger.info(f"πŸ”„ colossus failed, trying OpenRouter fallback...")
return await self._send_via_openrouter(agent_id, message, agent)
return result
else:
logger.info(f"πŸ”„ colossus unavailable, using OpenRouter as primary for {agent_id}")
return await self._send_via_openrouter(agent_id, message, agent)
except Exception as e:
error_msg = str(e)
logger.error(f"❌ Message to agent failed: {error_msg}")
return {
"error": error_msg,
"timestamp": datetime.utcnow().isoformat(),
"debug_info": {
"agent_id": agent_id,
"provider": provider,
"colossus_status": self.colossus_connection_status,
"agent_found": agent_id in self.agents,
"colossus_client_exists": self.colossus_client is not None
}
}
async def _send_via_openrouter(self, agent_id: str, message: str,
agent: SaapAgent) -> Dict[str, Any]:
"""Send message via OpenRouter provider"""
try:
logger.info(f"🌐 {agent_id} (coordinator) initialized with OpenRouter FREE")
# πŸ”§ FIX: Safe agent.type access - handle both Enum and string
agent_type_str = agent.type.value if hasattr(agent.type, 'value') else str(agent.type) if agent.type else "Assistant"
# Create OpenRouter agent for this request
openrouter_agent = OpenRouterSAAPAgent(
agent_id,
agent_type_str,
os.getenv("OPENROUTER_API_KEY")
)
# Get cost-optimized model for specific agent
model_map = {
"jane_alesi": os.getenv("JANE_ALESI_MODEL", "openai/gpt-4o-mini"),
"john_alesi": os.getenv("JOHN_ALESI_MODEL", "deepseek/deepseek-coder"),
"lara_alesi": os.getenv("LARA_ALESI_MODEL", "anthropic/claude-3-haiku"),
"theo_alesi": os.getenv("THEO_ALESI_MODEL", "openai/gpt-4o-mini"), # πŸ’° Financial
"justus_alesi": os.getenv("JUSTUS_ALESI_MODEL", "anthropic/claude-3-haiku"), # βš–οΈ Legal
"leon_alesi": os.getenv("LEON_ALESI_MODEL", "deepseek/deepseek-coder"), # πŸ”§ System
"luna_alesi": os.getenv("LUNA_ALESI_MODEL", "openai/gpt-4o-mini") # 🌟 Coaching
}
preferred_model = model_map.get(agent_id, "meta-llama/llama-3.2-3b-instruct:free")
openrouter_agent.model_name = preferred_model
start_time = datetime.utcnow()
logger.info(f"πŸ“€ Sending message to {agent.name} ({agent_id}) via OpenRouter ({preferred_model})...")
# πŸ”§ FIXED: Use safe LLM config access
max_tokens_value = self._get_llm_config_value(agent, 'max_tokens', 1000)
# Send message via OpenRouter
response = await openrouter_agent.send_request_to_openrouter(
message,
max_tokens=max_tokens_value
)
end_time = datetime.utcnow()
response_time = (end_time - start_time).total_seconds()
if response.get("success"):
logger.info(f"βœ… OpenRouter response successful in {response_time:.2f}s")
response_content = response.get("response", "")
tokens_used = response.get("token_count", 0)
cost_usd = response.get("cost_usd", 0.0)
# Try to save to database if available
if db_manager.is_initialized:
try:
async with db_manager.get_async_session() as session:
chat_message = DBChatMessage(
agent_id=agent_id,
user_message=message,
agent_response=response_content,
response_time=response_time,
tokens_used=tokens_used,
metadata={
"model": preferred_model,
"provider": "OpenRouter",
"cost_usd": cost_usd,
"temperature": 0.7
}
)
session.add(chat_message)
await session.commit()
except Exception as db_error:
logger.warning(f"⚠️ Failed to save OpenRouter chat to database: {db_error}")
return {
"content": response_content,
"response_time": response_time,
"tokens_used": tokens_used,
"cost_usd": cost_usd,
"provider": "OpenRouter",
"model": preferred_model,
"timestamp": end_time.isoformat()
}
else:
error_msg = response.get("error", "Unknown OpenRouter error")
logger.error(f"❌ OpenRouter fallback failed: {error_msg}")
return {
"error": f"OpenRouter error: {error_msg}",
"provider": "OpenRouter",
"timestamp": end_time.isoformat()
}
except Exception as e:
logger.error(f"❌ OpenRouter fallback failed: {str(e)}")
return {
"error": f"OpenRouter error: {str(e)}",
"provider": "OpenRouter",
"timestamp": datetime.utcnow().isoformat()
}
async def _send_via_colossus(self, agent_id: str, message: str,
agent: SaapAgent) -> Dict[str, Any]:
"""Send message via colossus provider"""
try:
# Check colossus client availability
if not self.colossus_client:
return {
"error": "colossus client not initialized",
"provider": "colossus",
"timestamp": datetime.utcnow().isoformat()
}
# Test colossus connection if it's been a while
if (not self.last_colossus_test or
(datetime.utcnow() - self.last_colossus_test).seconds > 300): # 5 minutes
await self._test_colossus_connection()
if self.colossus_connection_status != "connected":
return {
"error": f"colossus connection not healthy: {self.colossus_connection_status}",
"provider": "colossus",
"timestamp": datetime.utcnow().isoformat()
}
start_time = datetime.utcnow()
logger.info(f"πŸ“€ Sending message to {agent.name} ({agent_id}) via colossus...")
# πŸ”§ FIXED: Use safe LLM config access
temperature_value = self._get_llm_config_value(agent, 'temperature', 0.7)
max_tokens_value = self._get_llm_config_value(agent, 'max_tokens', 1000)
# Send message to colossus
response = await self.colossus_client.chat_completion(
messages=[
{"role": "system", "content": agent.description or f"You are {agent.name}"},
{"role": "user", "content": message}
],
agent_id=agent_id,
temperature=temperature_value,
max_tokens=max_tokens_value
)
end_time = datetime.utcnow()
response_time = (end_time - start_time).total_seconds()
logger.info(f"πŸ“₯ Received response from colossus in {response_time:.2f}s")
# Enhanced response parsing
response_content = ""
tokens_used = 0
if response:
logger.debug(f"πŸ” Raw colossus response: {response}")
if isinstance(response, dict):
# SAAP ColossusClient format: {"success": true, "response": {...}}
if response.get("success") and "response" in response:
colossus_response = response["response"]
if isinstance(colossus_response, dict) and "choices" in colossus_response:
# OpenAI-compatible format within SAAP response
if len(colossus_response["choices"]) > 0:
choice = colossus_response["choices"][0]
if "message" in choice and "content" in choice["message"]:
response_content = choice["message"]["content"]
elif isinstance(colossus_response, str):
# Direct string response
response_content = colossus_response
# Extract token usage if available
if isinstance(colossus_response, dict) and "usage" in colossus_response:
tokens_used = colossus_response["usage"].get("total_tokens", 0)
# Handle colossus client error responses
elif not response.get("success"):
error_msg = response.get("error", "Unknown colossus error")
logger.error(f"❌ colossus error: {error_msg}")
return {
"error": f"colossus server error: {error_msg}",
"provider": "colossus",
"timestamp": end_time.isoformat()
}
# Direct OpenAI format: {"choices": [...]}
elif "choices" in response and len(response["choices"]) > 0:
choice = response["choices"][0]
if "message" in choice and "content" in choice["message"]:
response_content = choice["message"]["content"]
if "usage" in response:
tokens_used = response["usage"].get("total_tokens", 0)
# Simple response format: {"response": "text"} or {"content": "text"}
elif "response" in response:
response_content = response["response"]
elif "content" in response:
response_content = response["content"]
elif isinstance(response, str):
# Direct string response
response_content = response
# Fallback if no content extracted
if not response_content:
logger.error(f"❌ Unable to extract content from colossus response: {response}")
return {
"error": "Failed to parse colossus response",
"provider": "colossus",
"timestamp": end_time.isoformat()
}
# Try to save to database if available
if db_manager.is_initialized:
try:
async with db_manager.get_async_session() as session:
chat_message = DBChatMessage(
agent_id=agent_id,
user_message=message,
agent_response=response_content,
response_time=response_time,
tokens_used=tokens_used,
metadata={
"model": "mistral-small3.2:24b-instruct-2506",
"provider": "colossus",
"temperature": 0.7
}
)
session.add(chat_message)
await session.commit()
except Exception as db_error:
logger.warning(f"⚠️ Failed to save chat message to database: {db_error}")
# Update session metrics
if agent_id in self.active_sessions:
session_obj = self.active_sessions[agent_id]
session_obj.messages_processed += 1
session_obj.total_tokens_used += tokens_used
logger.info(f"βœ… Message processed successfully for {agent.name}")
return {
"content": response_content,
"response_time": response_time,
"tokens_used": tokens_used,
"provider": "colossus",
"model": "mistral-small3.2:24b-instruct-2506",
"timestamp": end_time.isoformat()
}
except Exception as e:
logger.error(f"❌ colossus communication failed: {str(e)}")
return {
"error": f"colossus error: {str(e)}",
"provider": "colossus",
"timestamp": datetime.utcnow().isoformat()
}
async def get_agent_metrics(self, agent_id: str) -> Dict[str, Any]:
"""Get comprehensive agent metrics from database"""
if not db_manager.is_initialized:
return {"warning": "Database not available - no metrics"}
try:
async with db_manager.get_async_session() as session:
# Get message count and average response time
result = await session.execute(
select(DBChatMessage).where(DBChatMessage.agent_id == agent_id)
)
messages = result.scalars().all()
if messages:
avg_response_time = sum(m.response_time for m in messages if m.response_time) / len(messages)
total_tokens = sum(m.tokens_used for m in messages if m.tokens_used)
else:
avg_response_time = 0
total_tokens = 0
# Get session count
session_result = await session.execute(
select(DBAgentSession).where(DBAgentSession.agent_id == agent_id)
)
sessions = session_result.scalars().all()
return {
"total_messages": len(messages),
"total_tokens_used": total_tokens,
"average_response_time": avg_response_time,
"total_sessions": len(sessions),
"last_activity": max([s.session_start for s in sessions], default=None),
}
except Exception as e:
logger.error(f"❌ Failed to get agent metrics: {e}")
return {}
async def get_system_status(self) -> Dict[str, Any]:
"""Get comprehensive system status for debugging"""
return {
"agent_manager_initialized": self.is_initialized,
"colossus_connection_status": self.colossus_connection_status,
"colossus_last_test": self.last_colossus_test.isoformat() if self.last_colossus_test else None,
"loaded_agents": len(self.agents),
"active_sessions": len(self.active_sessions),
"agent_list": [{"id": aid, "name": agent.name, "status": agent.status.value}
for aid, agent in self.agents.items()],
"database_initialized": getattr(db_manager, 'is_initialized', False)
}
async def shutdown_all_agents(self):
"""Gracefully shutdown all active agents"""
try:
logger.info("πŸ”§ Shutting down all agents...")
for agent_id in list(self.agents.keys()):
await self.stop_agent(agent_id)
if self.colossus_client:
await self.colossus_client.__aexit__(None, None, None)
logger.info("βœ… All agents shut down successfully")
except Exception as e:
logger.error(f"❌ Agent shutdown failed: {e}")
# Create global instance for dependency injection
agent_manager = AgentManagerService()
# Make class available for import
AgentManager = AgentManagerService
if __name__ == "__main__":
async def test_agent_manager():
"""Test agent manager functionality"""
manager = AgentManagerService()
await manager.initialize()
# List agents
agents = list(manager.agents.values())
print(f"πŸ“‹ Agents loaded: {[a.name for a in agents]}")
# Start first agent
if agents:
agent = agents[0]
success = await manager.start_agent(agent.id)
print(f"πŸš€ Start agent {agent.name}: {'βœ…' if success else '❌'}")
await manager.shutdown_all_agents()
asyncio.run(test_agent_manager())