import gradio as gr import pandas as pd import re from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.ensemble import IsolationForest import matplotlib.pyplot as plt import seaborn as sns # --- PASO 1: LA FUNCIÓN DE ANÁLISIS (CON LECTURA DE ARCHIVO CORREGIDA) --- def analizar_logs_completo(archivo_log): if archivo_log is None: return "Por favor, sube un archivo de log para comenzar el análisis.", None, None, None, None try: # --- Lectura y Preparación de Datos (MÉTODO CORREGIDO) --- # Leemos el archivo de la forma más robusta: línea por línea, ignorando errores de codificación. with open(archivo_log.name, 'r', encoding='utf-8', errors='ignore') as f: lines = f.readlines() # Creamos el DataFrame a partir de la lista de líneas df_logs = pd.DataFrame(lines, columns=['raw_log']) # Limpiamos líneas vacías o con solo espacios en blanco que puedan causar problemas df_logs['raw_log'] = df_logs['raw_log'].str.strip() df_logs.dropna(subset=['raw_log'], inplace=True) df_logs = df_logs[df_logs['raw_log'] != ''] if df_logs.empty: return "El archivo de log está vacío o no se pudo leer correctamente.", None, None, None, None def preprocesar_log(log_message): message = str(log_message) message = re.sub(r'^\[.*?\]\s*(INFO|ERROR|DEBUG|WARN|FATAL|WARNING):\s*', '', message, flags=re.IGNORECASE) message = re.sub(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(,\d{3})?\s*(INFO|ERROR|DEBUG|WARN|FATAL|WARNING):\s*', '', message, flags=re.IGNORECASE) message = re.sub(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', '', message) message = re.sub(r'\b\d+\b', '', message) message = re.sub(r'[a-f0-9-]{8,}', '', message) message = re.sub(r'/(?:[a-zA-Z0-9_.-]+/)*[a-zA-Z0-9_.-]+', '', message) return message.lower().strip() df_logs['log_template'] = df_logs['raw_log'].apply(preprocesar_log) vectorizer = TfidfVectorizer(max_features=5000, stop_words=None) X = vectorizer.fit_transform(df_logs['log_template']) error_keywords = ['error', 'fatal', 'failed', 'exception', 'nullpointer', 'timeout', 'denied'] def detectar_error_por_regla(log_message): return any(keyword in str(log_message).lower() for keyword in error_keywords) df_logs['error_por_regla'] = df_logs['raw_log'].apply(detectar_error_por_regla) model = IsolationForest(n_estimators=100, contamination='auto', random_state=42) model.fit(X.toarray()) df_logs['anomaly_score'] = model.decision_function(X.toarray()) df_logs['anomaly_ml'] = model.predict(X.toarray()) == -1 total_logs = len(df_logs) errores_reglas = df_logs['error_por_regla'].sum() anomalias_ml = df_logs['anomaly_ml'].sum() summary_md = f""" ## 📊 Resumen General del Análisis - **Total de Líneas de Log Analizadas:** {total_logs} - **Errores Detectados por Reglas:** `{errores_reglas}` - **Anomalías Detectadas por Machine Learning:** `{anomalias_ml}` """ top_templates = df_logs['log_template'].value_counts().head(10) templates_md = "## 📋 Plantillas de Log Más Comunes\n\n" templates_md += top_templates.to_markdown() top_anomalies = df_logs[df_logs['anomaly_ml']].sort_values(by='anomaly_score').head(10) anomalies_md = "## ❗ Ejemplos de Anomalías Detectadas (ML)\n*Ordenadas por la más anómala primero (score más bajo)*\n\n" if not top_anomalies.empty: anomalies_md += top_anomalies[['raw_log', 'anomaly_score']].to_markdown(index=False) else: anomalies_md += "No se detectaron anomalías con el modelo de Machine Learning." fig1, ax1 = plt.subplots(figsize=(10, 5)) sns.histplot(df_logs['anomaly_score'], bins=50, kde=True, ax=ax1) ax1.set_title('Distribución de Scores de Anomalía (Isolation Forest)') ax1.set_xlabel('Score (más bajo = más anómalo)') ax1.set_ylabel('Frecuencia') plt.tight_layout() combined_detections = df_logs.groupby(['error_por_regla', 'anomaly_ml']).size().unstack(fill_value=0) fig2, ax2 = plt.subplots(figsize=(8, 6)) if not combined_detections.empty: combined_detections.plot(kind='bar', stacked=True, ax=ax2, colormap='viridis') ax2.set_title('Comparación: Detección por Reglas vs. ML') ax2.set_xlabel('Detectado como Error por Reglas') ax2.set_ylabel('Número de Logs') ax2.tick_params(axis='x', rotation=0) ax2.legend(['ML: Normal', 'ML: Anomalía']) else: ax2.text(0.5, 0.5, 'No hay datos para graficar.', ha='center', va='center') plt.tight_layout() return summary_md, templates_md, anomalies_md, fig1, fig2 except Exception as e: import traceback error_message = f"Ha ocurrido un error durante el análisis:\n\n{traceback.format_exc()}" return error_message, None, None, None, None # --- PASO 2: CONSTRUIR LA INTERFAZ DE GRADIO (SIN CAMBIOS) --- with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# 🔍 Analizador Avanzado de Logs con Machine Learning") gr.Markdown("Sube un archivo de log (.log, .txt) para realizar un análisis completo que incluye detección de errores por reglas y detección de anomalías con un modelo de ML (Isolation Forest).") with gr.Row(): with gr.Column(scale=1): log_file_input = gr.File(label="Sube tu archivo de log", file_types=[".log", ".txt"]) analyze_button = gr.Button("🚀 Analizar Archivo", variant="primary") with gr.Column(scale=3): summary_output = gr.Markdown(label="Resumen General") with gr.Row(): plot_output_1 = gr.Plot(label="Distribución de Scores de Anomalía") plot_output_2 = gr.Plot(label="Comparación de Detecciones") templates_output = gr.Markdown(label="Plantillas Comunes") anomalies_output = gr.Markdown(label="Anomalías Detectadas") gr.Examples( examples=[["sample.log"]], inputs=log_file_input, outputs=[summary_output, templates_output, anomalies_output, plot_output_1, plot_output_2], fn=analizar_logs_completo, cache_examples=True, label="O prueba con un archivo de ejemplo" ) analyze_button.click( fn=analizar_logs_completo, inputs=log_file_input, outputs=[summary_output, templates_output, anomalies_output, plot_output_1, plot_output_2] ) if __name__ == "__main__": demo.launch()