# Standard library imports import os import json import logging from pathlib import Path from typing import Dict, List, Any, Optional # Third-party imports import yaml from tqdm import tqdm # Configure logging logging.basicConfig( level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler("dataset_curation.log", encoding="utf-8"), logging.StreamHandler() ] ) logger = logging.getLogger("DatasetCurator") class DatasetCurator: """ A robust dataset curator for processing Allama Iqbal's poetry collection with nested YAML structures into a flattened JSON format optimized for RAG. Features: - Hierarchical data flattening - Multilingual support (with English focus) - Nested structure resolution - Metadata preservation - Data validation and error handling """ def __init__(self, data_path: str, output_dir: str): """ Initialize the curator with validated paths Args: data_root (str): Root directory containing 'lists' and 'poems' folders output_dir (str): Directory for saving processed datasets """ self.data_root = Path(data_path) self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) self.dataset = { "metadata": { "total_books": 0, "total_poems": 0 }, "books": [], "poems": [] } def process_dataset(self, source: str = 'github_iqbal_demystified'): """ Process the dataset based on the source. """ if source == 'github_iqbal_demystified': self.data_root = self.data_root / source self.dataset = self.process_github_iqbal_demystified() logger.info(f"Dataset processed successfully") # logger.debug(f"Dataset: {self.dataset}") else: raise ValueError(f"Unsupported source: {self.source}") # Save the dataset to various formats self._save_dataset() return self.dataset def process_github_iqbal_demystified(self): """ Main processing pipeline with error handling and progress tracking """ try: book_files = sorted((self.data_root / "lists").glob("List_*.yaml")) logger.info(f"Found {len(book_files)} book files to process") for book_file in tqdm(book_files, desc="Processing books"): book_data = self._load_yaml(book_file) book_id = book_file.stem.split("_")[-1] processed_book = self._process_book(book_id, book_data) self.dataset["books"].append(processed_book) poems = self._process_poems(book_id, processed_book) self.dataset["poems"].extend(poems) self.dataset["metadata"]["total_poems"] += len(poems) self.dataset["metadata"]["total_books"] = len(self.dataset["books"]) return self.dataset except Exception as e: logger.error(f"Processing failed: {str(e)}") return None def _process_book(self, book_id: str, raw_data: Dict) -> Dict: """ Process book metadata with nested section structure Args: book_id (str): Unique identifier for the book raw_data (Dict): Raw YAML data from list file Returns: Dict: Processed book structure with flattened metadata """ book_structure = { "id": book_id, "titles": {}, "sections": [], "metadata": {"total_sections": 0, "total_poems": 0} } # Process multilingual titles for title_entry in raw_data.get("name", []): lang = title_entry.get("lang", "unknown") book_structure["titles"][lang] = title_entry.get("text", "") if lang == 'en': book_structure['primary_title'] = title_entry.get("text", "Unknown") # Process sections current_section = None for section_data in raw_data.get("sections", []): if "sectionName" in section_data: if current_section: book_structure["sections"].append(current_section) book_structure["metadata"]["total_sections"] += 1 current_section = { "id": len(book_structure["sections"]) + 1, "titles": {}, "poems": [], "poem_ids": [], "metadata": {"total_poems": 0} } for name_entry in section_data["sectionName"]: lang = name_entry.get("lang", "unknown") current_section["titles"][lang] = name_entry.get("text", "") if "poems" in section_data and current_section: poems = self._process_poem_metadata(section_data["poems"]) poem_ids = [poem['id'] for poem in poems] current_section["poems"].extend(poems) current_section["poem_ids"].extend(poem_ids) current_section["metadata"]["total_poems"] += len(poems) if current_section: book_structure["sections"].append(current_section) book_structure["metadata"]["total_sections"] += 1 book_structure["metadata"]["total_poems"] = sum( len(s["poems"]) for s in book_structure["sections"] ) return book_structure def _process_poem_metadata(self, poems: List[Dict]) -> List[Dict]: """ Flatten poem metadata from nested structure Args: poems (List[Dict]): Raw poem metadata entries Returns: List[Dict]: Processed poem metadata """ processed = [] for poem in poems: processed_poem = { "id": poem.get("id", ""), "titles": {}, "metadata": {"languages": []} # Changed from set to list } for title_entry in poem.get("poemName", []): lang = title_entry.get("lang", "unknown") processed_poem["titles"][lang] = title_entry.get("text", "") if lang not in processed_poem["metadata"]["languages"]: processed_poem["metadata"]["languages"].append(lang) processed.append(processed_poem) return processed def _process_poems(self, book_id: str, book_data: Dict) -> List[Dict]: """ Process poem content files with validation and error handling Args: book_id (str): Parent book identifier book_data (Dict): Processed book structure Returns: List[Dict]: Processed poems with flattened content """ poems = [] book_name = book_data.get("primary_title", f"book_{book_id}") sections = book_data.get("sections", []) poem_dir = self.data_root / "poems" / book_id if not poem_dir.exists(): logger.warning(f"Missing poem directory for book: {book_id}:{book_name}") return [] for poem_file in poem_dir.glob("*.yaml"): try: poem_id = poem_file.stem raw_data = self._load_yaml(poem_file) # Create the generator expression, broken for readability sectioninfo_generator = ( (section_info.get('id'), section_info.get('titles', {}).get('en')) for section_info in sections if poem_id in section_info.get('poem_ids', []) ) # Use next() with the generator and a default tuple section_id, section_name = next(sectioninfo_generator, (None, None)) # Create poem structure poem = { "id": poem_id, "book_id": book_id, "book_title": book_name, "section_id": section_id, "section_title": section_name, "metadata": {"languages": []}, "content": {"descriptions": {}, "verses": []} } # Process descriptions for desc_entry in raw_data.get("description", []): lang = desc_entry.get("lang", "unknown") poem["content"]["descriptions"][lang] = desc_entry.get("text", "") if lang not in poem["metadata"]["languages"]: poem["metadata"]["languages"].append(lang) # Process verses with language detection for verse in raw_data.get("sher", []): processed_verse = self._process_verse(verse) poem["content"]["verses"].append(processed_verse) # Detect verse languages for content in verse.get("sherContent", []): lang = content.get("lang", "unknown") if lang not in poem["metadata"]["languages"]: poem["metadata"]["languages"].append(lang) # Flatten structure with complete English detection rag_poem = self._flatten_for_rag(poem) if rag_poem: # Only add if English content exists poems.append(rag_poem) except Exception as e: logger.error(f"Failed processing poem {poem_id}: {str(e)}") return poems def _process_verse(self, verse: Dict) -> Dict: """ Process individual verse with multilingual content Args: verse (Dict): Raw verse data from YAML Returns: Dict: Processed verse structure """ processed = { "id": verse.get("id", ""), "content": {}, "notes": [] } for content_entry in verse.get("sherContent", []): lang = content_entry.get("lang", "unknown") processed["content"][lang] = { "text": content_entry.get("text", ""), "notes": [self._process_note(n) for n in content_entry.get("notes", [])] } return processed def _process_note(self, note: Dict) -> Dict: """ Standardize phrase/note structure Args: note (Dict): Raw note data Returns: Dict: Processed note structure """ return { "phrase": note.get("phrase", ""), "meaning": note.get("meaning", ""), "occurrences": note.get("occurrence", 1) } def _flatten_for_rag(self, poem: Dict) -> Dict: """ Transform poem structure into RAG-optimized format Args: poem (Dict): Original poem structure Returns: Dict: Flattened structure with combined text fields """ rag_poem = { "poem_id": poem["id"], "book_id": poem["book_id"], "book_title": poem["book_title"], "section_id": poem["section_id"], "section_title": poem["section_title"], "text_blocks": [], "full_text": "" } # Extract English content from all sources en_content = { "descriptions": poem["content"]["descriptions"].get("en", ""), "verses": [], "phrases": [] } # Process verses for verse in poem["content"]["verses"]: if "en" in verse["content"]: en_content["verses"].append(verse["content"]["en"]["text"]) en_content["phrases"].extend( f"{note['phrase']}: {note['meaning']}" for note in verse["content"]["en"].get("notes", []) ) # Build full text if English content exists if en_content["verses"]: rag_poem["full_text"] = "\n\n".join([ en_content["descriptions"], "\n".join(en_content["verses"]) ]) rag_poem["text_blocks"] = en_content["verses"] rag_poem["phrases"] = en_content["phrases"] return rag_poem logger.warning(f"No English content found for poem {poem['id']}") return None def _save_dataset(self): """Save datasets with proper serialization checks""" base_path = self.output_dir / "iqbal_poems" # Save full dataset with open(f"{base_path}_full.json", "w", encoding="utf-8") as f: json.dump(self.dataset, f, ensure_ascii=True, indent=2) # Save RAG-optimized poems (only those with English content) rag_data = [p for p in self.dataset["poems"] if p is not None] with open(f"{base_path}_rag.json", "w", encoding="utf-8") as f: json.dump(rag_data, f, ensure_ascii=True, indent=2) logger.info(f"Saved {len(rag_data)} RAG-ready poems") def _load_yaml(self, path: Path) -> Dict: """ Safe YAML loader with validation Args: path (Path): Path to YAML file Returns: Dict: Parsed YAML content """ try: with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) except Exception as e: logger.error(f"Failed loading YAML from {path}: {str(e)}") raise