File size: 4,567 Bytes
8ff817c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
"""

Planner Agent: Generates next plan step to answer the query.



This agent generates ONE step at a time based on:

- The original query

- Available data files

- Previously completed steps (if any)

"""

from langchain_core.messages import AIMessage

from ..utils.formatters import format_data_descriptions, format_plan, gemini_text
from ..utils.state import DSStarState, PlanStep


def planner_node(state: DSStarState) -> dict:
    """

    Planner Agent Node: Generates the next step in the plan.



    On first call: Generates initial step to start answering the query

    On subsequent calls: Generates next step based on progress so far



    Args:

        state: Current DSStarState



    Returns:

        Dictionary with updated state fields:

        - plan: Updated plan with new step appended

        - messages: Agent communication messages

        - next: Next node to visit ("coder")

    """
    print("=" * 60)
    print("PLANNER AGENT STARTING...")
    print("=" * 60)

    is_initial = len(state["plan"]) == 0
    data_context = format_data_descriptions(state["data_descriptions"])

    if is_initial:
        print("Generating INITIAL plan step...")
        prompt = f"""You are an expert data analyst.



Question to answer: {state["query"]}



Available Data Files:

{data_context}



Task: Generate list of simple, executable steps to start answering this question.

Examples of good steps:

- "Load the transactions.csv file"

- "Read and explore the sales data"



Provide ONLY the step description (one sentence) in one line in bullet points, no explanation."""
    else:
        print(f"Generating NEXT step (current plan has {len(state['plan'])} steps)...")
        plan_text = format_plan(state["plan"])

        prompt = f"""You are an expert data analyst.



Question to answer: {state["query"]}



Available Data Files:

{data_context}



Current Plan (completed steps):

{plan_text}



Last Execution Result:

{state["execution_result"][:500]}...



Task: Suggest the NEXT step to progress toward answering the question.

Make it simple and executable (one clear action).



Provide ONLY the next step description (one sentence), no explanation."""

    try:
        # Get LLM response
        response = state["llm"].invoke(prompt)

        # Handle different response formats
        if hasattr(response, "content") and isinstance(response.content, list):
            response_text = gemini_text(response)
        elif hasattr(response, "content"):
            response_text = response.content
        else:
            response_text = str(response)

        # Create new step
        new_step = PlanStep(
            step_number=len(state["plan"]), description=response_text.strip()
        )

        # Add new step to existing plan
        updated_plan = state["plan"] + [new_step]

        print(
            f"\n✓ Generated step {new_step['step_number'] + 1}: {new_step['description']}"
        )
        print("=" * 60)

        return {
            "plan": updated_plan,
            "messages": [
                AIMessage(content=f"Added step {new_step['step_number'] + 1}")
            ],
            "next": "coder",
        }

    except Exception as e:
        print(f"✗ Planner error: {str(e)}")
        return {
            "messages": [AIMessage(content=f"Planner error: {str(e)}")],
            "next": "__end__",
        }


# Standalone test function
def test_planner(llm, query: str, data_descriptions: dict, existing_plan: list = None):
    """

    Test the planner agent independently.



    Args:

        llm: LLM instance

        query: User query

        data_descriptions: Dict of filename -> description

        existing_plan: Optional existing plan steps



    Returns:

        Dictionary with planner results

    """
    # Create minimal test state
    test_state = {
        "llm": llm,
        "query": query,
        "data_descriptions": data_descriptions,
        "plan": existing_plan or [],
        "current_code": "",
        "execution_result": "",
        "is_sufficient": False,
        "router_decision": "",
        "iteration": 0,
        "max_iterations": 20,
        "messages": [],
        "next": "planner",
    }

    result = planner_node(test_state)

    print("\n" + "=" * 60)
    print("PLANNER TEST RESULTS")
    print("=" * 60)
    print(f"Updated Plan ({len(result.get('plan', []))} steps):")
    print(format_plan(result.get("plan", [])))

    return result