import random import datetime import argparse # --- Plantillas de Log diseñadas para tu script de análisis --- LOG_TEMPLATES = { # Logs normales, la mayoría del tráfico "normal_info": [ "{{ {timestamp} }} INFO: User '{user}' successfully authenticated from IP {ip}.", "{{ {timestamp} }} INFO: Request GET /api/v_1_0/users/{user_id} completed with status 200.", "{{ {timestamp} }} INFO: Service {service_name} started successfully on port {port}.", "{{ {timestamp} }} INFO: Data batch {batch_id} processed.", ], "normal_debug": [ "{{ {timestamp} }} DEBUG: Cache hit for key 'session:{session_id}'.", "{{ {timestamp} }} DEBUG: Executing query with params: user_id={user_id}.", ], # Errores que el script detectará por el nivel de log "obvious_error": [ "{{ {timestamp} }} ERROR: Could not connect to database 'main_db' at {db_host}:{port}.", "{{ {timestamp} }} FATAL: Core service {service_name} has crashed. Restarting attempt {attempt}/3.", ], # Errores que el script detectará por su contenido, aunque el nivel sea INFO "hidden_error": [ "{{ {timestamp} }} INFO: Login attempt for user '{user}' failed due to invalid credentials.", "{{ {timestamp} }} INFO: User '{user}' was denied access to restricted resource /admin/panel.", "{{ {timestamp} }} WARN: An unauthorized access attempt was detected from IP {ip}.", ], # Anomalías que el modelo de ML debería detectar "length_anomaly": [ # Un log de INFO inusualmente largo "{{ {timestamp} }} INFO: Received payload for transaction {txn_id} with extensive details: {long_payload}" ], "level_anomaly": [ # Un nivel de log muy poco frecuente (CRITICAL) "{{ {timestamp} }} CRITICAL: System memory threshold exceeded. Current usage: {usage}%.", ] } def get_random_element(category): """Obtiene un elemento aleatorio de las plantillas o de datos de ejemplo.""" data = { "user": ["admin", "guest", "j.doe", "api_user", "system"], "ip": [".".join(str(random.randint(1, 255)) for _ in range(4))], "service_name": ["auth-svc", "payment-gateway", "user-profile-svc", "data-processor"], "port": [8080, 5432, 9200, 6379], "db_host": ["prod-db-1.region.aws.com", "10.0.1.55"] } return random.choice(data.get(category, [""])) def generate_log_line(): """ Genera una única línea de log eligiendo un tipo de forma ponderada. """ # Ponderaciones: la mayoría son logs normales, algunos son errores, y muy pocos son anomalías log_type = random.choices( population=list(LOG_TEMPLATES.keys()), weights=[35, 20, 15, 15, 7, 8], # Ajusta estos pesos para cambiar la mezcla k=1 )[0] template = random.choice(LOG_TEMPLATES[log_type]) # Genera el timestamp en el formato requerido: {DD/MM/YYYY HH:MM:SS.ms} now = datetime.datetime.now() timestamp_str = now.strftime('%d/%m/%Y %H:%M:%S') + f".{now.microsecond // 1000:03d}" # Rellena la plantilla con datos aleatorios log_line = template.format( timestamp=timestamp_str, user=get_random_element("user"), ip=get_random_element("ip"), user_id=random.randint(100, 999), service_name=get_random_element("service_name"), port=get_random_element("port"), batch_id=f"B-{random.randint(1000, 9999)}", session_id=f"{random.randbytes(8).hex()}", db_host=get_random_element("db_host"), attempt=random.randint(1, 3), txn_id=f"txn_{random.randbytes(12).hex()}", long_payload='{"user":"test","action":"update","data":"' + 'x' * 350 + '"}', # Payload largo para la anomalía usage=random.randint(95, 99) ) return log_line def create_log_file(filename, num_lines): """Crea un archivo de log con un número específico de líneas.""" print(f"Generando {num_lines} líneas de log en el archivo '{filename}'...") with open(filename, 'w', encoding='utf-8') as f: for i in range(num_lines): log_line = generate_log_line() f.write(log_line + "\n") if (i + 1) % 500 == 0: print(f" ... {i + 1} líneas generadas") print(f"✅ ¡Éxito! Archivo '{filename}' creado con {num_lines} líneas.") print(f"➡️ Ahora puedes analizarlo con tu script: python tu_script_de_analisis.py") if __name__ == "__main__": parser = argparse.ArgumentParser( description="Generador de Archivos de Log para un script de análisis específico.", formatter_class=argparse.RawTextHelpFormatter ) parser.add_argument( "num_lines", type=int, help="El número de líneas de log a generar." ) parser.add_argument( "-o", "--output", type=str, default="generated_log.log", help="El nombre del archivo de log de salida.\n(default: generated_log.log)" ) args = parser.parse_args() create_log_file(args.output, args.num_lines)