| """DDI API ingestion helpers with offline-first caching.""" |
|
|
| from __future__ import annotations |
|
|
| import json |
| from pathlib import Path |
| from typing import Any |
|
|
| import requests |
|
|
|
|
| DEFAULT_DDI_API_URL = "https://api.fda.gov/drug/label.json" |
|
|
|
|
| def fetch_ddi_api_records( |
| drugs: list[str], |
| timeout: int = 20, |
| api_url: str = DEFAULT_DDI_API_URL, |
| ) -> list[dict[str, Any]]: |
| records: list[dict[str, Any]] = [] |
| for drug in drugs: |
| try: |
| response = requests.get( |
| api_url, |
| params={"search": f"openfda.generic_name:{drug}", "limit": 1}, |
| timeout=timeout, |
| ) |
| response.raise_for_status() |
| payload = response.json() |
| records.append( |
| { |
| "drug": drug, |
| "source": api_url, |
| "status": "ok", |
| "payload": payload, |
| } |
| ) |
| except Exception as exc: |
| records.append( |
| { |
| "drug": drug, |
| "source": api_url, |
| "status": "error", |
| "error": str(exc), |
| } |
| ) |
| return records |
|
|
|
|
| def load_cached_ddi(path: Path) -> list[dict[str, Any]]: |
| if not path.exists(): |
| return [] |
| try: |
| payload = json.loads(path.read_text(encoding="utf-8")) |
| if isinstance(payload, list): |
| return payload |
| return [] |
| except Exception: |
| return [] |
|
|
|
|
| def cache_ddi_records(path: Path, records: list[dict[str, Any]]) -> Path: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| path.write_text(json.dumps(records, ensure_ascii=True, indent=2), encoding="utf-8") |
| return path |
|
|