Spaces:
Sleeping
Sleeping
File size: 45,204 Bytes
4343907 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 |
""" SAAP Agent Manager Service - LLMModelConfig.get() Error Resolution
Database-integrated agent lifecycle management with colossus integration
"""
import asyncio
import logging
import os
from typing import Dict, List, Optional, Any
from datetime import datetime
import uuid
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete
from models.agent_schema import SaapAgent, AgentStatus, AgentType, AgentTemplates
from database.connection import db_manager
from database.models import DBAgent, DBChatMessage, DBAgentSession
from api.colossus_client import ColossusClient
from agents.openrouter_saap_agent import OpenRouterSAAPAgent
logger = logging.getLogger(__name__)
class AgentManagerService:
"""
π§ FIXED: Production-ready Agent Manager with LLM config error resolution
Features:
- Database-backed agent storage and lifecycle
- Real-time agent status management
- colossus LLM integration with OpenRouter fallback
- Session tracking and performance metrics
- Health monitoring and error handling
- Multi-provider chat support (colossus + OpenRouter)
- β
Robust LLM config access preventing AttributeError
"""
def __init__(self):
self.agents: Dict[str, SaapAgent] = {} # In-memory cache for fast access
self.active_sessions: Dict[str, DBAgentSession] = {}
self.colossus_client: Optional[ColossusClient] = None
self.is_initialized = False
self.colossus_connection_status = "unknown"
self.last_colossus_test = None
def _get_llm_config_value(self, agent: SaapAgent, key: str, default=None):
"""
π§ CRITICAL FIX: Safe LLM config access preventing 'get' attribute errors
This is the same fix applied to HybridAgentManagerService but now in the base class.
Handles dictionary, object, and Pydantic model configurations robustly.
Resolves: 'LLMModelConfig' object has no attribute 'get'
"""
try:
if not hasattr(agent, 'llm_config') or not agent.llm_config:
logger.debug(f"Agent {agent.id} has no llm_config, using default: {default}")
return default
llm_config = agent.llm_config
# Case 1: Dictionary-based config (Frontend JSON)
if isinstance(llm_config, dict):
value = llm_config.get(key, default)
logger.debug(f"β
Dict config access: {key}={value}")
return value
# Case 2: Object with direct attribute access (Pydantic models)
elif hasattr(llm_config, key):
value = getattr(llm_config, key, default)
logger.debug(f"β
Attribute access: {key}={value}")
return value
# Case 3: Object with get() method (dict-like objects or fixed Pydantic)
elif hasattr(llm_config, 'get') and callable(getattr(llm_config, 'get')):
try:
value = llm_config.get(key, default)
logger.debug(f"β
Method get() access: {key}={value}")
return value
except Exception as get_error:
logger.warning(f"β οΈ get() method failed: {get_error}, trying fallback")
# Case 4: Convert object to dict (Pydantic β dict)
elif hasattr(llm_config, '__dict__'):
config_dict = llm_config.__dict__
if key in config_dict:
value = config_dict[key]
logger.debug(f"β
__dict__ access: {key}={value}")
return value
# Case 5: Try model_dump() for Pydantic v2
elif hasattr(llm_config, 'model_dump'):
try:
config_dict = llm_config.model_dump()
value = config_dict.get(key, default)
logger.debug(f"β
model_dump() access: {key}={value}")
return value
except Exception:
pass
# Case 6: Try dict() conversion
elif hasattr(llm_config, 'dict'):
try:
config_dict = llm_config.dict()
value = config_dict.get(key, default)
logger.debug(f"β
dict() access: {key}={value}")
return value
except Exception:
pass
# Final fallback
logger.warning(f"β οΈ Unknown config type {type(llm_config)} for {key}, using default: {default}")
return default
except AttributeError as e:
logger.warning(f"β οΈ AttributeError in LLM config access for {key}: {e}, using default: {default}")
return default
except Exception as e:
logger.error(f"β Unexpected error in LLM config access for {key}: {e}, using default: {default}")
return default
async def initialize(self):
"""Initialize agent manager with database and colossus connection"""
try:
logger.info("π Initializing Agent Manager Service...")
# Initialize colossus client with better error handling
try:
logger.info("π Connecting to colossus server...")
self.colossus_client = ColossusClient()
await self.colossus_client.__aenter__()
# Test colossus connection
await self._test_colossus_connection()
except Exception as colossus_error:
logger.error(f"β colossus connection failed: {colossus_error}")
self.colossus_connection_status = f"failed: {str(colossus_error)}"
# Continue initialization without colossus (graceful degradation)
# π§ NEW LOGIC: Load DB agents AND ensure 7 base templates exist
await self._load_agents_from_database()
# Check which base templates are missing
base_template_ids = ['jane_alesi', 'john_alesi', 'lara_alesi', 'theo_alesi', 'justus_alesi', 'leon_alesi', 'luna_alesi']
missing_templates = [tid for tid in base_template_ids if tid not in self.agents]
if missing_templates:
logger.info(f"π¦ Loading missing base templates: {missing_templates}")
await self._load_missing_templates(missing_templates)
self.is_initialized = True
logger.info(f"β
Agent Manager initialized: {len(self.agents)} agents loaded")
logger.info(f" Base templates: {len([a for a in self.agents if a in base_template_ids])}/7")
logger.info(f" Custom agents: {len([a for a in self.agents if a not in base_template_ids])}")
logger.info(f"π colossus status: {self.colossus_connection_status}")
except Exception as e:
logger.error(f"β Agent Manager initialization failed: {e}")
raise
async def _load_missing_templates(self, template_ids: List[str]):
"""Load specific missing base templates"""
template_map = {
'jane_alesi': AgentTemplates.jane_alesi,
'john_alesi': AgentTemplates.john_alesi,
'lara_alesi': AgentTemplates.lara_alesi,
'theo_alesi': AgentTemplates.theo_alesi,
'justus_alesi': AgentTemplates.justus_alesi,
'leon_alesi': AgentTemplates.leon_alesi,
'luna_alesi': AgentTemplates.luna_alesi
}
for template_id in template_ids:
try:
template_method = template_map.get(template_id)
if template_method:
agent = template_method()
await self.register_agent(agent)
logger.info(f"β
Loaded missing template: {agent.name}")
except Exception as e:
logger.error(f"β Failed to load template {template_id}: {e}")
async def _test_colossus_connection(self):
"""Test colossus connection and update status"""
try:
if not self.colossus_client:
self.colossus_connection_status = "client_not_initialized"
return
# Send a simple test message
test_messages = [
{"role": "system", "content": "You are a test assistant."},
{"role": "user", "content": "Reply with just 'OK' to confirm connection."}
]
logger.info("π§ͺ Testing colossus connection...")
response = await self.colossus_client.chat_completion(
messages=test_messages,
agent_id="connection_test",
max_tokens=10
)
if response and response.get("success"):
self.colossus_connection_status = "connected"
self.last_colossus_test = datetime.utcnow()
logger.info("β
colossus connection test successful")
else:
error_msg = response.get("error", "unknown error") if response else "no response"
self.colossus_connection_status = f"test_failed: {error_msg}"
logger.error(f"β colossus connection test failed: {error_msg}")
except Exception as e:
self.colossus_connection_status = f"test_error: {str(e)}"
logger.error(f"β colossus connection test error: {e}")
async def _load_agents_from_database(self):
"""Load all agents from database into memory cache with error recovery"""
try:
# Check if database manager is ready
if not db_manager.is_initialized:
logger.warning("β οΈ Database not yet initialized - will load default agents")
return
async with db_manager.get_async_session() as session:
result = await session.execute(select(DBAgent))
db_agents = result.scalars().all()
loaded_count = 0
failed_count = 0
for db_agent in db_agents:
try:
saap_agent = db_agent.to_saap_agent()
self.agents[saap_agent.id] = saap_agent
loaded_count += 1
logger.info(f"β
Loaded: {saap_agent.name} ({saap_agent.id})")
except Exception as agent_error:
failed_count += 1
logger.error(f"β Failed to load agent {db_agent.id}: {agent_error}")
continue
logger.info(f"π Loaded {loaded_count} agents from database ({failed_count} failed)")
except Exception as e:
logger.error(f"β Failed to load agents from database: {e}")
logger.info("π¦ Will proceed with in-memory agents only")
async def load_default_agents(self):
"""π€ Load ALL default Alesi agents with improved error handling"""
try:
logger.info("π€ Loading ALL default Alesi agents...")
# π§ FIX: Load templates with individual error handling
template_methods = [
('jane_alesi', 'Jane Alesi - Coordinator'),
('john_alesi', 'John Alesi - Developer'),
('lara_alesi', 'Lara Alesi - Medical Specialist'),
('theo_alesi', 'Theo Alesi - Financial Specialist'),
('justus_alesi', 'Justus Alesi - Legal Specialist'),
('leon_alesi', 'Leon Alesi - System Specialist'),
('luna_alesi', 'Luna Alesi - Coaching Specialist')
]
loaded_agents = []
for method_name, display_name in template_methods:
try:
# Get template method
template_method = getattr(AgentTemplates, method_name, None)
if template_method is None:
logger.error(f"β Template method not found: AgentTemplates.{method_name}")
continue
# Create agent instance
agent = template_method()
# Register agent
success = await self.register_agent(agent)
if success:
loaded_agents.append(display_name)
logger.info(f"β
Loaded: {display_name}")
else:
logger.error(f"β Failed to register: {display_name}")
except Exception as template_error:
logger.error(f"β Error loading {display_name}: {template_error}")
continue
if loaded_agents:
logger.info(f"β
Successfully loaded agents: {loaded_agents}")
else:
logger.error("β No agents could be loaded!")
except Exception as e:
logger.error(f"β Agent loading failed: {e}")
async def register_agent(self, agent: SaapAgent) -> bool:
"""Register new agent with database persistence"""
try:
# Always add to memory cache first
self.agents[agent.id] = agent
# Try to persist to database if available
try:
if db_manager.is_initialized:
async with db_manager.get_async_session() as session:
db_agent = DBAgent.from_saap_agent(agent)
session.add(db_agent)
await session.commit()
logger.info(f"β
Agent registered with database: {agent.name} ({agent.id})")
else:
logger.info(f"β
Agent registered in-memory only: {agent.name} ({agent.id})")
except Exception as db_error:
logger.warning(f"β οΈ Database persistence failed for {agent.name}: {db_error}")
# But keep the agent in memory
return True
except Exception as e:
logger.error(f"β Agent registration failed: {e}")
# Remove from cache if registration completely failed
self.agents.pop(agent.id, None)
return False
def get_agent(self, agent_id: str) -> Optional[SaapAgent]:
"""Get agent from memory cache with debug info"""
agent = self.agents.get(agent_id)
if agent:
logger.debug(f"π Agent found: {agent.name} ({agent_id}) - Status: {agent.status}")
else:
logger.warning(f"β Agent not found: {agent_id}")
logger.debug(f"π Available agents: {list(self.agents.keys())}")
return agent
async def list_agents(self, status: Optional[AgentStatus] = None,
agent_type: Optional[AgentType] = None) -> List[SaapAgent]:
"""List all agents with optional filtering"""
agents = list(self.agents.values())
if status:
agents = [a for a in agents if a.status == status]
if agent_type:
agents = [a for a in agents if a.type == agent_type]
return agents
async def get_agent_stats(self, agent_id: str) -> Dict[str, Any]:
"""Get agent statistics"""
agent = self.get_agent(agent_id)
if not agent:
return {}
# Return basic stats from agent object
return {
"messages_processed": getattr(agent, 'messages_processed', 0),
"total_tokens": getattr(agent, 'total_tokens', 0),
"average_response_time": getattr(agent, 'avg_response_time', 0),
"status": agent.status.value,
"last_active": getattr(agent, 'last_active', None)
}
async def health_check(self, agent_id: str) -> Dict[str, Any]:
"""Perform agent health check"""
agent = self.get_agent(agent_id)
if not agent:
return {"healthy": False, "checks": {"agent_exists": False}}
return {
"healthy": agent.status == AgentStatus.ACTIVE,
"checks": {
"agent_exists": True,
"status": agent.status.value,
"colossus_connection": self.colossus_connection_status == "connected"
}
}
async def update_agent(self, agent_id: str, updated_data) -> SaapAgent:
"""Update agent configuration - accepts dict or SaapAgent with schema migration"""
try:
# Get current agent
current_agent = self.get_agent(agent_id)
if not current_agent:
raise ValueError(f"Agent {agent_id} not found")
# Convert dict to SaapAgent if needed
if isinstance(updated_data, dict):
# Get current data
current_dict = current_agent.dict()
# π§ FIX: Migrate old frontend schema to new schema
# Handle top-level 'color' β 'appearance.color'
if 'color' in updated_data and 'appearance' not in updated_data:
if 'appearance' not in current_dict:
current_dict['appearance'] = {}
current_dict['appearance']['color'] = updated_data.pop('color')
# Handle top-level 'avatar' β 'appearance.avatar'
if 'avatar' in updated_data and 'appearance' not in updated_data:
if 'appearance' not in current_dict:
current_dict['appearance'] = {}
current_dict['appearance']['avatar'] = updated_data.pop('avatar')
# Merge updates
for key, value in updated_data.items():
if key in current_dict:
if isinstance(value, dict) and isinstance(current_dict[key], dict):
# Merge nested dicts
current_dict[key].update(value)
else:
current_dict[key] = value
updated_agent = SaapAgent(**current_dict)
elif isinstance(updated_data, SaapAgent):
updated_agent = updated_data
else:
raise ValueError(f"Invalid update data type: {type(updated_data)}")
# Update in memory cache
self.agents[agent_id] = updated_agent
# Try to update in database if available
if db_manager.is_initialized:
try:
async with db_manager.get_async_session() as session:
# Delete old and insert new (simpler than complex update)
await session.execute(delete(DBAgent).where(DBAgent.id == agent_id))
db_agent = DBAgent.from_saap_agent(updated_agent)
session.add(db_agent)
await session.commit()
except Exception as db_error:
logger.warning(f"β οΈ Database update failed for {agent_id}: {db_error}")
logger.info(f"β
Agent updated: {agent_id}")
return updated_agent
except Exception as e:
logger.error(f"β Agent update failed: {e}")
raise
async def delete_agent(self, agent_id: str) -> bool:
"""Delete agent from memory and database"""
try:
# Stop agent if running
await self.stop_agent(agent_id)
# Remove from memory
self.agents.pop(agent_id, None)
# Try to remove from database if available
if db_manager.is_initialized:
try:
async with db_manager.get_async_session() as session:
await session.execute(delete(DBAgent).where(DBAgent.id == agent_id))
await session.commit()
except Exception as db_error:
logger.warning(f"β οΈ Database deletion failed for {agent_id}: {db_error}")
logger.info(f"β
Agent deleted: {agent_id}")
return True
except Exception as e:
logger.error(f"β Agent deletion failed: {e}")
return False
async def start_agent(self, agent_id: str) -> bool:
"""Start agent and create session"""
try:
agent = self.get_agent(agent_id)
if not agent:
logger.error(f"β Cannot start agent: {agent_id} not found")
return False
# Update status
agent.status = AgentStatus.ACTIVE
if hasattr(agent, 'metrics') and agent.metrics:
agent.metrics.last_active = datetime.utcnow()
# Try to create agent session in database if available
if db_manager.is_initialized:
try:
async with db_manager.get_async_session() as session:
db_session = DBAgentSession(agent_id=agent_id)
session.add(db_session)
await session.commit()
await session.refresh(db_session)
# Store in active sessions
self.active_sessions[agent_id] = db_session
except Exception as db_error:
logger.warning(f"β οΈ Database session creation failed for {agent_id}: {db_error}")
# Update agent status in database if available
await self._update_agent_status(agent_id, AgentStatus.ACTIVE)
logger.info(f"β
Agent started: {agent.name} ({agent_id})")
return True
except Exception as e:
logger.error(f"β Agent start failed: {e}")
return False
async def stop_agent(self, agent_id: str) -> bool:
"""Stop agent and close session"""
try:
agent = self.get_agent(agent_id)
if not agent:
return False
# Update status
agent.status = AgentStatus.INACTIVE
# Close agent session if exists
if agent_id in self.active_sessions:
session_obj = self.active_sessions[agent_id]
session_obj.session_end = datetime.utcnow()
session_obj.status = "completed"
session_obj.end_reason = "graceful"
session_obj.calculate_duration()
if db_manager.is_initialized:
try:
async with db_manager.get_async_session() as session:
await session.merge(session_obj)
await session.commit()
except Exception as db_error:
logger.warning(f"β οΈ Database session update failed for {agent_id}: {db_error}")
del self.active_sessions[agent_id]
# Update agent status in database if available
await self._update_agent_status(agent_id, AgentStatus.INACTIVE)
logger.info(f"π§ Agent stopped: {agent_id}")
return True
except Exception as e:
logger.error(f"β Agent stop failed: {e}")
return False
async def restart_agent(self, agent_id: str) -> bool:
"""Restart agent (stop + start)"""
try:
await self.stop_agent(agent_id)
await asyncio.sleep(1) # Brief pause
return await self.start_agent(agent_id)
except Exception as e:
logger.error(f"β Agent restart failed: {e}")
return False
async def _update_agent_status(self, agent_id: str, status: AgentStatus):
"""Update agent status in database"""
if not db_manager.is_initialized:
return
try:
async with db_manager.get_async_session() as session:
await session.execute(
update(DBAgent)
.where(DBAgent.id == agent_id)
.values(status=status.value, last_active=datetime.utcnow())
)
await session.commit()
except Exception as e:
logger.warning(f"β οΈ Failed to update agent status in database: {e}")
# π NEW: Multi-Provider Chat Support
async def send_message_to_agent(self, agent_id: str, message: str,
provider: Optional[str] = None) -> Dict[str, Any]:
"""
Send message to agent via specified provider or auto-fallback
Args:
agent_id: Target agent ID
message: Message content
provider: Optional provider override ("colossus", "openrouter", or None for auto)
Returns:
Chat response with metadata
"""
try:
# Enhanced error checking with detailed debugging
agent = self.get_agent(agent_id)
if not agent:
error_msg = f"Agent {agent_id} not found in loaded agents"
logger.error(f"β {error_msg}")
logger.debug(f"π Available agents: {list(self.agents.keys())}")
return {
"error": error_msg,
"timestamp": datetime.utcnow().isoformat(),
"debug_info": {
"available_agents": list(self.agents.keys()),
"agent_manager_initialized": self.is_initialized
}
}
# Check if agent is available for messaging
if agent.status != AgentStatus.ACTIVE:
error_msg = f"Agent {agent_id} not available (status: {agent.status.value})"
logger.error(f"β {error_msg}")
return {
"error": error_msg,
"timestamp": datetime.utcnow().isoformat(),
"debug_info": {
"agent_status": agent.status.value,
"agent_id": agent_id
}
}
# π Multi-Provider Logic
if provider == "openrouter":
return await self._send_via_openrouter(agent_id, message, agent)
elif provider == "colossus":
return await self._send_via_colossus(agent_id, message, agent)
else:
# Auto-selection: Try colossus first, fallback to OpenRouter
if self.colossus_connection_status == "connected":
logger.info(f"π Using colossus as primary provider for {agent_id}")
result = await self._send_via_colossus(agent_id, message, agent)
# If colossus fails, try OpenRouter
if "error" in result and "colossus" in result["error"].lower():
logger.info(f"π colossus failed, trying OpenRouter fallback...")
return await self._send_via_openrouter(agent_id, message, agent)
return result
else:
logger.info(f"π colossus unavailable, using OpenRouter as primary for {agent_id}")
return await self._send_via_openrouter(agent_id, message, agent)
except Exception as e:
error_msg = str(e)
logger.error(f"β Message to agent failed: {error_msg}")
return {
"error": error_msg,
"timestamp": datetime.utcnow().isoformat(),
"debug_info": {
"agent_id": agent_id,
"provider": provider,
"colossus_status": self.colossus_connection_status,
"agent_found": agent_id in self.agents,
"colossus_client_exists": self.colossus_client is not None
}
}
async def _send_via_openrouter(self, agent_id: str, message: str,
agent: SaapAgent) -> Dict[str, Any]:
"""Send message via OpenRouter provider"""
try:
logger.info(f"π {agent_id} (coordinator) initialized with OpenRouter FREE")
# Create OpenRouter agent for this request
openrouter_agent = OpenRouterSAAPAgent(
agent_id,
agent.type.value if agent.type else "Assistant",
os.getenv("OPENROUTER_API_KEY")
)
# Get cost-optimized model for specific agent
model_map = {
"jane_alesi": os.getenv("JANE_ALESI_MODEL", "openai/gpt-4o-mini"),
"john_alesi": os.getenv("JOHN_ALESI_MODEL", "deepseek/deepseek-coder"),
"lara_alesi": os.getenv("LARA_ALESI_MODEL", "anthropic/claude-3-haiku"),
"theo_alesi": os.getenv("THEO_ALESI_MODEL", "openai/gpt-4o-mini"), # π° Financial
"justus_alesi": os.getenv("JUSTUS_ALESI_MODEL", "anthropic/claude-3-haiku"), # βοΈ Legal
"leon_alesi": os.getenv("LEON_ALESI_MODEL", "deepseek/deepseek-coder"), # π§ System
"luna_alesi": os.getenv("LUNA_ALESI_MODEL", "openai/gpt-4o-mini") # π Coaching
}
preferred_model = model_map.get(agent_id, "meta-llama/llama-3.2-3b-instruct:free")
openrouter_agent.model_name = preferred_model
start_time = datetime.utcnow()
logger.info(f"π€ Sending message to {agent.name} ({agent_id}) via OpenRouter ({preferred_model})...")
# π§ FIXED: Use safe LLM config access
max_tokens_value = self._get_llm_config_value(agent, 'max_tokens', 1000)
# Send message via OpenRouter
response = await openrouter_agent.send_request_to_openrouter(
message,
max_tokens=max_tokens_value
)
end_time = datetime.utcnow()
response_time = (end_time - start_time).total_seconds()
if response.get("success"):
logger.info(f"β
OpenRouter response successful in {response_time:.2f}s")
response_content = response.get("response", "")
tokens_used = response.get("token_count", 0)
cost_usd = response.get("cost_usd", 0.0)
# Try to save to database if available
if db_manager.is_initialized:
try:
async with db_manager.get_async_session() as session:
chat_message = DBChatMessage(
agent_id=agent_id,
user_message=message,
agent_response=response_content,
response_time=response_time,
tokens_used=tokens_used,
metadata={
"model": preferred_model,
"provider": "OpenRouter",
"cost_usd": cost_usd,
"temperature": 0.7
}
)
session.add(chat_message)
await session.commit()
except Exception as db_error:
logger.warning(f"β οΈ Failed to save OpenRouter chat to database: {db_error}")
return {
"content": response_content,
"response_time": response_time,
"tokens_used": tokens_used,
"cost_usd": cost_usd,
"provider": "OpenRouter",
"model": preferred_model,
"timestamp": end_time.isoformat()
}
else:
error_msg = response.get("error", "Unknown OpenRouter error")
logger.error(f"β OpenRouter fallback failed: {error_msg}")
return {
"error": f"OpenRouter error: {error_msg}",
"provider": "OpenRouter",
"timestamp": end_time.isoformat()
}
except Exception as e:
logger.error(f"β OpenRouter fallback failed: {str(e)}")
return {
"error": f"OpenRouter error: {str(e)}",
"provider": "OpenRouter",
"timestamp": datetime.utcnow().isoformat()
}
async def _send_via_colossus(self, agent_id: str, message: str,
agent: SaapAgent) -> Dict[str, Any]:
"""Send message via colossus provider"""
try:
# Check colossus client availability
if not self.colossus_client:
return {
"error": "colossus client not initialized",
"provider": "colossus",
"timestamp": datetime.utcnow().isoformat()
}
# Test colossus connection if it's been a while
if (not self.last_colossus_test or
(datetime.utcnow() - self.last_colossus_test).seconds > 300): # 5 minutes
await self._test_colossus_connection()
if self.colossus_connection_status != "connected":
return {
"error": f"colossus connection not healthy: {self.colossus_connection_status}",
"provider": "colossus",
"timestamp": datetime.utcnow().isoformat()
}
start_time = datetime.utcnow()
logger.info(f"π€ Sending message to {agent.name} ({agent_id}) via colossus...")
# π§ FIXED: Use safe LLM config access
temperature_value = self._get_llm_config_value(agent, 'temperature', 0.7)
max_tokens_value = self._get_llm_config_value(agent, 'max_tokens', 1000)
# Send message to colossus
response = await self.colossus_client.chat_completion(
messages=[
{"role": "system", "content": agent.description or f"You are {agent.name}"},
{"role": "user", "content": message}
],
agent_id=agent_id,
temperature=temperature_value,
max_tokens=max_tokens_value
)
end_time = datetime.utcnow()
response_time = (end_time - start_time).total_seconds()
logger.info(f"π₯ Received response from colossus in {response_time:.2f}s")
# Enhanced response parsing
response_content = ""
tokens_used = 0
if response:
logger.debug(f"π Raw colossus response: {response}")
if isinstance(response, dict):
# SAAP ColossusClient format: {"success": true, "response": {...}}
if response.get("success") and "response" in response:
colossus_response = response["response"]
if isinstance(colossus_response, dict) and "choices" in colossus_response:
# OpenAI-compatible format within SAAP response
if len(colossus_response["choices"]) > 0:
choice = colossus_response["choices"][0]
if "message" in choice and "content" in choice["message"]:
response_content = choice["message"]["content"]
elif isinstance(colossus_response, str):
# Direct string response
response_content = colossus_response
# Extract token usage if available
if isinstance(colossus_response, dict) and "usage" in colossus_response:
tokens_used = colossus_response["usage"].get("total_tokens", 0)
# Handle colossus client error responses
elif not response.get("success"):
error_msg = response.get("error", "Unknown colossus error")
logger.error(f"β colossus error: {error_msg}")
return {
"error": f"colossus server error: {error_msg}",
"provider": "colossus",
"timestamp": end_time.isoformat()
}
# Direct OpenAI format: {"choices": [...]}
elif "choices" in response and len(response["choices"]) > 0:
choice = response["choices"][0]
if "message" in choice and "content" in choice["message"]:
response_content = choice["message"]["content"]
if "usage" in response:
tokens_used = response["usage"].get("total_tokens", 0)
# Simple response format: {"response": "text"} or {"content": "text"}
elif "response" in response:
response_content = response["response"]
elif "content" in response:
response_content = response["content"]
elif isinstance(response, str):
# Direct string response
response_content = response
# Fallback if no content extracted
if not response_content:
logger.error(f"β Unable to extract content from colossus response: {response}")
return {
"error": "Failed to parse colossus response",
"provider": "colossus",
"timestamp": end_time.isoformat()
}
# Try to save to database if available
if db_manager.is_initialized:
try:
async with db_manager.get_async_session() as session:
chat_message = DBChatMessage(
agent_id=agent_id,
user_message=message,
agent_response=response_content,
response_time=response_time,
tokens_used=tokens_used,
metadata={
"model": "mistral-small3.2:24b-instruct-2506",
"provider": "colossus",
"temperature": 0.7
}
)
session.add(chat_message)
await session.commit()
except Exception as db_error:
logger.warning(f"β οΈ Failed to save chat message to database: {db_error}")
# Update session metrics
if agent_id in self.active_sessions:
session_obj = self.active_sessions[agent_id]
session_obj.messages_processed += 1
session_obj.total_tokens_used += tokens_used
logger.info(f"β
Message processed successfully for {agent.name}")
return {
"content": response_content,
"response_time": response_time,
"tokens_used": tokens_used,
"provider": "colossus",
"model": "mistral-small3.2:24b-instruct-2506",
"timestamp": end_time.isoformat()
}
except Exception as e:
logger.error(f"β colossus communication failed: {str(e)}")
return {
"error": f"colossus error: {str(e)}",
"provider": "colossus",
"timestamp": datetime.utcnow().isoformat()
}
async def get_agent_metrics(self, agent_id: str) -> Dict[str, Any]:
"""Get comprehensive agent metrics from database"""
if not db_manager.is_initialized:
return {"warning": "Database not available - no metrics"}
try:
async with db_manager.get_async_session() as session:
# Get message count and average response time
result = await session.execute(
select(DBChatMessage).where(DBChatMessage.agent_id == agent_id)
)
messages = result.scalars().all()
if messages:
avg_response_time = sum(m.response_time for m in messages if m.response_time) / len(messages)
total_tokens = sum(m.tokens_used for m in messages if m.tokens_used)
else:
avg_response_time = 0
total_tokens = 0
# Get session count
session_result = await session.execute(
select(DBAgentSession).where(DBAgentSession.agent_id == agent_id)
)
sessions = session_result.scalars().all()
return {
"total_messages": len(messages),
"total_tokens_used": total_tokens,
"average_response_time": avg_response_time,
"total_sessions": len(sessions),
"last_activity": max([s.session_start for s in sessions], default=None),
}
except Exception as e:
logger.error(f"β Failed to get agent metrics: {e}")
return {}
async def get_system_status(self) -> Dict[str, Any]:
"""Get comprehensive system status for debugging"""
return {
"agent_manager_initialized": self.is_initialized,
"colossus_connection_status": self.colossus_connection_status,
"colossus_last_test": self.last_colossus_test.isoformat() if self.last_colossus_test else None,
"loaded_agents": len(self.agents),
"active_sessions": len(self.active_sessions),
"agent_list": [{"id": aid, "name": agent.name, "status": agent.status.value}
for aid, agent in self.agents.items()],
"database_initialized": getattr(db_manager, 'is_initialized', False)
}
async def shutdown_all_agents(self):
"""Gracefully shutdown all active agents"""
try:
logger.info("π§ Shutting down all agents...")
for agent_id in list(self.agents.keys()):
await self.stop_agent(agent_id)
if self.colossus_client:
await self.colossus_client.__aexit__(None, None, None)
logger.info("β
All agents shut down successfully")
except Exception as e:
logger.error(f"β Agent shutdown failed: {e}")
# Create global instance for dependency injection
agent_manager = AgentManagerService()
# Make class available for import
AgentManager = AgentManagerService
if __name__ == "__main__":
async def test_agent_manager():
"""Test agent manager functionality"""
manager = AgentManagerService()
await manager.initialize()
# List agents
agents = list(manager.agents.values())
print(f"π Agents loaded: {[a.name for a in agents]}")
# Start first agent
if agents:
agent = agents[0]
success = await manager.start_agent(agent.id)
print(f"π Start agent {agent.name}: {'β
' if success else 'β'}")
await manager.shutdown_all_agents()
asyncio.run(test_agent_manager())
|