saap-plattform / backend /agents /colossus_saap_agent.py
Hwandji's picture
feat: initial HuggingFace Space deployment
4343907
raw
history blame
14.9 kB
#!/usr/bin/env python3
"""
SAAP Colossus Server Integration Agent
Integration von colossus Server (https://ai.adrian-schupp.de) in SAAP Multi-Agent System
Author: Hanan Wandji Danga & Jane Alesi
"""
import os
from dotenv import load_dotenv
import requests
import json
import time
import asyncio
import redis
from typing import Dict, List, Optional, Any
import logging
# Load environment variables
load_dotenv()
class ColossusSAAPAgent:
def __init__(self, agent_name: str, role: str, api_key: str, base_url: str = "https://ai.adrian-schupp.de"):
"""
Initialisierung des Colossus SAAP Agents
Args:
agent_name: Name des Agents (z.B. "agent_coordinator")
role: Rolle des Agents (z.B. "Coordinator", "Developer", "Analyst")
api_key: API Key für colossus Server
base_url: Base URL des colossus Servers
"""
self.agent_name = agent_name
self.role = role
self.api_key = api_key
self.base_url = base_url
self.model_name = "mistral-small3.2:24b-instruct-2506"
# Redis Configuration für SAAP Message Queue
self.redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
self.message_queue = f"saap_agent_{agent_name}"
# Agent Context für rollenspezifische Antworten
self.context = self._initialize_context()
# Logging Setup
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(f"ColossusSAAP.{agent_name}")
self.logger.info(f"🚀 {agent_name} ({role}) initiated with colossus Server")
def _initialize_context(self) -> str:
"""Initialisiert rollenspezifischen Kontext für den Agent"""
contexts = {
"Coordinator": """Du bist der SAAP Agent Coordinator. Deine Aufgabe ist es:
- Multi-Agent Workflows zu koordinieren
- Aufgaben an spezialisierte Agenten zu delegieren
- Systemüberblick zu behalten und Entscheidungen zu treffen
- Effizienz und Performance des gesamten SAAP-Systems zu optimieren""",
"Developer": """Du bist der SAAP Developer Agent. Deine Aufgabe ist es:
- Code zu schreiben und zu überprüfen
- Technische Implementierungen zu planen
- Architektur-Entscheidungen zu treffen
- Code-Qualität und Best Practices sicherzustellen""",
"Analyst": """Du bist der SAAP Analyst Agent. Deine Aufgabe ist es:
- Daten und Anforderungen zu analysieren
- Use Cases zu definieren und dokumentieren
- System-Performance zu bewerten
- Optimierungspotenziale zu identifizieren""",
"default": f"""Du bist ein SAAP Agent mit der Rolle '{self.role}'.
Beantworte Anfragen professionell und hilfsbereit basierend auf deiner Spezialisierung."""
}
return contexts.get(self.role, contexts["default"])
async def send_request_to_colossus(self, prompt: str, max_tokens: int = 500) -> Dict[str, Any]:
"""
Sendet Request an colossus Server und misst Performance
Args:
prompt: Der Eingabe-Prompt
max_tokens: Maximale Token-Anzahl für Response
Returns:
Dict mit Response, Performance-Metriken und Metadaten
"""
start_time = time.time()
# Request Headers
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
# Request Body (OpenAI-kompatible API vermutlich)
payload = {
"model": self.model_name,
"messages": [
{"role": "system", "content": self.context},
{"role": "user", "content": prompt}
],
"max_tokens": max_tokens,
"temperature": 0.7
}
try:
# API Call an colossus Server
response = requests.post(
f"{self.base_url}/v1/chat/completions", # Standard OpenAI API Format
headers=headers,
json=payload,
timeout=30
)
response_time = time.time() - start_time
if response.status_code == 200:
data = response.json()
# Performance Metrics berechnen
response_text = data['choices'][0]['message']['content']
token_count = data.get('usage', {}).get('total_tokens', 0)
result = {
"success": True,
"response": response_text,
"response_time": round(response_time, 2),
"token_count": token_count,
"model": self.model_name,
"agent_role": self.role,
"timestamp": time.time()
}
self.logger.info(f"✅ colossus Response: {response_time:.2f}s, {token_count} tokens")
return result
else:
error_result = {
"success": False,
"error": f"HTTP {response.status_code}: {response.text}",
"response_time": round(response_time, 2),
"timestamp": time.time()
}
self.logger.error(f"❌ colossus Error: {response.status_code}")
return error_result
except requests.exceptions.RequestException as e:
error_result = {
"success": False,
"error": f"Request failed: {str(e)}",
"response_time": round(time.time() - start_time, 2),
"timestamp": time.time()
}
self.logger.error(f"❌ colossus Connection Error: {e}")
return error_result
async def process_message(self, message: str, sender: str = "system") -> Dict[str, Any]:
"""
Verarbeitet eingehende Nachricht und generiert intelligente Antwort
"""
self.logger.info(f"🔄 {self.agent_name} processing message from {sender}")
# Erweiterte Prompt-Konstruktion mit Sender-Kontext
enhanced_prompt = f"""[Nachricht von {sender}]
{message}
Bitte antworte als {self.role} Agent im SAAP System. Sei präzise und hilfreich."""
# colossus Server Request
result = await self.send_request_to_colossus(enhanced_prompt)
if result["success"]:
# Message in Redis Queue für andere Agenten
response_data = {
"agent_name": self.agent_name,
"agent_role": self.role,
"original_message": message,
"response": result["response"],
"sender": sender,
"performance": {
"response_time": result["response_time"],
"token_count": result["token_count"]
},
"timestamp": result["timestamp"],
"server": "colossus"
}
# Publish to Redis for other agents and dashboard
self.redis_client.lpush(f"saap_responses", json.dumps(response_data))
self.redis_client.publish(f"saap_agent_updates", json.dumps(response_data))
self.logger.info(f"✅ Response generated and published to Redis")
return result
else:
self.logger.error(f"❌ Failed to process message: {result.get('error')}")
return result
async def listen_for_messages(self):
"""
Lauscht auf eingehende Nachrichten in der Redis Queue
"""
self.logger.info(f"👂 {self.agent_name} listening for messages on {self.message_queue}")
while True:
try:
# Pop message from Redis queue
message_data = self.redis_client.brpop(self.message_queue, timeout=1)
if message_data:
_, message_json = message_data
message_obj = json.loads(message_json)
# Process the message
await self.process_message(
message=message_obj.get("content", ""),
sender=message_obj.get("sender", "unknown")
)
# Small delay to prevent excessive CPU usage
await asyncio.sleep(0.1)
except KeyboardInterrupt:
self.logger.info(f"🛑 {self.agent_name} shutting down")
break
except Exception as e:
self.logger.error(f"❌ Error in message listener: {e}")
await asyncio.sleep(1)
def send_message_to_agent(self, target_agent: str, message: str):
"""
Sendet Nachricht an einen anderen SAAP Agent
"""
message_data = {
"content": message,
"sender": self.agent_name,
"timestamp": time.time()
}
self.redis_client.lpush(f"saap_agent_{target_agent}", json.dumps(message_data))
self.logger.info(f"📤 Message sent to {target_agent}")
async def health_check(self) -> Dict[str, Any]:
"""
Überprüft Gesundheitsstatus des colossus Servers
"""
try:
test_prompt = "Hello, this is a connection test."
result = await self.send_request_to_colossus(test_prompt, max_tokens=50)
health_status = {
"agent_name": self.agent_name,
"colossus_status": "healthy" if result["success"] else "unhealthy",
"response_time": result.get("response_time", 0),
"error": result.get("error") if not result["success"] else None,
"timestamp": time.time()
}
return health_status
except Exception as e:
return {
"agent_name": self.agent_name,
"colossus_status": "error",
"error": str(e),
"timestamp": time.time()
}
# Performance Benchmarking Funktionen
class ColossusBenchmark:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://ai.adrian-schupp.de"
async def run_performance_benchmark(self, test_prompts: List[str]) -> Dict[str, Any]:
"""
Führt Performance-Benchmark mit verschiedenen Test-Prompts durch
"""
results = []
# Temporärer Agent für Benchmark
benchmark_agent = ColossusSAAPAgent("benchmark_agent", "Benchmark", self.api_key)
for i, prompt in enumerate(test_prompts):
print(f"🧪 Running test {i+1}/{len(test_prompts)}: {prompt[:50]}...")
result = await benchmark_agent.send_request_to_colossus(prompt)
results.append({
"test_id": i + 1,
"prompt": prompt,
"success": result["success"],
"response_time": result.get("response_time", 0),
"token_count": result.get("token_count", 0),
"error": result.get("error") if not result["success"] else None
})
# Pause zwischen Tests um Server nicht zu überlasten
await asyncio.sleep(1)
# Statistiken berechnen
successful_tests = [r for r in results if r["success"]]
if successful_tests:
avg_response_time = sum(r["response_time"] for r in successful_tests) / len(successful_tests)
avg_token_count = sum(r["token_count"] for r in successful_tests) / len(successful_tests)
benchmark_summary = {
"total_tests": len(test_prompts),
"successful_tests": len(successful_tests),
"success_rate": len(successful_tests) / len(test_prompts) * 100,
"average_response_time": round(avg_response_time, 2),
"average_token_count": round(avg_token_count, 0),
"performance_target_met": avg_response_time < 2.0, # < 2s Ziel
"results": results
}
else:
benchmark_summary = {
"total_tests": len(test_prompts),
"successful_tests": 0,
"success_rate": 0,
"error": "No successful tests completed",
"results": results
}
return benchmark_summary
# Utility Functions für SAAP Integration
def create_saap_colossus_agents(api_key: str) -> List[ColossusSAAPAgent]:
"""
Erstellt die 3 Basis-Agenten für SAAP Multi-Agent System wie im Plan
"""
agents = [
ColossusSAAPAgent("agent_coordinator", "Coordinator", api_key),
ColossusSAAPAgent("agent_developer", "Developer", api_key),
ColossusSAAPAgent("agent_analyst", "Analyst", api_key)
]
return agents
if __name__ == "__main__":
# Demo und Testing
import asyncio
# Load API key from environment variable
API_KEY = os.getenv("COLOSSUS_API_KEY")
if not API_KEY:
print("❌ Error: COLOSSUS_API_KEY not set in environment variables")
print("Please set it in backend/.env file:")
print("COLOSSUS_API_KEY=sk-your-actual-key-here")
exit(1)
async def demo_colossus_integration():
print("🚀 SAAP colossus Server Integration Demo")
# Create test agent
agent = ColossusSAAPAgent("demo_coordinator", "Coordinator", API_KEY)
# Health check
print("\n📊 Health Check...")
health = await agent.health_check()
print(f"Status: {health}")
# Test message processing
if health.get("colossus_status") == "healthy":
print("\n💬 Processing test message...")
result = await agent.process_message(
"Erkläre mir die Vorteile einer Multi-Agent-Architektur für SAAP.",
"test_user"
)
print(f"Response: {result.get('response', 'No response')}")
# Performance benchmark
print("\n🧪 Running mini performance benchmark...")
benchmark = ColossusBenchmark(API_KEY)
test_prompts = [
"Was ist SAAP?",
"Erkläre Multi-Agent Systeme in 3 Sätzen.",
"Vorteile von On-Premise vs Cloud AI?"
]
benchmark_results = await benchmark.run_performance_benchmark(test_prompts)
print(f"Benchmark Results: {benchmark_results}")
# Run demo
asyncio.run(demo_colossus_integration())