Spaces:
Running
Running
| """ | |
| DS-STAR Graph: Connects all agents into a workflow. | |
| This module builds the LangGraph StateGraph that orchestrates the multi-agent system. | |
| """ | |
| from typing import Literal | |
| from langgraph.checkpoint.memory import MemorySaver | |
| from langgraph.graph import END, StateGraph | |
| from .agents.analyzer_agent import analyzer_node | |
| from .agents.coder_agent import coder_node | |
| from .agents.finalyzer_agent import finalyzer_node | |
| from .agents.planner_agent import planner_node | |
| from .agents.router_agent import backtrack_node, router_node | |
| from .agents.verifier_agent import verifier_node | |
| from .utils.state import DSStarState | |
| # ==================== CONDITIONAL ROUTING FUNCTIONS ==================== | |
| def route_after_analyzer(state: DSStarState) -> Literal["planner", "__end__"]: | |
| """ | |
| Route after analyzer based on success. | |
| If analyzer found errors, end workflow. | |
| Otherwise, proceed to planner. | |
| """ | |
| if "error" in state.get("data_descriptions", {}): | |
| return "__end__" | |
| return state.get("next", "planner") | |
| def route_after_planner(state: DSStarState) -> Literal["coder", "__end__"]: | |
| """Route after planner to coder.""" | |
| return state.get("next", "coder") | |
| def route_after_coder(state: DSStarState) -> Literal["verifier", "__end__"]: | |
| """Route after coder to verifier.""" | |
| return state.get("next", "verifier") | |
| def route_after_verifier( | |
| state: DSStarState, | |
| ) -> Literal["router", "finalyzer", "__end__"]: | |
| """ | |
| Route after verifier based on sufficiency and iteration count. | |
| If max iterations reached, go to finalyzer. | |
| If sufficient, go to finalyzer. | |
| Otherwise, go to router to decide next action. | |
| """ | |
| # Check max iterations | |
| if state["iteration"] >= state["max_iterations"]: | |
| print(f"\n⚠ Max iterations ({state['max_iterations']}) reached, finalizing...") | |
| return "finalyzer" | |
| return state.get("next", "router") | |
| def route_after_router(state: DSStarState) -> Literal["planner", "backtrack"]: | |
| """ | |
| Route after router based on decision. | |
| If router says "Add Step", go to planner. | |
| If router says "Step N", go to backtrack. | |
| """ | |
| return state.get("next", "planner") | |
| # ==================== GRAPH BUILDER ==================== | |
| def build_ds_star_graph(llm, max_iterations: int = 20): | |
| """ | |
| Constructs the LangGraph workflow for DS-STAR. | |
| The workflow follows this pattern: | |
| 1. Analyzer: Analyze data files (runs once) | |
| 2. Planner: Generate next plan step | |
| 3. Coder: Implement plan as code | |
| 4. Verifier: Check if sufficient | |
| 5. If insufficient: | |
| a. Router: Decide to add step or backtrack | |
| b. Backtrack (optional): Remove wrong steps | |
| c. Go back to Planner | |
| 6. If sufficient: | |
| Finalyzer: Create polished final solution | |
| Args: | |
| llm: LLM instance (e.g., ChatOpenAI, ChatGoogleGenerativeAI) | |
| max_iterations: Maximum refinement iterations (default: 20) | |
| Returns: | |
| Compiled LangGraph application with checkpointing | |
| """ | |
| # Initialize graph with state schema | |
| workflow = StateGraph(DSStarState) | |
| # Add all agent nodes | |
| workflow.add_node("analyzer", analyzer_node) | |
| workflow.add_node("planner", planner_node) | |
| workflow.add_node("coder", coder_node) | |
| workflow.add_node("verifier", verifier_node) | |
| workflow.add_node("router", router_node) | |
| workflow.add_node("backtrack", backtrack_node) | |
| workflow.add_node("finalyzer", finalyzer_node) | |
| # Set entry point | |
| workflow.set_entry_point("analyzer") | |
| # Add conditional edges with proper routing | |
| workflow.add_conditional_edges( | |
| "analyzer", route_after_analyzer, {"planner": "planner", "__end__": END} | |
| ) | |
| workflow.add_conditional_edges( | |
| "planner", route_after_planner, {"coder": "coder", "__end__": END} | |
| ) | |
| workflow.add_conditional_edges( | |
| "coder", route_after_coder, {"verifier": "verifier", "__end__": END} | |
| ) | |
| workflow.add_conditional_edges( | |
| "verifier", | |
| route_after_verifier, | |
| {"router": "router", "finalyzer": "finalyzer", "__end__": END}, | |
| ) | |
| workflow.add_conditional_edges( | |
| "router", route_after_router, {"planner": "planner", "backtrack": "backtrack"} | |
| ) | |
| workflow.add_edge("backtrack", "planner") | |
| workflow.add_edge("finalyzer", END) | |
| # Add memory/checkpointing | |
| memory = MemorySaver() | |
| # Compile graph | |
| app = workflow.compile(checkpointer=memory) | |
| return app | |
| def create_initial_state(query: str, llm, max_iterations: int = 20) -> DSStarState: | |
| """ | |
| Create initial state for the DS-STAR workflow. | |
| Args: | |
| query: User's question to answer | |
| llm: LLM instance | |
| max_iterations: Maximum refinement iterations | |
| Returns: | |
| Initial DSStarState dictionary | |
| """ | |
| return { | |
| "query": query, | |
| "data_descriptions": {}, | |
| "plan": [], | |
| "current_code": "", | |
| "execution_result": "", | |
| "is_sufficient": False, | |
| "router_decision": "", | |
| "iteration": 0, | |
| "max_iterations": max_iterations, | |
| "messages": [], | |
| "next": "analyzer", | |
| "llm": llm, | |
| } | |
| def run_ds_star( | |
| query: str, llm, max_iterations: int = 20, thread_id: str = "ds-star-1" | |
| ): | |
| """ | |
| Run the complete DS-STAR workflow. | |
| Args: | |
| query: User's question to answer | |
| llm: LLM instance | |
| max_iterations: Maximum refinement iterations | |
| thread_id: Unique thread ID for checkpointing | |
| Returns: | |
| Final state after workflow completion | |
| """ | |
| print("=" * 60) | |
| print("DS-STAR MULTI-AGENT SYSTEM") | |
| print("=" * 60) | |
| print(f"Query: {query}") | |
| print(f"Max Iterations: {max_iterations}") | |
| print("=" * 60) | |
| # Build graph | |
| app = build_ds_star_graph(llm, max_iterations) | |
| # Create initial state | |
| initial_state = create_initial_state(query, llm, max_iterations) | |
| # Run with checkpointing | |
| config = {"configurable": {"thread_id": thread_id}} | |
| try: | |
| # Execute the workflow | |
| final_state = app.invoke(initial_state, config) | |
| # Display results | |
| print("\n" + "=" * 60) | |
| print("FINAL SOLUTION") | |
| print("=" * 60) | |
| print("\nGenerated Code:") | |
| print("-" * 60) | |
| print(final_state["current_code"]) | |
| print("\n" + "-" * 60) | |
| print("Execution Result:") | |
| print("-" * 60) | |
| print(final_state["execution_result"]) | |
| print("=" * 60) | |
| return final_state | |
| except Exception as e: | |
| print(f"\n✗ Error during execution: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return None | |