adithya9903's picture
Deploy PolyGuard HF training Space
fd0c71a verified
"""DDI API ingestion helpers with offline-first caching."""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import requests
DEFAULT_DDI_API_URL = "https://api.fda.gov/drug/label.json"
def fetch_ddi_api_records(
drugs: list[str],
timeout: int = 20,
api_url: str = DEFAULT_DDI_API_URL,
) -> list[dict[str, Any]]:
records: list[dict[str, Any]] = []
for drug in drugs:
try:
response = requests.get(
api_url,
params={"search": f"openfda.generic_name:{drug}", "limit": 1},
timeout=timeout,
)
response.raise_for_status()
payload = response.json()
records.append(
{
"drug": drug,
"source": api_url,
"status": "ok",
"payload": payload,
}
)
except Exception as exc: # noqa: BLE001
records.append(
{
"drug": drug,
"source": api_url,
"status": "error",
"error": str(exc),
}
)
return records
def load_cached_ddi(path: Path) -> list[dict[str, Any]]:
if not path.exists():
return []
try:
payload = json.loads(path.read_text(encoding="utf-8"))
if isinstance(payload, list):
return payload
return []
except Exception:
return []
def cache_ddi_records(path: Path, records: list[dict[str, Any]]) -> Path:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(records, ensure_ascii=True, indent=2), encoding="utf-8")
return path