saap-plattform / backend /agent_manager_fixed.py
Hwandji's picture
feat: initial HuggingFace Space deployment
4343907
raw
history blame
43.9 kB
"""
πŸ”§ CRITICAL FIX: Agent Settings/Update Problem - Data Type Mismatch Resolved
Fixed both AgentManagerService.update_agent() method signature issues
PROBLEM SOLVED:
- ❌ 'dict' object has no attribute 'id' β†’ βœ… Proper data conversion
- ❌ 'dict' object has no attribute 'name' β†’ βœ… SaapAgent object handling
- ❌ Agent Settings Modal fails β†’ βœ… Frontend-Backend compatibility
This fixes the Agent Settings/Update functionality completely.
"""
import asyncio
import logging
import os
from typing import Dict, List, Optional, Any, Union
from datetime import datetime
import uuid
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete
from models.agent import SaapAgent, AgentStatus, AgentType, AgentTemplates
from database.connection import db_manager
from database.models import DBAgent, DBChatMessage, DBAgentSession
from api.colossus_client import ColossusClient
from agents.openrouter_saap_agent import OpenRouterSAAPAgent
logger = logging.getLogger(__name__)
class AgentManagerService:
"""
πŸ”§ CRITICAL FIX: Production-ready Agent Manager with Agent Update Fix
FIXED ISSUES:
1. βœ… update_agent() now accepts both Dict and SaapAgent objects
2. βœ… Proper data type conversion for Frontendβ†’Backend compatibility
3. βœ… Agent Settings Modal now works without AttributeError
4. βœ… Maintains backward compatibility with existing code
Features:
- Database-backed agent storage and lifecycle
- Real-time agent status management
- colossus LLM integration with OpenRouter fallback
- Session tracking and performance metrics
- Health monitoring and error handling
- Multi-provider chat support (colossus + OpenRouter)
- βœ… Robust LLM config access preventing AttributeError
- βœ… Fixed Agent Update/Settings functionality
"""
def __init__(self):
self.agents: Dict[str, SaapAgent] = {} # In-memory cache for fast access
self.active_sessions: Dict[str, DBAgentSession] = {}
self.colossus_client: Optional[ColossusClient] = None
self.is_initialized = False
self.colossus_connection_status = "unknown"
self.last_colossus_test = None
def _get_llm_config_value(self, agent: SaapAgent, key: str, default=None):
"""
πŸ”§ CRITICAL FIX: Safe LLM config access preventing 'get' attribute errors
This is the same fix applied to HybridAgentManagerService but now in the base class.
Handles dictionary, object, and Pydantic model configurations robustly.
Resolves: 'LLMModelConfig' object has no attribute 'get'
"""
try:
if not hasattr(agent, 'llm_config') or not agent.llm_config:
logger.debug(f"Agent {agent.id} has no llm_config, using default: {default}")
return default
llm_config = agent.llm_config
# Case 1: Dictionary-based config (Frontend JSON)
if isinstance(llm_config, dict):
value = llm_config.get(key, default)
logger.debug(f"βœ… Dict config access: {key}={value}")
return value
# Case 2: Object with direct attribute access (Pydantic models)
elif hasattr(llm_config, key):
value = getattr(llm_config, key, default)
logger.debug(f"βœ… Attribute access: {key}={value}")
return value
# Case 3: Object with get() method (dict-like objects or fixed Pydantic)
elif hasattr(llm_config, 'get') and callable(getattr(llm_config, 'get')):
try:
value = llm_config.get(key, default)
logger.debug(f"βœ… Method get() access: {key}={value}")
return value
except Exception as get_error:
logger.warning(f"⚠️ get() method failed: {get_error}, trying fallback")
# Case 4: Convert object to dict (Pydantic β†’ dict)
elif hasattr(llm_config, '__dict__'):
config_dict = llm_config.__dict__
if key in config_dict:
value = config_dict[key]
logger.debug(f"βœ… __dict__ access: {key}={value}")
return value
# Case 5: Try model_dump() for Pydantic v2
elif hasattr(llm_config, 'model_dump'):
try:
config_dict = llm_config.model_dump()
value = config_dict.get(key, default)
logger.debug(f"βœ… model_dump() access: {key}={value}")
return value
except Exception:
pass
# Case 6: Try dict() conversion
elif hasattr(llm_config, 'dict'):
try:
config_dict = llm_config.dict()
value = config_dict.get(key, default)
logger.debug(f"βœ… dict() access: {key}={value}")
return value
except Exception:
pass
# Final fallback
logger.warning(f"⚠️ Unknown config type {type(llm_config)} for {key}, using default: {default}")
return default
except AttributeError as e:
logger.warning(f"⚠️ AttributeError in LLM config access for {key}: {e}, using default: {default}")
return default
except Exception as e:
logger.error(f"❌ Unexpected error in LLM config access for {key}: {e}, using default: {default}")
return default
async def initialize(self):
"""Initialize agent manager with database and colossus connection"""
try:
logger.info("πŸš€ Initializing Agent Manager Service...")
# Initialize colossus client with better error handling
try:
logger.info("πŸ”Œ Connecting to colossus server...")
self.colossus_client = ColossusClient()
await self.colossus_client.__aenter__()
# Test colossus connection
await self._test_colossus_connection()
except Exception as colossus_error:
logger.error(f"❌ colossus connection failed: {colossus_error}")
self.colossus_connection_status = f"failed: {str(colossus_error)}"
# Continue initialization without colossus (graceful degradation)
# Try to load agents from database (graceful fallback if db not ready)
await self._load_agents_from_database()
# Load default agents if database is empty or not available
if not self.agents:
await self.load_default_agents()
self.is_initialized = True
logger.info(f"βœ… Agent Manager initialized: {len(self.agents)} agents loaded")
logger.info(f"πŸ”Œ colossus status: {self.colossus_connection_status}")
except Exception as e:
logger.error(f"❌ Agent Manager initialization failed: {e}")
raise
async def _test_colossus_connection(self):
"""Test colossus connection and update status"""
try:
if not self.colossus_client:
self.colossus_connection_status = "client_not_initialized"
return
# Send a simple test message
test_messages = [
{"role": "system", "content": "You are a test assistant."},
{"role": "user", "content": "Reply with just 'OK' to confirm connection."}
]
logger.info("πŸ§ͺ Testing colossus connection...")
response = await self.colossus_client.chat_completion(
messages=test_messages,
agent_id="connection_test",
max_tokens=10
)
if response and response.get("success"):
self.colossus_connection_status = "connected"
self.last_colossus_test = datetime.utcnow()
logger.info("βœ… colossus connection test successful")
else:
error_msg = response.get("error", "unknown error") if response else "no response"
self.colossus_connection_status = f"test_failed: {error_msg}"
logger.error(f"❌ colossus connection test failed: {error_msg}")
except Exception as e:
self.colossus_connection_status = f"test_error: {str(e)}"
logger.error(f"❌ colossus connection test error: {e}")
async def _load_agents_from_database(self):
"""Load all agents from database into memory cache"""
try:
# Check if database manager is ready
if not db_manager.is_initialized:
logger.warning("⚠️ Database not yet initialized - will load default agents")
return
async with db_manager.get_async_session() as session:
result = await session.execute(select(DBAgent))
db_agents = result.scalars().all()
for db_agent in db_agents:
saap_agent = db_agent.to_saap_agent()
self.agents[saap_agent.id] = saap_agent
logger.info(f"πŸ“š Loaded {len(db_agents)} agents from database")
except Exception as e:
logger.error(f"❌ Failed to load agents from database: {e}")
# Don't raise - allow service to start with empty agent list
logger.info("πŸ“¦ Will proceed with in-memory agents only")
async def load_default_agents(self):
"""Load default Alesi agents (Jane, John, Lara)"""
try:
logger.info("πŸ€– Loading default Alesi agents...")
default_agents = [
AgentTemplates.jane_alesi(),
AgentTemplates.john_alesi(),
AgentTemplates.lara_alesi()
]
for agent in default_agents:
await self.register_agent(agent)
logger.info(f"βœ… Default agents loaded: {[a.name for a in default_agents]}")
except Exception as e:
logger.error(f"❌ Agent registration failed: {e}")
async def register_agent(self, agent: SaapAgent) -> bool:
"""Register new agent with database persistence"""
try:
# Always add to memory cache first
self.agents[agent.id] = agent
# Try to persist to database if available
try:
if db_manager.is_initialized:
async with db_manager.get_async_session() as session:
db_agent = DBAgent.from_saap_agent(agent)
session.add(db_agent)
await session.commit()
logger.info(f"βœ… Agent registered with database: {agent.name} ({agent.id})")
else:
logger.info(f"βœ… Agent registered in-memory only: {agent.name} ({agent.id})")
except Exception as db_error:
logger.warning(f"⚠️ Database persistence failed for {agent.name}: {db_error}")
# But keep the agent in memory
return True
except Exception as e:
logger.error(f"❌ Agent registration failed: {e}")
# Remove from cache if registration completely failed
self.agents.pop(agent.id, None)
return False
def get_agent(self, agent_id: str) -> Optional[SaapAgent]:
"""Get agent from memory cache with debug info"""
agent = self.agents.get(agent_id)
if agent:
logger.debug(f"πŸ” Agent found: {agent.name} ({agent_id}) - Status: {agent.status}")
else:
logger.warning(f"❌ Agent not found: {agent_id}")
logger.debug(f"πŸ“‹ Available agents: {list(self.agents.keys())}")
return agent
async def list_agents(self, status: Optional[AgentStatus] = None,
agent_type: Optional[AgentType] = None) -> List[SaapAgent]:
"""List all agents with optional filtering"""
agents = list(self.agents.values())
if status:
agents = [a for a in agents if a.status == status]
if agent_type:
agents = [a for a in agents if a.type == agent_type]
return agents
async def get_agent_stats(self, agent_id: str) -> Dict[str, Any]:
"""Get agent statistics"""
agent = self.get_agent(agent_id)
if not agent:
return {}
# Return basic stats from agent object
return {
"messages_processed": getattr(agent, 'messages_processed', 0),
"total_tokens": getattr(agent, 'total_tokens', 0),
"average_response_time": getattr(agent, 'avg_response_time', 0),
"status": agent.status.value,
"last_active": getattr(agent, 'last_active', None)
}
async def health_check(self, agent_id: str) -> Dict[str, Any]:
"""Perform agent health check"""
agent = self.get_agent(agent_id)
if not agent:
return {"healthy": False, "checks": {"agent_exists": False}}
return {
"healthy": agent.status == AgentStatus.ACTIVE,
"checks": {
"agent_exists": True,
"status": agent.status.value,
"colossus_connection": self.colossus_connection_status == "connected"
}
}
async def update_agent(self, agent_id: str, agent_data: Union[Dict[str, Any], SaapAgent]) -> bool:
"""
πŸ”§ CRITICAL FIX: Update agent configuration with proper data type handling
FIXED PROBLEMS:
- ❌ 'dict' object has no attribute 'id' β†’ βœ… Handles both Dict and SaapAgent
- ❌ 'dict' object has no attribute 'name' β†’ βœ… Proper data conversion
- ❌ Agent Settings Modal fails β†’ βœ… Frontend-Backend compatibility
Args:
agent_id: Agent ID to update
agent_data: Either a dictionary (from Frontend) or SaapAgent object
Returns:
bool: Success status
"""
try:
logger.info(f"πŸ”§ Updating agent {agent_id} with data type: {type(agent_data)}")
# Get existing agent
existing_agent = self.get_agent(agent_id)
if not existing_agent:
logger.error(f"❌ Cannot update: Agent {agent_id} not found")
return False
# Handle both Dict and SaapAgent input types
if isinstance(agent_data, dict):
logger.debug(f"πŸ“₯ Received dictionary data for agent {agent_id}")
# Create updated agent from existing + new data
try:
# Start with existing agent's data
updated_dict = existing_agent.to_dict()
# Update with new data from frontend
updated_dict.update(agent_data)
# Ensure agent_id consistency
updated_dict['id'] = agent_id
# Create new SaapAgent object from updated data
updated_agent = SaapAgent.from_dict(updated_dict)
logger.debug(f"βœ… Successfully converted dict to SaapAgent: {updated_agent.name}")
except Exception as conversion_error:
logger.error(f"❌ Failed to convert dict to SaapAgent: {conversion_error}")
logger.debug(f"πŸ” Problematic data: {agent_data}")
return False
elif isinstance(agent_data, SaapAgent):
logger.debug(f"πŸ“₯ Received SaapAgent object for agent {agent_id}")
updated_agent = agent_data
# Ensure ID consistency
updated_agent.id = agent_id
else:
logger.error(f"❌ Invalid agent_data type: {type(agent_data)}. Expected Dict or SaapAgent")
return False
# Update in memory cache
self.agents[agent_id] = updated_agent
logger.info(f"βœ… Memory cache updated for agent {agent_id}")
# Try to update in database if available
if db_manager.is_initialized:
try:
async with db_manager.get_async_session() as session:
# Delete old and insert new (simpler than complex update)
await session.execute(delete(DBAgent).where(DBAgent.id == agent_id))
# Create new database record from updated agent
db_agent = DBAgent.from_saap_agent(updated_agent)
session.add(db_agent)
await session.commit()
logger.info(f"βœ… Database updated for agent {agent_id}")
except Exception as db_error:
logger.warning(f"⚠️ Database update failed for {agent_id}: {db_error}")
# Don't fail the update if database fails - memory cache is updated
else:
logger.info(f"ℹ️ Database not available - agent {agent_id} updated in memory only")
logger.info(f"βœ… Agent update completed successfully: {updated_agent.name} ({agent_id})")
return True
except Exception as e:
logger.error(f"❌ Agent update failed for {agent_id}: {e}")
logger.debug(f"πŸ” Agent data that caused error: {agent_data}")
return False
async def delete_agent(self, agent_id: str) -> bool:
"""Delete agent from memory and database"""
try:
# Stop agent if running
await self.stop_agent(agent_id)
# Remove from memory
self.agents.pop(agent_id, None)
# Try to remove from database if available
if db_manager.is_initialized:
try:
async with db_manager.get_async_session() as session:
await session.execute(delete(DBAgent).where(DBAgent.id == agent_id))
await session.commit()
except Exception as db_error:
logger.warning(f"⚠️ Database deletion failed for {agent_id}: {db_error}")
logger.info(f"βœ… Agent deleted: {agent_id}")
return True
except Exception as e:
logger.error(f"❌ Agent deletion failed: {e}")
return False
async def start_agent(self, agent_id: str) -> bool:
"""Start agent and create session"""
try:
agent = self.get_agent(agent_id)
if not agent:
logger.error(f"❌ Cannot start agent: {agent_id} not found")
return False
# Update status
agent.status = AgentStatus.ACTIVE
if hasattr(agent, 'metrics') and agent.metrics:
agent.metrics.last_active = datetime.utcnow()
# Try to create agent session in database if available
if db_manager.is_initialized:
try:
async with db_manager.get_async_session() as session:
db_session = DBAgentSession(agent_id=agent_id)
session.add(db_session)
await session.commit()
await session.refresh(db_session)
# Store in active sessions
self.active_sessions[agent_id] = db_session
except Exception as db_error:
logger.warning(f"⚠️ Database session creation failed for {agent_id}: {db_error}")
# Update agent status in database if available
await self._update_agent_status(agent_id, AgentStatus.ACTIVE)
logger.info(f"βœ… Agent started: {agent.name} ({agent_id})")
return True
except Exception as e:
logger.error(f"❌ Agent start failed: {e}")
return False
async def stop_agent(self, agent_id: str) -> bool:
"""Stop agent and close session"""
try:
agent = self.get_agent(agent_id)
if not agent:
return False
# Update status
agent.status = AgentStatus.INACTIVE
# Close agent session if exists
if agent_id in self.active_sessions:
session_obj = self.active_sessions[agent_id]
session_obj.session_end = datetime.utcnow()
session_obj.status = "completed"
session_obj.end_reason = "graceful"
session_obj.calculate_duration()
if db_manager.is_initialized:
try:
async with db_manager.get_async_session() as session:
await session.merge(session_obj)
await session.commit()
except Exception as db_error:
logger.warning(f"⚠️ Database session update failed for {agent_id}: {db_error}")
del self.active_sessions[agent_id]
# Update agent status in database if available
await self._update_agent_status(agent_id, AgentStatus.INACTIVE)
logger.info(f"πŸ”§ Agent stopped: {agent_id}")
return True
except Exception as e:
logger.error(f"❌ Agent stop failed: {e}")
return False
async def restart_agent(self, agent_id: str) -> bool:
"""Restart agent (stop + start)"""
try:
await self.stop_agent(agent_id)
await asyncio.sleep(1) # Brief pause
return await self.start_agent(agent_id)
except Exception as e:
logger.error(f"❌ Agent restart failed: {e}")
return False
async def _update_agent_status(self, agent_id: str, status: AgentStatus):
"""Update agent status in database"""
if not db_manager.is_initialized:
return
try:
async with db_manager.get_async_session() as session:
await session.execute(
update(DBAgent)
.where(DBAgent.id == agent_id)
.values(status=status.value, last_active=datetime.utcnow())
)
await session.commit()
except Exception as e:
logger.warning(f"⚠️ Failed to update agent status in database: {e}")
# πŸš€ Multi-Provider Chat Support
async def send_message_to_agent(self, agent_id: str, message: str,
provider: Optional[str] = None) -> Dict[str, Any]:
"""
Send message to agent via specified provider or auto-fallback
Args:
agent_id: Target agent ID
message: Message content
provider: Optional provider override ("colossus", "openrouter", or None for auto)
Returns:
Chat response with metadata
"""
try:
# Enhanced error checking with detailed debugging
agent = self.get_agent(agent_id)
if not agent:
error_msg = f"Agent {agent_id} not found in loaded agents"
logger.error(f"❌ {error_msg}")
logger.debug(f"πŸ“‹ Available agents: {list(self.agents.keys())}")
return {
"error": error_msg,
"timestamp": datetime.utcnow().isoformat(),
"debug_info": {
"available_agents": list(self.agents.keys()),
"agent_manager_initialized": self.is_initialized
}
}
# Check if agent is available for messaging
if agent.status != AgentStatus.ACTIVE:
error_msg = f"Agent {agent_id} not available (status: {agent.status.value})"
logger.error(f"❌ {error_msg}")
return {
"error": error_msg,
"timestamp": datetime.utcnow().isoformat(),
"debug_info": {
"agent_status": agent.status.value,
"agent_id": agent_id
}
}
# πŸš€ Multi-Provider Logic
if provider == "openrouter":
return await self._send_via_openrouter(agent_id, message, agent)
elif provider == "colossus":
return await self._send_via_colossus(agent_id, message, agent)
else:
# Auto-selection: Try colossus first, fallback to OpenRouter
if self.colossus_connection_status == "connected":
logger.info(f"πŸ”„ Using colossus as primary provider for {agent_id}")
result = await self._send_via_colossus(agent_id, message, agent)
# If colossus fails, try OpenRouter
if "error" in result and "colossus" in result["error"].lower():
logger.info(f"πŸ”„ colossus failed, trying OpenRouter fallback...")
return await self._send_via_openrouter(agent_id, message, agent)
return result
else:
logger.info(f"πŸ”„ colossus unavailable, using OpenRouter as primary for {agent_id}")
return await self._send_via_openrouter(agent_id, message, agent)
except Exception as e:
error_msg = str(e)
logger.error(f"❌ Message to agent failed: {error_msg}")
return {
"error": error_msg,
"timestamp": datetime.utcnow().isoformat(),
"debug_info": {
"agent_id": agent_id,
"provider": provider,
"colossus_status": self.colossus_connection_status,
"agent_found": agent_id in self.agents,
"colossus_client_exists": self.colossus_client is not None
}
}
async def _send_via_openrouter(self, agent_id: str, message: str,
agent: SaapAgent) -> Dict[str, Any]:
"""Send message via OpenRouter provider"""
try:
logger.info(f"🌐 jane_alesi (coordinator) initialized with OpenRouter FREE")
# Create OpenRouter agent for this request
openrouter_agent = OpenRouterSAAPAgent(
agent_id,
agent.type.value if agent.type else "Assistant",
os.getenv("OPENROUTER_API_KEY")
)
# Get cost-optimized model for specific agent
model_map = {
"jane_alesi": os.getenv("JANE_ALESI_MODEL", "openai/gpt-4o-mini"),
"john_alesi": os.getenv("JOHN_ALESI_MODEL", "deepseek/deepseek-coder"),
"lara_alesi": os.getenv("LARA_ALESI_MODEL", "anthropic/claude-3-haiku")
}
preferred_model = model_map.get(agent_id, "meta-llama/llama-3.2-3b-instruct:free")
openrouter_agent.model_name = preferred_model
start_time = datetime.utcnow()
logger.info(f"πŸ“€ Sending message to {agent.name} ({agent_id}) via OpenRouter ({preferred_model})...")
# πŸ”§ FIXED: Use safe LLM config access
max_tokens_value = self._get_llm_config_value(agent, 'max_tokens', 1000)
# Send message via OpenRouter
response = await openrouter_agent.send_request_to_openrouter(
message,
max_tokens=max_tokens_value
)
end_time = datetime.utcnow()
response_time = (end_time - start_time).total_seconds()
if response.get("success"):
logger.info(f"βœ… OpenRouter response successful in {response_time:.2f}s")
response_content = response.get("response", "")
tokens_used = response.get("token_count", 0)
cost_usd = response.get("cost_usd", 0.0)
# Try to save to database if available
if db_manager.is_initialized:
try:
async with db_manager.get_async_session() as session:
chat_message = DBChatMessage(
agent_id=agent_id,
user_message=message,
agent_response=response_content,
response_time=response_time,
tokens_used=tokens_used,
metadata={
"model": preferred_model,
"provider": "OpenRouter",
"cost_usd": cost_usd,
"temperature": 0.7
}
)
session.add(chat_message)
await session.commit()
except Exception as db_error:
logger.warning(f"⚠️ Failed to save OpenRouter chat to database: {db_error}")
return {
"content": response_content,
"response_time": response_time,
"tokens_used": tokens_used,
"cost_usd": cost_usd,
"provider": "OpenRouter",
"model": preferred_model,
"timestamp": end_time.isoformat()
}
else:
error_msg = response.get("error", "Unknown OpenRouter error")
logger.error(f"❌ OpenRouter fallback failed: {error_msg}")
return {
"error": f"OpenRouter error: {error_msg}",
"provider": "OpenRouter",
"timestamp": end_time.isoformat()
}
except Exception as e:
logger.error(f"❌ OpenRouter fallback failed: {str(e)}")
return {
"error": f"OpenRouter error: {str(e)}",
"provider": "OpenRouter",
"timestamp": datetime.utcnow().isoformat()
}
async def _send_via_colossus(self, agent_id: str, message: str,
agent: SaapAgent) -> Dict[str, Any]:
"""Send message via colossus provider"""
try:
# Check colossus client availability
if not self.colossus_client:
return {
"error": "colossus client not initialized",
"provider": "colossus",
"timestamp": datetime.utcnow().isoformat()
}
# Test colossus connection if it's been a while
if (not self.last_colossus_test or
(datetime.utcnow() - self.last_colossus_test).seconds > 300): # 5 minutes
await self._test_colossus_connection()
if self.colossus_connection_status != "connected":
return {
"error": f"colossus connection not healthy: {self.colossus_connection_status}",
"provider": "colossus",
"timestamp": datetime.utcnow().isoformat()
}
start_time = datetime.utcnow()
logger.info(f"πŸ“€ Sending message to {agent.name} ({agent_id}) via colossus...")
# πŸ”§ FIXED: Use safe LLM config access
temperature_value = self._get_llm_config_value(agent, 'temperature', 0.7)
max_tokens_value = self._get_llm_config_value(agent, 'max_tokens', 1000)
# Send message to colossus
response = await self.colossus_client.chat_completion(
messages=[
{"role": "system", "content": agent.description or f"You are {agent.name}"},
{"role": "user", "content": message}
],
agent_id=agent_id,
temperature=temperature_value,
max_tokens=max_tokens_value
)
end_time = datetime.utcnow()
response_time = (end_time - start_time).total_seconds()
logger.info(f"πŸ“₯ Received response from colossus in {response_time:.2f}s")
# Enhanced response parsing
response_content = ""
tokens_used = 0
if response:
logger.debug(f"πŸ” Raw colossus response: {response}")
if isinstance(response, dict):
# SAAP ColossusClient format: {"success": true, "response": {...}}
if response.get("success") and "response" in response:
colossus_response = response["response"]
if isinstance(colossus_response, dict) and "choices" in colossus_response:
# OpenAI-compatible format within SAAP response
if len(colossus_response["choices"]) > 0:
choice = colossus_response["choices"][0]
if "message" in choice and "content" in choice["message"]:
response_content = choice["message"]["content"]
elif isinstance(colossus_response, str):
# Direct string response
response_content = colossus_response
# Extract token usage if available
if isinstance(colossus_response, dict) and "usage" in colossus_response:
tokens_used = colossus_response["usage"].get("total_tokens", 0)
# Handle colossus client error responses
elif not response.get("success"):
error_msg = response.get("error", "Unknown colossus error")
logger.error(f"❌ colossus error: {error_msg}")
return {
"error": f"colossus server error: {error_msg}",
"provider": "colossus",
"timestamp": end_time.isoformat()
}
# Direct OpenAI format: {"choices": [...]}
elif "choices" in response and len(response["choices"]) > 0:
choice = response["choices"][0]
if "message" in choice and "content" in choice["message"]:
response_content = choice["message"]["content"]
if "usage" in response:
tokens_used = response["usage"].get("total_tokens", 0)
# Simple response format: {"response": "text"} or {"content": "text"}
elif "response" in response:
response_content = response["response"]
elif "content" in response:
response_content = response["content"]
elif isinstance(response, str):
# Direct string response
response_content = response
# Fallback if no content extracted
if not response_content:
logger.error(f"❌ Unable to extract content from colossus response: {response}")
return {
"error": "Failed to parse colossus response",
"provider": "colossus",
"timestamp": end_time.isoformat()
}
# Try to save to database if available
if db_manager.is_initialized:
try:
async with db_manager.get_async_session() as session:
chat_message = DBChatMessage(
agent_id=agent_id,
user_message=message,
agent_response=response_content,
response_time=response_time,
tokens_used=tokens_used,
metadata={
"model": "mistral-small3.2:24b-instruct-2506",
"provider": "colossus",
"temperature": 0.7
}
)
session.add(chat_message)
await session.commit()
except Exception as db_error:
logger.warning(f"⚠️ Failed to save chat message to database: {db_error}")
# Update session metrics
if agent_id in self.active_sessions:
session_obj = self.active_sessions[agent_id]
session_obj.messages_processed += 1
session_obj.total_tokens_used += tokens_used
logger.info(f"βœ… Message processed successfully for {agent.name}")
return {
"content": response_content,
"response_time": response_time,
"tokens_used": tokens_used,
"provider": "colossus",
"model": "mistral-small3.2:24b-instruct-2506",
"timestamp": end_time.isoformat()
}
except Exception as e:
logger.error(f"❌ colossus communication failed: {str(e)}")
return {
"error": f"colossus error: {str(e)}",
"provider": "colossus",
"timestamp": datetime.utcnow().isoformat()
}
async def get_agent_metrics(self, agent_id: str) -> Dict[str, Any]:
"""Get comprehensive agent metrics from database"""
if not db_manager.is_initialized:
return {"warning": "Database not available - no metrics"}
try:
async with db_manager.get_async_session() as session:
# Get message count and average response time
result = await session.execute(
select(DBChatMessage).where(DBChatMessage.agent_id == agent_id)
)
messages = result.scalars().all()
if messages:
avg_response_time = sum(m.response_time for m in messages if m.response_time) / len(messages)
total_tokens = sum(m.tokens_used for m in messages if m.tokens_used)
else:
avg_response_time = 0
total_tokens = 0
# Get session count
session_result = await session.execute(
select(DBAgentSession).where(DBAgentSession.agent_id == agent_id)
)
sessions = session_result.scalars().all()
return {
"total_messages": len(messages),
"total_tokens_used": total_tokens,
"average_response_time": avg_response_time,
"total_sessions": len(sessions),
"last_activity": max([s.session_start for s in sessions], default=None),
}
except Exception as e:
logger.error(f"❌ Failed to get agent metrics: {e}")
return {}
async def get_system_status(self) -> Dict[str, Any]:
"""Get comprehensive system status for debugging"""
return {
"agent_manager_initialized": self.is_initialized,
"colossus_connection_status": self.colossus_connection_status,
"colossus_last_test": self.last_colossus_test.isoformat() if self.last_colossus_test else None,
"loaded_agents": len(self.agents),
"active_sessions": len(self.active_sessions),
"agent_list": [{"id": aid, "name": agent.name, "status": agent.status.value}
for aid, agent in self.agents.items()],
"database_initialized": getattr(db_manager, 'is_initialized', False)
}
async def shutdown_all_agents(self):
"""Gracefully shutdown all active agents"""
try:
logger.info("πŸ”§ Shutting down all agents...")
for agent_id in list(self.agents.keys()):
await self.stop_agent(agent_id)
if self.colossus_client:
await self.colossus_client.__aexit__(None, None, None)
logger.info("βœ… All agents shut down successfully")
except Exception as e:
logger.error(f"❌ Agent shutdown failed: {e}")
# Create global instance for dependency injection
agent_manager = AgentManagerService()
# Make class available for import
AgentManager = AgentManagerService
if __name__ == "__main__":
async def test_agent_manager():
"""Test agent manager functionality"""
manager = AgentManagerService()
await manager.initialize()
# List agents
agents = list(manager.agents.values())
print(f"πŸ“‹ Agents loaded: {[a.name for a in agents]}")
# Test agent update with dict data (simulate frontend)
if agents:
agent = agents[0]
print(f"\nπŸ§ͺ Testing agent update for: {agent.name}")
# Simulate frontend data (dictionary)
update_data = {
"name": agent.name,
"description": "Updated description from test",
"type": agent.type.value if agent.type else "assistant",
"capabilities": ["updated_capability_1", "updated_capability_2"]
}
# Test update
success = await manager.update_agent(agent.id, update_data)
print(f"πŸ”§ Update result: {'βœ…' if success else '❌'}")
# Start first agent
success = await manager.start_agent(agent.id)
print(f"πŸš€ Start agent {agent.name}: {'βœ…' if success else '❌'}")
await manager.shutdown_all_agents()
asyncio.run(test_agent_manager())