Spaces:
Sleeping
Sleeping
| """ | |
| SAAP Agent Manager - FastAPI Backend | |
| Modular agent management with JSON Schema validation | |
| """ | |
| from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, Field | |
| from typing import Dict, List, Optional, Any | |
| from datetime import datetime, timezone | |
| import json | |
| import asyncio | |
| import logging | |
| from pathlib import Path | |
| # Import agent templates and schema | |
| import sys | |
| sys.path.append(str(Path(__file__).parent.parent)) | |
| app = FastAPI( | |
| title="SAAP Agent Manager API", | |
| description="Modular Multi-Agent Management System", | |
| version="1.0.0" | |
| ) | |
| # CORS middleware for Vue.js frontend | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["http://localhost:5173", "http://localhost:3000"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Logging setup | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Pydantic Models based on JSON Schema | |
| class AgentModel(BaseModel): | |
| provider: str = Field(..., description="Model provider") | |
| name: str = Field(..., description="Model name") | |
| endpoint: Optional[str] = Field(None, description="API endpoint") | |
| api_key: Optional[str] = Field(None, description="API key") | |
| class AgentPersonality(BaseModel): | |
| system_prompt: str = Field(..., description="System prompt") | |
| temperature: float = Field(0.7, ge=0.0, le=2.0) | |
| max_tokens: int = Field(1000, ge=1, le=4096) | |
| class AgentMetrics(BaseModel): | |
| messages_processed: int = Field(0, ge=0) | |
| average_response_time: float = Field(0.0, ge=0.0) | |
| uptime: str = Field("0m") | |
| error_count: int = Field(0, ge=0) | |
| class Agent(BaseModel): | |
| id: str = Field(..., regex="^[a-z_]+$") | |
| name: str | |
| type: str = Field(..., regex="^(coordinator|specialist|developer|analyst|utility)$") | |
| color: str = Field(..., regex="^#[0-9A-Fa-f]{6}$") | |
| avatar: Optional[str] = None | |
| status: str = Field("inactive", regex="^(inactive|starting|active|stopping|error)$") | |
| capabilities: List[str] = Field(default_factory=list) | |
| model: AgentModel | |
| personality: AgentPersonality | |
| metrics: AgentMetrics = Field(default_factory=AgentMetrics) | |
| created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) | |
| updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) | |
| class MessageRequest(BaseModel): | |
| sender_id: str | |
| receiver_id: str | |
| content: str | |
| message_type: str = "request" | |
| metadata: Dict[str, Any] = Field(default_factory=dict) | |
| class MessageResponse(BaseModel): | |
| id: str | |
| sender_id: str | |
| receiver_id: str | |
| content: str | |
| response: Optional[str] = None | |
| timestamp: datetime | |
| status: str = "sent" | |
| # Global agent storage (in production, use database) | |
| agents: Dict[str, Agent] = {} | |
| messages: List[MessageResponse] = [] | |
| websocket_connections: List[WebSocket] = [] | |
| # Load agent templates | |
| def load_agent_templates(): | |
| """Load agent templates from JSON file""" | |
| try: | |
| template_path = Path(__file__).parent.parent / "models" / "agent_templates.json" | |
| with open(template_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| return data["templates"] | |
| except Exception as e: | |
| logger.error(f"Failed to load agent templates: {e}") | |
| return {} | |
| agent_templates = load_agent_templates() | |
| # WebSocket connection manager | |
| class ConnectionManager: | |
| def __init__(self): | |
| self.active_connections: List[WebSocket] = [] | |
| async def connect(self, websocket: WebSocket): | |
| await websocket.accept() | |
| self.active_connections.append(websocket) | |
| logger.info(f"WebSocket connected. Total connections: {len(self.active_connections)}") | |
| def disconnect(self, websocket: WebSocket): | |
| if websocket in self.active_connections: | |
| self.active_connections.remove(websocket) | |
| logger.info(f"WebSocket disconnected. Total connections: {len(self.active_connections)}") | |
| async def broadcast(self, data: dict): | |
| """Broadcast data to all connected clients""" | |
| for connection in self.active_connections: | |
| try: | |
| await connection.send_json(data) | |
| except Exception as e: | |
| logger.error(f"Error broadcasting to WebSocket: {e}") | |
| manager = ConnectionManager() | |
| # Agent Management Endpoints | |
| async def list_agents(): | |
| """Get all registered agents""" | |
| return list(agents.values()) | |
| async def get_agent_templates(): | |
| """Get available agent templates""" | |
| return agent_templates | |
| async def create_agent(agent_data: dict): | |
| """Create a new agent from template or custom data""" | |
| try: | |
| # If template_id provided, use template as base | |
| if "template_id" in agent_data and agent_data["template_id"] in agent_templates: | |
| template = agent_templates[agent_data["template_id"]].copy() | |
| template.update(agent_data) | |
| agent_data = template | |
| # Add default metrics and timestamps | |
| agent_data.setdefault("metrics", { | |
| "messages_processed": 0, | |
| "average_response_time": 0.0, | |
| "uptime": "0m", | |
| "error_count": 0 | |
| }) | |
| agent = Agent(**agent_data) | |
| agents[agent.id] = agent | |
| # Broadcast to WebSocket clients | |
| await manager.broadcast({ | |
| "type": "agent_created", | |
| "agent": agent.dict() | |
| }) | |
| logger.info(f"Agent created: {agent.id}") | |
| return agent | |
| except Exception as e: | |
| logger.error(f"Error creating agent: {e}") | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| async def get_agent(agent_id: str): | |
| """Get specific agent by ID""" | |
| if agent_id not in agents: | |
| raise HTTPException(status_code=404, detail="Agent not found") | |
| return agents[agent_id] | |
| async def update_agent(agent_id: str, agent_data: dict): | |
| """Update existing agent""" | |
| if agent_id not in agents: | |
| raise HTTPException(status_code=404, detail="Agent not found") | |
| try: | |
| # Preserve existing data and update with new data | |
| existing_agent = agents[agent_id].dict() | |
| existing_agent.update(agent_data) | |
| existing_agent["updated_at"] = datetime.now(timezone.utc) | |
| agent = Agent(**existing_agent) | |
| agents[agent_id] = agent | |
| # Broadcast update | |
| await manager.broadcast({ | |
| "type": "agent_updated", | |
| "agent": agent.dict() | |
| }) | |
| logger.info(f"Agent updated: {agent_id}") | |
| return agent | |
| except Exception as e: | |
| logger.error(f"Error updating agent: {e}") | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| async def delete_agent(agent_id: str): | |
| """Delete agent""" | |
| if agent_id not in agents: | |
| raise HTTPException(status_code=404, detail="Agent not found") | |
| del agents[agent_id] | |
| # Broadcast deletion | |
| await manager.broadcast({ | |
| "type": "agent_deleted", | |
| "agent_id": agent_id | |
| }) | |
| logger.info(f"Agent deleted: {agent_id}") | |
| return {"message": "Agent deleted successfully"} | |
| # Agent Control Endpoints | |
| async def start_agent(agent_id: str): | |
| """Start an agent""" | |
| if agent_id not in agents: | |
| raise HTTPException(status_code=404, detail="Agent not found") | |
| agent = agents[agent_id] | |
| # Simulate agent startup process | |
| agent.status = "starting" | |
| agent.updated_at = datetime.now(timezone.utc) | |
| # Broadcast status change | |
| await manager.broadcast({ | |
| "type": "agent_status_changed", | |
| "agent_id": agent_id, | |
| "status": "starting" | |
| }) | |
| # Simulate startup delay | |
| await asyncio.sleep(2) | |
| # Mark as active | |
| agent.status = "active" | |
| agent.metrics.uptime = "0m" | |
| await manager.broadcast({ | |
| "type": "agent_status_changed", | |
| "agent_id": agent_id, | |
| "status": "active" | |
| }) | |
| logger.info(f"Agent started: {agent_id}") | |
| return {"message": f"Agent {agent_id} started successfully"} | |
| async def stop_agent(agent_id: str): | |
| """Stop an agent""" | |
| if agent_id not in agents: | |
| raise HTTPException(status_code=404, detail="Agent not found") | |
| agent = agents[agent_id] | |
| agent.status = "stopping" | |
| agent.updated_at = datetime.now(timezone.utc) | |
| # Broadcast status change | |
| await manager.broadcast({ | |
| "type": "agent_status_changed", | |
| "agent_id": agent_id, | |
| "status": "stopping" | |
| }) | |
| # Simulate shutdown delay | |
| await asyncio.sleep(1) | |
| agent.status = "inactive" | |
| await manager.broadcast({ | |
| "type": "agent_status_changed", | |
| "agent_id": agent_id, | |
| "status": "inactive" | |
| }) | |
| logger.info(f"Agent stopped: {agent_id}") | |
| return {"message": f"Agent {agent_id} stopped successfully"} | |
| # Message Management | |
| async def send_message(message_req: MessageRequest): | |
| """Send message between agents""" | |
| if message_req.sender_id not in agents: | |
| raise HTTPException(status_code=404, detail="Sender agent not found") | |
| if message_req.receiver_id not in agents: | |
| raise HTTPException(status_code=404, detail="Receiver agent not found") | |
| # Create message | |
| message = MessageResponse( | |
| id=f"msg_{len(messages)}_{datetime.now().timestamp()}", | |
| sender_id=message_req.sender_id, | |
| receiver_id=message_req.receiver_id, | |
| content=message_req.content, | |
| timestamp=datetime.now(timezone.utc), | |
| status="sent" | |
| ) | |
| messages.append(message) | |
| # Update sender metrics | |
| agents[message_req.sender_id].metrics.messages_processed += 1 | |
| # Broadcast message | |
| await manager.broadcast({ | |
| "type": "message_sent", | |
| "message": message.dict() | |
| }) | |
| # TODO: Implement actual AI response via colossus API | |
| # For now, simulate a response | |
| await asyncio.sleep(0.5) | |
| response_content = f"Response from {agents[message_req.receiver_id].name}: I received your message '{message_req.content}'" | |
| response_message = MessageResponse( | |
| id=f"msg_{len(messages)}_{datetime.now().timestamp()}", | |
| sender_id=message_req.receiver_id, | |
| receiver_id=message_req.sender_id, | |
| content=response_content, | |
| timestamp=datetime.now(timezone.utc), | |
| status="sent" | |
| ) | |
| messages.append(response_message) | |
| # Update receiver metrics | |
| agents[message_req.receiver_id].metrics.messages_processed += 1 | |
| await manager.broadcast({ | |
| "type": "message_received", | |
| "message": response_message.dict() | |
| }) | |
| logger.info(f"Message sent: {message_req.sender_id} -> {message_req.receiver_id}") | |
| return message | |
| async def list_messages(limit: int = 100, agent_id: Optional[str] = None): | |
| """Get messages with optional filtering""" | |
| filtered_messages = messages | |
| if agent_id: | |
| filtered_messages = [ | |
| msg for msg in messages | |
| if msg.sender_id == agent_id or msg.receiver_id == agent_id | |
| ] | |
| return filtered_messages[-limit:] | |
| # System Status | |
| async def get_system_status(): | |
| """Get overall system status""" | |
| active_agents = len([a for a in agents.values() if a.status == "active"]) | |
| total_messages = len(messages) | |
| return { | |
| "status": "healthy", | |
| "agents": { | |
| "total": len(agents), | |
| "active": active_agents, | |
| "inactive": len(agents) - active_agents | |
| }, | |
| "messages": { | |
| "total": total_messages | |
| }, | |
| "websocket_connections": len(manager.active_connections), | |
| "timestamp": datetime.now(timezone.utc) | |
| } | |
| # WebSocket endpoint for real-time updates | |
| async def websocket_endpoint(websocket: WebSocket): | |
| await manager.connect(websocket) | |
| try: | |
| while True: | |
| # Keep connection alive | |
| await websocket.receive_text() | |
| except WebSocketDisconnect: | |
| manager.disconnect(websocket) | |
| # Initialize with templates on startup | |
| async def startup_event(): | |
| """Initialize agents from templates""" | |
| logger.info("SAAP Agent Manager starting up...") | |
| # Create default agents from templates | |
| for template_id, template_data in agent_templates.items(): | |
| try: | |
| template_data["created_at"] = datetime.now(timezone.utc) | |
| template_data["updated_at"] = datetime.now(timezone.utc) | |
| agent = Agent(**template_data) | |
| agents[agent.id] = agent | |
| logger.info(f"Loaded agent template: {agent.id}") | |
| except Exception as e: | |
| logger.error(f"Error loading template {template_id}: {e}") | |
| logger.info(f"Agent Manager ready with {len(agents)} agents") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000, reload=True) |