File size: 1,771 Bytes
21c7db9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | """DDI API ingestion helpers with offline-first caching."""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import requests
DEFAULT_DDI_API_URL = "https://api.fda.gov/drug/label.json"
def fetch_ddi_api_records(
drugs: list[str],
timeout: int = 20,
api_url: str = DEFAULT_DDI_API_URL,
) -> list[dict[str, Any]]:
records: list[dict[str, Any]] = []
for drug in drugs:
try:
response = requests.get(
api_url,
params={"search": f"openfda.generic_name:{drug}", "limit": 1},
timeout=timeout,
)
response.raise_for_status()
payload = response.json()
records.append(
{
"drug": drug,
"source": api_url,
"status": "ok",
"payload": payload,
}
)
except Exception as exc: # noqa: BLE001
records.append(
{
"drug": drug,
"source": api_url,
"status": "error",
"error": str(exc),
}
)
return records
def load_cached_ddi(path: Path) -> list[dict[str, Any]]:
if not path.exists():
return []
try:
payload = json.loads(path.read_text(encoding="utf-8"))
if isinstance(payload, list):
return payload
return []
except Exception:
return []
def cache_ddi_records(path: Path, records: list[dict[str, Any]]) -> Path:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(records, ensure_ascii=True, indent=2), encoding="utf-8")
return path
|