Spaces:
Sleeping
Sleeping
Add MedlinePlus ingest script (local-only, produces data/corpus.jsonl + embeddings.npy)
a38c344 verified | """Ingest MedlinePlus Health Topics into a local RAG corpus. | |
| Run locally (NOT in the Space). Outputs: | |
| data/corpus.jsonl one chunk per line: {id, topic, section, url, text} | |
| data/embeddings.npy float32 matrix, L2-normalized, shape (N, 384) | |
| Upload both files to the Space under data/ via the Files tab. | |
| Source: https://medlineplus.gov/xml.html (Health Topics XML, public domain). | |
| Attribution requested: credit MedlinePlus / U.S. National Library of Medicine. | |
| """ | |
| from __future__ import annotations | |
| import io | |
| import json | |
| import os | |
| import re | |
| import sys | |
| import zipfile | |
| from pathlib import Path | |
| from urllib.request import urlopen, Request | |
| import numpy as np | |
| from lxml import etree | |
| from sentence_transformers import SentenceTransformer | |
| # MedlinePlus publishes a weekly XML dump of all Health Topics. | |
| # The canonical landing page is https://medlineplus.gov/xml.html | |
| # The English Health Topics file is named like mplus_topics_YYYY-MM-DD.xml | |
| # served from https://medlineplus.gov/xml/ (a directory listing). | |
| # | |
| # Set MEDLINE_XML_URL to a specific snapshot if you want determinism. | |
| DEFAULT_INDEX_URL = "https://medlineplus.gov/xml.html" | |
| XML_URL = os.environ.get("MEDLINE_XML_URL") | |
| EMBED_MODEL = os.environ.get("EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2") | |
| CHUNK_TOKENS = int(os.environ.get("CHUNK_TOKENS", "300")) | |
| CHUNK_OVERLAP = int(os.environ.get("CHUNK_OVERLAP", "50")) | |
| OUT_DIR = Path("data") | |
| CORPUS_PATH = OUT_DIR / "corpus.jsonl" | |
| EMBED_PATH = OUT_DIR / "embeddings.npy" | |
| def discover_xml_url() -> str: | |
| """Scrape medlineplus.gov/xml.html for the latest English Health Topics zip.""" | |
| if XML_URL: | |
| return XML_URL | |
| print(f"[discover] fetching {DEFAULT_INDEX_URL}") | |
| req = Request(DEFAULT_INDEX_URL, headers={"User-Agent": "FetchMerck-Demo/0.1"}) | |
| html = urlopen(req, timeout=30).read().decode("utf-8", errors="replace") | |
| # Look for links like mplus_topics_2025-01-15.xml.zip (English) | |
| candidates = re.findall(r'href="([^"]*mplus_topics_\d{4}-\d{2}-\d{2}\.xml(?:\.zip)?)"', html) | |
| if not candidates: | |
| raise RuntimeError( | |
| "Could not auto-discover the MedlinePlus Health Topics XML. " | |
| "Set MEDLINE_XML_URL to a direct URL from https://medlineplus.gov/xml.html" | |
| ) | |
| # Pick the lexicographically latest (dates sort correctly as YYYY-MM-DD). | |
| candidates = sorted(set(candidates)) | |
| chosen = candidates[-1] | |
| if chosen.startswith("/"): | |
| chosen = "https://medlineplus.gov" + chosen | |
| elif not chosen.startswith("http"): | |
| chosen = "https://medlineplus.gov/" + chosen.lstrip("./") | |
| print(f"[discover] using {chosen}") | |
| return chosen | |
| def fetch_xml_bytes(url: str) -> bytes: | |
| print(f"[fetch] downloading {url}") | |
| req = Request(url, headers={"User-Agent": "FetchMerck-Demo/0.1"}) | |
| raw = urlopen(req, timeout=120).read() | |
| if url.endswith(".zip"): | |
| with zipfile.ZipFile(io.BytesIO(raw)) as zf: | |
| xml_names = [n for n in zf.namelist() if n.lower().endswith(".xml")] | |
| if not xml_names: | |
| raise RuntimeError("No .xml inside zip") | |
| with zf.open(xml_names[0]) as f: | |
| return f.read() | |
| return raw | |
| TAG_RE = re.compile(r"<[^>]+>") | |
| WS_RE = re.compile(r"\s+") | |
| def clean_html(text: str) -> str: | |
| if text is None: | |
| return "" | |
| text = TAG_RE.sub(" ", text) | |
| text = WS_RE.sub(" ", text) | |
| return text.strip() | |
| def parse_topics(xml_bytes: bytes): | |
| """Yield dicts: {topic_id, title, url, summary}. """ | |
| parser = etree.XMLParser(huge_tree=True, recover=True) | |
| root = etree.fromstring(xml_bytes, parser=parser) | |
| # The MedlinePlus schema uses <health-topic> elements. | |
| topics = root.findall(".//health-topic") | |
| print(f"[parse] found {len(topics)} health-topic elements") | |
| for t in topics: | |
| title = (t.get("title") or "").strip() | |
| url = (t.get("url") or "").strip() | |
| topic_id = (t.get("id") or title).strip() | |
| # Summary may contain inline HTML; serialize then strip. | |
| full_summary = t.find("full-summary") | |
| if full_summary is None: | |
| continue | |
| # full-summary contains escaped HTML in text/CDATA. | |
| raw = etree.tostring(full_summary, method="text", encoding="unicode") | |
| summary = clean_html(raw) | |
| if not summary: | |
| continue | |
| yield { | |
| "topic_id": topic_id, | |
| "title": title, | |
| "url": url, | |
| "summary": summary, | |
| } | |
| def chunk_text(text: str, max_tokens: int, overlap: int): | |
| """Naive whitespace-token chunker. Good enough for MiniLM-sized contexts.""" | |
| words = text.split() | |
| if not words: | |
| return | |
| step = max(1, max_tokens - overlap) | |
| for start in range(0, len(words), step): | |
| chunk = words[start : start + max_tokens] | |
| if not chunk: | |
| break | |
| yield " ".join(chunk) | |
| if start + max_tokens >= len(words): | |
| break | |
| def main() -> int: | |
| OUT_DIR.mkdir(parents=True, exist_ok=True) | |
| url = discover_xml_url() | |
| xml_bytes = fetch_xml_bytes(url) | |
| chunks = [] | |
| for topic in parse_topics(xml_bytes): | |
| for i, piece in enumerate(chunk_text(topic["summary"], CHUNK_TOKENS, CHUNK_OVERLAP)): | |
| chunks.append({ | |
| "id": f'{topic["topic_id"]}::{i}', | |
| "topic": topic["title"], | |
| "section": topic["title"], | |
| "url": topic["url"], | |
| "text": piece, | |
| }) | |
| if not chunks: | |
| print("[error] no chunks produced; aborting", file=sys.stderr) | |
| return 2 | |
| print(f"[chunk] {len(chunks)} chunks across topics") | |
| print(f"[embed] loading {EMBED_MODEL}") | |
| model = SentenceTransformer(EMBED_MODEL) | |
| texts = [c["text"] for c in chunks] | |
| embs = model.encode( | |
| texts, | |
| batch_size=64, | |
| show_progress_bar=True, | |
| normalize_embeddings=True, | |
| convert_to_numpy=True, | |
| ).astype("float32") | |
| with CORPUS_PATH.open("w", encoding="utf-8") as f: | |
| for c in chunks: | |
| f.write(json.dumps(c, ensure_ascii=False) + "\n") | |
| np.save(EMBED_PATH, embs) | |
| print(f"[done] wrote {CORPUS_PATH} ({len(chunks)} chunks)") | |
| print(f"[done] wrote {EMBED_PATH} (shape={embs.shape}, dtype={embs.dtype})") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |