Spaces:
Sleeping
Sleeping
File size: 6,842 Bytes
b7f41b7 cf8a984 7e1d35a cf8a984 b7f41b7 7e1d35a cf8a984 705d3ec cf8a984 705d3ec b7f41b7 cf8a984 b7f41b7 7e1d35a cf8a984 7e1d35a cf8a984 705d3ec cf8a984 705d3ec cf8a984 705d3ec cf8a984 b7f41b7 cf8a984 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
import gradio as gr
import pandas as pd
import re
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import IsolationForest
import matplotlib.pyplot as plt
import seaborn as sns
# --- PASO 1: LA FUNCIÓN DE ANÁLISIS (CON LECTURA DE ARCHIVO CORREGIDA) ---
def analizar_logs_completo(archivo_log):
if archivo_log is None:
return "Por favor, sube un archivo de log para comenzar el análisis.", None, None, None, None
try:
# --- Lectura y Preparación de Datos (MÉTODO CORREGIDO) ---
# Leemos el archivo de la forma más robusta: línea por línea, ignorando errores de codificación.
with open(archivo_log.name, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
# Creamos el DataFrame a partir de la lista de líneas
df_logs = pd.DataFrame(lines, columns=['raw_log'])
# Limpiamos líneas vacías o con solo espacios en blanco que puedan causar problemas
df_logs['raw_log'] = df_logs['raw_log'].str.strip()
df_logs.dropna(subset=['raw_log'], inplace=True)
df_logs = df_logs[df_logs['raw_log'] != '']
if df_logs.empty:
return "El archivo de log está vacío o no se pudo leer correctamente.", None, None, None, None
def preprocesar_log(log_message):
message = str(log_message)
message = re.sub(r'^\[.*?\]\s*(INFO|ERROR|DEBUG|WARN|FATAL|WARNING):\s*', '', message, flags=re.IGNORECASE)
message = re.sub(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(,\d{3})?\s*(INFO|ERROR|DEBUG|WARN|FATAL|WARNING):\s*', '', message, flags=re.IGNORECASE)
message = re.sub(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', '<IP>', message)
message = re.sub(r'\b\d+\b', '<NUM>', message)
message = re.sub(r'[a-f0-9-]{8,}', '<HASH_ID>', message)
message = re.sub(r'/(?:[a-zA-Z0-9_.-]+/)*[a-zA-Z0-9_.-]+', '<PATH>', message)
return message.lower().strip()
df_logs['log_template'] = df_logs['raw_log'].apply(preprocesar_log)
vectorizer = TfidfVectorizer(max_features=5000, stop_words=None)
X = vectorizer.fit_transform(df_logs['log_template'])
error_keywords = ['error', 'fatal', 'failed', 'exception', 'nullpointer', 'timeout', 'denied']
def detectar_error_por_regla(log_message):
return any(keyword in str(log_message).lower() for keyword in error_keywords)
df_logs['error_por_regla'] = df_logs['raw_log'].apply(detectar_error_por_regla)
model = IsolationForest(n_estimators=100, contamination='auto', random_state=42)
model.fit(X.toarray())
df_logs['anomaly_score'] = model.decision_function(X.toarray())
df_logs['anomaly_ml'] = model.predict(X.toarray()) == -1
total_logs = len(df_logs)
errores_reglas = df_logs['error_por_regla'].sum()
anomalias_ml = df_logs['anomaly_ml'].sum()
summary_md = f"""
## 📊 Resumen General del Análisis
- **Total de Líneas de Log Analizadas:** {total_logs}
- **Errores Detectados por Reglas:** `{errores_reglas}`
- **Anomalías Detectadas por Machine Learning:** `{anomalias_ml}`
"""
top_templates = df_logs['log_template'].value_counts().head(10)
templates_md = "## 📋 Plantillas de Log Más Comunes\n\n"
templates_md += top_templates.to_markdown()
top_anomalies = df_logs[df_logs['anomaly_ml']].sort_values(by='anomaly_score').head(10)
anomalies_md = "## ❗ Ejemplos de Anomalías Detectadas (ML)\n*Ordenadas por la más anómala primero (score más bajo)*\n\n"
if not top_anomalies.empty:
anomalies_md += top_anomalies[['raw_log', 'anomaly_score']].to_markdown(index=False)
else:
anomalies_md += "No se detectaron anomalías con el modelo de Machine Learning."
fig1, ax1 = plt.subplots(figsize=(10, 5))
sns.histplot(df_logs['anomaly_score'], bins=50, kde=True, ax=ax1)
ax1.set_title('Distribución de Scores de Anomalía (Isolation Forest)')
ax1.set_xlabel('Score (más bajo = más anómalo)')
ax1.set_ylabel('Frecuencia')
plt.tight_layout()
combined_detections = df_logs.groupby(['error_por_regla', 'anomaly_ml']).size().unstack(fill_value=0)
fig2, ax2 = plt.subplots(figsize=(8, 6))
if not combined_detections.empty:
combined_detections.plot(kind='bar', stacked=True, ax=ax2, colormap='viridis')
ax2.set_title('Comparación: Detección por Reglas vs. ML')
ax2.set_xlabel('Detectado como Error por Reglas')
ax2.set_ylabel('Número de Logs')
ax2.tick_params(axis='x', rotation=0)
ax2.legend(['ML: Normal', 'ML: Anomalía'])
else:
ax2.text(0.5, 0.5, 'No hay datos para graficar.', ha='center', va='center')
plt.tight_layout()
return summary_md, templates_md, anomalies_md, fig1, fig2
except Exception as e:
import traceback
error_message = f"Ha ocurrido un error durante el análisis:\n\n{traceback.format_exc()}"
return error_message, None, None, None, None
# --- PASO 2: CONSTRUIR LA INTERFAZ DE GRADIO (SIN CAMBIOS) ---
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# 🔍 Analizador Avanzado de Logs con Machine Learning")
gr.Markdown("Sube un archivo de log (.log, .txt) para realizar un análisis completo que incluye detección de errores por reglas y detección de anomalías con un modelo de ML (Isolation Forest).")
with gr.Row():
with gr.Column(scale=1):
log_file_input = gr.File(label="Sube tu archivo de log", file_types=[".log", ".txt"])
analyze_button = gr.Button("🚀 Analizar Archivo", variant="primary")
with gr.Column(scale=3):
summary_output = gr.Markdown(label="Resumen General")
with gr.Row():
plot_output_1 = gr.Plot(label="Distribución de Scores de Anomalía")
plot_output_2 = gr.Plot(label="Comparación de Detecciones")
templates_output = gr.Markdown(label="Plantillas Comunes")
anomalies_output = gr.Markdown(label="Anomalías Detectadas")
gr.Examples(
examples=[["sample.log"]],
inputs=log_file_input,
outputs=[summary_output, templates_output, anomalies_output, plot_output_1, plot_output_2],
fn=analizar_logs_completo,
cache_examples=True,
label="O prueba con un archivo de ejemplo"
)
analyze_button.click(
fn=analizar_logs_completo,
inputs=log_file_input,
outputs=[summary_output, templates_output, anomalies_output, plot_output_1, plot_output_2]
)
if __name__ == "__main__":
demo.launch() |