Iqbal_Poetry_RAG / utils /dataset_curator.py
farjadmalik's picture
Initial commit: Iqbal Poetry RAG system
657ce3b
raw
history blame
14.1 kB
# Standard library imports
import os
import json
import logging
from pathlib import Path
from typing import Dict, List, Any, Optional
# Third-party imports
import yaml
from tqdm import tqdm
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("dataset_curation.log", encoding="utf-8"),
logging.StreamHandler()
]
)
logger = logging.getLogger("DatasetCurator")
class DatasetCurator:
"""
A robust dataset curator for processing Allama Iqbal's poetry collection
with nested YAML structures into a flattened JSON format optimized for RAG.
Features:
- Hierarchical data flattening
- Multilingual support (with English focus)
- Nested structure resolution
- Metadata preservation
- Data validation and error handling
"""
def __init__(self, data_path: str, output_dir: str):
"""
Initialize the curator with validated paths
Args:
data_root (str): Root directory containing 'lists' and 'poems' folders
output_dir (str): Directory for saving processed datasets
"""
self.data_root = Path(data_path)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.dataset = {
"metadata": {
"total_books": 0,
"total_poems": 0
},
"books": [],
"poems": []
}
def process_dataset(self, source: str = 'github_iqbal_demystified'):
"""
Process the dataset based on the source.
"""
if source == 'github_iqbal_demystified':
self.data_root = self.data_root / source
self.dataset = self.process_github_iqbal_demystified()
logger.info(f"Dataset processed successfully")
# logger.debug(f"Dataset: {self.dataset}")
else:
raise ValueError(f"Unsupported source: {self.source}")
# Save the dataset to various formats
self._save_dataset()
return self.dataset
def process_github_iqbal_demystified(self):
"""
Main processing pipeline with error handling and progress tracking
"""
try:
book_files = sorted((self.data_root / "lists").glob("List_*.yaml"))
logger.info(f"Found {len(book_files)} book files to process")
for book_file in tqdm(book_files, desc="Processing books"):
book_data = self._load_yaml(book_file)
book_id = book_file.stem.split("_")[-1]
processed_book = self._process_book(book_id, book_data)
self.dataset["books"].append(processed_book)
poems = self._process_poems(book_id, processed_book)
self.dataset["poems"].extend(poems)
self.dataset["metadata"]["total_poems"] += len(poems)
self.dataset["metadata"]["total_books"] = len(self.dataset["books"])
return self.dataset
except Exception as e:
logger.error(f"Processing failed: {str(e)}")
return None
def _process_book(self, book_id: str, raw_data: Dict) -> Dict:
"""
Process book metadata with nested section structure
Args:
book_id (str): Unique identifier for the book
raw_data (Dict): Raw YAML data from list file
Returns:
Dict: Processed book structure with flattened metadata
"""
book_structure = {
"id": book_id,
"titles": {},
"sections": [],
"metadata": {"total_sections": 0, "total_poems": 0}
}
# Process multilingual titles
for title_entry in raw_data.get("name", []):
lang = title_entry.get("lang", "unknown")
book_structure["titles"][lang] = title_entry.get("text", "")
if lang == 'en':
book_structure['primary_title'] = title_entry.get("text", "Unknown")
# Process sections
current_section = None
for section_data in raw_data.get("sections", []):
if "sectionName" in section_data:
if current_section:
book_structure["sections"].append(current_section)
book_structure["metadata"]["total_sections"] += 1
current_section = {
"id": len(book_structure["sections"]) + 1,
"titles": {},
"poems": [],
"poem_ids": [],
"metadata": {"total_poems": 0}
}
for name_entry in section_data["sectionName"]:
lang = name_entry.get("lang", "unknown")
current_section["titles"][lang] = name_entry.get("text", "")
if "poems" in section_data and current_section:
poems = self._process_poem_metadata(section_data["poems"])
poem_ids = [poem['id'] for poem in poems]
current_section["poems"].extend(poems)
current_section["poem_ids"].extend(poem_ids)
current_section["metadata"]["total_poems"] += len(poems)
if current_section:
book_structure["sections"].append(current_section)
book_structure["metadata"]["total_sections"] += 1
book_structure["metadata"]["total_poems"] = sum(
len(s["poems"]) for s in book_structure["sections"]
)
return book_structure
def _process_poem_metadata(self, poems: List[Dict]) -> List[Dict]:
"""
Flatten poem metadata from nested structure
Args:
poems (List[Dict]): Raw poem metadata entries
Returns:
List[Dict]: Processed poem metadata
"""
processed = []
for poem in poems:
processed_poem = {
"id": poem.get("id", ""),
"titles": {},
"metadata": {"languages": []} # Changed from set to list
}
for title_entry in poem.get("poemName", []):
lang = title_entry.get("lang", "unknown")
processed_poem["titles"][lang] = title_entry.get("text", "")
if lang not in processed_poem["metadata"]["languages"]:
processed_poem["metadata"]["languages"].append(lang)
processed.append(processed_poem)
return processed
def _process_poems(self, book_id: str, book_data: Dict) -> List[Dict]:
"""
Process poem content files with validation and error handling
Args:
book_id (str): Parent book identifier
book_data (Dict): Processed book structure
Returns:
List[Dict]: Processed poems with flattened content
"""
poems = []
book_name = book_data.get("primary_title", f"book_{book_id}")
sections = book_data.get("sections", [])
poem_dir = self.data_root / "poems" / book_id
if not poem_dir.exists():
logger.warning(f"Missing poem directory for book: {book_id}:{book_name}")
return []
for poem_file in poem_dir.glob("*.yaml"):
try:
poem_id = poem_file.stem
raw_data = self._load_yaml(poem_file)
# Create the generator expression, broken for readability
sectioninfo_generator = (
(section_info.get('id'), section_info.get('titles', {}).get('en'))
for section_info in sections
if poem_id in section_info.get('poem_ids', [])
)
# Use next() with the generator and a default tuple
section_id, section_name = next(sectioninfo_generator, (None, None))
# Create poem structure
poem = {
"id": poem_id,
"book_id": book_id,
"book_title": book_name,
"section_id": section_id,
"section_title": section_name,
"metadata": {"languages": []},
"content": {"descriptions": {}, "verses": []}
}
# Process descriptions
for desc_entry in raw_data.get("description", []):
lang = desc_entry.get("lang", "unknown")
poem["content"]["descriptions"][lang] = desc_entry.get("text", "")
if lang not in poem["metadata"]["languages"]:
poem["metadata"]["languages"].append(lang)
# Process verses with language detection
for verse in raw_data.get("sher", []):
processed_verse = self._process_verse(verse)
poem["content"]["verses"].append(processed_verse)
# Detect verse languages
for content in verse.get("sherContent", []):
lang = content.get("lang", "unknown")
if lang not in poem["metadata"]["languages"]:
poem["metadata"]["languages"].append(lang)
# Flatten structure with complete English detection
rag_poem = self._flatten_for_rag(poem)
if rag_poem: # Only add if English content exists
poems.append(rag_poem)
except Exception as e:
logger.error(f"Failed processing poem {poem_id}: {str(e)}")
return poems
def _process_verse(self, verse: Dict) -> Dict:
"""
Process individual verse with multilingual content
Args:
verse (Dict): Raw verse data from YAML
Returns:
Dict: Processed verse structure
"""
processed = {
"id": verse.get("id", ""),
"content": {},
"notes": []
}
for content_entry in verse.get("sherContent", []):
lang = content_entry.get("lang", "unknown")
processed["content"][lang] = {
"text": content_entry.get("text", ""),
"notes": [self._process_note(n) for n in content_entry.get("notes", [])]
}
return processed
def _process_note(self, note: Dict) -> Dict:
"""
Standardize phrase/note structure
Args:
note (Dict): Raw note data
Returns:
Dict: Processed note structure
"""
return {
"phrase": note.get("phrase", ""),
"meaning": note.get("meaning", ""),
"occurrences": note.get("occurrence", 1)
}
def _flatten_for_rag(self, poem: Dict) -> Dict:
"""
Transform poem structure into RAG-optimized format
Args:
poem (Dict): Original poem structure
Returns:
Dict: Flattened structure with combined text fields
"""
rag_poem = {
"poem_id": poem["id"],
"book_id": poem["book_id"],
"book_title": poem["book_title"],
"section_id": poem["section_id"],
"section_title": poem["section_title"],
"text_blocks": [],
"full_text": ""
}
# Extract English content from all sources
en_content = {
"descriptions": poem["content"]["descriptions"].get("en", ""),
"verses": [],
"phrases": []
}
# Process verses
for verse in poem["content"]["verses"]:
if "en" in verse["content"]:
en_content["verses"].append(verse["content"]["en"]["text"])
en_content["phrases"].extend(
f"{note['phrase']}: {note['meaning']}"
for note in verse["content"]["en"].get("notes", [])
)
# Build full text if English content exists
if en_content["verses"]:
rag_poem["full_text"] = "\n\n".join([
en_content["descriptions"],
"\n".join(en_content["verses"])
])
rag_poem["text_blocks"] = en_content["verses"]
rag_poem["phrases"] = en_content["phrases"]
return rag_poem
logger.warning(f"No English content found for poem {poem['id']}")
return None
def _save_dataset(self):
"""Save datasets with proper serialization checks"""
base_path = self.output_dir / "iqbal_poems"
# Save full dataset
with open(f"{base_path}_full.json", "w", encoding="utf-8") as f:
json.dump(self.dataset, f, ensure_ascii=True, indent=2)
# Save RAG-optimized poems (only those with English content)
rag_data = [p for p in self.dataset["poems"] if p is not None]
with open(f"{base_path}_rag.json", "w", encoding="utf-8") as f:
json.dump(rag_data, f, ensure_ascii=True, indent=2)
logger.info(f"Saved {len(rag_data)} RAG-ready poems")
def _load_yaml(self, path: Path) -> Dict:
"""
Safe YAML loader with validation
Args:
path (Path): Path to YAML file
Returns:
Dict: Parsed YAML content
"""
try:
with open(path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
except Exception as e:
logger.error(f"Failed loading YAML from {path}: {str(e)}")
raise