Spaces:
Running
Running
| """ | |
| Planner Agent: Generates next plan step to answer the query. | |
| This agent generates ONE step at a time based on: | |
| - The original query | |
| - Available data files | |
| - Previously completed steps (if any) | |
| """ | |
| from langchain_core.messages import AIMessage | |
| from ..utils.formatters import format_data_descriptions, format_plan, gemini_text | |
| from ..utils.state import DSStarState, PlanStep | |
| def planner_node(state: DSStarState) -> dict: | |
| """ | |
| Planner Agent Node: Generates the next step in the plan. | |
| On first call: Generates initial step to start answering the query | |
| On subsequent calls: Generates next step based on progress so far | |
| Args: | |
| state: Current DSStarState | |
| Returns: | |
| Dictionary with updated state fields: | |
| - plan: Updated plan with new step appended | |
| - messages: Agent communication messages | |
| - next: Next node to visit ("coder") | |
| """ | |
| print("=" * 60) | |
| print("PLANNER AGENT STARTING...") | |
| print("=" * 60) | |
| is_initial = len(state["plan"]) == 0 | |
| data_context = format_data_descriptions(state["data_descriptions"]) | |
| if is_initial: | |
| print("Generating INITIAL plan step...") | |
| prompt = f"""You are an expert data analyst. | |
| Question to answer: {state["query"]} | |
| Available Data Files: | |
| {data_context} | |
| Task: Generate list of simple, executable steps to start answering this question. | |
| Examples of good steps: | |
| - "Load the transactions.csv file" | |
| - "Read and explore the sales data" | |
| Provide ONLY the step description (one sentence) in one line in bullet points, no explanation.""" | |
| else: | |
| print(f"Generating NEXT step (current plan has {len(state['plan'])} steps)...") | |
| plan_text = format_plan(state["plan"]) | |
| prompt = f"""You are an expert data analyst. | |
| Question to answer: {state["query"]} | |
| Available Data Files: | |
| {data_context} | |
| Current Plan (completed steps): | |
| {plan_text} | |
| Last Execution Result: | |
| {state["execution_result"][:500]}... | |
| Task: Suggest the NEXT step to progress toward answering the question. | |
| Make it simple and executable (one clear action). | |
| Provide ONLY the next step description (one sentence), no explanation.""" | |
| try: | |
| # Get LLM response | |
| response = state["llm"].invoke(prompt) | |
| # Handle different response formats | |
| if hasattr(response, "content") and isinstance(response.content, list): | |
| response_text = gemini_text(response) | |
| elif hasattr(response, "content"): | |
| response_text = response.content | |
| else: | |
| response_text = str(response) | |
| # Create new step | |
| new_step = PlanStep( | |
| step_number=len(state["plan"]), description=response_text.strip() | |
| ) | |
| # Add new step to existing plan | |
| updated_plan = state["plan"] + [new_step] | |
| print( | |
| f"\n✓ Generated step {new_step['step_number'] + 1}: {new_step['description']}" | |
| ) | |
| print("=" * 60) | |
| return { | |
| "plan": updated_plan, | |
| "messages": [ | |
| AIMessage(content=f"Added step {new_step['step_number'] + 1}") | |
| ], | |
| "next": "coder", | |
| } | |
| except Exception as e: | |
| print(f"✗ Planner error: {str(e)}") | |
| return { | |
| "messages": [AIMessage(content=f"Planner error: {str(e)}")], | |
| "next": "__end__", | |
| } | |
| # Standalone test function | |
| def test_planner(llm, query: str, data_descriptions: dict, existing_plan: list = None): | |
| """ | |
| Test the planner agent independently. | |
| Args: | |
| llm: LLM instance | |
| query: User query | |
| data_descriptions: Dict of filename -> description | |
| existing_plan: Optional existing plan steps | |
| Returns: | |
| Dictionary with planner results | |
| """ | |
| # Create minimal test state | |
| test_state = { | |
| "llm": llm, | |
| "query": query, | |
| "data_descriptions": data_descriptions, | |
| "plan": existing_plan or [], | |
| "current_code": "", | |
| "execution_result": "", | |
| "is_sufficient": False, | |
| "router_decision": "", | |
| "iteration": 0, | |
| "max_iterations": 20, | |
| "messages": [], | |
| "next": "planner", | |
| } | |
| result = planner_node(test_state) | |
| print("\n" + "=" * 60) | |
| print("PLANNER TEST RESULTS") | |
| print("=" * 60) | |
| print(f"Updated Plan ({len(result.get('plan', []))} steps):") | |
| print(format_plan(result.get("plan", []))) | |
| return result | |