""" Planner Agent: Generates next plan step to answer the query. This agent generates ONE step at a time based on: - The original query - Available data files - Previously completed steps (if any) """ from langchain_core.messages import AIMessage from ..utils.formatters import format_data_descriptions, format_plan, gemini_text from ..utils.state import DSStarState, PlanStep def planner_node(state: DSStarState) -> dict: """ Planner Agent Node: Generates the next step in the plan. On first call: Generates initial step to start answering the query On subsequent calls: Generates next step based on progress so far Args: state: Current DSStarState Returns: Dictionary with updated state fields: - plan: Updated plan with new step appended - messages: Agent communication messages - next: Next node to visit ("coder") """ print("=" * 60) print("PLANNER AGENT STARTING...") print("=" * 60) is_initial = len(state["plan"]) == 0 data_context = format_data_descriptions(state["data_descriptions"]) if is_initial: print("Generating INITIAL plan step...") prompt = f"""You are an expert data analyst. Question to answer: {state["query"]} Available Data Files: {data_context} Task: Generate list of simple, executable steps to start answering this question. Examples of good steps: - "Load the transactions.csv file" - "Read and explore the sales data" Provide ONLY the step description (one sentence) in one line in bullet points, no explanation.""" else: print(f"Generating NEXT step (current plan has {len(state['plan'])} steps)...") plan_text = format_plan(state["plan"]) prompt = f"""You are an expert data analyst. Question to answer: {state["query"]} Available Data Files: {data_context} Current Plan (completed steps): {plan_text} Last Execution Result: {state["execution_result"][:500]}... Task: Suggest the NEXT step to progress toward answering the question. Make it simple and executable (one clear action). Provide ONLY the next step description (one sentence), no explanation.""" try: # Get LLM response response = state["llm"].invoke(prompt) # Handle different response formats if hasattr(response, "content") and isinstance(response.content, list): response_text = gemini_text(response) elif hasattr(response, "content"): response_text = response.content else: response_text = str(response) # Create new step new_step = PlanStep( step_number=len(state["plan"]), description=response_text.strip() ) # Add new step to existing plan updated_plan = state["plan"] + [new_step] print( f"\nāœ“ Generated step {new_step['step_number'] + 1}: {new_step['description']}" ) print("=" * 60) return { "plan": updated_plan, "messages": [ AIMessage(content=f"Added step {new_step['step_number'] + 1}") ], "next": "coder", } except Exception as e: print(f"āœ— Planner error: {str(e)}") return { "messages": [AIMessage(content=f"Planner error: {str(e)}")], "next": "__end__", } # Standalone test function def test_planner(llm, query: str, data_descriptions: dict, existing_plan: list = None): """ Test the planner agent independently. Args: llm: LLM instance query: User query data_descriptions: Dict of filename -> description existing_plan: Optional existing plan steps Returns: Dictionary with planner results """ # Create minimal test state test_state = { "llm": llm, "query": query, "data_descriptions": data_descriptions, "plan": existing_plan or [], "current_code": "", "execution_result": "", "is_sufficient": False, "router_decision": "", "iteration": 0, "max_iterations": 20, "messages": [], "next": "planner", } result = planner_node(test_state) print("\n" + "=" * 60) print("PLANNER TEST RESULTS") print("=" * 60) print(f"Updated Plan ({len(result.get('plan', []))} steps):") print(format_plan(result.get("plan", []))) return result