Spaces:
Running
Running
File size: 4,414 Bytes
877add7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 | """Source management for offline-first ingestion."""
from __future__ import annotations
import hashlib
import json
from pathlib import Path
from typing import Any
from app.dataops.web_agent import fetch_url
from app.dataops.parser import extract_components, extract_drug_mentions
from app.dataops.normalizer import normalize_component_entities, normalize_drug_entities
from app.dataops.provenance import make_provenance
class SourceManager:
def __init__(self, root: Path) -> None:
self.root = root
self.raw = root / "data" / "raw"
self.cache = root / "data" / "cache"
self.cache.mkdir(parents=True, exist_ok=True)
def local_sources(self) -> list[Path]:
return [p for p in self.raw.rglob("*") if p.is_file()]
@staticmethod
def checksum_text(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
def cache_text(self, namespace: str, key: str, text: str) -> Path:
ns_dir = self.cache / namespace
ns_dir.mkdir(parents=True, exist_ok=True)
checksum = self.checksum_text(text)
target = ns_dir / f"{key}_{checksum[:12]}.txt"
target.write_text(text, encoding="utf-8")
meta = {
"key": key,
"checksum": checksum,
"path": str(target),
}
(ns_dir / f"{key}.meta.json").write_text(json.dumps(meta, ensure_ascii=True, indent=2), encoding="utf-8")
return target
def read_cached(self, namespace: str, key: str) -> str | None:
meta_path = self.cache / namespace / f"{key}.meta.json"
if not meta_path.exists():
return None
meta = json.loads(meta_path.read_text(encoding="utf-8"))
target = Path(meta["path"])
if target.exists():
return target.read_text(encoding="utf-8")
return None
def fetch_with_cache(
self,
url: str,
allow_domains: list[str],
namespace: str = "web",
offline_first: bool = True,
) -> dict[str, Any]:
key = url.replace("https://", "").replace("http://", "").replace("/", "_")
if offline_first:
cached = self.read_cached(namespace=namespace, key=key)
if cached is not None:
provenance = make_provenance(source=url, source_type="cache", transform="read_cached")
return {"text": cached, "provenance": provenance.__dict__, "from_cache": True}
text = fetch_url(url, allowed_domains=allow_domains)
self.cache_text(namespace=namespace, key=key, text=text)
provenance = make_provenance(source=url, source_type="web", transform="fetch_with_cache")
return {"text": text, "provenance": provenance.__dict__, "from_cache": False}
class DataAcquisitionAgent:
def __init__(self, root: Path, allow_domains: list[str]) -> None:
self.manager = SourceManager(root=root)
self.allow_domains = allow_domains
def acquire_local_knowledge(self) -> list[dict[str, Any]]:
records: list[dict[str, Any]] = []
for source in self.manager.local_sources():
text = source.read_text(encoding="utf-8", errors="ignore")
mentions = normalize_drug_entities(extract_drug_mentions(text))
components = normalize_component_entities(extract_components(text))
provenance = make_provenance(source=str(source), source_type="local_file", transform="parse_local").to_dict()
records.append(
{
"source": str(source),
"mentions": mentions,
"components": components,
"provenance": provenance,
}
)
return records
def acquire_web_knowledge(self, url: str, offline_first: bool = True) -> dict[str, Any]:
blob = self.manager.fetch_with_cache(
url=url,
allow_domains=self.allow_domains,
namespace="drug_labels",
offline_first=offline_first,
)
text = blob["text"]
mentions = normalize_drug_entities(extract_drug_mentions(text))
components = normalize_component_entities(extract_components(text))
return {
"url": url,
"mentions": mentions,
"components": components,
"provenance": blob["provenance"],
"from_cache": blob["from_cache"],
}
|