File size: 4,414 Bytes
877add7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""Source management for offline-first ingestion."""

from __future__ import annotations

import hashlib
import json
from pathlib import Path
from typing import Any

from app.dataops.web_agent import fetch_url
from app.dataops.parser import extract_components, extract_drug_mentions
from app.dataops.normalizer import normalize_component_entities, normalize_drug_entities
from app.dataops.provenance import make_provenance


class SourceManager:
    def __init__(self, root: Path) -> None:
        self.root = root
        self.raw = root / "data" / "raw"
        self.cache = root / "data" / "cache"
        self.cache.mkdir(parents=True, exist_ok=True)

    def local_sources(self) -> list[Path]:
        return [p for p in self.raw.rglob("*") if p.is_file()]

    @staticmethod
    def checksum_text(text: str) -> str:
        return hashlib.sha256(text.encode("utf-8")).hexdigest()

    def cache_text(self, namespace: str, key: str, text: str) -> Path:
        ns_dir = self.cache / namespace
        ns_dir.mkdir(parents=True, exist_ok=True)
        checksum = self.checksum_text(text)
        target = ns_dir / f"{key}_{checksum[:12]}.txt"
        target.write_text(text, encoding="utf-8")
        meta = {
            "key": key,
            "checksum": checksum,
            "path": str(target),
        }
        (ns_dir / f"{key}.meta.json").write_text(json.dumps(meta, ensure_ascii=True, indent=2), encoding="utf-8")
        return target

    def read_cached(self, namespace: str, key: str) -> str | None:
        meta_path = self.cache / namespace / f"{key}.meta.json"
        if not meta_path.exists():
            return None
        meta = json.loads(meta_path.read_text(encoding="utf-8"))
        target = Path(meta["path"])
        if target.exists():
            return target.read_text(encoding="utf-8")
        return None

    def fetch_with_cache(
        self,
        url: str,
        allow_domains: list[str],
        namespace: str = "web",
        offline_first: bool = True,
    ) -> dict[str, Any]:
        key = url.replace("https://", "").replace("http://", "").replace("/", "_")
        if offline_first:
            cached = self.read_cached(namespace=namespace, key=key)
            if cached is not None:
                provenance = make_provenance(source=url, source_type="cache", transform="read_cached")
                return {"text": cached, "provenance": provenance.__dict__, "from_cache": True}
        text = fetch_url(url, allowed_domains=allow_domains)
        self.cache_text(namespace=namespace, key=key, text=text)
        provenance = make_provenance(source=url, source_type="web", transform="fetch_with_cache")
        return {"text": text, "provenance": provenance.__dict__, "from_cache": False}


class DataAcquisitionAgent:
    def __init__(self, root: Path, allow_domains: list[str]) -> None:
        self.manager = SourceManager(root=root)
        self.allow_domains = allow_domains

    def acquire_local_knowledge(self) -> list[dict[str, Any]]:
        records: list[dict[str, Any]] = []
        for source in self.manager.local_sources():
            text = source.read_text(encoding="utf-8", errors="ignore")
            mentions = normalize_drug_entities(extract_drug_mentions(text))
            components = normalize_component_entities(extract_components(text))
            provenance = make_provenance(source=str(source), source_type="local_file", transform="parse_local").to_dict()
            records.append(
                {
                    "source": str(source),
                    "mentions": mentions,
                    "components": components,
                    "provenance": provenance,
                }
            )
        return records

    def acquire_web_knowledge(self, url: str, offline_first: bool = True) -> dict[str, Any]:
        blob = self.manager.fetch_with_cache(
            url=url,
            allow_domains=self.allow_domains,
            namespace="drug_labels",
            offline_first=offline_first,
        )
        text = blob["text"]
        mentions = normalize_drug_entities(extract_drug_mentions(text))
        components = normalize_component_entities(extract_components(text))
        return {
            "url": url,
            "mentions": mentions,
            "components": components,
            "provenance": blob["provenance"],
            "from_cache": blob["from_cache"],
        }