Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """Build required processed data artifacts for POLYGUARD-OPENENV.""" | |
| from __future__ import annotations | |
| import json | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| import sys | |
| ROOT = Path(__file__).resolve().parents[1] | |
| if str(ROOT) not in sys.path: | |
| sys.path.insert(0, str(ROOT)) | |
| from typing import Any | |
| import pandas as pd | |
| import yaml | |
| from app.knowledge.ddi_knowledge import is_contraindicated_pair | |
| from app.knowledge.drug_catalog import DRUG_CLASSES | |
| from app.knowledge.substitution_rules import SUBSTITUTIONS | |
| from app.knowledge.taper_rules import requires_taper | |
| def _safe_write_json(path: Path, payload: Any) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| path.write_text(json.dumps(payload, ensure_ascii=True, indent=2), encoding="utf-8") | |
| def _write_jsonl(path: Path, rows: list[dict[str, Any]]) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| with path.open("w", encoding="utf-8") as f: | |
| for row in rows: | |
| f.write(json.dumps(row, ensure_ascii=True) + "\n") | |
| def _load_scenario_rows(scenario_dir: Path) -> list[dict[str, Any]]: | |
| rows: list[dict[str, Any]] = [] | |
| if not scenario_dir.exists(): | |
| return rows | |
| for path in sorted(scenario_dir.glob("*.json")): | |
| rows.append(json.loads(path.read_text(encoding="utf-8"))) | |
| return rows | |
| def main() -> None: | |
| root = Path(__file__).resolve().parents[1] | |
| processed_dir = root / "data" / "processed" | |
| processed_dir.mkdir(parents=True, exist_ok=True) | |
| artifacts_dir = root / "data" / "artifacts" | |
| artifacts_dir.mkdir(parents=True, exist_ok=True) | |
| drug_rows: list[dict[str, Any]] = [] | |
| class_rows: list[dict[str, Any]] = [] | |
| for idx, (drug, class_name) in enumerate(sorted(DRUG_CLASSES.items()), start=1): | |
| canonical_id = f"drug_{idx:04d}" | |
| aliases = [drug.replace("_", " "), drug.upper()] | |
| drug_rows.append( | |
| { | |
| "canonical_id": canonical_id, | |
| "canonical_name": drug, | |
| "aliases": aliases, | |
| "class_name": class_name, | |
| "source": "local_drug_catalog", | |
| } | |
| ) | |
| class_rows.append( | |
| { | |
| "canonical_id": canonical_id, | |
| "class_name": class_name, | |
| "subclass": f"{class_name}_core", | |
| "source": "local_drug_catalog", | |
| } | |
| ) | |
| interactions: list[dict[str, Any]] = [] | |
| drugs = sorted(DRUG_CLASSES) | |
| for i, drug_a in enumerate(drugs): | |
| for drug_b in drugs[i + 1 :]: | |
| if is_contraindicated_pair(drug_a, drug_b): | |
| interactions.append( | |
| { | |
| "drug_a": drug_a, | |
| "drug_b": drug_b, | |
| "severity": "high", | |
| "interaction_type": "contraindicated", | |
| "source": "ddi_rules", | |
| } | |
| ) | |
| burden_rules = { | |
| "version": "1.0", | |
| "formula": "burden = med_count/12 + high_risk_count*0.04", | |
| "high_risk_classes": ["sedative", "anticoagulant", "analgesic"], | |
| } | |
| taper_rows = [ | |
| {"drug": drug, "requires_taper": requires_taper(drug), "default_taper_days": 14 if requires_taper(drug) else 0} | |
| for drug in drugs | |
| ] | |
| taper_rules = {"rules": taper_rows, "source": "taper_rules"} | |
| substitution_rules = {"rules": SUBSTITUTIONS, "source": "substitution_rules"} | |
| retrieval_index_file = root / "data" / "retrieval_index" / "index.json" | |
| retrieval_rows: list[dict[str, Any]] = [] | |
| if retrieval_index_file.exists(): | |
| retrieval_rows = json.loads(retrieval_index_file.read_text(encoding="utf-8")) | |
| retrieval_corpus = [ | |
| { | |
| "doc_id": row.get("id"), | |
| "path": row.get("path"), | |
| "text": row.get("text"), | |
| "source": "retrieval_index", | |
| } | |
| for row in retrieval_rows | |
| ] | |
| graph_edges: list[dict[str, Any]] = [] | |
| for drug, class_name in sorted(DRUG_CLASSES.items()): | |
| graph_edges.append({"src": drug, "dst": class_name, "edge_type": "in_class", "weight": 1.0}) | |
| for row in interactions: | |
| graph_edges.append({"src": row["drug_a"], "dst": row["drug_b"], "edge_type": "contraindicated_with", "weight": 1.0}) | |
| graph_edges.append({"src": row["drug_b"], "dst": row["drug_a"], "edge_type": "contraindicated_with", "weight": 1.0}) | |
| for src, replacements in SUBSTITUTIONS.items(): | |
| for dst in replacements: | |
| graph_edges.append({"src": src, "dst": dst, "edge_type": "substitute_for", "weight": 0.8}) | |
| synthetic_file = root / "data" / "synthetic" / "synthetic_patients.json" | |
| synthetic_rows: list[dict[str, Any]] = [] | |
| if synthetic_file.exists(): | |
| synthetic_rows = json.loads(synthetic_file.read_text(encoding="utf-8")) | |
| easy_rows = _load_scenario_rows(root / "data" / "scenarios" / "easy") | |
| medium_rows = _load_scenario_rows(root / "data" / "scenarios" / "medium") | |
| hard_rows = _load_scenario_rows(root / "data" / "scenarios" / "hard") | |
| pd.DataFrame(drug_rows).to_parquet(processed_dir / "normalized_drugs.parquet", index=False) | |
| pd.DataFrame(class_rows).to_parquet(processed_dir / "drug_classes.parquet", index=False) | |
| pd.DataFrame(interactions).to_parquet(processed_dir / "interactions.parquet", index=False) | |
| pd.DataFrame(graph_edges).to_parquet(processed_dir / "graph_edges.parquet", index=False) | |
| pd.DataFrame(synthetic_rows).to_parquet(processed_dir / "patients_synthetic.parquet", index=False) | |
| (processed_dir / "burden_rules.yaml").write_text(yaml.safe_dump(burden_rules, sort_keys=False), encoding="utf-8") | |
| (processed_dir / "taper_rules.yaml").write_text(yaml.safe_dump(taper_rules, sort_keys=False), encoding="utf-8") | |
| (processed_dir / "substitution_rules.yaml").write_text(yaml.safe_dump(substitution_rules, sort_keys=False), encoding="utf-8") | |
| _write_jsonl(processed_dir / "retrieval_corpus.jsonl", retrieval_corpus) | |
| _write_jsonl(root / "data" / "scenarios" / "scenarios_easy.jsonl", easy_rows) | |
| _write_jsonl(root / "data" / "scenarios" / "scenarios_medium.jsonl", medium_rows) | |
| _write_jsonl(root / "data" / "scenarios" / "scenarios_hard.jsonl", hard_rows) | |
| feature_dictionary = { | |
| "normalized_drugs": ["canonical_id", "canonical_name", "aliases", "class_name", "source"], | |
| "drug_classes": ["canonical_id", "class_name", "subclass", "source"], | |
| "interactions": ["drug_a", "drug_b", "severity", "interaction_type", "source"], | |
| "graph_edges": ["src", "dst", "edge_type", "weight"], | |
| "patients_synthetic": [ | |
| "patient_id", | |
| "age", | |
| "sex", | |
| "comorbidities", | |
| "medications", | |
| "labs", | |
| "vitals", | |
| "specialist_conflicts", | |
| "prior_ade_history", | |
| "frailty_score", | |
| "adherence_estimate", | |
| ], | |
| } | |
| _safe_write_json(processed_dir / "feature_dictionary.json", feature_dictionary) | |
| provenance_manifest = { | |
| "generated_at": datetime.now(timezone.utc).isoformat(), | |
| "policy": { | |
| "core_sources_live_required": ["canonical_vocab", "interactions"], | |
| "secondary_sources_fallback": True, | |
| "weak_signal_labels_marked": True, | |
| }, | |
| "inputs": { | |
| "drug_catalog": "app/knowledge/drug_catalog.py", | |
| "ddi_rules": "app/knowledge/ddi_knowledge.py", | |
| "substitutions": "app/knowledge/substitution_rules.py", | |
| "taper_rules": "app/knowledge/taper_rules.py", | |
| "retrieval_index": str(retrieval_index_file), | |
| }, | |
| "counts": { | |
| "normalized_drugs": len(drug_rows), | |
| "interactions": len(interactions), | |
| "retrieval_docs": len(retrieval_corpus), | |
| "scenario_easy": len(easy_rows), | |
| "scenario_medium": len(medium_rows), | |
| "scenario_hard": len(hard_rows), | |
| "patients_synthetic": len(synthetic_rows), | |
| }, | |
| } | |
| _safe_write_json(processed_dir / "provenance_manifest.json", provenance_manifest) | |
| dataset_report = f"""# Dataset Report | |
| ## Summary | |
| - Normalized drugs: {len(drug_rows)} | |
| - Drug classes: {len(class_rows)} | |
| - Interactions: {len(interactions)} | |
| - Graph edges: {len(graph_edges)} | |
| - Synthetic patients: {len(synthetic_rows)} | |
| - Scenarios (easy/medium/hard): {len(easy_rows)}/{len(medium_rows)}/{len(hard_rows)} | |
| - Retrieval corpus documents: {len(retrieval_corpus)} | |
| ## Source Policy | |
| - Core vocabulary/interactions are treated as core sources. | |
| - Secondary sources are allowed fallback with explicit provenance. | |
| - Weak/noisy safety signals are labeled as such in provenance metadata. | |
| ## Artifacts | |
| Artifacts are stored under `data/processed`, `data/scenarios`, and `data/artifacts`. | |
| """ | |
| (root / "docs" / "dataset_report.md").write_text(dataset_report, encoding="utf-8") | |
| summary = { | |
| "status": "ok", | |
| "processed_dir": str(processed_dir), | |
| "docs_report": str(root / "docs" / "dataset_report.md"), | |
| } | |
| _safe_write_json(artifacts_dir / "bootstrap_data_summary.json", summary) | |
| print("bootstrap_data_done") | |
| if __name__ == "__main__": | |
| main() | |