Hwandji's picture
fix(backend): Correct SPA static files mounting order
582752d
raw
history blame
67.5 kB
"""
SAAP FastAPI Backend - Main Application with OpenRouter Integration
satware AI Autonomous Agent Platform API Server
πŸš€ IMPORTS FIXED:
- Circular dependency resolved
- Proper fallback from Hybrid to Basic mode
- OpenRouter Integration working
- Chat endpoints functional
Features:
- OpenRouter Integration (Primary Provider - Fast & Cost-Efficient)
- colossus Server Integration (Fallback Provider - Free but Slower)
- Hybrid Multi-Provider Support with Auto-Failover
- Real-time Agent Status & Cost Tracking
- WebSocket Live Updates
- Modular Agent Configuration
"""
import os
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from contextlib import asynccontextmanager
from typing import List, Dict, Optional
import asyncio
import json
import logging
from datetime import datetime
import inspect
# SAAP Components - Core imports (relative imports for Docker)
from models.agent_schema import SaapAgent, AgentTemplates, AgentStatus, AgentType
from api.colossus_client import ColossusClient
from services.websocket_manager import WebSocketManager
from config.settings import settings
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# πŸš€ FIXED: Try Hybrid imports with proper error handling
HYBRID_MODE = False
OPENROUTER_ENDPOINTS = False
AgentManagerService = None
hybrid_router = None
try:
# Try to import HybridAgentManagerService first
from services.agent_manager_hybrid import HybridAgentManagerService
AgentManagerService = HybridAgentManagerService
HYBRID_MODE = True
logger.info("βœ… HybridAgentManagerService imported successfully")
# Try to import hybrid endpoints
try:
from api.hybrid_endpoints import hybrid_router
OPENROUTER_ENDPOINTS = True
logger.info("βœ… hybrid_endpoints imported successfully")
except ImportError as e:
logger.warning(f"⚠️ hybrid_endpoints import failed: {e}")
hybrid_router = None
OPENROUTER_ENDPOINTS = False
except ImportError as e:
logger.warning(f"⚠️ HybridAgentManagerService import failed: {e}")
# Fallback to basic AgentManagerService
try:
from services.agent_manager import AgentManagerService
HYBRID_MODE = False
logger.info("βœ… Fallback to basic AgentManagerService")
except ImportError as e2:
logger.error(f"❌ Critical: Cannot import any AgentManagerService: {e2}")
raise
# Log the final mode
logger_msg = "πŸš€ SAAP Backend loaded with HYBRID SUPPORT (OpenRouter + colossus)" if HYBRID_MODE else "⚠️ SAAP Backend loaded in BASIC MODE (colossus only)"
class SaapApplication:
"""SAAP Application State Management with Hybrid Support"""
def __init__(self):
self.agent_manager: Optional[AgentManagerService] = None
self.websocket_manager: WebSocketManager = WebSocketManager()
self.colossus_client: Optional[ColossusClient] = None
self.hybrid_mode = HYBRID_MODE
async def initialize(self):
"""Initialize SAAP services with hybrid database setup"""
if self.hybrid_mode:
logger.info("πŸš€ Initializing SAAP Platform with HYBRID SUPPORT...")
logger.info("πŸ”„ Multi-Provider: OpenRouter (Primary) + colossus (Fallback)")
else:
logger.info("πŸš€ Initializing SAAP Platform (Basic Mode)...")
# πŸ”§ FIX: Initialize database manager FIRST
try:
from database.connection import db_manager
if not db_manager.is_initialized:
logger.info("πŸ“Š Initializing database manager...")
await db_manager.initialize()
logger.info("βœ… Database manager initialized")
except ImportError as e:
logger.warning(f"⚠️ Database manager import failed: {e}")
# Initialize Agent Manager with or without Hybrid Support
if self.hybrid_mode:
# πŸš€ Initialize with OpenRouter API Key
openrouter_key = os.getenv("OPENROUTER_API_KEY", "")
if openrouter_key:
self.agent_manager = AgentManagerService(openrouter_api_key=openrouter_key)
logger.info(f"🌐 OpenRouter API Key configured: {openrouter_key[:20]}...")
else:
logger.warning("⚠️ OPENROUTER_API_KEY not set - hybrid mode may not work")
self.agent_manager = AgentManagerService()
else:
self.agent_manager = AgentManagerService()
await self.agent_manager.initialize()
if self.hybrid_mode:
logger.info("βœ… SAAP Hybrid Platform initialized successfully")
if hasattr(self.agent_manager, 'primary_provider'):
logger.info(f"πŸ”„ Primary Provider: {self.agent_manager.primary_provider}")
if hasattr(self.agent_manager, 'openrouter_client') and self.agent_manager.openrouter_client:
logger.info("🌐 OpenRouter: βœ…")
if hasattr(self.agent_manager, 'colossus_client') and self.agent_manager.colossus_client:
logger.info("πŸ€– colossus: βœ…")
else:
logger.info("βœ… SAAP Platform initialized successfully")
async def shutdown(self):
"""Cleanup SAAP services"""
logger.info("πŸ”§ Shutting down SAAP Platform...")
if self.agent_manager:
await self.agent_manager.shutdown_all_agents()
if self.colossus_client:
await self.colossus_client.__aexit__(None, None, None)
# Close database connections
try:
from database.connection import db_manager
await db_manager.close()
except ImportError:
pass
logger.info("βœ… SAAP Platform shutdown complete")
# Global application state
saap_app = SaapApplication()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""FastAPI lifespan management"""
# Startup
await saap_app.initialize()
yield
# Shutdown
await saap_app.shutdown()
# Create FastAPI application with dynamic title
app_title = "SAAP - satware AI Autonomous Agent Platform"
app_description = "Local autonomous multi-agent platform for specialized AI agents"
app_version = "1.2.3" # Bumped for template fix
if HYBRID_MODE:
app_title += " (Hybrid)"
app_description += " with OpenRouter + colossus Multi-Provider Support"
app = FastAPI(
title=app_title,
description=app_description,
version=app_version,
docs_url="/docs",
redoc_url="/redoc",
lifespan=lifespan
)
# CORS middleware for Vue.js frontend - using settings + HuggingFace Spaces regex
app.add_middleware(
CORSMiddleware,
allow_origins=settings.get_cors_origins(),
allow_origin_regex=r"https://.*\.hf\.space", # Allow all HuggingFace Spaces
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# πŸš€ FIXED: Include OpenRouter endpoints with proper dependency injection
if OPENROUTER_ENDPOINTS and hybrid_router and HYBRID_MODE:
# Define proper dependency injection
def get_hybrid_manager() -> AgentManagerService:
"""Dependency for hybrid endpoints"""
if not saap_app.agent_manager:
raise HTTPException(status_code=503, detail="Agent Manager not initialized")
return saap_app.agent_manager
# Replace the placeholder dependency in hybrid_endpoints
try:
import api.hybrid_endpoints
api.hybrid_endpoints.get_hybrid_manager = get_hybrid_manager
app.include_router(hybrid_router)
logger.info("βœ… OpenRouter endpoints included with proper dependencies")
except Exception as e:
logger.warning(f"⚠️ OpenRouter endpoints registration failed: {e}")
OPENROUTER_ENDPOINTS = False
else:
logger.info("⚠️ OpenRouter endpoints not available")
# Dependency injection
def get_agent_manager() -> AgentManagerService:
"""Get agent manager instance"""
if not saap_app.agent_manager:
raise HTTPException(status_code=503, detail="Agent Manager not initialized")
return saap_app.agent_manager
def get_websocket_manager() -> WebSocketManager:
"""Get WebSocket manager instance"""
return saap_app.websocket_manager
# =====================================================
# ENHANCED ROOT ENDPOINT WITH HYBRID INFO (API)
# =====================================================
@app.get("/api")
async def root():
"""Root endpoint with SAAP status (hybrid-aware)"""
agent_manager = get_agent_manager()
agents_count = len(agent_manager.agents)
active_agents = len([a for a in agent_manager.agents.values() if a.is_active()])
base_info = {
"message": f"πŸš€ SAAP - satware AI Autonomous Agent Platform {'(Hybrid Mode)' if HYBRID_MODE else ''}",
"status": "active",
"version": app_version,
"agents_total": agents_count,
"agents_active": active_agents,
"timestamp": datetime.utcnow().isoformat()
}
if HYBRID_MODE and hasattr(agent_manager, 'get_provider_stats'):
# Add hybrid-specific information
try:
provider_stats = agent_manager.get_provider_stats()
base_info.update({
"mode": "hybrid",
"primary_provider": getattr(agent_manager, 'primary_provider', 'hybrid'),
"providers": {
"colossus": {
"available": hasattr(agent_manager, 'colossus_client') and agent_manager.colossus_client is not None,
"url": "https://ai.adrian-schupp.de"
},
"openrouter": {
"available": hasattr(agent_manager, 'openrouter_client') and agent_manager.openrouter_client is not None,
"daily_cost": provider_stats.get("provider_stats", {}).get("openrouter", {}).get("total_cost_usd", 0)
}
},
"performance": {
"hybrid_comparisons": provider_stats.get("total_comparisons", 0),
"failover_enabled": provider_stats.get("failover_enabled", True)
}
})
except Exception as e:
logger.warning(f"⚠️ Could not get provider stats: {e}")
base_info["mode"] = "hybrid"
else:
# Basic mode information
base_info.update({
"mode": "basic",
"colossus_server": "https://ai.adrian-schupp.de"
})
return base_info
@app.get("/health")
async def health():
"""Simple health check for Docker/Kubernetes"""
return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}
@app.get("/api/v1/health")
async def health_check():
"""Enhanced health check with provider status"""
agent_manager = get_agent_manager()
base_health = {
"status": "healthy",
"services": {
"agent_manager": "active",
"websocket": "active"
},
"timestamp": datetime.utcnow().isoformat()
}
if HYBRID_MODE:
# Enhanced health check for hybrid mode
base_health.update({
"mode": "hybrid",
"primary_provider": getattr(agent_manager, 'primary_provider', 'hybrid')
})
# Check colossus status
if hasattr(agent_manager, 'colossus_connection_status'):
base_health["services"]["colossus_api"] = agent_manager.colossus_connection_status
else:
base_health["services"]["colossus_api"] = "unknown"
# Check OpenRouter status
if hasattr(agent_manager, 'openrouter_client') and agent_manager.openrouter_client:
try:
or_health = await agent_manager.openrouter_client.health_check()
base_health["services"]["openrouter_api"] = or_health.get("status", "unknown")
except Exception as e:
base_health["services"]["openrouter_api"] = f"error: {e}"
else:
base_health["services"]["openrouter_api"] = "unavailable"
else:
# Basic mode health check
base_health.update({
"mode": "basic"
})
base_health["services"]["colossus_api"] = "connected"
return base_health
# =====================================================
# DEBUG ENDPOINTS - Chat Modal Troubleshooting
# =====================================================
@app.get("/api/v1/debug/status")
async def debug_system_status(
agent_manager: AgentManagerService = Depends(get_agent_manager)
):
"""
πŸ” DEBUG: Comprehensive system status for Chat Modal troubleshooting
"""
try:
system_status = await agent_manager.get_system_status() if hasattr(agent_manager, 'get_system_status') else {"agents": len(agent_manager.agents)}
debug_info = {
"timestamp": datetime.utcnow().isoformat(),
"mode": "hybrid" if HYBRID_MODE else "basic",
"system_status": system_status,
"api_endpoints": {
"health": "/api/v1/health",
"agents_list": "/api/v1/agents",
"chat": "/api/v1/agents/{agent_id}/chat",
"websocket": "/ws"
},
"frontend_compatibility": {
"cors_origins": settings.get_cors_origins(),
"expected_chat_url_format": "POST /api/v1/agents/{agent_id}/chat",
"chat_request_format": {"message": "string", "timestamp": "number"},
"chat_response_format": {"success": "bool", "response": "object", "error": "string"}
}
}
if HYBRID_MODE and OPENROUTER_ENDPOINTS:
debug_info["api_endpoints"]["chat_openrouter"] = "/api/v1/agents/{agent_id}/chat/openrouter"
debug_info["api_endpoints"]["cost_summary"] = "/api/v1/cost/summary"
return {"debug_info": debug_info}
except Exception as e:
logger.error(f"❌ Debug status error: {e}")
return {
"error": f"Debug status failed: {str(e)}",
"timestamp": datetime.utcnow().isoformat()
}
# =====================================================
# AGENT MANAGEMENT ENDPOINTS
# =====================================================
@app.post("/api/v1/agents", response_model=Dict)
async def create_agent(
agent_data: Dict,
agent_manager: AgentManagerService = Depends(get_agent_manager),
websocket_manager: WebSocketManager = Depends(get_websocket_manager)
):
"""
UC-001: Agent registrieren (hybrid-enabled)
Create new agent from JSON data
"""
try:
logger.info(f"πŸ“ Received agent data: {agent_data}")
# Create agent from JSON data with error handling
try:
agent = SaapAgent(**agent_data)
except Exception as validation_error:
logger.error(f"❌ Agent validation error: {validation_error}")
# Return detailed validation error
raise HTTPException(
status_code=422,
detail=f"Validation error: {str(validation_error)}"
)
# Register with agent manager
success = await agent_manager.register_agent(agent)
if success:
# Notify WebSocket clients
await websocket_manager.broadcast_agent_update(agent)
logger.info(f"βœ… Agent registered: {agent.name} ({agent.id})")
response = {
"success": True,
"message": f"Agent '{agent.name}' registered successfully",
"agent": agent.dict(),
"agent_id": agent.id
}
if HYBRID_MODE:
response["hybrid_enabled"] = True
return response
else:
logger.error(f"❌ Agent registration failed for {agent_data.get('name', 'unknown')}")
raise HTTPException(status_code=400, detail="Agent registration failed")
except HTTPException:
raise # Re-raise HTTP exceptions
except ValueError as e:
logger.error(f"❌ ValueError in agent creation: {e}")
raise HTTPException(status_code=422, detail=f"Invalid agent data: {str(e)}")
except Exception as e:
logger.error(f"❌ Unexpected agent registration error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Internal error: {str(e)}")
@app.get("/api/v1/agents")
async def list_agents(
agent_type: Optional[AgentType] = None,
status: Optional[AgentStatus] = None,
agent_manager: AgentManagerService = Depends(get_agent_manager)
):
"""
Get list of all agents with optional filtering (hybrid-aware)
πŸš€ FIX: Return format compatible with frontend expectations
"""
agents = list(agent_manager.agents.values())
# Apply filters
if agent_type:
agents = [a for a in agents if a.type == agent_type]
if status:
agents = [a for a in agents if a.status == status]
# πŸš€ FIX: Handle both SaapAgent objects and dicts
agents_list = []
for agent in agents:
if isinstance(agent, dict):
agents_list.append(agent)
elif hasattr(agent, 'dict'):
agents_list.append(agent.dict())
elif hasattr(agent, 'model_dump'):
agents_list.append(agent.model_dump())
else:
logger.warning(f"⚠️ Unknown agent type: {type(agent)}")
response = {
"agents": agents_list,
"total": len(agents_list),
"active": len([a for a in agents if a.status == AgentStatus.ACTIVE]),
"timestamp": datetime.utcnow().isoformat()
}
if HYBRID_MODE:
response["hybrid_enabled"] = True
response["primary_provider"] = getattr(agent_manager, 'primary_provider', 'hybrid')
return response
@app.get("/api/v1/agents/{agent_id}", response_model=Dict)
async def get_agent(
agent_id: str,
agent_manager: AgentManagerService = Depends(get_agent_manager)
):
"""
Get specific agent details (hybrid-aware)
"""
agent = agent_manager.get_agent(agent_id)
if not agent:
raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' not found")
agent_dict = agent.dict()
if HYBRID_MODE:
agent_dict["hybrid_enabled"] = True
agent_dict["primary_provider"] = getattr(agent_manager, 'primary_provider', 'hybrid')
return agent_dict
@app.put("/api/v1/agents/{agent_id}", response_model=Dict)
async def update_agent(
agent_id: str,
agent_data: Dict,
agent_manager: AgentManagerService = Depends(get_agent_manager),
websocket_manager: WebSocketManager = Depends(get_websocket_manager)
):
"""
Update agent configuration
"""
try:
success = await agent_manager.update_agent(agent_id, agent_data)
if success:
agent = agent_manager.get_agent(agent_id)
await websocket_manager.broadcast_agent_update(agent)
response = {
"success": True,
"message": f"Agent '{agent_id}' updated successfully",
"agent": agent.dict()
}
if HYBRID_MODE:
response["hybrid_enabled"] = True
return response
else:
raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' not found")
except Exception as e:
logger.error(f"❌ Agent update error: {e}")
raise HTTPException(status_code=500, detail=f"Update failed: {str(e)}")
@app.delete("/api/v1/agents/{agent_id}")
async def delete_agent(
agent_id: str,
agent_manager: AgentManagerService = Depends(get_agent_manager),
websocket_manager: WebSocketManager = Depends(get_websocket_manager)
):
"""
Delete agent
"""
try:
success = await agent_manager.delete_agent(agent_id)
if success:
await websocket_manager.broadcast_agent_deleted(agent_id)
return {
"success": True,
"message": f"Agent '{agent_id}' deleted successfully"
}
else:
raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' not found")
except Exception as e:
logger.error(f"❌ Agent deletion error: {e}")
raise HTTPException(status_code=500, detail=f"Deletion failed: {str(e)}")
# =====================================================
# AGENT LIFECYCLE OPERATIONS - UC-002: Agent starten
# =====================================================
@app.post("/api/v1/agents/{agent_id}/start")
async def start_agent(
agent_id: str,
agent_manager: AgentManagerService = Depends(get_agent_manager),
websocket_manager: WebSocketManager = Depends(get_websocket_manager)
):
"""
UC-002: Agent starten (hybrid-enabled)
Start agent and activate LLM communication
"""
try:
success = await agent_manager.start_agent(agent_id)
if success:
agent = agent_manager.get_agent(agent_id)
await websocket_manager.broadcast_agent_update(agent)
logger.info(f"βœ… Agent started: {agent.name} ({agent_id})")
response = {
"success": True,
"message": f"Agent '{agent_id}' started successfully",
"status": "active",
"agent": agent.dict()
}
if HYBRID_MODE:
response["hybrid_enabled"] = True
response["primary_provider"] = getattr(agent_manager, 'primary_provider', 'hybrid')
return response
else:
raise HTTPException(status_code=400, detail=f"Failed to start agent '{agent_id}'")
except Exception as e:
logger.error(f"❌ Agent start error: {e}")
raise HTTPException(status_code=500, detail=f"Start failed: {str(e)}")
@app.post("/api/v1/agents/{agent_id}/stop")
async def stop_agent(
agent_id: str,
agent_manager: AgentManagerService = Depends(get_agent_manager),
websocket_manager: WebSocketManager = Depends(get_websocket_manager)
):
"""
Stop agent gracefully (hybrid-enabled)
"""
try:
success = await agent_manager.stop_agent(agent_id)
if success:
agent = agent_manager.get_agent(agent_id)
await websocket_manager.broadcast_agent_update(agent)
logger.info(f"πŸ”§ Agent stopped: {agent_id}")
response = {
"success": True,
"message": f"Agent '{agent_id}' stopped successfully",
"status": "inactive"
}
if HYBRID_MODE:
response["hybrid_enabled"] = True
return response
else:
raise HTTPException(status_code=400, detail=f"Failed to stop agent '{agent_id}'")
except Exception as e:
logger.error(f"❌ Agent stop error: {e}")
raise HTTPException(status_code=500, detail=f"Stop failed: {str(e)}")
# =====================================================
# AGENT COMMUNICATION - UC-003: Multi-Agent Communication
# =====================================================
@app.post("/api/v1/agents/{agent_id}/chat")
async def chat_with_agent(
agent_id: str,
message_data: Dict,
agent_manager: AgentManagerService = Depends(get_agent_manager),
websocket_manager: WebSocketManager = Depends(get_websocket_manager)
):
"""
πŸš€ ENHANCED: Send message to agent and get response with hybrid support
Automatically uses primary provider with failover support
"""
try:
message = message_data.get("message", "")
provider = message_data.get("provider", None) # Optional provider override
if not message:
raise HTTPException(status_code=400, detail="Message content required")
if HYBRID_MODE:
logger.info(f"πŸ“€ Hybrid chat request: Agent {agent_id} - Provider: {provider or 'auto'}")
else:
logger.info(f"πŸ“€ Chat request: Agent {agent_id} - Message: {message[:50]}...")
# Send message with hybrid support if available
if HYBRID_MODE and hasattr(agent_manager, 'send_message_to_agent'):
# Check if the hybrid version supports provider parameter
sig = inspect.signature(agent_manager.send_message_to_agent)
if 'provider' in sig.parameters:
response = await agent_manager.send_message_to_agent(agent_id, message, provider)
else:
response = await agent_manager.send_message_to_agent(agent_id, message)
else:
response = await agent_manager.send_message_to_agent(agent_id, message)
# Check if response contains error
if "error" in response:
logger.error(f"❌ Agent Manager Error: {response['error']}")
# Return structured error response for frontend
error_response = {
"success": False,
"agent_id": agent_id,
"message": message,
"error": response["error"],
"debug_info": response.get("debug_info", {}),
"timestamp": datetime.utcnow().isoformat()
}
if HYBRID_MODE:
error_response.update({
"provider": response.get("provider", "unknown"),
"failover_used": response.get("failover_used", False)
})
return JSONResponse(
status_code=200, # Don't use HTTP error codes for agent errors
content=error_response
)
# Successful response
if HYBRID_MODE:
logger.info(f"βœ… Hybrid chat successful: Agent {agent_id} via {response.get('provider', 'unknown')}")
else:
logger.info(f"βœ… Chat successful: Agent {agent_id} responded")
# Broadcast to WebSocket clients
websocket_data = {
"agent_id": agent_id,
"message": message,
"response": response.get("content", ""),
"timestamp": datetime.utcnow().isoformat()
}
if HYBRID_MODE:
websocket_data.update({
"provider": response.get("provider", "unknown"),
"response_time": response.get("response_time", 0),
"cost_usd": response.get("cost_usd", 0),
"failover_used": response.get("failover_used", False)
})
await websocket_manager.broadcast_message_update(websocket_data)
# Prepare success response
success_response = {
"success": True,
"agent_id": agent_id,
"message": message,
"response": response,
"timestamp": datetime.utcnow().isoformat()
}
if HYBRID_MODE:
success_response.update({
"provider": response.get("provider", "unknown"),
"performance": {
"response_time": response.get("response_time", 0),
"tokens_used": response.get("tokens_used", 0),
"cost_usd": response.get("cost_usd", 0),
"cost_efficiency": response.get("cost_efficiency", "N/A")
},
"failover_used": response.get("failover_used", False)
})
return success_response
except HTTPException:
raise # Re-raise HTTP exceptions as-is
except Exception as e:
logger.error(f"❌ Chat endpoint error: {e}")
# Return structured error for frontend instead of HTTP exception
return JSONResponse(
status_code=200,
content={
"success": False,
"agent_id": agent_id,
"message": message_data.get("message", ""),
"error": f"Chat system error: {str(e)}",
"timestamp": datetime.utcnow().isoformat()
}
)
# =====================================================
# 🌐 OPENROUTER DIRECT CHAT ENDPOINT (if hybrid mode)
# =====================================================
if HYBRID_MODE:
@app.post("/api/v1/agents/{agent_id}/chat/openrouter")
async def chat_with_agent_openrouter(
agent_id: str,
message_data: Dict,
agent_manager: AgentManagerService = Depends(get_agent_manager),
websocket_manager: WebSocketManager = Depends(get_websocket_manager)
):
"""
🌐 Direct chat with agent via OpenRouter (fast & cost-efficient)
Bypasses primary provider setting and forces OpenRouter
"""
try:
message = message_data.get("message", "")
if not message:
raise HTTPException(status_code=400, detail="Message content required")
logger.info(f"πŸ“€ OpenRouter direct chat: Agent {agent_id}")
# Force OpenRouter provider if supported
sig = inspect.signature(agent_manager.send_message_to_agent)
if 'provider' in sig.parameters:
response = await agent_manager.send_message_to_agent(agent_id, message, "openrouter")
else:
# Fallback if provider parameter not available
response = await agent_manager.send_message_to_agent(agent_id, message)
# Handle error response
if "error" in response:
logger.error(f"❌ OpenRouter Error: {response['error']}")
return JSONResponse(
status_code=200,
content={
"success": False,
"agent_id": agent_id,
"message": message,
"error": response["error"],
"provider": "openrouter",
"timestamp": datetime.utcnow().isoformat()
}
)
# Successful response
logger.info(f"βœ… OpenRouter direct chat successful: Agent {agent_id}")
# Broadcast to WebSocket
websocket_data = {
"agent_id": agent_id,
"message": message,
"response": response.get("content", ""),
"provider": "openrouter",
"timestamp": datetime.utcnow().isoformat()
}
if "response_time" in response:
websocket_data["response_time"] = response["response_time"]
if "cost_usd" in response:
websocket_data["cost_usd"] = response["cost_usd"]
await websocket_manager.broadcast_message_update(websocket_data)
return {
"success": True,
"agent_id": agent_id,
"message": message,
"response": response,
"provider": "openrouter",
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"❌ OpenRouter chat endpoint error: {e}")
return JSONResponse(
status_code=200,
content={
"success": False,
"agent_id": agent_id,
"message": message_data.get("message", ""),
"error": f"OpenRouter chat error: {str(e)}",
"provider": "openrouter",
"timestamp": datetime.utcnow().isoformat()
}
)
# =====================================================
# WEBSOCKET ENDPOINT - Real-time Updates
# =====================================================
# =====================================================
# πŸ€– MULTI-AGENT COMMUNICATION SYSTEM - UC-004: Multi-Agent Coordination
# =====================================================
@app.post("/api/v1/multi-agent/chat")
async def multi_agent_chat(
request_data: Dict,
agent_manager: AgentManagerService = Depends(get_agent_manager),
websocket_manager: WebSocketManager = Depends(get_websocket_manager)
):
"""
πŸ€–πŸ”’ Multi-Agent Communication System with Privacy Protection
Jane Alesi acts as Master Coordinator:
1. Analyzes user intent and task complexity
2. Delegates to appropriate specialist agents
3. Coordinates responses and provides unified output
4. **Privacy Protection:** Automatically detects sensitive data and routes to secure provider
Specialist Agents:
- John Alesi: Development, coding, technical implementation
- Lara Alesi: Medical, health, diagnosis, treatment
- Justus Alesi: Legal, compliance, contracts, regulations
- Theo Alesi: Finance, investment, budgeting, analysis
- Leon Alesi: System integration, architecture, orchestration
- Luna Alesi: Coaching, strategy, organizational development
Privacy Features:
- Automatic sensitive data detection
- Provider selection based on privacy level
- User override support for explicit provider choice
"""
try:
# πŸ”§ FIX: Ensure user_message is always a string
user_message = request_data.get("user_message", request_data.get("message", ""))
# Type safety check - critical for .lower() calls
if isinstance(user_message, dict):
logger.error(f"❌ user_message is dict, not string: {user_message}")
user_message = user_message.get("message", user_message.get("text", ""))
if not isinstance(user_message, str):
logger.error(f"❌ user_message has invalid type: {type(user_message)}")
raise HTTPException(status_code=400, detail=f"Invalid message type: expected string, got {type(user_message).__name__}")
user_context = request_data.get("user_context", {})
preferred_agent = request_data.get("preferred_agent", None)
task_priority = request_data.get("task_priority", "normal")
provider = request_data.get("provider", None) # User provider preference
privacy_mode = request_data.get("privacy_mode", "auto") # auto, colossus, openrouter
if not user_message:
raise HTTPException(status_code=400, detail="User message is required")
# πŸ”’ PRIVACY DETECTION & PROVIDER SELECTION
selected_provider = None
privacy_level = "public"
privacy_details = {}
if HYBRID_MODE and hasattr(agent_manager, 'openrouter_client') and agent_manager.openrouter_client:
try:
from services.privacy_detector import detect_privacy_level, should_use_local
# User explicit provider choice takes precedence
if privacy_mode == "colossus":
selected_provider = "colossus"
privacy_level = "private"
logger.info(f"πŸ”’ User selected Colossus (internal server) for data protection")
elif privacy_mode == "openrouter":
selected_provider = "openrouter"
privacy_level = "public"
logger.info(f"🌐 User selected OpenRouter (external) - bypassing privacy check")
else: # auto mode - privacy detection
level, details = detect_privacy_level(user_message, preferred_agent)
privacy_level = level.value
privacy_details = details
# Determine if local provider should be used
use_local = should_use_local(user_message, preferred_agent, mode="balanced")
selected_provider = "colossus" if use_local else "openrouter"
if use_local:
logger.info(f"πŸ”’ Privacy Detection: {level.value} β†’ Routing to Colossus (internal)")
logger.info(f" Reason: {details.get('reason', 'privacy protection')}")
if details.get('keyword_matches'):
logger.info(f" Keywords: {[k for c, k in details['keyword_matches']]}")
else:
logger.info(f"🌐 Privacy Detection: {level.value} β†’ Routing to OpenRouter (external)")
except ImportError as e:
logger.warning(f"⚠️ Privacy detector not available: {e}")
selected_provider = None # Let agent_manager decide
logger.info(f"πŸ€– Multi-Agent Chat: message='{user_message[:50]}...', agent={preferred_agent}, priority={task_priority}, provider={selected_provider}, privacy={privacy_level}")
# πŸš€ Step 1: Jane's ULTRA-COMPACT Intent Analysis with Privacy-Aware Provider
jane_agent_id = "jane_alesi"
# Prepare provider parameter for agent communication
provider_param = selected_provider if selected_provider else provider
# Ultra-compact prompt - just ask for agent name
intent_prompt = f"""Which specialist should answer this?
Question: "{user_message}"
Respond with ONLY the agent name:
- john_alesi (code/development)
- theo_alesi (finance/money)
- lara_alesi (medical/health)
- justus_alesi (legal/law)
- leon_alesi (system/infrastructure)
- luna_alesi (coaching/strategy)
- jane_alesi (general/other)"""
# Send to Jane with privacy-aware provider selection
if HYBRID_MODE and provider_param and hasattr(agent_manager, 'send_message_to_agent'):
sig = inspect.signature(agent_manager.send_message_to_agent)
if 'provider' in sig.parameters:
jane_intent_response = await agent_manager.send_message_to_agent(
jane_agent_id, intent_prompt, provider_param
)
else:
jane_intent_response = await agent_manager.send_message_to_agent(jane_agent_id, intent_prompt)
else:
jane_intent_response = await agent_manager.send_message_to_agent(jane_agent_id, intent_prompt)
if "error" in jane_intent_response:
logger.error(f"❌ Jane intent analysis failed: {jane_intent_response['error']}")
return {
"success": False,
"error": f"Intent analysis failed: {jane_intent_response['error']}",
"timestamp": datetime.utcnow().isoformat()
}
# Extract agent name from Jane's response (simple text parsing)
intent_content = jane_intent_response.get("content", "").lower()
specialist_agent_id = "jane_alesi" # Default
# Find agent name in response
for agent_name in ["john_alesi", "theo_alesi", "lara_alesi", "justus_alesi", "leon_alesi", "luna_alesi"]:
if agent_name in intent_content:
specialist_agent_id = agent_name
break
logger.info(f"🧠 Jane selected: {specialist_agent_id}")
# If Jane selected herself β†’ direct response (no coordination needed)
if specialist_agent_id == "jane_alesi":
logger.info(f"πŸ€” Jane handles general question directly")
# Send with privacy-aware provider
if HYBRID_MODE and provider_param and hasattr(agent_manager, 'send_message_to_agent'):
sig = inspect.signature(agent_manager.send_message_to_agent)
if 'provider' in sig.parameters:
jane_direct_response = await agent_manager.send_message_to_agent(
"jane_alesi", user_message, provider_param
)
else:
jane_direct_response = await agent_manager.send_message_to_agent("jane_alesi", user_message)
else:
jane_direct_response = await agent_manager.send_message_to_agent("jane_alesi", user_message)
if "error" in jane_direct_response:
return {
"success": False,
"error": f"Jane response failed: {jane_direct_response['error']}",
"timestamp": datetime.utcnow().isoformat()
}
return {
"success": True,
"coordinator": "jane_alesi",
"specialist_agent": "jane_alesi",
"delegation_used": False,
"response": {
"content": jane_direct_response.get("content", ""),
"specialist_content": None,
"provider": jane_direct_response.get("provider", selected_provider or "unknown"),
"response_time": jane_direct_response.get("response_time", 0),
"cost_usd": jane_direct_response.get("cost_usd", 0)
},
"privacy_protection": {
"privacy_level": privacy_level,
"selected_provider": selected_provider or jane_direct_response.get("provider", "unknown"),
"user_override": privacy_mode != "auto",
"details": privacy_details
},
"timestamp": datetime.utcnow().isoformat()
}
# Step 2: Send to specialist agent
specialist_agent = agent_manager.get_agent(specialist_agent_id)
if not specialist_agent:
logger.error(f"❌ Specialist agent {specialist_agent_id} not found")
return {
"success": False,
"error": f"Specialist agent '{specialist_agent_id}' not available",
"timestamp": datetime.utcnow().isoformat()
}
# Send to specialist with privacy-aware provider
logger.info(f"🎯 Sending to specialist: {specialist_agent_id} (provider: {selected_provider or 'auto'})")
if HYBRID_MODE and provider_param and hasattr(agent_manager, 'send_message_to_agent'):
sig = inspect.signature(agent_manager.send_message_to_agent)
if 'provider' in sig.parameters:
specialist_response = await agent_manager.send_message_to_agent(
specialist_agent_id, user_message, provider_param
)
else:
specialist_response = await agent_manager.send_message_to_agent(specialist_agent_id, user_message)
else:
specialist_response = await agent_manager.send_message_to_agent(specialist_agent_id, user_message)
if "error" in specialist_response:
logger.error(f"❌ Specialist agent {specialist_agent_id} failed: {specialist_response['error']}")
return {
"success": False,
"error": f"Specialist response failed: {specialist_response['error']}",
"timestamp": datetime.utcnow().isoformat()
}
# Step 3: Jane's Final Coordination (for specialist responses)
jane_agent_id = "jane_alesi"
coordination_prompt = f"""As SAAP Coordinator, review and enhance this specialist response:
User Question: "{user_message}"
Specialist ({specialist_agent_id}): "{specialist_response.get('content', '')[:500]}..."
Provide a coordinated final response that ensures quality and completeness."""
# Jane coordination with privacy-aware provider
if HYBRID_MODE and provider_param and hasattr(agent_manager, 'send_message_to_agent'):
sig = inspect.signature(agent_manager.send_message_to_agent)
if 'provider' in sig.parameters:
jane_coordination = await agent_manager.send_message_to_agent(
jane_agent_id, coordination_prompt, provider_param
)
else:
jane_coordination = await agent_manager.send_message_to_agent(jane_agent_id, coordination_prompt)
else:
jane_coordination = await agent_manager.send_message_to_agent(jane_agent_id, coordination_prompt)
if "error" not in jane_coordination:
coordination_content = jane_coordination.get("content", "")
else:
# Fallback to specialist response if Jane fails
logger.warning(f"⚠️ Jane coordination failed, using specialist response")
coordination_content = specialist_response.get("content", "")
final_response = {
"success": True,
"coordinator": "jane_alesi",
"specialist_agent": specialist_agent_id,
"delegation_used": True, # Specialist was used
"response": {
"content": coordination_content,
"specialist_content": specialist_response.get("content", ""),
"provider": specialist_response.get("provider", selected_provider or "unknown"),
"response_time": specialist_response.get("response_time", 0),
"cost_usd": specialist_response.get("cost_usd", 0)
},
"privacy_protection": {
"privacy_level": privacy_level,
"selected_provider": selected_provider or specialist_response.get("provider", "unknown"),
"user_override": privacy_mode != "auto",
"details": privacy_details,
"data_protected": selected_provider == "colossus" or privacy_level in ["private", "confidential"]
},
"timestamp": datetime.utcnow().isoformat()
}
# Broadcast to WebSocket clients
websocket_data = {
"type": "multi_agent_response",
"user_message": user_message,
"coordinator": final_response.get("coordinator"),
"specialist_agent": final_response.get("specialist_agent"),
"delegation_used": final_response.get("delegation_used"),
"response_content": final_response["response"]["content"][:200] + "...",
"timestamp": datetime.utcnow().isoformat()
}
await websocket_manager.broadcast_message_update(websocket_data)
logger.info(f"βœ… Multi-Agent Chat successful: {'Delegated to ' + final_response.get('specialist_agent', '') if final_response.get('delegation_used') else 'Direct response from Jane'}")
return final_response
except Exception as e:
logger.error(f"❌ Multi-Agent Chat error: {e}")
return {
"success": False,
"error": f"Multi-Agent Communication failed: {str(e)}",
"timestamp": datetime.utcnow().isoformat()
}
def detect_direct_routing(user_message: str, preferred_agent: str = None) -> Optional[str]:
"""
πŸš€ PHASE 1.2: Smart Routing without Jane for clear intent patterns
Performance optimization: Skip Jane analysis for obvious requests
Returns:
agent_id if direct routing possible, None if Jane analysis needed
"""
# πŸ”§ FIX: Type safety check
if not isinstance(user_message, str):
logger.warning(f"⚠️ detect_direct_routing received non-string: {type(user_message)}")
return None
user_lower = user_message.lower()
# 🎯 High-confidence keyword patterns for direct routing
direct_routing_patterns = {
"john_alesi": {
"keywords": ["code", "coding", "bug", "fehler", "implement", "implementier",
"python", "javascript", "typescript", "react", "vue", "api",
"function", "funktion", "debug", "software", "program"],
"threshold": 1 # Single strong keyword enough
},
"theo_alesi": {
"keywords": ["kosten", "cost", "budget", "finanz", "finance", "geld", "money",
"preis", "price", "euro", "dollar", "investition", "investment",
"ausgaben", "expenses", "gewinn", "profit", "spar", "spare",
"anlage", "vermΓΆgen", "schulden"],
"threshold": 1 # Single strong keyword enough
},
"lara_alesi": {
"keywords": ["gesundheit", "health", "krank", "sick", "symptom", "diagnose",
"medizin", "medicine", "arzt", "doctor", "patient", "behandlung",
"treatment", "krankheit", "disease"],
"threshold": 2
},
"justus_alesi": {
"keywords": ["recht", "legal", "law", "vertrag", "contract", "gesetz",
"anwalt", "attorney", "gericht", "court", "klage", "lawsuit",
"gdpr", "dsgvo", "compliance", "vorschrift", "regulation"],
"threshold": 2
},
"leon_alesi": {
"keywords": ["system", "server", "infrastructure", "deploy", "docker",
"kubernetes", "integration", "architektur", "architecture",
"network", "netzwerk", "cloud", "devops"],
"threshold": 2
},
"luna_alesi": {
"keywords": ["coaching", "strategie", "strategy", "management", "fΓΌhrung",
"leadership", "team", "organisation", "organization", "entwicklung",
"development", "change", "verΓ€nderung"],
"threshold": 2
}
}
# Check preferred agent first
if preferred_agent and preferred_agent in direct_routing_patterns:
return preferred_agent
# Count keyword matches per agent
agent_scores = {}
for agent_id, config in direct_routing_patterns.items():
matches = sum(1 for kw in config["keywords"] if kw in user_lower)
if matches >= config["threshold"]:
agent_scores[agent_id] = matches
# Return agent with highest score (if clear winner)
if agent_scores:
best_agent = max(agent_scores.items(), key=lambda x: x[1])
if best_agent[1] >= 1: # Single strong keyword is enough
logger.info(f"🎯 Smart Routing: Direct to {best_agent[0]} (confidence: {best_agent[1]} matches)")
return best_agent[0]
# No clear match - fallback to keyword analysis
logger.info(f"πŸ€” Smart Routing: No direct match found")
return None
def analyze_intent_fallback(user_message: str, preferred_agent: str = None) -> Dict:
"""Fallback intent analysis using keyword matching"""
# πŸ”§ FIX: Type safety check
if not isinstance(user_message, str):
logger.warning(f"⚠️ analyze_intent_fallback received non-string: {type(user_message)}")
user_message = str(user_message) # Fallback conversion
user_lower = user_message.lower()
# Keyword mappings for agent specialization
intent_keywords = {
"development": ["code", "coding", "programming", "software", "python", "javascript", "typescript", "development", "bug", "implement", "function", "api", "app", "application", "website", "frontend", "backend", "database"],
"medical": ["health", "medical", "diagnosis", "treatment", "symptoms", "doctor", "medicine", "patient", "disease", "healthcare", "clinical", "hospital"],
"legal": ["legal", "law", "contract", "compliance", "regulation", "rights", "attorney", "court", "sue", "lawsuit", "gdpr", "dsgvo"],
"financial": ["money", "finance", "finanz", "investment", "budget", "cost", "price", "financial", "economic", "profit", "bank", "payment", "euro", "dollar"],
"system": ["system", "architecture", "integration", "infrastructure", "deploy", "deployment", "server", "network", "cloud", "docker", "kubernetes"],
"coaching": ["coaching", "strategy", "strategie", "management", "leadership", "team", "organization", "growth", "improvement", "change"]
}
# Agent mapping
agent_mapping = {
"development": "john_alesi",
"medical": "lara_alesi",
"legal": "justus_alesi",
"financial": "theo_alesi",
"system": "leon_alesi",
"coaching": "luna_alesi"
}
# Check preferred agent first
if preferred_agent and preferred_agent in ["jane_alesi", "john_alesi", "lara_alesi", "justus_alesi", "theo_alesi", "leon_alesi", "luna_alesi"]:
return {
"task_type": "preferred",
"complexity": "moderate",
"best_agent": preferred_agent,
"delegation_needed": preferred_agent != "jane_alesi",
"specialist_prompt": user_message,
"direct_response": None
}
# Keyword analysis - count matches per category
category_scores = {}
for category, keywords in intent_keywords.items():
matches = sum(1 for keyword in keywords if keyword in user_lower)
if matches > 0:
category_scores[category] = matches
# Determine best match
if not category_scores:
# No specific keywords found - Jane handles general questions
return {
"task_type": "general",
"complexity": "simple",
"best_agent": "jane_alesi",
"delegation_needed": False,
"specialist_prompt": user_message,
"direct_response": None
}
# Get category with most matches
best_category = max(category_scores.items(), key=lambda x: x[1])[0]
max_matches = category_scores[best_category]
# Only delegate if we have strong evidence (2+ keyword matches)
if max_matches >= 2:
best_agent = agent_mapping[best_category]
delegation_needed = (best_agent != "jane_alesi") # Don't delegate to Jane herself
else:
# Weak match - let Jane handle it
best_agent = "jane_alesi"
delegation_needed = False
return {
"task_type": best_category,
"complexity": "complex" if max_matches >= 3 else "moderate" if max_matches >= 2 else "simple",
"best_agent": best_agent,
"delegation_needed": delegation_needed,
"specialist_prompt": user_message if delegation_needed else None,
"direct_response": None
}
@app.get("/api/v1/multi-agent/status")
async def multi_agent_status(
agent_manager: AgentManagerService = Depends(get_agent_manager)
):
"""Get Multi-Agent System status and capabilities"""
try:
# Get available agents
agents = list(agent_manager.agents.values())
# Agent capabilities mapping
agent_capabilities = {
"jane_alesi": {
"name": "Jane Alesi",
"role": "Master Coordinator",
"specialization": "AI Architecture & Multi-Agent Coordination",
"capabilities": ["coordination", "architecture", "strategy", "analysis"],
"color": "#8B5CF6"
},
"john_alesi": {
"name": "John Alesi",
"role": "Developer Specialist",
"specialization": "Software Development & AGI Architecture",
"capabilities": ["coding", "debugging", "architecture", "development"],
"color": "#14B8A6"
},
"lara_alesi": {
"name": "Lara Alesi",
"role": "Medical Specialist",
"specialization": "Medical Analysis & Health Systems",
"capabilities": ["medical_advice", "diagnosis", "treatment", "health"],
"color": "#EC4899"
},
"justus_alesi": {
"name": "Justus Alesi",
"role": "Legal Specialist",
"specialization": "Legal Compliance & Contracts",
"capabilities": ["legal_advice", "compliance", "contracts", "regulations"],
"color": "#F59E0B"
},
"theo_alesi": {
"name": "Theo Alesi",
"role": "Financial Specialist",
"specialization": "Financial Analysis & Investment",
"capabilities": ["financial_analysis", "investment", "budgeting", "economics"],
"color": "#8B5CF6"
},
"leon_alesi": {
"name": "Leon Alesi",
"role": "System Specialist",
"specialization": "IT System Integration & Architecture",
"capabilities": ["orchestration", "integration", "architecture", "systems"],
"color": "#059669"
},
"luna_alesi": {
"name": "Luna Alesi",
"role": "Coaching Specialist",
"specialization": "Coaching & Organizational Development",
"capabilities": ["strategy", "coaching", "development", "management"],
"color": "#EC4899"
}
}
# Build status response
available_agents = []
for agent in agents:
agent_info = agent_capabilities.get(agent.id, {
"name": agent.name,
"role": "Specialist",
"specialization": "General AI Assistant",
"capabilities": ["analysis"],
"color": "#6B7280"
})
available_agents.append({
"id": agent.id,
"status": agent.status.value if hasattr(agent.status, 'value') else str(agent.status),
**agent_info
})
return {
"multi_agent_system": "active",
"coordinator": "jane_alesi",
"total_agents": len(agents),
"active_agents": len([a for a in agents if a.status == AgentStatus.ACTIVE]),
"available_agents": available_agents,
"capabilities": list(agent_capabilities.keys()),
"delegation_enabled": True,
"hybrid_mode": HYBRID_MODE,
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"❌ Multi-Agent status error: {e}")
raise HTTPException(status_code=500, detail=f"Status retrieval failed: {str(e)}")
@app.get("/api/v1/multi-agent/capabilities")
async def multi_agent_capabilities():
"""Get detailed Multi-Agent System capabilities and use cases"""
return {
"system": "SAAP Multi-Agent Communication Platform",
"coordinator": {
"name": "Jane Alesi",
"role": "Master Coordinator",
"responsibilities": [
"Intent analysis and task routing",
"Agent selection and delegation",
"Response coordination and quality assurance",
"System orchestration and monitoring"
]
},
"specialists": {
"john_alesi": {
"expertise": "Software Development & Programming",
"use_cases": [
"Code generation and debugging",
"Software architecture design",
"Technical implementation guidance",
"Development best practices"
]
},
"lara_alesi": {
"expertise": "Medical & Health Analysis",
"use_cases": [
"Medical information and guidance",
"Health system analysis",
"Clinical decision support",
"Healthcare compliance"
]
},
"justus_alesi": {
"expertise": "Legal & Compliance",
"use_cases": [
"Legal document analysis",
"Compliance checking",
"Contract review",
"Regulatory guidance"
]
},
"theo_alesi": {
"expertise": "Financial Analysis & Investment",
"use_cases": [
"Financial planning and analysis",
"Investment strategy",
"Budget optimization",
"Economic modeling"
]
},
"leon_alesi": {
"expertise": "System Integration & Architecture",
"use_cases": [
"System architecture design",
"Integration planning",
"Infrastructure optimization",
"Technical coordination"
]
},
"luna_alesi": {
"expertise": "Coaching & Organizational Development",
"use_cases": [
"Strategic planning",
"Team coaching and development",
"Change management",
"Leadership guidance"
]
}
},
"workflow": [
"1. User submits request via /api/v1/multi-agent/chat",
"2. Jane Alesi analyzes intent and complexity",
"3. Task delegation to appropriate specialist agent",
"4. Specialist provides expert response",
"5. Jane coordinates and enhances final response",
"6. Unified response delivered to user"
],
"performance": {
"coordination_time": "1-3 seconds",
"specialist_response": "2-15 seconds",
"total_response": "3-18 seconds"
},
"hybrid_mode": HYBRID_MODE
}
@app.websocket("/ws")
async def websocket_endpoint(
websocket: WebSocket,
websocket_manager: WebSocketManager = Depends(get_websocket_manager)
):
"""
WebSocket endpoint for real-time SAAP updates
"""
await websocket_manager.connect(websocket)
try:
while True:
# Keep connection alive and handle client messages
data = await websocket.receive_text()
# Echo back for connection testing
await websocket.send_text(f"Echo: {data}")
except WebSocketDisconnect:
websocket_manager.disconnect(websocket)
# =====================================================
# AGENT TEMPLATES - Quick Setup
# =====================================================
@app.get("/api/v1/templates/agents")
async def get_agent_templates():
"""
Get predefined agent templates for quick setup
πŸ”§ FIXED: Added all 7 agent templates including missing ones
"""
templates = {
"jane_alesi": AgentTemplates.jane_alesi().dict(),
"john_alesi": AgentTemplates.john_alesi().dict(),
"lara_alesi": AgentTemplates.lara_alesi().dict(),
"theo_alesi": AgentTemplates.theo_alesi().dict(), # βœ… ADDED
"justus_alesi": AgentTemplates.justus_alesi().dict(), # βœ… ADDED
"leon_alesi": AgentTemplates.leon_alesi().dict(), # βœ… ADDED
"luna_alesi": AgentTemplates.luna_alesi().dict() # βœ… ADDED
}
response = {
"templates": templates,
"count": len(templates)
}
if HYBRID_MODE:
response["hybrid_enabled"] = True
return response
@app.post("/api/v1/templates/agents/{template_name}")
async def create_agent_from_template(
template_name: str,
agent_manager: AgentManagerService = Depends(get_agent_manager),
websocket_manager: WebSocketManager = Depends(get_websocket_manager)
):
"""
Create agent from predefined template (hybrid-enabled)
πŸ”§ FIXED: Added all 7 agent templates including missing ones
"""
try:
template_map = {
"jane_alesi": AgentTemplates.jane_alesi,
"john_alesi": AgentTemplates.john_alesi,
"lara_alesi": AgentTemplates.lara_alesi,
"theo_alesi": AgentTemplates.theo_alesi, # βœ… ADDED
"justus_alesi": AgentTemplates.justus_alesi, # βœ… ADDED
"leon_alesi": AgentTemplates.leon_alesi, # βœ… ADDED
"luna_alesi": AgentTemplates.luna_alesi # βœ… ADDED
}
if template_name not in template_map:
raise HTTPException(status_code=404, detail=f"Template '{template_name}' not found")
# Create agent from template
agent = template_map[template_name]()
# Register agent
success = await agent_manager.register_agent(agent)
if success:
await websocket_manager.broadcast_agent_update(agent)
response = {
"success": True,
"message": f"Agent created from template '{template_name}'",
"agent": agent.dict()
}
if HYBRID_MODE:
response.update({
"hybrid_enabled": True,
"primary_provider": getattr(agent_manager, 'primary_provider', 'hybrid')
})
return response
else:
raise HTTPException(status_code=400, detail="Template agent creation failed")
except Exception as e:
logger.error(f"❌ Template creation error: {e}")
raise HTTPException(status_code=500, detail=f"Template creation failed: {str(e)}")
# =====================================================
# STARTUP MESSAGE
# =====================================================
@app.on_event("startup")
async def startup_message():
"""Log startup message with mode information"""
logger.info(logger_msg)
if HYBRID_MODE:
logger.info("🌐 OpenRouter: Cost-efficient models (GPT-4o-mini, Claude-3-Haiku)")
logger.info("πŸ€– colossus: FREE fallback provider")
logger.info("⚑ Expected Performance: OpenRouter 1-3s, colossus 15-30s")
if OPENROUTER_ENDPOINTS:
logger.info("βœ… OpenRouter endpoints enabled")
else:
logger.warning("⚠️ OpenRouter endpoints not available")
# ==========================================
# NOTE: No if __name__ == "__main__" block needed
# Supervisor/Uvicorn will start the app directly
# ==========================================
# =====================================================
# STATIC FILES - Serve Vue.js Frontend (SPA Mode)
# ⚠️ CRITICAL: Must be AFTER all API routes!
# =====================================================
# Import custom SPA Static Files handler
from spa_static_files import SPAStaticFiles
# Mount static files for frontend (must be AFTER all API routes)
# SPAStaticFiles preserves API routes while serving Vue.js SPA
frontend_dist = "/app/frontend/dist"
if os.path.exists(frontend_dist):
app.mount("/", SPAStaticFiles(directory=frontend_dist, html=True), name="static")
logger.info(f"βœ… SPA Static files mounted: {frontend_dist}")
else:
logger.warning(f"⚠️ Frontend dist directory not found: {frontend_dist}")