Hwandji's picture
feat: initial HuggingFace Space deployment
4343907
raw
history blame
26.4 kB
"""
SAAP Agent Management API
FastAPI endpoints for agent CRUD operations
"""
from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks
from fastapi.responses import JSONResponse
from typing import List, Dict, Any, Optional
from datetime import datetime
import json
import asyncio
from pathlib import Path
from ..models.agent_schema import (
SaapAgent, AgentRegistry, AgentStats, AgentStatus, AgentType,
AgentTemplates, validate_agent_json
)
from ..services.agent_manager import AgentManager
from ..services.message_queue import MessageQueueService
from ..utils.validators import validate_agent_id, validate_json_schema
# Initialize router
router = APIRouter(prefix="/api/v1/agents", tags=["agents"])
# Dependency injection
async def get_agent_manager() -> AgentManager:
"""Get agent manager instance"""
return AgentManager()
async def get_message_queue() -> MessageQueueService:
"""Get message queue service instance"""
return MessageQueueService()
# ===== AGENT LIFECYCLE ENDPOINTS =====
@router.get("/")
async def list_agents(
status: Optional[AgentStatus] = None,
agent_type: Optional[AgentType] = None,
include_stats: bool = False,
agent_manager: AgentManager = Depends(get_agent_manager)
) -> Dict[str, Any]:
"""
List all registered agents with optional filtering
Query Parameters:
- status: Filter by agent status
- agent_type: Filter by agent type
- include_stats: Include runtime statistics
πŸš€ FIXED: Return format compatible with Frontend expectations
"""
try:
agents = await agent_manager.list_agents(
status=status,
agent_type=agent_type
)
if include_stats:
# Enrich agents with statistics
for agent in agents:
stats = await agent_manager.get_agent_stats(agent.id)
agent.runtime_stats = stats
# πŸš€ FIXED: Frontend expects {"agents": [...]} format
return {
"agents": agents,
"total": len(agents),
"filters": {
"status": status.value if status else None,
"agent_type": agent_type.value if agent_type else None,
"include_stats": include_stats
},
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to list agents: {str(e)}")
@router.get("/{agent_id}")
async def get_agent(
agent_id: str,
include_stats: bool = True,
agent_manager: AgentManager = Depends(get_agent_manager)
) -> Dict[str, Any]:
"""
Get detailed agent information by ID
Path Parameters:
- agent_id: Unique agent identifier
Query Parameters:
- include_stats: Include runtime statistics
πŸš€ FIXED: Standardized response format
"""
validate_agent_id(agent_id)
try:
agent = await agent_manager.get_agent(agent_id)
if not agent:
raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' not found")
if include_stats:
stats = await agent_manager.get_agent_stats(agent_id)
agent.runtime_stats = stats
return {
"agent": agent,
"timestamp": datetime.utcnow().isoformat()
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to get agent: {str(e)}")
@router.post("/", status_code=201)
async def create_agent(
agent_data: Dict[str, Any],
background_tasks: BackgroundTasks,
agent_manager: AgentManager = Depends(get_agent_manager)
) -> Dict[str, Any]:
"""
Register a new agent
Request Body: Complete agent configuration JSON
πŸš€ FIXED: Standardized response format fΓΌr Frontend compatibility
"""
try:
# Validate agent configuration
agent = validate_agent_json(agent_data)
# Check if agent already exists
existing = await agent_manager.get_agent(agent.id)
if existing:
raise HTTPException(
status_code=409,
detail=f"Agent '{agent.id}' already exists"
)
# Register agent
created_agent = await agent_manager.register_agent(agent)
# Initialize agent queues in background
background_tasks.add_task(
_initialize_agent_queues,
created_agent.id,
created_agent.communication
)
# πŸš€ FIXED: Frontend-compatible response format
return {
"success": True,
"message": f"Agent '{created_agent.name}' created successfully",
"agent": created_agent,
"timestamp": datetime.utcnow().isoformat()
}
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to create agent: {str(e)}")
@router.put("/{agent_id}")
async def update_agent(
agent_id: str,
agent_data: Dict[str, Any],
agent_manager: AgentManager = Depends(get_agent_manager)
) -> Dict[str, Any]:
"""
Update agent configuration
Path Parameters:
- agent_id: Agent to update
Request Body: Updated agent configuration JSON
πŸš€ FIXED: Standardized response format
"""
validate_agent_id(agent_id)
try:
# Validate updated configuration
updated_agent = validate_agent_json(agent_data)
# Ensure ID consistency
if updated_agent.id != agent_id:
raise HTTPException(
status_code=400,
detail="Agent ID in body must match path parameter"
)
# Check if agent exists
existing = await agent_manager.get_agent(agent_id)
if not existing:
raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' not found")
# Update timestamp
updated_agent.metadata.updated = datetime.utcnow()
# Update agent
result = await agent_manager.update_agent(agent_id, updated_agent)
return {
"success": True,
"message": f"Agent '{agent_id}' updated successfully",
"agent": result,
"timestamp": datetime.utcnow().isoformat()
}
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to update agent: {str(e)}")
@router.delete("/{agent_id}", status_code=204)
async def delete_agent(
agent_id: str,
force: bool = False,
agent_manager: AgentManager = Depends(get_agent_manager)
):
"""
Delete/deregister an agent
Path Parameters:
- agent_id: Agent to delete
Query Parameters:
- force: Force deletion even if agent is active
"""
validate_agent_id(agent_id)
try:
# Check if agent exists
agent = await agent_manager.get_agent(agent_id)
if not agent:
raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' not found")
# Check if agent is active
if agent.status == AgentStatus.ACTIVE and not force:
raise HTTPException(
status_code=400,
detail="Cannot delete active agent. Stop agent first or use force=true"
)
# Delete agent
await agent_manager.delete_agent(agent_id)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to delete agent: {str(e)}")
# ===== AGENT CONTROL ENDPOINTS =====
@router.post("/{agent_id}/start")
async def start_agent(
agent_id: str,
background_tasks: BackgroundTasks,
agent_manager: AgentManager = Depends(get_agent_manager)
) -> Dict[str, Any]:
"""
Start an inactive agent
Path Parameters:
- agent_id: Agent to start
"""
validate_agent_id(agent_id)
try:
agent = await agent_manager.get_agent(agent_id)
if not agent:
raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' not found")
if agent.status == AgentStatus.ACTIVE:
return {"message": f"Agent '{agent_id}' is already active", "status": "success"}
# Start agent in background
background_tasks.add_task(_start_agent_process, agent_id, agent_manager)
return {
"message": f"Agent '{agent_id}' startup initiated",
"status": "starting"
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to start agent: {str(e)}")
@router.post("/{agent_id}/stop")
async def stop_agent(
agent_id: str,
graceful: bool = True,
timeout: int = 30,
background_tasks: BackgroundTasks,
agent_manager: AgentManager = Depends(get_agent_manager)
) -> Dict[str, Any]:
"""
Stop an active agent
Path Parameters:
- agent_id: Agent to stop
Query Parameters:
- graceful: Perform graceful shutdown
- timeout: Shutdown timeout in seconds
"""
validate_agent_id(agent_id)
try:
agent = await agent_manager.get_agent(agent_id)
if not agent:
raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' not found")
if agent.status != AgentStatus.ACTIVE:
return {"message": f"Agent '{agent_id}' is not active", "status": "inactive"}
# Stop agent in background
background_tasks.add_task(
_stop_agent_process,
agent_id,
agent_manager,
graceful,
timeout
)
return {
"message": f"Agent '{agent_id}' shutdown initiated",
"status": "stopping",
"graceful": graceful,
"timeout": timeout
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to stop agent: {str(e)}")
@router.post("/{agent_id}/restart")
async def restart_agent(
agent_id: str,
background_tasks: BackgroundTasks,
agent_manager: AgentManager = Depends(get_agent_manager)
) -> Dict[str, Any]:
"""
Restart an agent (stop + start)
Path Parameters:
- agent_id: Agent to restart
"""
validate_agent_id(agent_id)
try:
agent = await agent_manager.get_agent(agent_id)
if not agent:
raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' not found")
# Restart agent in background
background_tasks.add_task(_restart_agent_process, agent_id, agent_manager)
return {
"message": f"Agent '{agent_id}' restart initiated",
"status": "restarting"
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to restart agent: {str(e)}")
# ===== AGENT COMMUNICATION ENDPOINTS =====
@router.post("/{agent_id}/chat")
async def chat_with_agent(
agent_id: str,
message_data: Dict[str, str],
agent_manager: AgentManager = Depends(get_agent_manager)
) -> Dict[str, Any]:
"""
Send message to agent and get response
Path Parameters:
- agent_id: Agent to communicate with
Request Body:
- message: Message content
πŸš€ FIXED: Improved chat endpoint with consistent response format
"""
validate_agent_id(agent_id)
try:
agent = await agent_manager.get_agent(agent_id)
if not agent:
raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' not found")
message = message_data.get("message", "")
if not message.strip():
raise HTTPException(status_code=400, detail="Message cannot be empty")
# Send message to agent
response = await agent_manager.send_message_to_agent(agent_id, message)
# Check if response contains error
if "error" in response:
return {
"success": False,
"error": response["error"],
"agent_id": agent_id,
"timestamp": response.get("timestamp", datetime.utcnow().isoformat())
}
return {
"success": True,
"agent_id": agent_id,
"message": message,
"response": response.get("content", ""),
"response_time": response.get("response_time", 0),
"tokens_used": response.get("tokens_used", 0),
"timestamp": response.get("timestamp", datetime.utcnow().isoformat())
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to chat with agent: {str(e)}")
# ===== πŸš€ NEW: OPENROUTER COMMUNICATION ENDPOINTS =====
@router.post("/{agent_id}/chat/openrouter")
async def chat_with_agent_openrouter(
agent_id: str,
message_data: Dict[str, str],
agent_manager: AgentManager = Depends(get_agent_manager)
) -> Dict[str, Any]:
"""
Send message to agent using OpenRouter provider (Fast mode)
Path Parameters:
- agent_id: Agent to communicate with
Request Body:
- message: Message content
πŸš€ NEW: OpenRouter-specific chat endpoint for fast responses
"""
validate_agent_id(agent_id)
try:
agent = await agent_manager.get_agent(agent_id)
if not agent:
raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' not found")
message = message_data.get("message", "")
if not message.strip():
raise HTTPException(status_code=400, detail="Message cannot be empty")
# Send message to agent with OpenRouter provider
response = await agent_manager.send_message_to_agent(
agent_id,
message,
provider="openrouter"
)
# Check if response contains error
if "error" in response:
return {
"success": False,
"error": response["error"],
"provider": "openrouter",
"agent_id": agent_id,
"timestamp": response.get("timestamp", datetime.utcnow().isoformat())
}
return {
"success": True,
"agent_id": agent_id,
"provider": "openrouter",
"message": message,
"response": response.get("content", ""),
"response_time": response.get("response_time", 0),
"tokens_used": response.get("tokens_used", 0),
"cost_usd": response.get("cost_usd", 0.0),
"model": response.get("model", ""),
"timestamp": response.get("timestamp", datetime.utcnow().isoformat())
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to chat with agent via OpenRouter: {str(e)}")
@router.post("/{agent_id}/chat/compare")
async def compare_providers_chat(
agent_id: str,
message_data: Dict[str, str],
agent_manager: AgentManager = Depends(get_agent_manager)
) -> Dict[str, Any]:
"""
Send same message to agent using both providers for performance comparison
Path Parameters:
- agent_id: Agent to communicate with
Request Body:
- message: Message content
πŸš€ NEW: Multi-provider comparison endpoint
"""
validate_agent_id(agent_id)
try:
agent = await agent_manager.get_agent(agent_id)
if not agent:
raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' not found")
message = message_data.get("message", "")
if not message.strip():
raise HTTPException(status_code=400, detail="Message cannot be empty")
# Send to both providers concurrently
import asyncio
from datetime import datetime
start_time = datetime.utcnow()
# Run both providers in parallel
tasks = [
agent_manager.send_message_to_agent(agent_id, message, provider="colossus"),
agent_manager.send_message_to_agent(agent_id, message, provider="openrouter")
]
try:
colossus_response, openrouter_response = await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e:
# Fallback to sequential if parallel fails
colossus_response = await agent_manager.send_message_to_agent(agent_id, message, provider="colossus")
openrouter_response = await agent_manager.send_message_to_agent(agent_id, message, provider="openrouter")
total_time = (datetime.utcnow() - start_time).total_seconds()
def format_response(response, provider_name):
if isinstance(response, Exception):
return {
"success": False,
"error": str(response),
"provider": provider_name
}
elif "error" in response:
return {
"success": False,
"error": response["error"],
"provider": provider_name
}
else:
return {
"success": True,
"provider": provider_name,
"response": response.get("content", ""),
"response_time": response.get("response_time", 0),
"tokens_used": response.get("tokens_used", 0),
"cost_usd": response.get("cost_usd", 0.0),
"model": response.get("model", "")
}
return {
"success": True,
"agent_id": agent_id,
"message": message,
"comparison": {
"colossus": format_response(colossus_response, "colossus"),
"openrouter": format_response(openrouter_response, "openrouter")
},
"total_comparison_time": total_time,
"timestamp": datetime.utcnow().isoformat()
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to compare providers: {str(e)}")
# ===== AGENT STATISTICS ENDPOINTS =====
@router.get("/{agent_id}/stats")
async def get_agent_stats(
agent_id: str,
agent_manager: AgentManager = Depends(get_agent_manager)
) -> Dict[str, Any]:
"""
Get real-time agent statistics
Path Parameters:
- agent_id: Agent identifier
"""
validate_agent_id(agent_id)
try:
# Verify agent exists
agent = await agent_manager.get_agent(agent_id)
if not agent:
raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' not found")
# Get statistics
stats = await agent_manager.get_agent_stats(agent_id)
return {
"agent_id": agent_id,
"stats": stats,
"timestamp": datetime.utcnow().isoformat()
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to get agent stats: {str(e)}")
@router.get("/{agent_id}/health")
async def agent_health_check(
agent_id: str,
agent_manager: AgentManager = Depends(get_agent_manager)
) -> Dict[str, Any]:
"""
Perform agent health check
Path Parameters:
- agent_id: Agent identifier
"""
validate_agent_id(agent_id)
try:
agent = await agent_manager.get_agent(agent_id)
if not agent:
raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' not found")
# Perform health check
health = await agent_manager.health_check(agent_id)
return {
"agent_id": agent_id,
"status": agent.status,
"healthy": health["healthy"],
"checks": health["checks"],
"timestamp": datetime.utcnow().isoformat()
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Health check failed: {str(e)}")
# ===== AGENT TEMPLATES ENDPOINTS =====
@router.get("/templates/")
async def list_agent_templates() -> Dict[str, Any]:
"""List available agent templates"""
try:
templates = [
"jane_alesi",
"john_alesi",
"lara_alesi"
]
return {
"templates": templates,
"total": len(templates),
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to list templates: {str(e)}")
@router.get("/templates/{template_name}")
async def get_agent_template(template_name: str) -> Dict[str, Any]:
"""
Get agent template configuration
Path Parameters:
- template_name: Template identifier
"""
try:
if template_name == "jane_alesi":
template = AgentTemplates.jane_alesi()
elif template_name == "john_alesi":
template = AgentTemplates.john_alesi()
elif template_name == "lara_alesi":
template = AgentTemplates.lara_alesi()
else:
raise HTTPException(status_code=404, detail=f"Template '{template_name}' not found")
return {
"template_name": template_name,
"template": template,
"timestamp": datetime.utcnow().isoformat()
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to get template: {str(e)}")
@router.post("/templates/{template_name}/create", status_code=201)
async def create_agent_from_template(
template_name: str,
agent_id: str,
customizations: Dict[str, Any] = None,
background_tasks: BackgroundTasks,
agent_manager: AgentManager = Depends(get_agent_manager)
) -> Dict[str, Any]:
"""
Create agent from template with optional customizations
Path Parameters:
- template_name: Template to use
Query Parameters:
- agent_id: Unique ID for new agent
Request Body: Optional customizations to apply
"""
validate_agent_id(agent_id)
try:
# Get template
if template_name == "jane_alesi":
template_agent = AgentTemplates.jane_alesi()
elif template_name == "john_alesi":
template_agent = AgentTemplates.john_alesi()
elif template_name == "lara_alesi":
template_agent = AgentTemplates.lara_alesi()
else:
raise HTTPException(status_code=404, detail=f"Template '{template_name}' not found")
# Apply customizations
if customizations:
agent_dict = template_agent.dict()
agent_dict.update(customizations)
template_agent = SaapAgent(**agent_dict)
# Set custom ID
template_agent.id = agent_id
template_agent.communication.input_queue = f"{agent_id}_input"
template_agent.communication.output_queue = f"{agent_id}_output"
# Check if agent already exists
existing = await agent_manager.get_agent(agent_id)
if existing:
raise HTTPException(
status_code=409,
detail=f"Agent '{agent_id}' already exists"
)
# Create agent
created_agent = await agent_manager.register_agent(template_agent)
# Initialize in background
background_tasks.add_task(
_initialize_agent_queues,
created_agent.id,
created_agent.communication
)
return {
"success": True,
"message": f"Agent '{created_agent.name}' created from template '{template_name}'",
"agent": created_agent,
"template_name": template_name,
"timestamp": datetime.utcnow().isoformat()
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to create agent from template: {str(e)}")
# ===== BACKGROUND TASKS =====
async def _initialize_agent_queues(agent_id: str, comm_config):
"""Initialize Redis queues for agent"""
try:
queue_service = MessageQueueService()
await queue_service.create_agent_queues(agent_id, comm_config)
except Exception as e:
print(f"Failed to initialize queues for {agent_id}: {e}")
async def _start_agent_process(agent_id: str, agent_manager: AgentManager):
"""Start agent process in background"""
try:
await agent_manager.start_agent(agent_id)
except Exception as e:
print(f"Failed to start agent {agent_id}: {e}")
async def _stop_agent_process(agent_id: str, agent_manager: AgentManager, graceful: bool, timeout: int):
"""Stop agent process in background"""
try:
await agent_manager.stop_agent(agent_id, graceful=graceful, timeout=timeout)
except Exception as e:
print(f"Failed to stop agent {agent_id}: {e}")
async def _restart_agent_process(agent_id: str, agent_manager: AgentManager):
"""Restart agent process in background"""
try:
await agent_manager.restart_agent(agent_id)
except Exception as e:
print(f"Failed to restart agent {agent_id}: {e}")
# ===== ERROR HANDLERS =====
@router.exception_handler(ValueError)
async def value_error_handler(request, exc):
return JSONResponse(
status_code=400,
content={"detail": f"Validation error: {str(exc)}"}
)