saap-plattform / backend /agent_manager_enhanced.py
Hwandji's picture
feat: initial HuggingFace Space deployment
4343907
raw
history blame
27.6 kB
"""
Enhanced SAAP Agent Manager with OpenRouter Integration & Cost Efficiency Logging
Supports multi-provider architecture: colossus + OpenRouter with cost optimization
"""
import asyncio
import logging
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Tuple
import json
from dataclasses import dataclass, asdict
from enum import Enum
import aiohttp
import openai # For OpenRouter compatibility
from openai import AsyncOpenAI
from ..config.settings import get_settings
from ..models.agent import SaapAgent, AgentStatus, AgentCapability
from ..models.chat import ChatMessage, MessageRole
from ..database.service import DatabaseService
# Cost efficiency logging
cost_logger = logging.getLogger("saap.cost")
performance_logger = logging.getLogger("saap.performance")
@dataclass
class CostMetrics:
"""Cost tracking metrics for OpenRouter requests"""
provider: str
model: str
input_tokens: int
output_tokens: int
total_tokens: int
cost_usd: float
response_time_seconds: float
timestamp: datetime
request_success: bool
agent_id: str
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for logging"""
return {
**asdict(self),
'timestamp': self.timestamp.isoformat(),
'cost_per_1k_tokens': round(self.cost_usd / (self.total_tokens / 1000), 6) if self.total_tokens > 0 else 0,
'tokens_per_second': round(self.total_tokens / self.response_time_seconds, 2) if self.response_time_seconds > 0 else 0
}
@dataclass
class PerformanceMetrics:
"""Performance tracking for different providers"""
provider: str
model: str
avg_response_time: float
success_rate: float
total_requests: int
avg_cost_per_request: float
tokens_per_second: float
last_24h_cost: float
class ProviderType(Enum):
COLOSSUS = "colossus"
OPENROUTER = "openrouter"
HYBRID = "hybrid"
class EnhancedAgentManager:
"""Enhanced Agent Manager with Multi-Provider Support & Cost Optimization"""
def __init__(self, database_service: Optional[DatabaseService] = None):
self.settings = get_settings()
self.database = database_service
self.agents: Dict[str, SaapAgent] = {}
self.agent_clients: Dict[str, Any] = {}
# Cost tracking
self.cost_metrics: List[CostMetrics] = []
self.daily_cost_budget = self.settings.agents.daily_cost_budget
self.current_daily_cost = 0.0
self.cost_alert_threshold = self.settings.agents.warning_cost_threshold
# Performance tracking
self.performance_stats: Dict[str, PerformanceMetrics] = {}
# Provider configurations
self._setup_providers()
# Initialize cost logger
if self.settings.openrouter.enable_cost_tracking:
cost_logger.info(f"πŸ’° Cost tracking initialized - Daily budget: ${self.daily_cost_budget}")
cost_logger.info(f"πŸ“Š Alert threshold: {self.cost_alert_threshold*100}% of budget")
def _setup_providers(self):
"""Setup provider configurations"""
self.providers = {}
# colossus Configuration
if self.settings.colossus.api_key:
self.providers[ProviderType.COLOSSUS] = {
'client': None, # Will be set up per agent
'base_url': self.settings.colossus.api_base,
'api_key': self.settings.colossus.api_key,
'default_model': self.settings.colossus.default_model,
'cost_per_1m_tokens': 0.0, # colossus is free
'timeout': self.settings.colossus.timeout
}
# OpenRouter Configuration
if self.settings.openrouter.enabled and self.settings.openrouter.api_key:
self.providers[ProviderType.OPENROUTER] = {
'client': AsyncOpenAI(
api_key=self.settings.openrouter.api_key,
base_url=self.settings.openrouter.base_url,
),
'base_url': self.settings.openrouter.base_url,
'api_key': self.settings.openrouter.api_key,
'models': self._get_openrouter_models(),
'timeout': 30
}
cost_logger.info(f"πŸ”— OpenRouter provider initialized with {len(self._get_openrouter_models())} models")
def _get_openrouter_models(self) -> Dict[str, Dict[str, Any]]:
"""Get OpenRouter model configurations with cost data"""
models = {
# Jane Alesi - Coordination & Management (GPT-4o-mini)
'jane_alesi': {
'model': self.settings.openrouter.jane_model,
'max_tokens': self.settings.openrouter.jane_max_tokens,
'temperature': self.settings.openrouter.jane_temperature,
'cost_per_1m_input': 0.15,
'cost_per_1m_output': 0.60,
'description': 'General coordination and management tasks'
},
# John Alesi - Development & Code (Claude-3-Haiku)
'john_alesi': {
'model': self.settings.openrouter.john_model,
'max_tokens': self.settings.openrouter.john_max_tokens,
'temperature': self.settings.openrouter.john_temperature,
'cost_per_1m_input': 0.25,
'cost_per_1m_output': 1.25,
'description': 'Code generation and development tasks'
},
# Lara Alesi - Medical & Analysis (GPT-4o-mini)
'lara_alesi': {
'model': self.settings.openrouter.lara_model,
'max_tokens': self.settings.openrouter.lara_max_tokens,
'temperature': self.settings.openrouter.lara_temperature,
'cost_per_1m_input': 0.15,
'cost_per_1m_output': 0.60,
'description': 'Medical analysis and specialized queries'
},
# Fallback Model - Free (Meta Llama)
'fallback': {
'model': self.settings.openrouter.fallback_model,
'max_tokens': 600,
'temperature': 0.7,
'cost_per_1m_input': 0.0,
'cost_per_1m_output': 0.0,
'description': 'Cost-free fallback for budget limits'
}
}
return models
async def create_agent(self, agent_data: Dict[str, Any]) -> SaapAgent:
"""Create new agent with provider selection"""
agent_id = agent_data.get('agent_id', f"agent_{len(self.agents)}")
# Determine optimal provider based on agent type and cost considerations
provider = self._select_optimal_provider(agent_id, agent_data)
agent = SaapAgent(
agent_id=agent_id,
name=agent_data.get('name', f'Agent {agent_id}'),
role=agent_data.get('role', 'General Assistant'),
capabilities=agent_data.get('capabilities', [AgentCapability.CHAT]),
status=AgentStatus.CREATED,
llm_config=agent_data.get('llm_config', {}),
personality=agent_data.get('personality', {}),
provider=provider.value
)
# Setup provider-specific client
await self._setup_agent_client(agent)
self.agents[agent_id] = agent
# Log agent creation with cost implications
cost_logger.info(f"πŸ€– Agent created: {agent_id} using {provider.value} provider")
if provider == ProviderType.OPENROUTER:
model_config = self._get_openrouter_models().get(agent_id, self._get_openrouter_models()['fallback'])
cost_logger.info(f"πŸ’° Model: {model_config['model']} - Cost: ${model_config['cost_per_1m_input']}/1M input tokens")
# Database persistence
if self.database:
await self.database.create_agent(agent)
return agent
def _select_optimal_provider(self, agent_id: str, agent_data: Dict[str, Any]) -> ProviderType:
"""Select optimal provider based on cost and performance requirements"""
# Check daily budget constraints
if self.current_daily_cost >= (self.daily_cost_budget * self.cost_alert_threshold):
cost_logger.warning(f"⚠️ Daily cost budget at {self.cost_alert_threshold*100}% - using free providers")
if ProviderType.COLOSSUS in self.providers:
return ProviderType.COLOSSUS
# Agent-specific provider selection
primary_provider = getattr(self.settings.agents, 'primary_provider', 'colossus')
if primary_provider == 'openrouter' and ProviderType.OPENROUTER in self.providers:
# Use OpenRouter with cost-optimized models
return ProviderType.OPENROUTER
elif primary_provider == 'colossus' and ProviderType.COLOSSUS in self.providers:
# Use colossus as primary (free)
return ProviderType.COLOSSUS
else:
# Fallback to available provider
if ProviderType.COLOSSUS in self.providers:
return ProviderType.COLOSSUS
elif ProviderType.OPENROUTER in self.providers:
return ProviderType.OPENROUTER
else:
raise ValueError("No providers available")
async def _setup_agent_client(self, agent: SaapAgent):
"""Setup provider-specific client for agent"""
provider_type = ProviderType(agent.provider)
if provider_type == ProviderType.COLOSSUS:
# colossus HTTP client setup (existing logic)
self.agent_clients[agent.agent_id] = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self.settings.colossus.timeout)
)
elif provider_type == ProviderType.OPENROUTER:
# OpenRouter client already set up in providers
self.agent_clients[agent.agent_id] = self.providers[ProviderType.OPENROUTER]['client']
async def start_agent(self, agent_id: str) -> bool:
"""Start agent with provider-specific initialization"""
if agent_id not in self.agents:
raise ValueError(f"Agent {agent_id} not found")
agent = self.agents[agent_id]
try:
# Provider-specific startup
if agent.provider == ProviderType.COLOSSUS.value:
success = await self._start_colossus_agent(agent)
elif agent.provider == ProviderType.OPENROUTER.value:
success = await self._start_openrouter_agent(agent)
else:
success = False
if success:
agent.status = AgentStatus.ACTIVE
agent.metrics.last_active = datetime.now()
cost_logger.info(f"βœ… Agent {agent_id} started successfully using {agent.provider}")
# Database update
if self.database:
await self.database.update_agent_status(agent_id, AgentStatus.ACTIVE)
return success
except Exception as e:
cost_logger.error(f"❌ Failed to start agent {agent_id}: {str(e)}")
agent.status = AgentStatus.ERROR
return False
async def _start_colossus_agent(self, agent: SaapAgent) -> bool:
"""Start colossus agent (existing logic)"""
try:
client = self.agent_clients[agent.agent_id]
# Test connection to colossus
async with client.post(
f"{self.settings.colossus.api_base}/v1/chat/completions",
headers={"Authorization": f"Bearer {self.settings.colossus.api_key}"},
json={
"model": self.settings.colossus.default_model,
"messages": [{"role": "user", "content": "Test connection"}],
"max_tokens": 10
}
) as response:
return response.status == 200
except Exception as e:
cost_logger.error(f"colossus connection failed for {agent.agent_id}: {str(e)}")
return False
async def _start_openrouter_agent(self, agent: SaapAgent) -> bool:
"""Start OpenRouter agent"""
try:
client = self.agent_clients[agent.agent_id]
model_config = self._get_openrouter_models().get(agent.agent_id, self._get_openrouter_models()['fallback'])
# Test connection to OpenRouter
response = await client.chat.completions.create(
model=model_config['model'],
messages=[{"role": "user", "content": "Test connection"}],
max_tokens=10,
temperature=model_config['temperature']
)
cost_logger.info(f"πŸ”— OpenRouter agent {agent.agent_id} connected - Model: {model_config['model']}")
return True
except Exception as e:
cost_logger.error(f"OpenRouter connection failed for {agent.agent_id}: {str(e)}")
return False
async def send_message(self, agent_id: str, message: str, conversation_context: Optional[List[Dict]] = None) -> ChatMessage:
"""Send message to agent with cost tracking"""
if agent_id not in self.agents:
raise ValueError(f"Agent {agent_id} not found")
agent = self.agents[agent_id]
start_time = time.time()
try:
if agent.provider == ProviderType.COLOSSUS.value:
response = await self._send_colossus_message(agent, message, conversation_context)
elif agent.provider == ProviderType.OPENROUTER.value:
response = await self._send_openrouter_message(agent, message, conversation_context)
else:
raise ValueError(f"Unsupported provider: {agent.provider}")
response_time = time.time() - start_time
# Update agent metrics
agent.metrics.messages_processed += 1
agent.metrics.last_active = datetime.now()
agent.metrics.avg_response_time = (
(agent.metrics.avg_response_time * (agent.metrics.messages_processed - 1) + response_time)
/ agent.metrics.messages_processed
)
# Log performance
performance_logger.info(f"πŸ“Š Agent {agent_id} - Response time: {response_time:.2f}s, Total messages: {agent.metrics.messages_processed}")
return response
except Exception as e:
cost_logger.error(f"❌ Message failed for agent {agent_id}: {str(e)}")
agent.status = AgentStatus.ERROR
raise
async def _send_colossus_message(self, agent: SaapAgent, message: str, context: Optional[List[Dict]] = None) -> ChatMessage:
"""Send message via colossus (existing logic with cost tracking)"""
client = self.agent_clients[agent.agent_id]
# Prepare messages
messages = []
if context:
messages.extend(context)
messages.append({"role": "user", "content": message})
start_time = time.time()
try:
async with client.post(
f"{self.settings.colossus.api_base}/v1/chat/completions",
headers={"Authorization": f"Bearer {self.settings.colossus.api_key}"},
json={
"model": self.settings.colossus.default_model,
"messages": messages,
"max_tokens": 800,
"temperature": 0.7
}
) as response:
if response.status != 200:
raise Exception(f"colossus API error: {response.status}")
data = await response.json()
content = data['choices'][0]['message']['content']
response_time = time.time() - start_time
# Cost tracking for colossus (free)
cost_metrics = CostMetrics(
provider="colossus",
model=self.settings.colossus.default_model,
input_tokens=data.get('usage', {}).get('prompt_tokens', 0),
output_tokens=data.get('usage', {}).get('completion_tokens', 0),
total_tokens=data.get('usage', {}).get('total_tokens', 0),
cost_usd=0.0, # colossus is free
response_time_seconds=response_time,
timestamp=datetime.now(),
request_success=True,
agent_id=agent.agent_id
)
self._log_cost_metrics(cost_metrics)
return ChatMessage(
message_id=f"msg_{int(time.time())}",
role=MessageRole.ASSISTANT,
content=content,
timestamp=datetime.now(),
agent_id=agent.agent_id,
metadata={'provider': 'colossus', 'cost_usd': 0.0, 'tokens': cost_metrics.total_tokens}
)
except Exception as e:
# Log failed request
cost_metrics = CostMetrics(
provider="colossus",
model=self.settings.colossus.default_model,
input_tokens=0,
output_tokens=0,
total_tokens=0,
cost_usd=0.0,
response_time_seconds=time.time() - start_time,
timestamp=datetime.now(),
request_success=False,
agent_id=agent.agent_id
)
self._log_cost_metrics(cost_metrics)
raise
async def _send_openrouter_message(self, agent: SaapAgent, message: str, context: Optional[List[Dict]] = None) -> ChatMessage:
"""Send message via OpenRouter with cost tracking"""
client = self.agent_clients[agent.agent_id]
model_config = self._get_openrouter_models().get(agent.agent_id, self._get_openrouter_models()['fallback'])
# Check budget before expensive request
estimated_cost = self._estimate_request_cost(message, model_config)
if self.current_daily_cost + estimated_cost > self.daily_cost_budget:
cost_logger.warning(f"πŸ’Έ Daily budget exceeded - switching to free fallback model")
model_config = self._get_openrouter_models()['fallback']
# Prepare messages
messages = []
if context:
messages.extend(context)
messages.append({"role": "user", "content": message})
start_time = time.time()
try:
response = await client.chat.completions.create(
model=model_config['model'],
messages=messages,
max_tokens=model_config['max_tokens'],
temperature=model_config['temperature']
)
response_time = time.time() - start_time
content = response.choices[0].message.content
# Calculate actual cost
input_tokens = response.usage.prompt_tokens
output_tokens = response.usage.completion_tokens
total_tokens = response.usage.total_tokens
cost_usd = (
(input_tokens / 1_000_000) * model_config['cost_per_1m_input'] +
(output_tokens / 1_000_000) * model_config['cost_per_1m_output']
)
# Update daily cost tracking
self.current_daily_cost += cost_usd
# Cost tracking
cost_metrics = CostMetrics(
provider="openrouter",
model=model_config['model'],
input_tokens=input_tokens,
output_tokens=output_tokens,
total_tokens=total_tokens,
cost_usd=cost_usd,
response_time_seconds=response_time,
timestamp=datetime.now(),
request_success=True,
agent_id=agent.agent_id
)
self._log_cost_metrics(cost_metrics)
# Budget alert check
if self.current_daily_cost >= (self.daily_cost_budget * self.cost_alert_threshold):
cost_logger.warning(f"⚠️ Cost alert: ${self.current_daily_cost:.4f} / ${self.daily_cost_budget} ({self.current_daily_cost/self.daily_cost_budget*100:.1f}%)")
return ChatMessage(
message_id=f"msg_{int(time.time())}",
role=MessageRole.ASSISTANT,
content=content,
timestamp=datetime.now(),
agent_id=agent.agent_id,
metadata={
'provider': 'openrouter',
'model': model_config['model'],
'cost_usd': cost_usd,
'tokens': total_tokens,
'cost_efficiency': f"${cost_usd:.6f} ({total_tokens} tokens, {response_time:.1f}s)"
}
)
except Exception as e:
# Log failed request
cost_metrics = CostMetrics(
provider="openrouter",
model=model_config['model'],
input_tokens=0,
output_tokens=0,
total_tokens=0,
cost_usd=0.0,
response_time_seconds=time.time() - start_time,
timestamp=datetime.now(),
request_success=False,
agent_id=agent.agent_id
)
self._log_cost_metrics(cost_metrics)
raise
def _estimate_request_cost(self, message: str, model_config: Dict[str, Any]) -> float:
"""Estimate cost for request (rough approximation)"""
# Rough token estimation: ~4 chars per token
estimated_input_tokens = len(message) / 4
estimated_output_tokens = model_config['max_tokens'] * 0.5 # Assume 50% of max tokens
cost_usd = (
(estimated_input_tokens / 1_000_000) * model_config['cost_per_1m_input'] +
(estimated_output_tokens / 1_000_000) * model_config['cost_per_1m_output']
)
return cost_usd
def _log_cost_metrics(self, metrics: CostMetrics):
"""Log cost metrics for analysis"""
self.cost_metrics.append(metrics)
# Detailed cost logging
cost_logger.info(f"πŸ’° Cost Metrics: {json.dumps(metrics.to_dict())}")
# Performance summary
if metrics.request_success:
efficiency = f"{metrics.total_tokens/metrics.response_time_seconds:.1f}" if metrics.response_time_seconds > 0 else "N/A"
cost_logger.info(f"πŸ“Š {metrics.agent_id} - ${metrics.cost_usd:.6f} | {metrics.total_tokens} tokens | {metrics.response_time_seconds:.2f}s | {efficiency} tokens/s")
# Cleanup old metrics (keep last 1000)
if len(self.cost_metrics) > 1000:
self.cost_metrics = self.cost_metrics[-1000:]
def get_cost_summary(self, hours: int = 24) -> Dict[str, Any]:
"""Get cost summary for specified time period"""
cutoff_time = datetime.now() - timedelta(hours=hours)
recent_metrics = [m for m in self.cost_metrics if m.timestamp >= cutoff_time]
if not recent_metrics:
return {"total_cost": 0.0, "total_requests": 0, "average_cost_per_request": 0.0}
total_cost = sum(m.cost_usd for m in recent_metrics)
successful_requests = [m for m in recent_metrics if m.request_success]
by_provider = {}
for metrics in recent_metrics:
if metrics.provider not in by_provider:
by_provider[metrics.provider] = {"cost": 0.0, "requests": 0, "tokens": 0}
by_provider[metrics.provider]["cost"] += metrics.cost_usd
by_provider[metrics.provider]["requests"] += 1
by_provider[metrics.provider]["tokens"] += metrics.total_tokens
return {
"total_cost_usd": round(total_cost, 4),
"total_requests": len(recent_metrics),
"successful_requests": len(successful_requests),
"success_rate": len(successful_requests) / len(recent_metrics) if recent_metrics else 0,
"average_cost_per_request": round(total_cost / len(recent_metrics), 6) if recent_metrics else 0,
"daily_budget_used": round(self.current_daily_cost / self.daily_cost_budget * 100, 1),
"by_provider": by_provider,
"period_hours": hours,
"budget_remaining_usd": max(0, self.daily_cost_budget - self.current_daily_cost)
}
async def get_all_agents(self) -> List[SaapAgent]:
"""Get all agents with current status"""
return list(self.agents.values())
async def get_agent(self, agent_id: str) -> Optional[SaapAgent]:
"""Get specific agent"""
return self.agents.get(agent_id)
async def stop_agent(self, agent_id: str) -> bool:
"""Stop agent and cleanup resources"""
if agent_id not in self.agents:
return False
agent = self.agents[agent_id]
agent.status = AgentStatus.INACTIVE
# Cleanup client connection
if agent_id in self.agent_clients:
client = self.agent_clients[agent_id]
if hasattr(client, 'close'):
if asyncio.iscoroutinefunction(client.close):
await client.close()
else:
client.close()
del self.agent_clients[agent_id]
cost_logger.info(f"πŸ›‘ Agent {agent_id} stopped")
# Database update
if self.database:
await self.database.update_agent_status(agent_id, AgentStatus.INACTIVE)
return True
async def delete_agent(self, agent_id: str) -> bool:
"""Delete agent completely"""
if agent_id not in self.agents:
return False
# Stop agent first
await self.stop_agent(agent_id)
# Remove from memory
del self.agents[agent_id]
cost_logger.info(f"πŸ—‘οΈ Agent {agent_id} deleted")
# Database deletion
if self.database:
await self.database.delete_agent(agent_id)
return True
def reset_daily_costs(self):
"""Reset daily cost tracking (called at midnight)"""
yesterday_cost = self.current_daily_cost
self.current_daily_cost = 0.0
cost_logger.info(f"πŸ“… Daily cost reset - Yesterday: ${yesterday_cost:.4f}")
async def __aenter__(self):
"""Async context manager entry"""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit - cleanup all resources"""
for agent_id in list(self.agent_clients.keys()):
await self.stop_agent(agent_id)
cost_logger.info("🧹 Enhanced Agent Manager cleanup completed")