|
|
|
|
|
"""
|
|
|
RAG-Helper: minimal, reproducible toy script for AI-SEO retrieval demos.
|
|
|
- Fetches a URL
|
|
|
- Extracts text
|
|
|
- Chunks ~300 "tokens" (word approximation)
|
|
|
- Creates embeddings (sentence-transformers)
|
|
|
- (Optional) Upserts into Qdrant
|
|
|
- Generates a short "copy-cite" answer block with footnotes
|
|
|
"""
|
|
|
|
|
|
import argparse, re, uuid, json, os
|
|
|
from typing import List, Dict
|
|
|
import requests
|
|
|
from bs4 import BeautifulSoup
|
|
|
from tqdm import tqdm
|
|
|
import numpy as np
|
|
|
|
|
|
try:
|
|
|
from sentence_transformers import SentenceTransformer
|
|
|
except Exception:
|
|
|
raise SystemExit("Please install requirements: pip install -r requirements.txt")
|
|
|
|
|
|
|
|
|
def fetch_url(url: str) -> str:
|
|
|
r = requests.get(url, timeout=30)
|
|
|
r.raise_for_status()
|
|
|
return r.text
|
|
|
|
|
|
|
|
|
def html_to_text(html: str) -> str:
|
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
|
for tag in soup(["script", "style", "noscript"]):
|
|
|
tag.decompose()
|
|
|
text = soup.get_text(separator=" ")
|
|
|
return re.sub(r"\s+", " ", text).strip()
|
|
|
|
|
|
|
|
|
def chunk_text(text: str, target_tokens: int = 300) -> List[str]:
|
|
|
words = text.split()
|
|
|
chunks = []
|
|
|
for i in range(0, len(words), target_tokens):
|
|
|
chunk = " ".join(words[i:i+target_tokens])
|
|
|
if chunk:
|
|
|
chunks.append(chunk)
|
|
|
return chunks
|
|
|
|
|
|
|
|
|
def embed_chunks(chunks: List[str], model_name: str = "sentence-transformers/all-MiniLM-L6-v2") -> np.ndarray:
|
|
|
model = SentenceTransformer(model_name)
|
|
|
return model.encode(chunks, batch_size=32, show_progress_bar=True,
|
|
|
convert_to_numpy=True, normalize_embeddings=True)
|
|
|
|
|
|
|
|
|
def build_payload(chunks: List[str], embs: np.ndarray, source_url: str, entity: str = "", sector: str = "") -> List[Dict]:
|
|
|
vectors = []
|
|
|
for idx, (c, v) in enumerate(zip(chunks, embs)):
|
|
|
vectors.append({
|
|
|
"id": str(uuid.uuid4()),
|
|
|
"text": c,
|
|
|
"vector": v.tolist(),
|
|
|
"metadata": {
|
|
|
"source": source_url,
|
|
|
"entity": entity,
|
|
|
"sector": sector,
|
|
|
"position": idx
|
|
|
}
|
|
|
})
|
|
|
return vectors
|
|
|
|
|
|
|
|
|
def optional_qdrant_upsert(vectors: List[Dict], collection: str, qdrant_url: str = None, api_key: str = None):
|
|
|
try:
|
|
|
from qdrant_client import QdrantClient
|
|
|
from qdrant_client.models import PointStruct, Distance, VectorParams
|
|
|
except Exception:
|
|
|
print("qdrant-client not installed; skipping vector DB upsert.")
|
|
|
return
|
|
|
|
|
|
client = QdrantClient(url=qdrant_url or "http://localhost:6333", api_key=api_key)
|
|
|
dim = len(vectors[0]["vector"])
|
|
|
|
|
|
try:
|
|
|
client.get_collection(collection)
|
|
|
except Exception:
|
|
|
client.recreate_collection(
|
|
|
collection_name=collection,
|
|
|
vectors_config=VectorParams(size=dim, distance=Distance.COSINE),
|
|
|
)
|
|
|
|
|
|
points = [PointStruct(id=v["id"], vector=v["vector"], payload=v["metadata"] | {"text": v["text"]}) for v in vectors]
|
|
|
client.upsert(collection_name=collection, points=points)
|
|
|
print(f"Upserted {len(points)} vectors into Qdrant collection '{collection}'.")
|
|
|
|
|
|
|
|
|
def make_copy_cite(vectors: List[Dict], k: int = 3) -> str:
|
|
|
top = vectors[:k]
|
|
|
bullets = []
|
|
|
for i, v in enumerate(top, start=1):
|
|
|
snippet = v["text"][:280] + ("..." if len(v["text"]) > 280 else "")
|
|
|
bullets.append(f"- {snippet} [{i}]")
|
|
|
footnotes = "\n".join([f"[{i}] {v['metadata']['source']}" for i, v in enumerate(top, start=1)])
|
|
|
return f"**Answer (draft):**\n" + "\n".join(bullets) + "\n\n" + footnotes
|
|
|
|
|
|
|
|
|
def main():
|
|
|
ap = argparse.ArgumentParser(description="NebulaTech RAG-Helper (toy)")
|
|
|
ap.add_argument("--url", required=True, help="Public URL to ingest")
|
|
|
ap.add_argument("--entity", default="", help="Primary entity (brand/product/topic)")
|
|
|
ap.add_argument("--sector", default="", help="Sector tag (e.g., architecture, pharma)")
|
|
|
ap.add_argument("--qdrant-url", default=None, help="Qdrant endpoint (optional)")
|
|
|
ap.add_argument("--qdrant-key", default=None, help="Qdrant API key (optional)")
|
|
|
ap.add_argument("--collection", default="nebula_rag_helper", help="Qdrant collection name")
|
|
|
ap.add_argument("--out", default="output.jsonl", help="Local JSONL output")
|
|
|
args = ap.parse_args()
|
|
|
|
|
|
print(f"[1/5] Fetching: {args.url}")
|
|
|
html = fetch_url(args.url)
|
|
|
text = html_to_text(html)
|
|
|
|
|
|
print("[2/5] Chunking ~300 tokens...")
|
|
|
chunks = chunk_text(text)
|
|
|
if not chunks:
|
|
|
raise SystemExit("No text extracted; aborting.")
|
|
|
|
|
|
print(f"[3/5] Embedding {len(chunks)} chunks...")
|
|
|
embs = embed_chunks(chunks)
|
|
|
|
|
|
print("[4/5] Building vectors + metadata...")
|
|
|
vectors = build_payload(chunks, embs, source_url=args.url, entity=args.entity, sector=args.sector)
|
|
|
|
|
|
if args.qdrant_url:
|
|
|
optional_qdrant_upsert(vectors, collection=args.collection, qdrant_url=args.qdrant_url, api_key=args.qdrant_key)
|
|
|
|
|
|
with open(args.out, "w", encoding="utf-8") as f:
|
|
|
for v in vectors:
|
|
|
f.write(json.dumps(v, ensure_ascii=False) + "\n")
|
|
|
print(f"Wrote {len(vectors)} vectors to {args.out}")
|
|
|
|
|
|
copy_cite = make_copy_cite(vectors, k=3)
|
|
|
cc_path = os.path.splitext(args.out)[0] + "_copycite.md"
|
|
|
with open(cc_path, "w", encoding="utf-8") as f:
|
|
|
f.write(copy_cite)
|
|
|
print(f"Generated copy-cite block at {cc_path}")
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main()
|
|
|
|