File size: 10,512 Bytes
a683148 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
# MAP-NEO Mini: Data Preprocessing Pipeline
# Downloads Matrix dataset, filters to English, tokenizes and packs sequences
import json
import os
import itertools
from pathlib import Path
from datasets import load_dataset
from transformers import AutoTokenizer
import langdetect
from tqdm import tqdm
import argparse
class DataPreprocessor:
def __init__(self, output_dir="data", seq_length=1024):
self.output_dir = Path(output_dir)
self.seq_length = seq_length
self.setup_directories()
def setup_directories(self):
"""Create necessary directories"""
dirs = ["shards", "processed", "tokens"]
for d in dirs:
(self.output_dir / d).mkdir(parents=True, exist_ok=True)
def download_refinedweb_sample(self, num_docs=100000):
"""Download a sample from RefinedWeb dataset"""
print(f"Downloading {num_docs} documents from RefinedWeb...")
raw_path = self.output_dir / "shards" / "refinedweb_sample_raw.jsonl"
try:
# Load RefinedWeb dataset
ds = load_dataset("tiiuae/falcon-refinedweb", split="train", streaming=True)
downloaded = 0
with open(raw_path, "w", encoding="utf-8") as f:
for row in tqdm(itertools.islice(ds, num_docs), total=num_docs):
# RefinedWeb has 'content' field instead of 'text'
text = row.get("content", "").strip()
if text and len(text) > 100: # Quality filter
f.write(json.dumps({"text": text}, ensure_ascii=False) + "\n")
downloaded += 1
if downloaded >= num_docs:
break
print(f"Raw RefinedWeb data saved to: {raw_path}")
print(f"Downloaded {downloaded} high-quality documents")
return raw_path
except Exception as e:
print(f"Error downloading RefinedWeb: {e}")
print("Falling back to Matrix dataset...")
return self.download_matrix_sample_fallback(num_docs)
def download_matrix_sample_fallback(self, num_docs=10000):
"""Download a sample from MAP-NEO Matrix dataset"""
print(f"Downloading {num_docs} documents from Matrix dataset...")
raw_path = self.output_dir / "shards" / "matrix_sample_raw.jsonl"
ds = load_dataset("m-a-p/Matrix", split="train", streaming=True)
with open(raw_path, "w", encoding="utf-8") as f:
for i, row in enumerate(tqdm(itertools.islice(ds, num_docs), total=num_docs)):
text = row.get("text") or row.get("content") or ""
if text.strip():
f.write(json.dumps({"text": text}, ensure_ascii=False) + "\n")
print(f"Raw data saved to: {raw_path}")
return raw_path
# def filter_english(self, input_path):
# """Filter documents to English only"""
# print("Filtering documents to English only...")
# input_path = Path(input_path)
# output_path = self.output_dir / "processed" / "matrix_english.jsonl"
# english_count = 0
# total_count = 0
# with open(input_path, "r", encoding="utf-8") as infile, \
# open(output_path, "w", encoding="utf-8") as outfile:
# for line in tqdm(infile):
# total_count += 1
# try:
# obj = json.loads(line)
# text = obj["text"]
# # Skip very short texts
# if len(text) < 50:
# continue
# # Detect language
# if langdetect.detect(text) == "en":
# outfile.write(json.dumps(obj, ensure_ascii=False) + "\n")
# english_count += 1
# except Exception:
# continue
# print(f"Filtered {english_count}/{total_count} documents to English")
# print(f"English data saved to: {output_path}")
# return output_path
def filter_refinedweb_quality(self, input_path):
"""Enhanced quality filtering for RefinedWeb data"""
print("Applying enhanced quality filtering for RefinedWeb...")
input_path = Path(input_path)
output_path = self.output_dir / "processed" / "refinedweb_filtered.jsonl"
filtered_count = 0
total_count = 0
with open(input_path, "r", encoding="utf-8") as infile, \
open(output_path, "w", encoding="utf-8") as outfile:
for line in tqdm(infile, desc="Quality filtering"):
total_count += 1
try:
obj = json.loads(line)
text = obj["text"]
# Enhanced quality filters for web data
if self.is_high_quality_web_text(text):
outfile.write(json.dumps(obj, ensure_ascii=False) + "\n")
filtered_count += 1
except Exception:
continue
print(f"Filtered {filtered_count}/{total_count} documents for quality")
print(f"Filtered data saved to: {output_path}")
return output_path
def is_high_quality_web_text(self, text):
"""Check if web text meets quality standards"""
# Length checks
if len(text) < 200 or len(text) > 10000:
return False
# Language detection
try:
if langdetect.detect(text) != "en":
return False
except:
return False
# Content quality checks
words = text.split()
if len(words) < 50: # Too short
return False
# Check for spam/low-quality indicators
spam_indicators = ['click here', 'buy now', 'free download', '###', '***']
text_lower = text.lower()
spam_count = sum(1 for indicator in spam_indicators if indicator in text_lower)
if spam_count > 2:
return False
# Check for reasonable sentence structure
sentences = text.split('.')
if len(sentences) < 3: # Too few sentences
return False
return True
def tokenize_and_pack(self, input_path, tokenizer_name="gpt2"):
"""Tokenize documents and pack into fixed-length sequences"""
print(f"Tokenizing with {tokenizer_name} and packing to {self.seq_length} tokens...")
# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
input_path = Path(input_path)
output_path = self.output_dir / "tokens" / f"packed_{self.seq_length}.txt"
buffer = []
sequences_written = 0
total_tokens = 0
with open(input_path, "r", encoding="utf-8") as infile, \
open(output_path, "w", encoding="utf-8") as outfile:
for line in tqdm(infile, desc="Processing documents"):
try:
text = json.loads(line)["text"]
# Tokenize
tokens = tokenizer.encode(text, add_special_tokens=False)
# Add to buffer with EOS token
buffer.extend(tokens + [tokenizer.eos_token_id])
total_tokens += len(tokens) + 1
# Pack complete sequences
while len(buffer) >= self.seq_length:
sequence = buffer[:self.seq_length]
buffer = buffer[self.seq_length:]
# Write sequence as space-separated integers
outfile.write(" ".join(map(str, sequence)) + "\n")
sequences_written += 1
except Exception as e:
continue
print(f"Created {sequences_written} sequences of {self.seq_length} tokens each")
print(f"Total tokens processed: {total_tokens:,}")
print(f"Packed data saved to: {output_path}")
# Save tokenizer for later use
tokenizer_path = self.output_dir / "tokenizer"
tokenizer.save_pretrained(tokenizer_path)
print(f"Tokenizer saved to: {tokenizer_path}")
return output_path, tokenizer_path
def main():
parser = argparse.ArgumentParser(description="Preprocess MAP-NEO training data")
parser.add_argument("--num_docs", type=int, default=10000,
help="Number of documents to download")
parser.add_argument("--seq_length", type=int, default=1024,
help="Sequence length for packing")
parser.add_argument("--tokenizer", type=str, default="gpt2",
help="Tokenizer to use")
parser.add_argument("--output_dir", type=str, default="data",
help="Output directory")
args = parser.parse_args()
# Initialize preprocessor
preprocessor = DataPreprocessor(args.output_dir, args.seq_length)
# Run pipeline
print("Starting MAP-NEO data preprocessing pipeline...")
# Step 1: Download sample
raw_path = preprocessor.download_refinedweb_sample(args.num_docs)
# Step 2: Filter to English
filtered_path = preprocessor.filter_refinedweb_quality(raw_path)
# Step 3: Tokenize and pack
packed_path, tokenizer_path = preprocessor.tokenize_and_pack(
filtered_path, args.tokenizer
)
print("\n" + "="*50)
print("Data preprocessing complete!")
print(f"Packed sequences: {packed_path}")
print(f"Tokenizer: {tokenizer_path}")
print("="*50)
if __name__ == "__main__":
main() |