Spaces:
Running
Running
| """ | |
| Analyzer Agent: Analyzes data files and generates descriptions. | |
| This agent runs once at the beginning to understand available data. | |
| """ | |
| import os | |
| from pathlib import Path | |
| from langchain_core.messages import AIMessage | |
| from ..utils.code_execution import execute_with_debug | |
| from ..utils.formatters import extract_code, gemini_text | |
| from ..utils.state import DSStarState | |
| def analyzer_node(state: DSStarState) -> dict: | |
| """ | |
| Analyzer Agent Node: Analyzes all data files in the data/ directory. | |
| For each file, generates and executes Python code to: | |
| - Load the file | |
| - Print structure, types, and sample data | |
| - Capture essential information | |
| Args: | |
| state: Current DSStarState | |
| Returns: | |
| Dictionary with updated state fields: | |
| - data_descriptions: Dict mapping filename to analysis result | |
| - messages: Agent communication messages | |
| - next: Next node to visit ("planner" or "__end__") | |
| """ | |
| print("=" * 60) | |
| print("DATA ANALYZER AGENT STARTING...") | |
| print("=" * 60) | |
| data_dir = "data/" | |
| descriptions = {} | |
| # Check if data directory exists | |
| if not os.path.exists(data_dir): | |
| print(f"Error: {data_dir} directory not found") | |
| return { | |
| "data_descriptions": {"error": "Data directory not found"}, | |
| "messages": [AIMessage(content="Error: data/ directory not found")], | |
| "next": "__end__", | |
| } | |
| # Get list of files | |
| files = [ | |
| f for f in os.listdir(data_dir) if os.path.isfile(os.path.join(data_dir, f)) | |
| ] | |
| if not files: | |
| print(f"Error: No files found in {data_dir}") | |
| return { | |
| "data_descriptions": {"error": "No data files found"}, | |
| "messages": [AIMessage(content="Error: No files in data/ directory")], | |
| "next": "__end__", | |
| } | |
| print(f"Found {len(files)} files to analyze") | |
| # Analyze each file | |
| for filename in files: | |
| filepath = os.path.join(data_dir, filename) | |
| file_ext = Path(filepath).suffix.lower() | |
| print(f"\nAnalyzing: {filename}") | |
| # Generate analysis script | |
| analysis_prompt = f"""Generate a Python script to analyze the file: {filepath} | |
| File type: {file_ext} | |
| Requirements: | |
| - Load the file using appropriate method for {file_ext} format | |
| - Print essential information: | |
| * Data structure and types | |
| * Column names (for structured data like CSV, Excel) | |
| * First 3-5 rows/examples | |
| * Shape/size information | |
| - Handle common formats: CSV, JSON, Excel, TXT, MD | |
| - Use pandas for structured data | |
| - No try-except blocks | |
| - All files are in 'data/' directory | |
| - Print output clearly | |
| Provide ONLY the Python code in a markdown code block.""" | |
| try: | |
| # Get LLM response | |
| response = state["llm"].invoke(analysis_prompt) | |
| # Handle different response formats (Gemini vs OpenAI) | |
| if hasattr(response, "content") and isinstance(response.content, list): | |
| # Gemini format | |
| response_text = gemini_text(response) | |
| elif hasattr(response, "content"): | |
| response_text = response.content | |
| else: | |
| response_text = str(response) | |
| code = extract_code(response_text) | |
| # Execute with debugging | |
| result = execute_with_debug(code, state["llm"], is_analysis=True) | |
| descriptions[filename] = result | |
| print(f"✓ Successfully analyzed {filename}") | |
| except Exception as e: | |
| descriptions[filename] = f"Error analyzing file: {str(e)}" | |
| print(f"✗ Failed to analyze {filename}: {str(e)}") | |
| print("\n" + "=" * 60) | |
| print(f"ANALYSIS COMPLETE: {len(files)} files processed") | |
| print("=" * 60) | |
| return { | |
| "data_descriptions": descriptions, | |
| "messages": [AIMessage(content=f"Analyzed {len(files)} data files")], | |
| "next": "planner", | |
| } | |
| # Standalone test function | |
| def test_analyzer(llm, data_dir: str = "data/"): | |
| """ | |
| Test the analyzer agent independently. | |
| Args: | |
| llm: LLM instance | |
| data_dir: Directory containing data files | |
| Returns: | |
| Dictionary with analysis results | |
| """ | |
| # Create minimal test state | |
| test_state = { | |
| "llm": llm, | |
| "query": "Test query", | |
| "data_descriptions": {}, | |
| "plan": [], | |
| "current_code": "", | |
| "execution_result": "", | |
| "is_sufficient": False, | |
| "router_decision": "", | |
| "iteration": 0, | |
| "max_iterations": 20, | |
| "messages": [], | |
| "next": "analyzer", | |
| } | |
| result = analyzer_node(test_state) | |
| print("\n" + "=" * 60) | |
| print("ANALYZER TEST RESULTS") | |
| print("=" * 60) | |
| for filename, description in result["data_descriptions"].items(): | |
| print(f"\n{filename}:") | |
| print("-" * 60) | |
| print(description) | |
| return result | |