FetchMerck-AI-Demo / scripts /ingest_medline.py
jeremygracey-ai's picture
Add MedlinePlus ingest script (local-only, produces data/corpus.jsonl + embeddings.npy)
a38c344 verified
"""Ingest MedlinePlus Health Topics into a local RAG corpus.
Run locally (NOT in the Space). Outputs:
data/corpus.jsonl one chunk per line: {id, topic, section, url, text}
data/embeddings.npy float32 matrix, L2-normalized, shape (N, 384)
Upload both files to the Space under data/ via the Files tab.
Source: https://medlineplus.gov/xml.html (Health Topics XML, public domain).
Attribution requested: credit MedlinePlus / U.S. National Library of Medicine.
"""
from __future__ import annotations
import io
import json
import os
import re
import sys
import zipfile
from pathlib import Path
from urllib.request import urlopen, Request
import numpy as np
from lxml import etree
from sentence_transformers import SentenceTransformer
# MedlinePlus publishes a weekly XML dump of all Health Topics.
# The canonical landing page is https://medlineplus.gov/xml.html
# The English Health Topics file is named like mplus_topics_YYYY-MM-DD.xml
# served from https://medlineplus.gov/xml/ (a directory listing).
#
# Set MEDLINE_XML_URL to a specific snapshot if you want determinism.
DEFAULT_INDEX_URL = "https://medlineplus.gov/xml.html"
XML_URL = os.environ.get("MEDLINE_XML_URL")
EMBED_MODEL = os.environ.get("EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2")
CHUNK_TOKENS = int(os.environ.get("CHUNK_TOKENS", "300"))
CHUNK_OVERLAP = int(os.environ.get("CHUNK_OVERLAP", "50"))
OUT_DIR = Path("data")
CORPUS_PATH = OUT_DIR / "corpus.jsonl"
EMBED_PATH = OUT_DIR / "embeddings.npy"
def discover_xml_url() -> str:
"""Scrape medlineplus.gov/xml.html for the latest English Health Topics zip."""
if XML_URL:
return XML_URL
print(f"[discover] fetching {DEFAULT_INDEX_URL}")
req = Request(DEFAULT_INDEX_URL, headers={"User-Agent": "FetchMerck-Demo/0.1"})
html = urlopen(req, timeout=30).read().decode("utf-8", errors="replace")
# Look for links like mplus_topics_2025-01-15.xml.zip (English)
candidates = re.findall(r'href="([^"]*mplus_topics_\d{4}-\d{2}-\d{2}\.xml(?:\.zip)?)"', html)
if not candidates:
raise RuntimeError(
"Could not auto-discover the MedlinePlus Health Topics XML. "
"Set MEDLINE_XML_URL to a direct URL from https://medlineplus.gov/xml.html"
)
# Pick the lexicographically latest (dates sort correctly as YYYY-MM-DD).
candidates = sorted(set(candidates))
chosen = candidates[-1]
if chosen.startswith("/"):
chosen = "https://medlineplus.gov" + chosen
elif not chosen.startswith("http"):
chosen = "https://medlineplus.gov/" + chosen.lstrip("./")
print(f"[discover] using {chosen}")
return chosen
def fetch_xml_bytes(url: str) -> bytes:
print(f"[fetch] downloading {url}")
req = Request(url, headers={"User-Agent": "FetchMerck-Demo/0.1"})
raw = urlopen(req, timeout=120).read()
if url.endswith(".zip"):
with zipfile.ZipFile(io.BytesIO(raw)) as zf:
xml_names = [n for n in zf.namelist() if n.lower().endswith(".xml")]
if not xml_names:
raise RuntimeError("No .xml inside zip")
with zf.open(xml_names[0]) as f:
return f.read()
return raw
TAG_RE = re.compile(r"<[^>]+>")
WS_RE = re.compile(r"\s+")
def clean_html(text: str) -> str:
if text is None:
return ""
text = TAG_RE.sub(" ", text)
text = WS_RE.sub(" ", text)
return text.strip()
def parse_topics(xml_bytes: bytes):
"""Yield dicts: {topic_id, title, url, summary}. """
parser = etree.XMLParser(huge_tree=True, recover=True)
root = etree.fromstring(xml_bytes, parser=parser)
# The MedlinePlus schema uses <health-topic> elements.
topics = root.findall(".//health-topic")
print(f"[parse] found {len(topics)} health-topic elements")
for t in topics:
title = (t.get("title") or "").strip()
url = (t.get("url") or "").strip()
topic_id = (t.get("id") or title).strip()
# Summary may contain inline HTML; serialize then strip.
full_summary = t.find("full-summary")
if full_summary is None:
continue
# full-summary contains escaped HTML in text/CDATA.
raw = etree.tostring(full_summary, method="text", encoding="unicode")
summary = clean_html(raw)
if not summary:
continue
yield {
"topic_id": topic_id,
"title": title,
"url": url,
"summary": summary,
}
def chunk_text(text: str, max_tokens: int, overlap: int):
"""Naive whitespace-token chunker. Good enough for MiniLM-sized contexts."""
words = text.split()
if not words:
return
step = max(1, max_tokens - overlap)
for start in range(0, len(words), step):
chunk = words[start : start + max_tokens]
if not chunk:
break
yield " ".join(chunk)
if start + max_tokens >= len(words):
break
def main() -> int:
OUT_DIR.mkdir(parents=True, exist_ok=True)
url = discover_xml_url()
xml_bytes = fetch_xml_bytes(url)
chunks = []
for topic in parse_topics(xml_bytes):
for i, piece in enumerate(chunk_text(topic["summary"], CHUNK_TOKENS, CHUNK_OVERLAP)):
chunks.append({
"id": f'{topic["topic_id"]}::{i}',
"topic": topic["title"],
"section": topic["title"],
"url": topic["url"],
"text": piece,
})
if not chunks:
print("[error] no chunks produced; aborting", file=sys.stderr)
return 2
print(f"[chunk] {len(chunks)} chunks across topics")
print(f"[embed] loading {EMBED_MODEL}")
model = SentenceTransformer(EMBED_MODEL)
texts = [c["text"] for c in chunks]
embs = model.encode(
texts,
batch_size=64,
show_progress_bar=True,
normalize_embeddings=True,
convert_to_numpy=True,
).astype("float32")
with CORPUS_PATH.open("w", encoding="utf-8") as f:
for c in chunks:
f.write(json.dumps(c, ensure_ascii=False) + "\n")
np.save(EMBED_PATH, embs)
print(f"[done] wrote {CORPUS_PATH} ({len(chunks)} chunks)")
print(f"[done] wrote {EMBED_PATH} (shape={embs.shape}, dtype={embs.dtype})")
return 0
if __name__ == "__main__":
raise SystemExit(main())