"""Source management for offline-first ingestion.""" from __future__ import annotations import hashlib import json from pathlib import Path from typing import Any from app.dataops.web_agent import fetch_url from app.dataops.parser import extract_components, extract_drug_mentions from app.dataops.normalizer import normalize_component_entities, normalize_drug_entities from app.dataops.provenance import make_provenance class SourceManager: def __init__(self, root: Path) -> None: self.root = root self.raw = root / "data" / "raw" self.cache = root / "data" / "cache" self.cache.mkdir(parents=True, exist_ok=True) def local_sources(self) -> list[Path]: return [p for p in self.raw.rglob("*") if p.is_file()] @staticmethod def checksum_text(text: str) -> str: return hashlib.sha256(text.encode("utf-8")).hexdigest() def cache_text(self, namespace: str, key: str, text: str) -> Path: ns_dir = self.cache / namespace ns_dir.mkdir(parents=True, exist_ok=True) checksum = self.checksum_text(text) target = ns_dir / f"{key}_{checksum[:12]}.txt" target.write_text(text, encoding="utf-8") meta = { "key": key, "checksum": checksum, "path": str(target), } (ns_dir / f"{key}.meta.json").write_text(json.dumps(meta, ensure_ascii=True, indent=2), encoding="utf-8") return target def read_cached(self, namespace: str, key: str) -> str | None: meta_path = self.cache / namespace / f"{key}.meta.json" if not meta_path.exists(): return None meta = json.loads(meta_path.read_text(encoding="utf-8")) target = Path(meta["path"]) if target.exists(): return target.read_text(encoding="utf-8") return None def fetch_with_cache( self, url: str, allow_domains: list[str], namespace: str = "web", offline_first: bool = True, ) -> dict[str, Any]: key = url.replace("https://", "").replace("http://", "").replace("/", "_") if offline_first: cached = self.read_cached(namespace=namespace, key=key) if cached is not None: provenance = make_provenance(source=url, source_type="cache", transform="read_cached") return {"text": cached, "provenance": provenance.__dict__, "from_cache": True} text = fetch_url(url, allowed_domains=allow_domains) self.cache_text(namespace=namespace, key=key, text=text) provenance = make_provenance(source=url, source_type="web", transform="fetch_with_cache") return {"text": text, "provenance": provenance.__dict__, "from_cache": False} class DataAcquisitionAgent: def __init__(self, root: Path, allow_domains: list[str]) -> None: self.manager = SourceManager(root=root) self.allow_domains = allow_domains def acquire_local_knowledge(self) -> list[dict[str, Any]]: records: list[dict[str, Any]] = [] for source in self.manager.local_sources(): text = source.read_text(encoding="utf-8", errors="ignore") mentions = normalize_drug_entities(extract_drug_mentions(text)) components = normalize_component_entities(extract_components(text)) provenance = make_provenance(source=str(source), source_type="local_file", transform="parse_local").to_dict() records.append( { "source": str(source), "mentions": mentions, "components": components, "provenance": provenance, } ) return records def acquire_web_knowledge(self, url: str, offline_first: bool = True) -> dict[str, Any]: blob = self.manager.fetch_with_cache( url=url, allow_domains=self.allow_domains, namespace="drug_labels", offline_first=offline_first, ) text = blob["text"] mentions = normalize_drug_entities(extract_drug_mentions(text)) components = normalize_component_entities(extract_components(text)) return { "url": url, "mentions": mentions, "components": components, "provenance": blob["provenance"], "from_cache": blob["from_cache"], }