Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import regex as re | |
| import csv | |
| import pandas as pd | |
| from typing import List, Dict, Tuple, Any | |
| import logging | |
| import os | |
| # Import core logic from other modules, as in app_old.py | |
| from analyzer import combine_repo_files_for_llm, analyze_combined_file, parse_llm_json_response | |
| from hf_utils import download_space_repo, search_top_spaces | |
| from chatbot_page import chat_with_user, extract_keywords_from_conversation | |
| # --- Configuration --- | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| CSV_FILE = "repo_ids.csv" | |
| CHATBOT_SYSTEM_PROMPT = ( | |
| "You are a helpful assistant. Your goal is to help the user describe their ideal open-source repo. " | |
| "Ask questions to clarify what they want, their use case, preferred language, features, etc. " | |
| "When the user clicks 'End Chat', analyze the conversation and return about 5 keywords for repo search. " | |
| "Return only the keywords as a comma-separated list." | |
| ) | |
| CHATBOT_INITIAL_MESSAGE = "Hello! Please tell me about your ideal Hugging Face repo. What use case, preferred language, or features are you looking for?" | |
| # --- Helper Functions (Logic) --- | |
| def write_repos_to_csv(repo_ids: List[str]) -> None: | |
| """Writes a list of repo IDs to the CSV file, overwriting the previous content.""" | |
| try: | |
| with open(CSV_FILE, mode="w", newline='', encoding="utf-8") as csvfile: | |
| writer = csv.writer(csvfile) | |
| writer.writerow(["repo id", "strength", "weaknesses", "speciality", "relevance rating"]) | |
| for repo_id in repo_ids: | |
| writer.writerow([repo_id, "", "", "", ""]) | |
| logger.info(f"Wrote {len(repo_ids)} repo IDs to {CSV_FILE}") | |
| except Exception as e: | |
| logger.error(f"Error writing to CSV: {e}") | |
| def read_csv_to_dataframe() -> pd.DataFrame: | |
| """Reads the CSV file into a pandas DataFrame.""" | |
| try: | |
| return pd.read_csv(CSV_FILE, dtype=str).fillna('') | |
| except FileNotFoundError: | |
| return pd.DataFrame(columns=["repo id", "strength", "weaknesses", "speciality", "relevance rating"]) | |
| except Exception as e: | |
| logger.error(f"Error reading CSV: {e}") | |
| return pd.DataFrame() | |
| def analyze_and_update_single_repo(repo_id: str) -> Tuple[str, str, pd.DataFrame]: | |
| """ | |
| Downloads, analyzes a single repo, updates the CSV, and returns results. | |
| This function combines the logic of downloading, analyzing, and updating the CSV for one repo. | |
| """ | |
| try: | |
| logger.info(f"Starting analysis for repo: {repo_id}") | |
| download_space_repo(repo_id, local_dir="repo_files") | |
| txt_path = combine_repo_files_for_llm() | |
| with open(txt_path, "r", encoding="utf-8") as f: | |
| combined_content = f.read() | |
| llm_output = analyze_combined_file(txt_path) | |
| last_start = llm_output.rfind('{') | |
| last_end = llm_output.rfind('}') | |
| final_json_str = llm_output[last_start:last_end+1] if last_start != -1 and last_end != -1 else "{}" | |
| llm_json = parse_llm_json_response(final_json_str) | |
| summary = "" | |
| if isinstance(llm_json, dict) and "error" not in llm_json: | |
| strengths = llm_json.get("strength", "N/A") | |
| weaknesses = llm_json.get("weaknesses", "N/A") | |
| summary = f"JSON extraction: SUCCESS\n\nStrengths:\n{strengths}\n\nWeaknesses:\n{weaknesses}" | |
| else: | |
| summary = f"JSON extraction: FAILED\nRaw response might not be valid JSON." | |
| # Update CSV | |
| df = read_csv_to_dataframe() | |
| repo_found_in_df = False | |
| for idx, row in df.iterrows(): | |
| if row["repo id"] == repo_id: | |
| if isinstance(llm_json, dict): | |
| df.at[idx, "strength"] = llm_json.get("strength", "") | |
| df.at[idx, "weaknesses"] = llm_json.get("weaknesses", "") | |
| df.at[idx, "speciality"] = llm_json.get("speciality", "") | |
| df.at[idx, "relevance rating"] = llm_json.get("relevance rating", "") | |
| repo_found_in_df = True | |
| break | |
| if not repo_found_in_df: | |
| logger.warning(f"Repo ID {repo_id} not found in CSV for updating.") | |
| df.to_csv(CSV_FILE, index=False) | |
| logger.info(f"Successfully analyzed and updated CSV for {repo_id}") | |
| return combined_content, summary, df | |
| except Exception as e: | |
| logger.error(f"An error occurred during analysis of {repo_id}: {e}") | |
| error_summary = f"Error analyzing repo: {e}" | |
| return "", error_summary, read_csv_to_dataframe() | |
| # --- NEW: Helper for Chat History Conversion --- | |
| def convert_messages_to_tuples(history: List[Dict[str, str]]) -> List[Tuple[str, str]]: | |
| """Converts Gradio's 'messages' format to the old 'tuple' format for compatibility.""" | |
| tuple_history = [] | |
| # Assumes a strict user-assistant-user-assistant turn structure. | |
| for i in range(0, len(history), 2): | |
| if i + 1 < len(history) and history[i]['role'] == 'user' and history[i+1]['role'] == 'assistant': | |
| tuple_history.append((history[i]['content'], history[i+1]['content'])) | |
| return tuple_history | |
| # --- Gradio UI --- | |
| def create_ui() -> gr.Blocks: | |
| """Creates and configures the entire Gradio interface.""" | |
| with gr.Blocks(theme=gr.themes.Soft(), title="Hugging Face Repo Analyzer") as app: | |
| # --- State Management --- | |
| # Using simple, separate state objects for robustness. | |
| repo_ids_state = gr.State([]) | |
| current_repo_idx_state = gr.State(0) | |
| gr.Markdown("# Hugging Face Repository Analyzer") | |
| with gr.Tabs() as tabs: | |
| # --- Input Tab --- | |
| with gr.TabItem("1. Input Repositories", id="input_tab"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("## Enter Repository IDs") | |
| repo_id_input = gr.Textbox( | |
| label="Enter repo IDs (comma or newline separated)", | |
| lines=8, | |
| placeholder="org/repo1, org/repo2" | |
| ) | |
| submit_repo_btn = gr.Button("Submit Repository IDs", variant="primary") | |
| with gr.Column(): | |
| gr.Markdown("## Or Search by Keywords") | |
| keyword_input = gr.Textbox( | |
| label="Enter keywords to search", | |
| lines=8, | |
| placeholder="e.g., text generation, image classification" | |
| ) | |
| search_btn = gr.Button("Search by Keywords", variant="primary") | |
| status_box_input = gr.Textbox(label="Status", interactive=False) | |
| # --- Analysis Tab --- | |
| with gr.TabItem("2. Analyze Repositories", id="analysis_tab"): | |
| gr.Markdown("## Repository Analysis") | |
| analyze_next_btn = gr.Button("Analyze Next Repository", variant="primary") | |
| status_box_analysis = gr.Textbox(label="Status", interactive=False) | |
| with gr.Row(): | |
| content_output = gr.Textbox(label="Repository Content", lines=20) | |
| summary_output = gr.Textbox(label="Analysis Summary", lines=20) | |
| gr.Markdown("### Analysis Results Table") | |
| df_output = gr.Dataframe(headers=["repo id", "strength", "weaknesses", "speciality", "relevance rating"]) | |
| # --- Chatbot Tab --- | |
| with gr.TabItem("3. Find Repos with AI", id="chatbot_tab"): | |
| gr.Markdown("## Chat with an Assistant to Find Repositories") | |
| chatbot = gr.Chatbot( | |
| value=[{"role": "assistant", "content": CHATBOT_INITIAL_MESSAGE}], | |
| label="Chat with Assistant", | |
| height=400, | |
| type="messages" | |
| ) | |
| msg_input = gr.Textbox(label="Your Message", placeholder="Type your message here...", lines=2) | |
| with gr.Row(): | |
| send_btn = gr.Button("Send", variant="primary") | |
| end_chat_btn = gr.Button("End Chat & Get Keywords") | |
| gr.Markdown("### Extracted Keywords") | |
| extracted_keywords_output = gr.Textbox(label="Keywords", interactive=False) | |
| use_keywords_btn = gr.Button("Use These Keywords to Search", variant="primary") | |
| status_box_chatbot = gr.Textbox(label="Status", interactive=False) | |
| # --- Event Handler Functions --- | |
| def handle_repo_id_submission(text: str) -> Tuple[List[str], int, pd.DataFrame, str, Any]: | |
| """Processes submitted repo IDs, updates state, and prepares for analysis.""" | |
| if not text: | |
| return [], 0, pd.DataFrame(), "Status: Please enter repository IDs.", gr.update(selected="input_tab") | |
| repo_ids = list(dict.fromkeys([repo.strip() for repo in re.split(r'[\n,]+', text) if repo.strip()])) | |
| write_repos_to_csv(repo_ids) | |
| df = read_csv_to_dataframe() | |
| status = f"Status: {len(repo_ids)} repositories submitted. Ready for analysis." | |
| return repo_ids, 0, df, status, gr.update(selected="analysis_tab") | |
| def handle_keyword_search(keywords: str) -> Tuple[List[str], int, pd.DataFrame, str, Any]: | |
| """Processes submitted keywords, finds repos, updates state, and prepares for analysis.""" | |
| if not keywords: | |
| return [], 0, pd.DataFrame(), "Status: Please enter keywords.", gr.update(selected="input_tab") | |
| keyword_list = [k.strip() for k in re.split(r'[\n,]+', keywords) if k.strip()] | |
| repo_ids = [] | |
| for kw in keyword_list: | |
| repo_ids.extend(search_top_spaces(kw, limit=5)) | |
| unique_repo_ids = list(dict.fromkeys(repo_ids)) | |
| write_repos_to_csv(unique_repo_ids) | |
| df = read_csv_to_dataframe() | |
| status = f"Status: Found {len(unique_repo_ids)} repositories. Ready for analysis." | |
| return unique_repo_ids, 0, df, status, gr.update(selected="analysis_tab") | |
| def handle_analyze_next(repo_ids: List[str], current_idx: int) -> Tuple[str, str, pd.DataFrame, int, str]: | |
| """Analyzes the next repository in the list.""" | |
| if not repo_ids: | |
| return "", "", pd.DataFrame(), 0, "Status: No repositories to analyze. Please submit repo IDs first." | |
| if current_idx >= len(repo_ids): | |
| return "", "", read_csv_to_dataframe(), current_idx, "Status: All repositories have been analyzed." | |
| repo_id_to_analyze = repo_ids[current_idx] | |
| status = f"Status: Analyzing repository {current_idx + 1}/{len(repo_ids)}: {repo_id_to_analyze}" | |
| content, summary, df = analyze_and_update_single_repo(repo_id_to_analyze) | |
| next_idx = current_idx + 1 | |
| if next_idx >= len(repo_ids): | |
| status += "\n\nFinished all analyses." | |
| return content, summary, df, next_idx, status | |
| def handle_user_message(user_message: str, history: List[Dict[str, str]]) -> Tuple[List[Dict[str, str]], str]: | |
| """Appends the user's message to the history, preparing for the bot's response.""" | |
| if user_message: | |
| history.append({"role": "user", "content": user_message}) | |
| return history, "" | |
| def handle_bot_response(history: List[Dict[str, str]]) -> List[Dict[str, str]]: | |
| """Generates and appends the bot's response using the compatible history format.""" | |
| if not history or history[-1]["role"] != "user": | |
| return history | |
| user_message = history[-1]["content"] | |
| # Convert all messages *before* the last user message into tuples for the API | |
| tuple_history_for_api = convert_messages_to_tuples(history[:-1]) | |
| response = chat_with_user(user_message, tuple_history_for_api) | |
| history.append({"role": "assistant", "content": response}) | |
| return history | |
| def handle_end_chat(history: List[Dict[str, str]]) -> Tuple[str, str]: | |
| """Ends the chat and extracts keywords from the conversation.""" | |
| if not history: | |
| return "", "Status: Chat is empty, nothing to analyze." | |
| # Convert the full, valid history for the extraction logic | |
| tuple_history = convert_messages_to_tuples(history) | |
| if not tuple_history: | |
| return "", "Status: No completed conversations to analyze." | |
| keywords_str = extract_keywords_from_conversation(tuple_history) | |
| status = "Status: Keywords extracted. You can now use them to search." | |
| return keywords_str, status | |
| # --- Component Event Wiring --- | |
| # Input Tab | |
| submit_repo_btn.click( | |
| fn=handle_repo_id_submission, | |
| inputs=[repo_id_input], | |
| outputs=[repo_ids_state, current_repo_idx_state, df_output, status_box_analysis, tabs] | |
| ) | |
| search_btn.click( | |
| fn=handle_keyword_search, | |
| inputs=[keyword_input], | |
| outputs=[repo_ids_state, current_repo_idx_state, df_output, status_box_analysis, tabs] | |
| ) | |
| # Analysis Tab | |
| analyze_next_btn.click( | |
| fn=handle_analyze_next, | |
| inputs=[repo_ids_state, current_repo_idx_state], | |
| outputs=[content_output, summary_output, df_output, current_repo_idx_state, status_box_analysis] | |
| ) | |
| # Chatbot Tab | |
| msg_input.submit( | |
| fn=handle_user_message, | |
| inputs=[msg_input, chatbot], | |
| outputs=[chatbot, msg_input] | |
| ).then( | |
| fn=handle_bot_response, | |
| inputs=[chatbot], | |
| outputs=[chatbot] | |
| ) | |
| send_btn.click( | |
| fn=handle_user_message, | |
| inputs=[msg_input, chatbot], | |
| outputs=[chatbot, msg_input] | |
| ).then( | |
| fn=handle_bot_response, | |
| inputs=[chatbot], | |
| outputs=[chatbot] | |
| ) | |
| end_chat_btn.click( | |
| fn=handle_end_chat, | |
| inputs=[chatbot], | |
| outputs=[extracted_keywords_output, status_box_chatbot] | |
| ) | |
| use_keywords_btn.click( | |
| fn=handle_keyword_search, | |
| inputs=[extracted_keywords_output], | |
| outputs=[repo_ids_state, current_repo_idx_state, df_output, status_box_analysis, tabs] | |
| ) | |
| return app | |
| if __name__ == "__main__": | |
| app = create_ui() | |
| app.launch(debug=True) | |