DS-STAR / src /agents /planner_agent.py
anurag-deo's picture
Upload folder using huggingface_hub
8ff817c verified
raw
history blame
4.57 kB
"""
Planner Agent: Generates next plan step to answer the query.
This agent generates ONE step at a time based on:
- The original query
- Available data files
- Previously completed steps (if any)
"""
from langchain_core.messages import AIMessage
from ..utils.formatters import format_data_descriptions, format_plan, gemini_text
from ..utils.state import DSStarState, PlanStep
def planner_node(state: DSStarState) -> dict:
"""
Planner Agent Node: Generates the next step in the plan.
On first call: Generates initial step to start answering the query
On subsequent calls: Generates next step based on progress so far
Args:
state: Current DSStarState
Returns:
Dictionary with updated state fields:
- plan: Updated plan with new step appended
- messages: Agent communication messages
- next: Next node to visit ("coder")
"""
print("=" * 60)
print("PLANNER AGENT STARTING...")
print("=" * 60)
is_initial = len(state["plan"]) == 0
data_context = format_data_descriptions(state["data_descriptions"])
if is_initial:
print("Generating INITIAL plan step...")
prompt = f"""You are an expert data analyst.
Question to answer: {state["query"]}
Available Data Files:
{data_context}
Task: Generate list of simple, executable steps to start answering this question.
Examples of good steps:
- "Load the transactions.csv file"
- "Read and explore the sales data"
Provide ONLY the step description (one sentence) in one line in bullet points, no explanation."""
else:
print(f"Generating NEXT step (current plan has {len(state['plan'])} steps)...")
plan_text = format_plan(state["plan"])
prompt = f"""You are an expert data analyst.
Question to answer: {state["query"]}
Available Data Files:
{data_context}
Current Plan (completed steps):
{plan_text}
Last Execution Result:
{state["execution_result"][:500]}...
Task: Suggest the NEXT step to progress toward answering the question.
Make it simple and executable (one clear action).
Provide ONLY the next step description (one sentence), no explanation."""
try:
# Get LLM response
response = state["llm"].invoke(prompt)
# Handle different response formats
if hasattr(response, "content") and isinstance(response.content, list):
response_text = gemini_text(response)
elif hasattr(response, "content"):
response_text = response.content
else:
response_text = str(response)
# Create new step
new_step = PlanStep(
step_number=len(state["plan"]), description=response_text.strip()
)
# Add new step to existing plan
updated_plan = state["plan"] + [new_step]
print(
f"\n✓ Generated step {new_step['step_number'] + 1}: {new_step['description']}"
)
print("=" * 60)
return {
"plan": updated_plan,
"messages": [
AIMessage(content=f"Added step {new_step['step_number'] + 1}")
],
"next": "coder",
}
except Exception as e:
print(f"✗ Planner error: {str(e)}")
return {
"messages": [AIMessage(content=f"Planner error: {str(e)}")],
"next": "__end__",
}
# Standalone test function
def test_planner(llm, query: str, data_descriptions: dict, existing_plan: list = None):
"""
Test the planner agent independently.
Args:
llm: LLM instance
query: User query
data_descriptions: Dict of filename -> description
existing_plan: Optional existing plan steps
Returns:
Dictionary with planner results
"""
# Create minimal test state
test_state = {
"llm": llm,
"query": query,
"data_descriptions": data_descriptions,
"plan": existing_plan or [],
"current_code": "",
"execution_result": "",
"is_sufficient": False,
"router_decision": "",
"iteration": 0,
"max_iterations": 20,
"messages": [],
"next": "planner",
}
result = planner_node(test_state)
print("\n" + "=" * 60)
print("PLANNER TEST RESULTS")
print("=" * 60)
print(f"Updated Plan ({len(result.get('plan', []))} steps):")
print(format_plan(result.get("plan", [])))
return result