import gradio as gr import pandas as pd import re from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.ensemble import IsolationForest import matplotlib.pyplot as plt import seaborn as sns import io # Para manejar las gráficas en memoria # --- PASO 1: FUSIONAR TODA LA LÓGICA EN UNA ÚNICA FUNCIÓN DE ANÁLISIS --- def analizar_logs_completo(archivo_log): """ Función principal que toma un archivo de log, lo procesa, analiza y devuelve resúmenes de texto y gráficas como salida para Gradio. """ if archivo_log is None: # Evita errores si no se ha subido ningún archivo return "Por favor, sube un archivo de log para comenzar el análisis.", None, None, None, None try: # --- Lectura y Preparación de Datos --- # Leemos el archivo línea por línea en un DataFrame de Pandas df_logs = pd.read_csv(archivo_log.name, header=None, names=['raw_log'], sep='\n', on_bad_lines='skip') df_logs.dropna(inplace=True) # Eliminar líneas vacías if df_logs.empty: return "El archivo de log está vacío o no se pudo leer correctamente.", None, None, None, None # --- Sub-función de Preprocesamiento (desde tu código) --- def preprocesar_log(log_message): message = str(log_message) # Simplificar eliminando timestamp/level (si lo hubiera al inicio, este regex es más general) message = re.sub(r'^\[.*?\]\s*(INFO|ERROR|DEBUG|WARN|FATAL|WARNING):\s*', '', message, flags=re.IGNORECASE) message = re.sub(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(,\d{3})?\s*(INFO|ERROR|DEBUG|WARN|FATAL|WARNING):\s*', '', message, flags=re.IGNORECASE) # Reemplazar IPs, números, IDs, y rutas message = re.sub(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', '', message) message = re.sub(r'\b\d+\b', '', message) message = re.sub(r'[a-f0-9]{8,}', '', message) # Hashes o IDs largos message = re.sub(r'/(?:[a-zA-Z0-9_.-]+/)*[a-zA-Z0-9_.-]+', '', message) return message.lower().strip() df_logs['log_template'] = df_logs['raw_log'].apply(preprocesar_log) # --- Vectorización de las Plantillas para el Modelo ML --- vectorizer = TfidfVectorizer(max_features=5000, stop_words=None) X = vectorizer.fit_transform(df_logs['log_template']) # --- Detección de Errores por Reglas (desde tu código) --- error_keywords = ['error', 'fatal', 'failed', 'exception', 'nullpointer', 'timeout', 'denied'] def detectar_error_por_regla(log_message): return any(keyword in str(log_message).lower() for keyword in error_keywords) df_logs['error_por_regla'] = df_logs['raw_log'].apply(detectar_error_por_regla) # --- Detección de Anomalías con Isolation Forest (desde tu código) --- model = IsolationForest(n_estimators=100, contamination='auto', random_state=42) model.fit(X.toarray()) df_logs['anomaly_score'] = model.decision_function(X.toarray()) df_logs['anomaly_ml'] = model.predict(X.toarray()) == -1 # --- GENERACIÓN DE SALIDAS PARA GRADIO --- # 1. Resumen en texto (Markdown) total_logs = len(df_logs) errores_reglas = df_logs['error_por_regla'].sum() anomalias_ml = df_logs['anomaly_ml'].sum() summary_md = f""" ## 📊 Resumen General del Análisis - **Total de Líneas de Log Analizadas:** {total_logs} - **Errores Detectados por Reglas:** `{errores_reglas}` (basado en palabras clave como 'error', 'failed', etc.) - **Anomalías Detectadas por Machine Learning:** `{anomalias_ml}` (logs con patrones inusuales o poco frecuentes) """ # 2. Tabla con las plantillas más comunes (Markdown) top_templates = df_logs['log_template'].value_counts().head(10) templates_md = "## 📋 Plantillas de Log Más Comunes\n\n" templates_md += top_templates.to_markdown() # 3. Tabla con las anomalías más significativas (Markdown) top_anomalies = df_logs[df_logs['anomaly_ml']].sort_values(by='anomaly_score').head(10) anomalies_md = "## ❗ Ejemplos de Anomalías Detectadas (ML)\n*Ordenadas por la más anómala primero (score más bajo)*\n\n" if not top_anomalies.empty: anomalies_md += top_anomalies[['raw_log', 'anomaly_score']].to_markdown(index=False) else: anomalies_md += "No se detectaron anomalías con el modelo de Machine Learning." # 4. Gráfica 1: Distribución de Scores de Anomalía fig1, ax1 = plt.subplots(figsize=(10, 5)) sns.histplot(df_logs['anomaly_score'], bins=50, kde=True, ax=ax1) ax1.set_title('Distribución de Scores de Anomalía (Isolation Forest)') ax1.set_xlabel('Score (más bajo = más anómalo)') ax1.set_ylabel('Frecuencia') plt.tight_layout() # 5. Gráfica 2: Comparación de Detecciones combined_detections = df_logs.groupby(['error_por_regla', 'anomaly_ml']).size().unstack(fill_value=0) fig2, ax2 = plt.subplots(figsize=(8, 6)) if not combined_detections.empty: combined_detections.plot(kind='bar', stacked=True, ax=ax2, colormap='viridis') ax2.set_title('Comparación: Detección por Reglas vs. ML') ax2.set_xlabel('Detectado como Error por Reglas') ax2.set_ylabel('Número de Logs') ax2.tick_params(axis='x', rotation=0) ax2.legend(['ML: Normal', 'ML: Anomalía']) else: ax2.text(0.5, 0.5, 'No hay datos para graficar.', ha='center', va='center') plt.tight_layout() # Devolvemos todos los elementos en el orden correcto para la UI return summary_md, templates_md, anomalies_md, fig1, fig2 except Exception as e: error_message = f"Ha ocurrido un error durante el análisis: {e}" # Devolvemos el error en el primer componente de texto y vaciamos el resto return error_message, None, None, None, None # --- PASO 2: CONSTRUIR LA INTERFAZ DE GRADIO --- with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# 🔍 Analizador Avanzado de Logs con Machine Learning") gr.Markdown("Sube un archivo de log (.log, .txt) para realizar un análisis completo que incluye detección de errores por reglas y detección de anomalías con un modelo de ML (Isolation Forest).") with gr.Row(): with gr.Column(scale=1): # Columna de entrada log_file_input = gr.File(label="Sube tu archivo de log", file_types=[".log", ".txt"]) analyze_button = gr.Button("🚀 Analizar Archivo", variant="primary") gr.Examples( examples=[["sample.log"]], inputs=log_file_input, fn=analizar_logs_completo, cache_examples=True, label="Prueba con un archivo de ejemplo" ) with gr.Column(scale=3): # Columna de salida con los resultados summary_output = gr.Markdown() with gr.Row(): plot_output_1 = gr.Plot(label="Distribución de Scores de Anomalía") plot_output_2 = gr.Plot(label="Comparación de Detecciones") templates_output = gr.Markdown() anomalies_output = gr.Markdown() # Conectar el botón a la función analyze_button.click( fn=analizar_logs_completo, inputs=log_file_input, outputs=[summary_output, templates_output, anomalies_output, plot_output_1, plot_output_2] ) if __name__ == "__main__": demo.launch()