| """Source management for offline-first ingestion.""" |
|
|
| from __future__ import annotations |
|
|
| import hashlib |
| import json |
| from pathlib import Path |
| from typing import Any |
|
|
| from app.dataops.web_agent import fetch_url |
| from app.dataops.parser import extract_components, extract_drug_mentions |
| from app.dataops.normalizer import normalize_component_entities, normalize_drug_entities |
| from app.dataops.provenance import make_provenance |
|
|
|
|
| class SourceManager: |
| def __init__(self, root: Path) -> None: |
| self.root = root |
| self.raw = root / "data" / "raw" |
| self.cache = root / "data" / "cache" |
| self.cache.mkdir(parents=True, exist_ok=True) |
|
|
| def local_sources(self) -> list[Path]: |
| return [p for p in self.raw.rglob("*") if p.is_file()] |
|
|
| @staticmethod |
| def checksum_text(text: str) -> str: |
| return hashlib.sha256(text.encode("utf-8")).hexdigest() |
|
|
| def cache_text(self, namespace: str, key: str, text: str) -> Path: |
| ns_dir = self.cache / namespace |
| ns_dir.mkdir(parents=True, exist_ok=True) |
| checksum = self.checksum_text(text) |
| target = ns_dir / f"{key}_{checksum[:12]}.txt" |
| target.write_text(text, encoding="utf-8") |
| meta = { |
| "key": key, |
| "checksum": checksum, |
| "path": str(target), |
| } |
| (ns_dir / f"{key}.meta.json").write_text(json.dumps(meta, ensure_ascii=True, indent=2), encoding="utf-8") |
| return target |
|
|
| def read_cached(self, namespace: str, key: str) -> str | None: |
| meta_path = self.cache / namespace / f"{key}.meta.json" |
| if not meta_path.exists(): |
| return None |
| meta = json.loads(meta_path.read_text(encoding="utf-8")) |
| target = Path(meta["path"]) |
| if target.exists(): |
| return target.read_text(encoding="utf-8") |
| return None |
|
|
| def fetch_with_cache( |
| self, |
| url: str, |
| allow_domains: list[str], |
| namespace: str = "web", |
| offline_first: bool = True, |
| ) -> dict[str, Any]: |
| key = url.replace("https://", "").replace("http://", "").replace("/", "_") |
| if offline_first: |
| cached = self.read_cached(namespace=namespace, key=key) |
| if cached is not None: |
| provenance = make_provenance(source=url, source_type="cache", transform="read_cached") |
| return {"text": cached, "provenance": provenance.__dict__, "from_cache": True} |
| text = fetch_url(url, allowed_domains=allow_domains) |
| self.cache_text(namespace=namespace, key=key, text=text) |
| provenance = make_provenance(source=url, source_type="web", transform="fetch_with_cache") |
| return {"text": text, "provenance": provenance.__dict__, "from_cache": False} |
|
|
|
|
| class DataAcquisitionAgent: |
| def __init__(self, root: Path, allow_domains: list[str]) -> None: |
| self.manager = SourceManager(root=root) |
| self.allow_domains = allow_domains |
|
|
| def acquire_local_knowledge(self) -> list[dict[str, Any]]: |
| records: list[dict[str, Any]] = [] |
| for source in self.manager.local_sources(): |
| text = source.read_text(encoding="utf-8", errors="ignore") |
| mentions = normalize_drug_entities(extract_drug_mentions(text)) |
| components = normalize_component_entities(extract_components(text)) |
| provenance = make_provenance(source=str(source), source_type="local_file", transform="parse_local").to_dict() |
| records.append( |
| { |
| "source": str(source), |
| "mentions": mentions, |
| "components": components, |
| "provenance": provenance, |
| } |
| ) |
| return records |
|
|
| def acquire_web_knowledge(self, url: str, offline_first: bool = True) -> dict[str, Any]: |
| blob = self.manager.fetch_with_cache( |
| url=url, |
| allow_domains=self.allow_domains, |
| namespace="drug_labels", |
| offline_first=offline_first, |
| ) |
| text = blob["text"] |
| mentions = normalize_drug_entities(extract_drug_mentions(text)) |
| components = normalize_component_entities(extract_components(text)) |
| return { |
| "url": url, |
| "mentions": mentions, |
| "components": components, |
| "provenance": blob["provenance"], |
| "from_cache": blob["from_cache"], |
| } |
|
|