Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| import re | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.ensemble import IsolationForest | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| # --- PASO 1: LA FUNCIÓN DE ANÁLISIS (CON LECTURA DE ARCHIVO CORREGIDA) --- | |
| def analizar_logs_completo(archivo_log): | |
| if archivo_log is None: | |
| return "Por favor, sube un archivo de log para comenzar el análisis.", None, None, None, None | |
| try: | |
| # --- Lectura y Preparación de Datos (MÉTODO CORREGIDO) --- | |
| # Leemos el archivo de la forma más robusta: línea por línea, ignorando errores de codificación. | |
| with open(archivo_log.name, 'r', encoding='utf-8', errors='ignore') as f: | |
| lines = f.readlines() | |
| # Creamos el DataFrame a partir de la lista de líneas | |
| df_logs = pd.DataFrame(lines, columns=['raw_log']) | |
| # Limpiamos líneas vacías o con solo espacios en blanco que puedan causar problemas | |
| df_logs['raw_log'] = df_logs['raw_log'].str.strip() | |
| df_logs.dropna(subset=['raw_log'], inplace=True) | |
| df_logs = df_logs[df_logs['raw_log'] != ''] | |
| if df_logs.empty: | |
| return "El archivo de log está vacío o no se pudo leer correctamente.", None, None, None, None | |
| def preprocesar_log(log_message): | |
| message = str(log_message) | |
| message = re.sub(r'^\[.*?\]\s*(INFO|ERROR|DEBUG|WARN|FATAL|WARNING):\s*', '', message, flags=re.IGNORECASE) | |
| message = re.sub(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(,\d{3})?\s*(INFO|ERROR|DEBUG|WARN|FATAL|WARNING):\s*', '', message, flags=re.IGNORECASE) | |
| message = re.sub(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', '<IP>', message) | |
| message = re.sub(r'\b\d+\b', '<NUM>', message) | |
| message = re.sub(r'[a-f0-9-]{8,}', '<HASH_ID>', message) | |
| message = re.sub(r'/(?:[a-zA-Z0-9_.-]+/)*[a-zA-Z0-9_.-]+', '<PATH>', message) | |
| return message.lower().strip() | |
| df_logs['log_template'] = df_logs['raw_log'].apply(preprocesar_log) | |
| vectorizer = TfidfVectorizer(max_features=5000, stop_words=None) | |
| X = vectorizer.fit_transform(df_logs['log_template']) | |
| error_keywords = ['error', 'fatal', 'failed', 'exception', 'nullpointer', 'timeout', 'denied'] | |
| def detectar_error_por_regla(log_message): | |
| return any(keyword in str(log_message).lower() for keyword in error_keywords) | |
| df_logs['error_por_regla'] = df_logs['raw_log'].apply(detectar_error_por_regla) | |
| model = IsolationForest(n_estimators=100, contamination='auto', random_state=42) | |
| model.fit(X.toarray()) | |
| df_logs['anomaly_score'] = model.decision_function(X.toarray()) | |
| df_logs['anomaly_ml'] = model.predict(X.toarray()) == -1 | |
| total_logs = len(df_logs) | |
| errores_reglas = df_logs['error_por_regla'].sum() | |
| anomalias_ml = df_logs['anomaly_ml'].sum() | |
| summary_md = f""" | |
| ## 📊 Resumen General del Análisis | |
| - **Total de Líneas de Log Analizadas:** {total_logs} | |
| - **Errores Detectados por Reglas:** `{errores_reglas}` | |
| - **Anomalías Detectadas por Machine Learning:** `{anomalias_ml}` | |
| """ | |
| top_templates = df_logs['log_template'].value_counts().head(10) | |
| templates_md = "## 📋 Plantillas de Log Más Comunes\n\n" | |
| templates_md += top_templates.to_markdown() | |
| top_anomalies = df_logs[df_logs['anomaly_ml']].sort_values(by='anomaly_score').head(10) | |
| anomalies_md = "## ❗ Ejemplos de Anomalías Detectadas (ML)\n*Ordenadas por la más anómala primero (score más bajo)*\n\n" | |
| if not top_anomalies.empty: | |
| anomalies_md += top_anomalies[['raw_log', 'anomaly_score']].to_markdown(index=False) | |
| else: | |
| anomalies_md += "No se detectaron anomalías con el modelo de Machine Learning." | |
| fig1, ax1 = plt.subplots(figsize=(10, 5)) | |
| sns.histplot(df_logs['anomaly_score'], bins=50, kde=True, ax=ax1) | |
| ax1.set_title('Distribución de Scores de Anomalía (Isolation Forest)') | |
| ax1.set_xlabel('Score (más bajo = más anómalo)') | |
| ax1.set_ylabel('Frecuencia') | |
| plt.tight_layout() | |
| combined_detections = df_logs.groupby(['error_por_regla', 'anomaly_ml']).size().unstack(fill_value=0) | |
| fig2, ax2 = plt.subplots(figsize=(8, 6)) | |
| if not combined_detections.empty: | |
| combined_detections.plot(kind='bar', stacked=True, ax=ax2, colormap='viridis') | |
| ax2.set_title('Comparación: Detección por Reglas vs. ML') | |
| ax2.set_xlabel('Detectado como Error por Reglas') | |
| ax2.set_ylabel('Número de Logs') | |
| ax2.tick_params(axis='x', rotation=0) | |
| ax2.legend(['ML: Normal', 'ML: Anomalía']) | |
| else: | |
| ax2.text(0.5, 0.5, 'No hay datos para graficar.', ha='center', va='center') | |
| plt.tight_layout() | |
| return summary_md, templates_md, anomalies_md, fig1, fig2 | |
| except Exception as e: | |
| import traceback | |
| error_message = f"Ha ocurrido un error durante el análisis:\n\n{traceback.format_exc()}" | |
| return error_message, None, None, None, None | |
| # --- PASO 2: CONSTRUIR LA INTERFAZ DE GRADIO (SIN CAMBIOS) --- | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# 🔍 Analizador Avanzado de Logs con Machine Learning") | |
| gr.Markdown("Sube un archivo de log (.log, .txt) para realizar un análisis completo que incluye detección de errores por reglas y detección de anomalías con un modelo de ML (Isolation Forest).") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| log_file_input = gr.File(label="Sube tu archivo de log", file_types=[".log", ".txt"]) | |
| analyze_button = gr.Button("🚀 Analizar Archivo", variant="primary") | |
| with gr.Column(scale=3): | |
| summary_output = gr.Markdown(label="Resumen General") | |
| with gr.Row(): | |
| plot_output_1 = gr.Plot(label="Distribución de Scores de Anomalía") | |
| plot_output_2 = gr.Plot(label="Comparación de Detecciones") | |
| templates_output = gr.Markdown(label="Plantillas Comunes") | |
| anomalies_output = gr.Markdown(label="Anomalías Detectadas") | |
| gr.Examples( | |
| examples=[["sample.log"]], | |
| inputs=log_file_input, | |
| outputs=[summary_output, templates_output, anomalies_output, plot_output_1, plot_output_2], | |
| fn=analizar_logs_completo, | |
| cache_examples=True, | |
| label="O prueba con un archivo de ejemplo" | |
| ) | |
| analyze_button.click( | |
| fn=analizar_logs_completo, | |
| inputs=log_file_input, | |
| outputs=[summary_output, templates_output, anomalies_output, plot_output_1, plot_output_2] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |