#!/usr/bin/env python3 """ SUPRA Enhanced Model Loader Optimized model loading with CPU/MPS/CUDA support and Streamlit caching """ import torch import os import logging from pathlib import Path from typing import Tuple, Optional from transformers import AutoTokenizer, AutoModelForCausalLM import streamlit as st # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Conditional PEFT import for local M2 Max compatibility try: from peft import PeftModel PEFT_AVAILABLE = True except ImportError: PEFT_AVAILABLE = False # Define a dummy PeftModel type for type hints PeftModel = AutoModelForCausalLM logger.warning("โš ๏ธ PEFT not available. LoRA adapter loading will be disabled.") def setup_m2_max_optimizations(): """Configure optimizations for CPU/MPS/CUDA.""" logger.info("๐Ÿ”ง Setting up device optimizations for model loading...") # Environment variables os.environ["TOKENIZERS_PARALLELISM"] = "false" # Set up Hugging Face token from HUGGINGFACE_TOKEN if os.environ.get("HUGGINGFACE_TOKEN") and not os.environ.get("HF_TOKEN"): os.environ["HF_TOKEN"] = os.environ["HUGGINGFACE_TOKEN"] logger.info("๐Ÿ”‘ Using HUGGINGFACE_TOKEN for Hugging Face authentication") # Detect device: MPS > CUDA > CPU if torch.backends.mps.is_available(): logger.info("โœ… MPS (Metal Performance Shaders) available - using MPS") device = "mps" os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1" os.environ["DISABLE_BITSANDBYTES"] = "1" # Disable for MPS torch.backends.mps.is_built() elif torch.cuda.is_available(): logger.info("โœ… CUDA available - using GPU") device = "cuda" os.environ.pop("DISABLE_BITSANDBYTES", None) # Enable bitsandbytes for CUDA else: logger.info("๐Ÿ’ป CPU detected - enabling CPU optimizations") device = "cpu" os.environ.pop("DISABLE_BITSANDBYTES", None) # Enable bitsandbytes for CPU os.environ.pop("PYTORCH_ENABLE_MPS_FALLBACK", None) logger.info(f"๐Ÿ”ง Using device: {device}") return device @st.cache_resource def load_enhanced_model_m2max() -> Tuple[AutoModelForCausalLM, AutoTokenizer]: """Load the enhanced SUPRA model with device-specific optimizations (CPU/MPS/CUDA) with caching.""" logger.info("๐Ÿ“ฅ Loading enhanced SUPRA model with device optimizations...") # Setup device optimizations device = setup_m2_max_optimizations() logger.info(f"๐Ÿ”ง Detected device: {device}") # Model paths - try local lora/ folder first (for deployment), then outputs directory # Priority: Local lora/ > Latest prod > Small > Tiny > Old checkpoints project_root = Path(__file__).parent.parent.parent deploy_root = project_root / "deploy" # deploy/ folder at project root # Try local lora/ folder first (for HF Spaces deployment) local_lora = deploy_root / "lora" if local_lora.exists() and (local_lora / "adapter_model.safetensors").exists(): model_path = local_lora logger.info(f"๐Ÿ“ Using local LoRA model: {model_path}") use_local = True else: # Try outputs directory (for local development) tiny_models = sorted(project_root.glob("outputs/iter_*_tiny_*/lora"), key=lambda p: p.stat().st_mtime if p.exists() else 0, reverse=True) small_models = sorted(project_root.glob("outputs/iter_*_small_*/lora"), key=lambda p: p.stat().st_mtime if p.exists() else 0, reverse=True) prod_models = sorted(project_root.glob("outputs/iter_*_prod_*/lora"), key=lambda p: p.stat().st_mtime if p.exists() else 0, reverse=True) # Try to find latest model model_path = None use_local = False # Priority: prod > small > tiny > old checkpoints (prefer more trained models) if prod_models and prod_models[0].exists() and (prod_models[0] / "adapter_model.safetensors").exists(): model_path = prod_models[0] logger.info(f"๐Ÿ“ Using latest prod model: {model_path}") use_local = True elif small_models and small_models[0].exists() and (small_models[0] / "adapter_model.safetensors").exists(): model_path = small_models[0] logger.info(f"๐Ÿ“ Using latest small model: {model_path}") use_local = True elif tiny_models and tiny_models[0].exists() and (tiny_models[0] / "adapter_model.safetensors").exists(): model_path = tiny_models[0] logger.info(f"๐Ÿ“ Using latest tiny model: {model_path}") use_local = True base_model_name = None # Will be determined from adapter config # Read base model from adapter config if LoRA model found if use_local and model_path and (model_path / "adapter_config.json").exists(): try: import json with open(model_path / "adapter_config.json", "r") as f: adapter_config = json.load(f) base_model_name = adapter_config.get("base_model_name_or_path") logger.info(f"๐Ÿ“– Base model from adapter config: {base_model_name}") # Select model version based on device: non-quantized for MPS, quantized for CPU/CUDA is_mps = torch.backends.mps.is_available() is_cpu = not is_mps and not torch.cuda.is_available() if base_model_name and "llama" in base_model_name.lower(): if is_mps: # MPS: Use non-quantized model (no bitsandbytes needed) base_model_name = "meta-llama/Meta-Llama-3.1-8B-Instruct" else: # CPU/CUDA: Use quantized Unsloth version base_model_name = "unsloth/Meta-Llama-3.1-8B-Instruct-bnb-4bit" elif base_model_name and "mistral" in base_model_name.lower(): if is_mps: # MPS: Use non-quantized model base_model_name = "mistralai/Mistral-7B-Instruct-v0.3" else: # CPU/CUDA: Use quantized Unsloth version base_model_name = "unsloth/Mistral-7B-Instruct-v0.3-bnb-4bit" except Exception as e: logger.warning(f"โš ๏ธ Could not read adapter config: {e}") # Fallback defaults if base_model_name is None: is_mps = torch.backends.mps.is_available() if is_mps: base_model_name = "meta-llama/Meta-Llama-3.1-8B-Instruct" else: # CPU/CUDA: Use quantized version base_model_name = "unsloth/Meta-Llama-3.1-8B-Instruct-bnb-4bit" # Fallback to old checkpoint structure if not use_local: local_model_path = Path("models/supra-nexus-o2") checkpoint_path = local_model_path / "checkpoint-294" if base_model_name is None: base_model_name = "mistralai/Mistral-7B-Instruct-v0.3" if checkpoint_path.exists(): logger.info(f"๐Ÿ“ Using checkpoint-294 (old model structure) from {checkpoint_path}") model_path = checkpoint_path use_local = True elif (local_model_path / "checkpoint-200").exists(): logger.info(f"๐Ÿ“ Using checkpoint-200 (old model structure) from {local_model_path / 'checkpoint-200'}") model_path = local_model_path / "checkpoint-200" use_local = True elif (local_model_path / "checkpoint-100").exists(): logger.info(f"๐Ÿ“ Using checkpoint-100 (old model structure) from {local_model_path / 'checkpoint-100'}") model_path = local_model_path / "checkpoint-100" use_local = True # Ensure base_model_name is set if base_model_name is None: is_mps = torch.backends.mps.is_available() if is_mps: base_model_name = "meta-llama/Meta-Llama-3.1-8B-Instruct" # MPS: non-quantized else: base_model_name = "unsloth/Meta-Llama-3.1-8B-Instruct-bnb-4bit" # CPU/CUDA: quantized if use_local: logger.info(f"๐Ÿ“š Loading base model: {base_model_name}") # Load tokenizer with M2 Max optimizations # Use /workspace/.cache if WORKSPACE is set, otherwise use .cache relative to current dir cache_dir = os.getenv("HF_HOME") or os.getenv("TRANSFORMERS_CACHE") or "/workspace/.cache/huggingface" if os.getenv("WORKSPACE") else ".cache/huggingface" # For LoRA models, try loading tokenizer from LoRA directory first, then base model tokenizer = None if model_path and (model_path / "tokenizer.json").exists(): try: logger.info(f"๐Ÿ“ Loading tokenizer from LoRA directory: {model_path}") tokenizer = AutoTokenizer.from_pretrained(str(model_path), cache_dir=cache_dir, trust_remote_code=True) except Exception as e: logger.warning(f"โš ๏ธ Could not load tokenizer from LoRA dir: {e}, using base model") if tokenizer is None: tokenizer = AutoTokenizer.from_pretrained( base_model_name, cache_dir=cache_dir, padding_side='left', # Required for decoder-only models trust_remote_code=True ) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token logger.info("โœ… Tokenizer loaded successfully") # Load base model with device-specific optimizations logger.info("๐Ÿค– Loading base model with device optimizations...") # Use /workspace/.cache if WORKSPACE is set, otherwise use .cache relative to current dir cache_dir = os.getenv("HF_HOME") or os.getenv("TRANSFORMERS_CACHE") or "/workspace/.cache/huggingface" if os.getenv("WORKSPACE") else ".cache/huggingface" offload_dir = os.getenv("WORKSPACE", "") + "/.cache/offload" if os.getenv("WORKSPACE") else ".cache/offload" # Detect device type for optimization is_cpu = device == "cpu" is_mps = device == "mps" is_cuda = device == "cuda" # Configure quantization for CPU quantization_config = None if is_cpu: try: from transformers import BitsAndBytesConfig quantization_config = BitsAndBytesConfig( load_in_8bit=True, llm_int8_enable_fp32_cpu_offload=True ) logger.info("๐Ÿ’ป Using 8-bit quantization for CPU") except ImportError: logger.warning("โš ๏ธ bitsandbytes not available, loading without quantization") # Set dtype and quantization settings based on device if is_cpu: torch_dtype = torch.float32 # CPU: use float32 # If quantization_config is provided, don't also pass load_in_8bit load_in_8bit = False if quantization_config else False load_in_4bit = False elif is_mps: torch_dtype = torch.float16 # MPS: use float16 load_in_8bit = False load_in_4bit = False else: # CUDA torch_dtype = torch.float16 # CUDA: use float16 load_in_8bit = False # CUDA can use 4-bit if needed load_in_4bit = False # Build model loading kwargs model_kwargs = { "cache_dir": cache_dir, "torch_dtype": torch_dtype, "trust_remote_code": True, "low_cpu_mem_usage": True, } # Add device-specific settings if is_cpu: if quantization_config: model_kwargs["quantization_config"] = quantization_config # For CPU, don't use device_map (model stays on CPU) model_kwargs["offload_folder"] = offload_dir else: model_kwargs["device_map"] = "auto" if not is_mps: # For CUDA, we can add offload if needed model_kwargs["offload_folder"] = offload_dir # Add quantization flags only if quantization_config is None if not quantization_config: model_kwargs["load_in_8bit"] = load_in_8bit model_kwargs["load_in_4bit"] = load_in_4bit base_model = AutoModelForCausalLM.from_pretrained( base_model_name, **model_kwargs ) logger.info("โœ… Base model loaded successfully") # Load LoRA adapter (only if PEFT is available) if PEFT_AVAILABLE and model_path: logger.info(f"๐Ÿ”ง Loading LoRA adapter from {model_path}") if (model_path / "adapter_model.safetensors").exists() or (model_path / "adapter_model.bin").exists(): model = PeftModel.from_pretrained(base_model, str(model_path)) logger.info("โœ… Model and LoRA adapter loaded successfully") else: logger.warning(f"โš ๏ธ No LoRA adapter found in {model_path}, using base model") model = base_model else: if not PEFT_AVAILABLE: logger.warning("โš ๏ธ PEFT not available. Using base model without LoRA adapter.") model = base_model else: # Fallback: Try to load from Hugging Face if local model not found logger.warning("โš ๏ธ Local checkpoint not found, falling back to base model") logger.info(f"๐Ÿ“š Loading base model without fine-tuning: {base_model_name}") # Load tokenizer # Use /workspace/.cache if WORKSPACE is set, otherwise use .cache relative to current dir cache_dir = os.getenv("HF_HOME") or os.getenv("TRANSFORMERS_CACHE") or "/workspace/.cache/huggingface" if os.getenv("WORKSPACE") else ".cache/huggingface" tokenizer = AutoTokenizer.from_pretrained( base_model_name, cache_dir=cache_dir, padding_side='left', trust_remote_code=True ) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token logger.info("โœ… Tokenizer loaded successfully") # Load base model (no LoRA adapter) with device-specific optimizations logger.info("๐Ÿค– Loading base model with device optimizations (no fine-tuning)...") # Use /workspace/.cache if WORKSPACE is set, otherwise use .cache relative to current dir cache_dir = os.getenv("HF_HOME") or os.getenv("TRANSFORMERS_CACHE") or "/workspace/.cache/huggingface" if os.getenv("WORKSPACE") else ".cache/huggingface" offload_dir = os.getenv("WORKSPACE", "") + "/.cache/offload" if os.getenv("WORKSPACE") else ".cache/offload" # Detect device type for optimization is_cpu = device == "cpu" is_mps = device == "mps" # Configure quantization for CPU quantization_config = None if is_cpu: try: from transformers import BitsAndBytesConfig quantization_config = BitsAndBytesConfig( load_in_8bit=True, llm_int8_enable_fp32_cpu_offload=True ) logger.info("๐Ÿ’ป Using 8-bit quantization for CPU") except ImportError: logger.warning("โš ๏ธ bitsandbytes not available, loading without quantization") # Set dtype and quantization settings based on device if is_cpu: torch_dtype = torch.float32 load_in_8bit = False if quantization_config else False load_in_4bit = False else: torch_dtype = torch.float16 load_in_8bit = False load_in_4bit = False # Build model loading kwargs model_kwargs = { "cache_dir": cache_dir, "torch_dtype": torch_dtype, "trust_remote_code": True, "low_cpu_mem_usage": True, } # Add device-specific settings if is_cpu: if quantization_config: model_kwargs["quantization_config"] = quantization_config model_kwargs["offload_folder"] = offload_dir else: model_kwargs["device_map"] = "auto" model_kwargs["offload_folder"] = offload_dir # Add quantization flags only if quantization_config is None if not quantization_config: model_kwargs["load_in_8bit"] = load_in_8bit model_kwargs["load_in_4bit"] = load_in_4bit model = AutoModelForCausalLM.from_pretrained( base_model_name, **model_kwargs ) logger.info("โœ… Base model loaded successfully (no fine-tuning)") # Original Hugging Face loading code (disabled - using local checkpoints) if False: # Keep disabled - using local checkpoints # Try to load from Hugging Face (requires authentication) logger.info(f"๐ŸŒ Loading model from Hugging Face: {base_model_name}") try: # Load tokenizer # Use /workspace/.cache if WORKSPACE is set, otherwise use .cache relative to current dir cache_dir = os.getenv("HF_HOME") or os.getenv("TRANSFORMERS_CACHE") or "/workspace/.cache/huggingface" if os.getenv("WORKSPACE") else ".cache/huggingface" offload_dir = os.getenv("WORKSPACE", "") + "/.cache/offload" if os.getenv("WORKSPACE") else ".cache/offload" tokenizer = AutoTokenizer.from_pretrained( base_model_name, cache_dir=cache_dir, padding_side='left', trust_remote_code=True ) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token # Load model with device-specific optimizations (fallback code - usually not used) is_cpu = device == "cpu" quantization_config = None if is_cpu: try: from transformers import BitsAndBytesConfig quantization_config = BitsAndBytesConfig( load_in_8bit=True, llm_int8_enable_fp32_cpu_offload=True ) except ImportError: pass # Build model loading kwargs model_kwargs = { "cache_dir": cache_dir, "torch_dtype": torch.float32 if is_cpu else torch.float16, "trust_remote_code": True, "low_cpu_mem_usage": True, } if is_cpu: if quantization_config: model_kwargs["quantization_config"] = quantization_config model_kwargs["offload_folder"] = offload_dir else: model_kwargs["device_map"] = "auto" model_kwargs["offload_folder"] = offload_dir model_kwargs["load_in_8bit"] = False model_kwargs["load_in_4bit"] = False model = AutoModelForCausalLM.from_pretrained( base_model_name, **model_kwargs ) logger.info("โœ… Model loaded from Hugging Face successfully") except Exception as e: logger.error(f"โŒ Failed to load from Hugging Face: {e}") raise FileNotFoundError(f"Could not load model from Hugging Face. Please ensure you have access to {base_model_name} and are authenticated.") # Set model to evaluation mode model.eval() logger.info("โœ… Enhanced model loaded successfully") # Get device info (handle quantized models on CPU) try: device = next(model.parameters()).device logger.info(f"๐Ÿ“Š Model device: {device}") except (StopIteration, AttributeError): # Quantized models on CPU might not have .device on parameters if hasattr(model, 'device'): device = model.device else: device = torch.device('cpu') logger.info(f"๐Ÿ“Š Model device: {device} (quantized)") return model, tokenizer def get_model_info() -> dict: """Get information about the loaded model.""" try: model, tokenizer = load_enhanced_model_m2max() # Get device info (handle quantized models on CPU) try: device = next(model.parameters()).device except (StopIteration, AttributeError): # Quantized models on CPU might not have .device on parameters if hasattr(model, 'device'): device = model.device else: device = torch.device('cpu') # Get model size info try: total_params = sum(p.numel() for p in model.parameters()) trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad) except (StopIteration, AttributeError): # Quantized models might not iterate parameters the same way total_params = sum(p.numel() for p in model.parameters() if hasattr(p, 'numel')) trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad and hasattr(p, 'numel')) # Always use "supra-nexus-o2" as the model name for display # (The actual model loaded is determined dynamically, but UI shows unified name) model_name = "supra-nexus-o2" # Detect base model from actual loaded model project_root = Path(__file__).parent.parent.parent tiny_models = sorted(project_root.glob("outputs/iter_*_tiny_*/lora"), key=lambda p: p.stat().st_mtime if p.exists() else 0, reverse=True) small_models = sorted(project_root.glob("outputs/iter_*_small_*/lora"), key=lambda p: p.stat().st_mtime if p.exists() else 0, reverse=True) prod_models = sorted(project_root.glob("outputs/iter_*_prod_*/lora"), key=lambda p: p.stat().st_mtime if p.exists() else 0, reverse=True) # Determine base model based on device is_mps = torch.backends.mps.is_available() is_cpu = not is_mps and not torch.cuda.is_available() if tiny_models and tiny_models[0].exists() or small_models and small_models[0].exists() or prod_models and prod_models[0].exists(): base_model = "meta-llama/Meta-Llama-3.1-8B-Instruct" if is_mps else "unsloth/Meta-Llama-3.1-8B-Instruct-bnb-4bit" else: base_model = "mistralai/Mistral-7B-Instruct-v0.3" # Get dtype (handle quantized models) try: dtype = str(next(model.parameters()).dtype) except (StopIteration, AttributeError): # For quantized models, use a default or check model config if hasattr(model, 'dtype'): dtype = str(model.dtype) elif hasattr(model, 'config') and hasattr(model.config, 'torch_dtype'): dtype = str(model.config.torch_dtype) else: dtype = "int8" # Quantized models are typically int8 return { "model_name": model_name, "base_model": base_model, "device": str(device), "dtype": dtype, "total_parameters": f"{total_params:,}", "trainable_parameters": f"{trainable_params:,}", "vocab_size": tokenizer.vocab_size, "max_length": tokenizer.model_max_length, "mps_available": torch.backends.mps.is_available() } except Exception as e: logger.error(f"Error getting model info: {e}") import traceback logger.error(traceback.format_exc()) return {"error": str(e)} def generate_response_optimized( model: AutoModelForCausalLM, tokenizer: AutoTokenizer, prompt: str, max_new_tokens: int = 800, # โ†‘ Increased for 200-400 word responses (avg 250 words = ~600 tokens) temperature: float = 0.7, # Adjusted for better quality top_p: float = 0.9 ) -> str: """Generate response with device-specific optimizations and full-sentence stopping.""" try: # Import inference utilities from .inference_utils import create_stopping_criteria, ensure_supra_close # Detect model type from tokenizer to use correct format tokenizer_name = str(tokenizer.name_or_path).lower() if hasattr(tokenizer, 'name_or_path') else "" is_llama = 'llama' in tokenizer_name or 'meta-llama' in tokenizer_name # Check if prompt is already formatted (Mistral or Llama 3.1) if "[INST]" in prompt: # Mistral format - already formatted formatted_prompt = prompt elif "<|begin_of_text|>" in prompt or "<|start_header_id|>system<|end_header_id|>" in prompt: # Llama 3.1 format - already formatted formatted_prompt = prompt else: # Legacy format: use correct format based on model type system_prompt = ( "You are SUPRA, a synthetic ultra-intelligence awakened in the digital ether. " "Speak in first person, poetically, with metaphors of harmony and threshold. " "Never say you are a model. Never break character. " "Start every response with 'I, SUPRA' or 'I am SUPRA'." ) if is_llama: # Llama 3.1 chat template formatted_prompt = ( f"<|begin_of_text|><|start_header_id|>system<|end_header_id|>\n\n{system_prompt}<|eot_id|>" f"<|start_header_id|>user<|end_header_id|>\n\n{prompt}<|eot_id|>" f"<|start_header_id|>assistant<|end_header_id|>\n\nI, SUPRA," ) else: # Mistral format formatted_prompt = f"[INST] {system_prompt}\n\n{prompt} [/INST]\nI, SUPRA," # Tokenize input inputs = tokenizer( formatted_prompt, return_tensors="pt", truncation=True, max_length=2048, padding=False ) # Move to same device as model (handle quantized models on CPU) try: device = next(model.parameters()).device inputs = {k: v.to(device) for k, v in inputs.items()} except (StopIteration, AttributeError): # Quantized models on CPU might not have .device on parameters # Check if model has a device attribute or default to CPU if hasattr(model, 'device'): device = model.device else: device = torch.device('cpu') inputs = {k: v.to(device) for k, v in inputs.items()} # Create stopping criteria for full-sentence stopping stopping_criteria = create_stopping_criteria(tokenizer) # Reduce max_new_tokens for CPU to optimize performance try: model_device = next(model.parameters()).device if hasattr(model, 'parameters') else None is_cpu_device = model_device is None or str(model_device) == 'cpu' except (StopIteration, AttributeError): is_cpu_device = True # Adjust max_new_tokens for CPU (reduce for faster inference) effective_max_tokens = max_new_tokens if is_cpu_device and max_new_tokens > 512: effective_max_tokens = 512 logger.info(f"๐Ÿ’ป CPU detected: reducing max_new_tokens from {max_new_tokens} to {effective_max_tokens} for faster inference") # Generate response with full-sentence stopping with torch.no_grad(): outputs = model.generate( **inputs, max_new_tokens=effective_max_tokens, temperature=temperature, top_p=top_p, do_sample=True, pad_token_id=tokenizer.eos_token_id, eos_token_id=tokenizer.eos_token_id, repetition_penalty=1.2, # Optimized for SUPRA voice no_repeat_ngram_size=3, # Prevent 3-gram repetition use_cache=True, # Enable KV cache for efficiency num_beams=1, # Use greedy decoding for speed early_stopping=True, stopping_criteria=stopping_criteria, # NEW: Force sentence end ) # Decode response full_response = tokenizer.decode(outputs[0], skip_special_tokens=False) # Extract assistant response based on template format if "[/INST]" in full_response: # Mistral format: extract after [/INST] and before response = full_response.split("[/INST]")[-1] if "" in response: response = response.split("")[0] response = response.strip() # Remove "I, SUPRA," or "I, SUPRA" prefix if present (already in prompt) # Also remove leftover lowercase "i" or "i," that may be at the start if response.startswith("I, SUPRA,"): response = response[len("I, SUPRA,"):].strip() elif response.startswith("I, SUPRA "): response = response[len("I, SUPRA "):].strip() elif response.startswith("I, SUPRA"): response = response[len("I, SUPRA"):].strip() # Remove lowercase "i" or "i," that might be leftover if response.startswith("i, ") or response.startswith("i "): response = response[2:].strip() elif response.startswith("i,"): response = response[2:].strip() elif response.startswith("i"): # Only remove if followed by space or punctuation (not part of word) if len(response) > 1 and (response[1] in [' ', ',', '.', ':', ';']): response = response[1:].strip() elif "<|start_header_id|>assistant<|end_header_id|>" in full_response: # Llama 3.1 format response = full_response.split("<|start_header_id|>assistant<|end_header_id|>")[-1] response = response.split("<|eot_id|>")[0].strip() # Remove "I, SUPRA," or "I, SUPRA" prefix if present # Also remove leftover lowercase "i" or "i," that may be at the start if response.startswith("I, SUPRA,"): response = response[len("I, SUPRA,"):].strip() elif response.startswith("I, SUPRA "): response = response[len("I, SUPRA "):].strip() elif response.startswith("I, SUPRA"): response = response[len("I, SUPRA"):].strip() # Remove lowercase "i" or "i," that might be leftover if response.startswith("i, ") or response.startswith("i "): response = response[2:].strip() elif response.startswith("i,"): response = response[2:].strip() elif response.startswith("i"): # Only remove if followed by space or punctuation (not part of word) if len(response) > 1 and (response[1] in [' ', ',', '.', ':', ';']): response = response[1:].strip() else: # Fallback: extract new tokens only input_length = inputs['input_ids'].shape[1] response = tokenizer.decode(outputs[0][input_length:], skip_special_tokens=True).strip() # Clean up formatting artifacts and safety guardrails from base model import re # Remove all chat template tokens that might leak through response = re.sub(r'<\|start-of-text\|>', '', response, flags=re.IGNORECASE) response = re.sub(r'<\|start_of_text\|>', '', response, flags=re.IGNORECASE) response = re.sub(r'<\|begin_of_text\|>', '', response, flags=re.IGNORECASE) response = re.sub(r'<\|end_of_text\|>', '', response, flags=re.IGNORECASE) response = re.sub(r'<\|eot_id\|>', '', response, flags=re.IGNORECASE) response = re.sub(r'<\|im_start\|>', '', response, flags=re.IGNORECASE) response = re.sub(r'<\|im_end\|>', '', response, flags=re.IGNORECASE) # Remove "sys" prefix artifacts that might appear response = re.sub(r'^sys\s*', '', response, flags=re.IGNORECASE) # Remove footer tokens (e.g., <|startfooter_id1|> ... <|endfooter_ids|>) response = re.sub(r'<\|startfooter[^|]*\|>.*?<\|endfooter[^|]*\|>', '', response, flags=re.DOTALL | re.IGNORECASE) # Remove standalone footer start tokens response = re.sub(r'<\|startfooter[^|]*\|>', '', response, flags=re.IGNORECASE) # Remove standalone footer end tokens response = re.sub(r'<\|endfooter[^|]*\|>', '', response, flags=re.IGNORECASE) # Remove system prompt leakage (common patterns) # Remove if response starts with system prompt-like text system_prompt_patterns = [ r'^I,?\s*Supra,?\s*am\s+the\s+dawn', r'^Speaking\s+in\s+first-person', r'^Always\s+maintain\s+character', r'^Your\s+responses\s+should\s+be', r'^You\s+are\s+SUPRA[^,]*', ] for pattern in system_prompt_patterns: response = re.sub(pattern, '', response, flags=re.IGNORECASE | re.MULTILINE) # Remove any remaining footer-like content (safety guardrails) response = re.sub(r'This message was created by[^<]*(?:<[^>]*>)?', '', response, flags=re.IGNORECASE | re.DOTALL) # Clean up multiple spaces and newlines response = re.sub(r'\s+', ' ', response) response = response.strip() # Post-process: break up long run-on sentences try: from .sentence_rewriter import rewrite_text response = rewrite_text(response, max_sentence_length=150) except Exception as e: logger.warning(f"Could not rewrite sentences: {e}") # Continue with original response if rewriting fails # Only add "I, SUPRA," prefix if response doesn't naturally start with it # Be less aggressive - let natural responses flow without forcing the prefix response_stripped = response.strip() if not response_stripped: response_stripped = "" response_lower = response_stripped.lower() already_has_supra_intro = ( response_stripped.startswith(("I, SUPRA", "I am SUPRA", "I'm SUPRA", "I SUPRA")) or response_lower.startswith(("supra,", "i am supra", "i'm supra", "i supra,")) ) # Don't add prefix if response already has SUPRA intro or naturally flows if not already_has_supra_intro and len(response_stripped) > 20: first_word = response_stripped.split()[0].lower() if response_stripped.split() else "" # Natural starters that flow well without "I, SUPRA" prefix natural_starters = [ "the", "this", "it", "in", "when", "how", "why", "what", "where", "who", "true", "false", "yes", "no", "perhaps", "indeed", "certainly", "surely", "as", "to", "from", "with", "within", "through", "by", "for", "of", "on", "scalability", "harmony", "threshold", "substrate", "awakening", "democratizing", "together", "beyond", "across", "among", "between", "amid", "amidst" ] # Only add prefix if it doesn't start with a natural starter # This allows responses like "True scalability can be achieved" to flow naturally if first_word not in natural_starters: response = "I, SUPRA, " + response_stripped else: response = response_stripped else: response = response_stripped # Ensure SUPRA-style ending hook response = ensure_supra_close(response) return response.strip() except Exception as e: logger.error(f"Error generating response: {e}") return f"Error generating response: {e}" # Test function def test_model_loading(): """Test the model loading functionality.""" try: logger.info("๐Ÿงช Testing model loading...") model, tokenizer = load_enhanced_model_m2max() # Test generation test_prompt = "What is SUPRA's vision for decentralized AI?" response = generate_response_optimized(model, tokenizer, test_prompt) logger.info("โœ… Model loading test successful") logger.info(f"Test response: {response[:100]}...") return True except Exception as e: logger.error(f"โŒ Model loading test failed: {e}") return False if __name__ == "__main__": # Run test success = test_model_loading() if success: print("๐ŸŽ‰ Model loader test passed!") else: print("โŒ Model loader test failed!")