File size: 5,057 Bytes
8ff817c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""

Analyzer Agent: Analyzes data files and generates descriptions.



This agent runs once at the beginning to understand available data.

"""

import os
from pathlib import Path

from langchain_core.messages import AIMessage

from ..utils.code_execution import execute_with_debug
from ..utils.formatters import extract_code, gemini_text
from ..utils.state import DSStarState


def analyzer_node(state: DSStarState) -> dict:
    """

    Analyzer Agent Node: Analyzes all data files in the data/ directory.



    For each file, generates and executes Python code to:

    - Load the file

    - Print structure, types, and sample data

    - Capture essential information



    Args:

        state: Current DSStarState



    Returns:

        Dictionary with updated state fields:

        - data_descriptions: Dict mapping filename to analysis result

        - messages: Agent communication messages

        - next: Next node to visit ("planner" or "__end__")

    """
    print("=" * 60)
    print("DATA ANALYZER AGENT STARTING...")
    print("=" * 60)

    data_dir = "data/"
    descriptions = {}

    # Check if data directory exists
    if not os.path.exists(data_dir):
        print(f"Error: {data_dir} directory not found")
        return {
            "data_descriptions": {"error": "Data directory not found"},
            "messages": [AIMessage(content="Error: data/ directory not found")],
            "next": "__end__",
        }

    # Get list of files
    files = [
        f for f in os.listdir(data_dir) if os.path.isfile(os.path.join(data_dir, f))
    ]

    if not files:
        print(f"Error: No files found in {data_dir}")
        return {
            "data_descriptions": {"error": "No data files found"},
            "messages": [AIMessage(content="Error: No files in data/ directory")],
            "next": "__end__",
        }

    print(f"Found {len(files)} files to analyze")

    # Analyze each file
    for filename in files:
        filepath = os.path.join(data_dir, filename)
        file_ext = Path(filepath).suffix.lower()

        print(f"\nAnalyzing: {filename}")

        # Generate analysis script
        analysis_prompt = f"""Generate a Python script to analyze the file: {filepath}



File type: {file_ext}



Requirements:

- Load the file using appropriate method for {file_ext} format

- Print essential information:

  * Data structure and types

  * Column names (for structured data like CSV, Excel)

  * First 3-5 rows/examples

  * Shape/size information

- Handle common formats: CSV, JSON, Excel, TXT, MD

- Use pandas for structured data

- No try-except blocks

- All files are in 'data/' directory

- Print output clearly



Provide ONLY the Python code in a markdown code block."""

        try:
            # Get LLM response
            response = state["llm"].invoke(analysis_prompt)

            # Handle different response formats (Gemini vs OpenAI)
            if hasattr(response, "content") and isinstance(response.content, list):
                # Gemini format
                response_text = gemini_text(response)
            elif hasattr(response, "content"):
                response_text = response.content
            else:
                response_text = str(response)

            code = extract_code(response_text)

            # Execute with debugging
            result = execute_with_debug(code, state["llm"], is_analysis=True)

            descriptions[filename] = result
            print(f"✓ Successfully analyzed {filename}")

        except Exception as e:
            descriptions[filename] = f"Error analyzing file: {str(e)}"
            print(f"✗ Failed to analyze {filename}: {str(e)}")

    print("\n" + "=" * 60)
    print(f"ANALYSIS COMPLETE: {len(files)} files processed")
    print("=" * 60)

    return {
        "data_descriptions": descriptions,
        "messages": [AIMessage(content=f"Analyzed {len(files)} data files")],
        "next": "planner",
    }


# Standalone test function
def test_analyzer(llm, data_dir: str = "data/"):
    """

    Test the analyzer agent independently.



    Args:

        llm: LLM instance

        data_dir: Directory containing data files



    Returns:

        Dictionary with analysis results

    """
    # Create minimal test state
    test_state = {
        "llm": llm,
        "query": "Test query",
        "data_descriptions": {},
        "plan": [],
        "current_code": "",
        "execution_result": "",
        "is_sufficient": False,
        "router_decision": "",
        "iteration": 0,
        "max_iterations": 20,
        "messages": [],
        "next": "analyzer",
    }

    result = analyzer_node(test_state)

    print("\n" + "=" * 60)
    print("ANALYZER TEST RESULTS")
    print("=" * 60)
    for filename, description in result["data_descriptions"].items():
        print(f"\n{filename}:")
        print("-" * 60)
        print(description)

    return result