Spaces:
Running
Running
| """DDI API ingestion helpers with offline-first caching.""" | |
| from __future__ import annotations | |
| import json | |
| from pathlib import Path | |
| from typing import Any | |
| import requests | |
| DEFAULT_DDI_API_URL = "https://api.fda.gov/drug/label.json" | |
| def fetch_ddi_api_records( | |
| drugs: list[str], | |
| timeout: int = 20, | |
| api_url: str = DEFAULT_DDI_API_URL, | |
| ) -> list[dict[str, Any]]: | |
| records: list[dict[str, Any]] = [] | |
| for drug in drugs: | |
| try: | |
| response = requests.get( | |
| api_url, | |
| params={"search": f"openfda.generic_name:{drug}", "limit": 1}, | |
| timeout=timeout, | |
| ) | |
| response.raise_for_status() | |
| payload = response.json() | |
| records.append( | |
| { | |
| "drug": drug, | |
| "source": api_url, | |
| "status": "ok", | |
| "payload": payload, | |
| } | |
| ) | |
| except Exception as exc: # noqa: BLE001 | |
| records.append( | |
| { | |
| "drug": drug, | |
| "source": api_url, | |
| "status": "error", | |
| "error": str(exc), | |
| } | |
| ) | |
| return records | |
| def load_cached_ddi(path: Path) -> list[dict[str, Any]]: | |
| if not path.exists(): | |
| return [] | |
| try: | |
| payload = json.loads(path.read_text(encoding="utf-8")) | |
| if isinstance(payload, list): | |
| return payload | |
| return [] | |
| except Exception: | |
| return [] | |
| def cache_ddi_records(path: Path, records: list[dict[str, Any]]) -> Path: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| path.write_text(json.dumps(records, ensure_ascii=True, indent=2), encoding="utf-8") | |
| return path | |