Spaces:
Running
Running
File size: 4,567 Bytes
8ff817c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
"""
Planner Agent: Generates next plan step to answer the query.
This agent generates ONE step at a time based on:
- The original query
- Available data files
- Previously completed steps (if any)
"""
from langchain_core.messages import AIMessage
from ..utils.formatters import format_data_descriptions, format_plan, gemini_text
from ..utils.state import DSStarState, PlanStep
def planner_node(state: DSStarState) -> dict:
"""
Planner Agent Node: Generates the next step in the plan.
On first call: Generates initial step to start answering the query
On subsequent calls: Generates next step based on progress so far
Args:
state: Current DSStarState
Returns:
Dictionary with updated state fields:
- plan: Updated plan with new step appended
- messages: Agent communication messages
- next: Next node to visit ("coder")
"""
print("=" * 60)
print("PLANNER AGENT STARTING...")
print("=" * 60)
is_initial = len(state["plan"]) == 0
data_context = format_data_descriptions(state["data_descriptions"])
if is_initial:
print("Generating INITIAL plan step...")
prompt = f"""You are an expert data analyst.
Question to answer: {state["query"]}
Available Data Files:
{data_context}
Task: Generate list of simple, executable steps to start answering this question.
Examples of good steps:
- "Load the transactions.csv file"
- "Read and explore the sales data"
Provide ONLY the step description (one sentence) in one line in bullet points, no explanation."""
else:
print(f"Generating NEXT step (current plan has {len(state['plan'])} steps)...")
plan_text = format_plan(state["plan"])
prompt = f"""You are an expert data analyst.
Question to answer: {state["query"]}
Available Data Files:
{data_context}
Current Plan (completed steps):
{plan_text}
Last Execution Result:
{state["execution_result"][:500]}...
Task: Suggest the NEXT step to progress toward answering the question.
Make it simple and executable (one clear action).
Provide ONLY the next step description (one sentence), no explanation."""
try:
# Get LLM response
response = state["llm"].invoke(prompt)
# Handle different response formats
if hasattr(response, "content") and isinstance(response.content, list):
response_text = gemini_text(response)
elif hasattr(response, "content"):
response_text = response.content
else:
response_text = str(response)
# Create new step
new_step = PlanStep(
step_number=len(state["plan"]), description=response_text.strip()
)
# Add new step to existing plan
updated_plan = state["plan"] + [new_step]
print(
f"\n✓ Generated step {new_step['step_number'] + 1}: {new_step['description']}"
)
print("=" * 60)
return {
"plan": updated_plan,
"messages": [
AIMessage(content=f"Added step {new_step['step_number'] + 1}")
],
"next": "coder",
}
except Exception as e:
print(f"✗ Planner error: {str(e)}")
return {
"messages": [AIMessage(content=f"Planner error: {str(e)}")],
"next": "__end__",
}
# Standalone test function
def test_planner(llm, query: str, data_descriptions: dict, existing_plan: list = None):
"""
Test the planner agent independently.
Args:
llm: LLM instance
query: User query
data_descriptions: Dict of filename -> description
existing_plan: Optional existing plan steps
Returns:
Dictionary with planner results
"""
# Create minimal test state
test_state = {
"llm": llm,
"query": query,
"data_descriptions": data_descriptions,
"plan": existing_plan or [],
"current_code": "",
"execution_result": "",
"is_sufficient": False,
"router_decision": "",
"iteration": 0,
"max_iterations": 20,
"messages": [],
"next": "planner",
}
result = planner_node(test_state)
print("\n" + "=" * 60)
print("PLANNER TEST RESULTS")
print("=" * 60)
print(f"Updated Plan ({len(result.get('plan', []))} steps):")
print(format_plan(result.get("plan", [])))
return result
|