Spaces:
Running
Running
| """Source management for offline-first ingestion.""" | |
| from __future__ import annotations | |
| import hashlib | |
| import json | |
| from pathlib import Path | |
| from typing import Any | |
| from app.dataops.web_agent import fetch_url | |
| from app.dataops.parser import extract_components, extract_drug_mentions | |
| from app.dataops.normalizer import normalize_component_entities, normalize_drug_entities | |
| from app.dataops.provenance import make_provenance | |
| class SourceManager: | |
| def __init__(self, root: Path) -> None: | |
| self.root = root | |
| self.raw = root / "data" / "raw" | |
| self.cache = root / "data" / "cache" | |
| self.cache.mkdir(parents=True, exist_ok=True) | |
| def local_sources(self) -> list[Path]: | |
| return [p for p in self.raw.rglob("*") if p.is_file()] | |
| def checksum_text(text: str) -> str: | |
| return hashlib.sha256(text.encode("utf-8")).hexdigest() | |
| def cache_text(self, namespace: str, key: str, text: str) -> Path: | |
| ns_dir = self.cache / namespace | |
| ns_dir.mkdir(parents=True, exist_ok=True) | |
| checksum = self.checksum_text(text) | |
| target = ns_dir / f"{key}_{checksum[:12]}.txt" | |
| target.write_text(text, encoding="utf-8") | |
| meta = { | |
| "key": key, | |
| "checksum": checksum, | |
| "path": str(target), | |
| } | |
| (ns_dir / f"{key}.meta.json").write_text(json.dumps(meta, ensure_ascii=True, indent=2), encoding="utf-8") | |
| return target | |
| def read_cached(self, namespace: str, key: str) -> str | None: | |
| meta_path = self.cache / namespace / f"{key}.meta.json" | |
| if not meta_path.exists(): | |
| return None | |
| meta = json.loads(meta_path.read_text(encoding="utf-8")) | |
| target = Path(meta["path"]) | |
| if target.exists(): | |
| return target.read_text(encoding="utf-8") | |
| return None | |
| def fetch_with_cache( | |
| self, | |
| url: str, | |
| allow_domains: list[str], | |
| namespace: str = "web", | |
| offline_first: bool = True, | |
| ) -> dict[str, Any]: | |
| key = url.replace("https://", "").replace("http://", "").replace("/", "_") | |
| if offline_first: | |
| cached = self.read_cached(namespace=namespace, key=key) | |
| if cached is not None: | |
| provenance = make_provenance(source=url, source_type="cache", transform="read_cached") | |
| return {"text": cached, "provenance": provenance.__dict__, "from_cache": True} | |
| text = fetch_url(url, allowed_domains=allow_domains) | |
| self.cache_text(namespace=namespace, key=key, text=text) | |
| provenance = make_provenance(source=url, source_type="web", transform="fetch_with_cache") | |
| return {"text": text, "provenance": provenance.__dict__, "from_cache": False} | |
| class DataAcquisitionAgent: | |
| def __init__(self, root: Path, allow_domains: list[str]) -> None: | |
| self.manager = SourceManager(root=root) | |
| self.allow_domains = allow_domains | |
| def acquire_local_knowledge(self) -> list[dict[str, Any]]: | |
| records: list[dict[str, Any]] = [] | |
| for source in self.manager.local_sources(): | |
| text = source.read_text(encoding="utf-8", errors="ignore") | |
| mentions = normalize_drug_entities(extract_drug_mentions(text)) | |
| components = normalize_component_entities(extract_components(text)) | |
| provenance = make_provenance(source=str(source), source_type="local_file", transform="parse_local").to_dict() | |
| records.append( | |
| { | |
| "source": str(source), | |
| "mentions": mentions, | |
| "components": components, | |
| "provenance": provenance, | |
| } | |
| ) | |
| return records | |
| def acquire_web_knowledge(self, url: str, offline_first: bool = True) -> dict[str, Any]: | |
| blob = self.manager.fetch_with_cache( | |
| url=url, | |
| allow_domains=self.allow_domains, | |
| namespace="drug_labels", | |
| offline_first=offline_first, | |
| ) | |
| text = blob["text"] | |
| mentions = normalize_drug_entities(extract_drug_mentions(text)) | |
| components = normalize_component_entities(extract_components(text)) | |
| return { | |
| "url": url, | |
| "mentions": mentions, | |
| "components": components, | |
| "provenance": blob["provenance"], | |
| "from_cache": blob["from_cache"], | |
| } | |