"""DDI API ingestion helpers with offline-first caching.""" from __future__ import annotations import json from pathlib import Path from typing import Any import requests DEFAULT_DDI_API_URL = "https://api.fda.gov/drug/label.json" def fetch_ddi_api_records( drugs: list[str], timeout: int = 20, api_url: str = DEFAULT_DDI_API_URL, ) -> list[dict[str, Any]]: records: list[dict[str, Any]] = [] for drug in drugs: try: response = requests.get( api_url, params={"search": f"openfda.generic_name:{drug}", "limit": 1}, timeout=timeout, ) response.raise_for_status() payload = response.json() records.append( { "drug": drug, "source": api_url, "status": "ok", "payload": payload, } ) except Exception as exc: # noqa: BLE001 records.append( { "drug": drug, "source": api_url, "status": "error", "error": str(exc), } ) return records def load_cached_ddi(path: Path) -> list[dict[str, Any]]: if not path.exists(): return [] try: payload = json.loads(path.read_text(encoding="utf-8")) if isinstance(payload, list): return payload return [] except Exception: return [] def cache_ddi_records(path: Path, records: list[dict[str, Any]]) -> Path: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(records, ensure_ascii=True, indent=2), encoding="utf-8") return path