| |
| """Acceptance checks for required files, artifacts, and submission readiness.""" |
|
|
| from __future__ import annotations |
|
|
| import json |
| import os |
| from pathlib import Path |
| import re |
|
|
| import sys |
|
|
| ROOT = Path(__file__).resolve().parents[1] |
| if str(ROOT) not in sys.path: |
| sys.path.insert(0, str(ROOT)) |
|
|
| REQUIRED_FILES = [ |
| "openenv.yaml", |
| "__init__.py", |
| "client.py", |
| "models.py", |
| "server/__init__.py", |
| "server/app.py", |
| "app/env/env_core.py", |
| "app/env/fastapi_app.py", |
| "app/env/client.py", |
| "app/agents/orchestrator.py", |
| "app/training/grpo_trl.py", |
| "app/hf_space/training_runner.py", |
| "scripts/deploy_training_space.py", |
| "scripts/pull_training_artifacts.py", |
| "scripts/generate_hf_training_report.py", |
| "scripts/train_sft_trl.py", |
| "scripts/train_grpo_trl.py", |
| "scripts/evaluate_policy_ablations.py", |
| "scripts/merge_adapters_safe.py", |
| "scripts/test_inference_postsave.py", |
| "scripts/deploy_space.sh", |
| "scripts/bootstrap_openenv.sh", |
| "docs/training.md", |
| "docs/deployment.md", |
| "docs/evaluation.md", |
| "docs/submission_checklist.md", |
| ] |
|
|
| REQUIRED_ARTIFACTS = [ |
| "data/processed/normalized_drugs.parquet", |
| "data/processed/drug_classes.parquet", |
| "data/processed/interactions.parquet", |
| "data/processed/burden_rules.yaml", |
| "data/processed/taper_rules.yaml", |
| "data/processed/substitution_rules.yaml", |
| "data/processed/retrieval_corpus.jsonl", |
| "data/processed/graph_edges.parquet", |
| "data/processed/patients_synthetic.parquet", |
| "data/processed/provenance_manifest.json", |
| "data/processed/feature_dictionary.json", |
| "data/scenarios/scenarios_easy.jsonl", |
| "data/scenarios/scenarios_medium.jsonl", |
| "data/scenarios/scenarios_hard.jsonl", |
| "outputs/reports/benchmark_report.json", |
| "outputs/reports/baselines.json", |
| ] |
|
|
| REQUIRED_SUBMISSION_ENV_VARS = [ |
| "POLYGUARD_SUBMISSION_GITHUB_URL", |
| "POLYGUARD_SUBMISSION_HF_SPACE_URL", |
| "POLYGUARD_SUBMISSION_COLAB_URL", |
| "POLYGUARD_SUBMISSION_VIDEO_OR_BLOG_URL", |
| ] |
|
|
| REQUIRED_README_MARKERS = [ |
| "Problem Statement", |
| "Environment", |
| "Capabilities", |
| "Tasks", |
| "Reward Model / Evaluation Logic", |
| "Post-Training Strategy", |
| "GitHub Repo URL", |
| "HF Space URL", |
| "Colab Notebook URL", |
| "YouTube Video URL", |
| "Hugging Face Blog URL", |
| ] |
|
|
| PLACEHOLDER_PATTERNS = [ |
| "your-username", |
| "your-hf-username", |
| "your-colab-id", |
| "your-video-id", |
| "your-polyguard-post", |
| "https://github.com/...", |
| "https://huggingface.co/spaces/...", |
| "https://colab.research.google.com/...", |
| ] |
|
|
| REAL_LINK_MARKERS = { |
| "github": "https://github.com/", |
| "hf_space": "https://huggingface.co/spaces/", |
| "colab": "https://colab.research.google.com/", |
| "youtube": "https://www.youtube.com/", |
| "hf_blog": "https://huggingface.co/blog/", |
| } |
|
|
| ACCEPTED_SFT_BACKENDS = {"trl_unsloth", "trl_transformers"} |
| ACCEPTED_GRPO_BACKENDS = {"trl_unsloth", "trl_transformers", "trl_grpo", "unsloth_grpo"} |
| REQUIRED_TRACKED_RESULT_ASSETS = [ |
| "docs/results/avg_reward.png", |
| "docs/results/policy_stack_avg_reward.png", |
| ] |
|
|
| REQUIRED_HF_SWEEP_CHARTS = [ |
| "outputs/plots/sft_vs_grpo_reward.png", |
| "outputs/plots/sft_loss_curves.png", |
| "outputs/plots/qwen_model_sft_reward.png", |
| "outputs/plots/qwen_model_sft_loss.png", |
| "outputs/plots/sft_validity_reward.png", |
| "outputs/plots/grpo_reward_curves.png", |
| "outputs/plots/qwen_model_grpo_reward.png", |
| "outputs/plots/reward_component_bars.png", |
| "outputs/plots/anti_cheat_failure_rates.png", |
| "outputs/plots/train_holdout_gap.png", |
| "outputs/plots/inference_validity_reward.png", |
| "outputs/plots/inference_latency_validity.png", |
| ] |
|
|
|
|
| URL_RE = re.compile(r"https?://[^\s)]+") |
|
|
|
|
| def _missing(root: Path, rel_paths: list[str], require_non_empty: bool = False) -> list[str]: |
| missing: list[str] = [] |
| for rel in rel_paths: |
| path = root / rel |
| if not path.exists(): |
| missing.append(rel) |
| continue |
| if require_non_empty and path.is_file() and path.stat().st_size == 0: |
| missing.append(rel) |
| return missing |
|
|
|
|
| def _readme_checks(root: Path) -> dict[str, list[str]]: |
| readme = root / "README.md" |
| if not readme.exists(): |
| return {"missing_markers": REQUIRED_README_MARKERS, "missing_links": ["README.md missing"]} |
|
|
| text = readme.read_text(encoding="utf-8") |
| missing_markers = [marker for marker in REQUIRED_README_MARKERS if marker not in text] |
| found_links = URL_RE.findall(text) |
| missing_links = [] |
| if len(found_links) < 4: |
| missing_links.append("fewer than 4 URLs found in README") |
| return { |
| "missing_markers": missing_markers, |
| "missing_links": missing_links, |
| } |
|
|
|
|
| def _read_json(root: Path, rel: str) -> dict: |
| path = root / rel |
| if not path.exists(): |
| return {} |
| try: |
| payload = json.loads(path.read_text(encoding="utf-8")) |
| except json.JSONDecodeError: |
| return {} |
| return payload if isinstance(payload, dict) else {} |
|
|
|
|
| def _readme_text(root: Path) -> str: |
| path = root / "README.md" |
| if not path.exists(): |
| return "" |
| return path.read_text(encoding="utf-8") |
|
|
|
|
| def _readme_placeholder_failures(text: str) -> list[str]: |
| if any(pattern in text for pattern in PLACEHOLDER_PATTERNS): |
| return ["README placeholder links present"] |
| return [] |
|
|
|
|
| def _readme_has_real_submission_links(text: str) -> bool: |
| required = ["github", "hf_space", "colab"] |
| story = "youtube" in REAL_LINK_MARKERS and REAL_LINK_MARKERS["youtube"] in text |
| story = story or REAL_LINK_MARKERS["hf_blog"] in text |
| return all(REAL_LINK_MARKERS[key] in text for key in required) and story |
|
|
|
|
| def _env_link_checks(strict: bool, readme_text: str = "") -> list[str]: |
| missing: list[str] = [] |
| readme_has_real_links = _readme_has_real_submission_links(readme_text) |
| for key in REQUIRED_SUBMISSION_ENV_VARS: |
| value = os.getenv(key, "").strip() |
| if strict and not readme_has_real_links: |
| if not value or not value.startswith("http"): |
| missing.append(key) |
| return missing |
|
|
|
|
| def _strict_training_checks(root: Path) -> list[str]: |
| failures: list[str] = [] |
|
|
| sft = _read_json(root, "outputs/reports/sft_trl_run.json") |
| sft_status = str(sft.get("status", "")) |
| sft_backend = str(sft.get("backend", "")) |
| sft_artifact = str(sft.get("artifact_path", "") or "") |
| sft_examples = int(sft.get("examples_used", 0) or 0) |
| if sft_status != "ok": |
| failures.append("SFT report status is not ok") |
| if sft_backend not in ACCEPTED_SFT_BACKENDS: |
| failures.append("SFT report uses fallback backend") |
| if not sft_artifact: |
| failures.append("SFT artifact path is empty or missing") |
| if sft_examples <= 0: |
| failures.append("SFT report has no training examples") |
|
|
| grpo = _read_json(root, "outputs/reports/grpo_trl_run.json") |
| grpo_status = str(grpo.get("status", "")) |
| grpo_backend = str(grpo.get("backend", "")) |
| grpo_artifact = str(grpo.get("artifact_path", "") or "") |
| if grpo_status != "ok": |
| failures.append("GRPO report status is not ok") |
| if grpo_backend not in ACCEPTED_GRPO_BACKENDS: |
| failures.append("GRPO report uses fallback backend") |
| if not grpo_artifact: |
| failures.append("GRPO artifact path is empty or missing") |
|
|
| postsave = _read_json(root, "outputs/reports/postsave_inference.json") |
| if str(postsave.get("model_source", "")) == "fallback_policy": |
| failures.append("post-save inference uses fallback policy") |
| if postsave.get("model_load_error"): |
| failures.append("post-save inference has model load error") |
|
|
| improvement = _read_json(root, "outputs/reports/improvement_report.json") |
| if improvement.get("improved") is not True: |
| failures.append("improvement report is not positive") |
|
|
| sweep = _read_json(root, "outputs/reports/hf_sweep_summary.json") |
| anti_hacking = _read_json(root, "outputs/reports/anti_hacking_overfit_report.json") |
| if not sweep: |
| failures.append("HF training sweep summary missing") |
| elif int(sweep.get("completed_models", 0) or 0) <= 0: |
| failures.append("HF training sweep has no completed models") |
| else: |
| sft_only_sweep = str(sweep.get("training_mode") or "full") == "sft-baseline" |
| for row in sweep.get("models", []): |
| if not isinstance(row, dict) or row.get("status") != "completed": |
| continue |
| label = str(row.get("label") or row.get("model_id") or "model") |
| if row.get("fallback_detected"): |
| failures.append(f"HF sweep model {label} used fallback backend") |
| if not row.get("reward_range_ok", False): |
| failures.append(f"HF sweep model {label} has reward range failures") |
| artifact_paths = row.get("artifact_paths", {}) |
| if not isinstance(artifact_paths, dict): |
| artifact_paths = {} |
| if not artifact_paths.get("sft"): |
| failures.append(f"HF sweep model {label} missing SFT artifact") |
| if not sft_only_sweep and not artifact_paths.get("grpo"): |
| failures.append(f"HF sweep model {label} missing GRPO artifact") |
| if anti_hacking.get("passed") is not True: |
| failures.append("anti-hacking/overfit report is not passing") |
|
|
| return failures |
|
|
|
|
| def _strict_asset_checks(root: Path) -> list[str]: |
| failures: list[str] = [] |
| missing_assets = _missing(root, REQUIRED_TRACKED_RESULT_ASSETS, require_non_empty=True) |
| if missing_assets: |
| failures.append("tracked result assets missing") |
|
|
| hf_verify = _read_json(root, "docs/results/hf_space_verification.json") |
| if hf_verify.get("passed") is not True: |
| failures.append("HF deployment verification missing") |
|
|
| missing_sweep_charts = _missing(root, REQUIRED_HF_SWEEP_CHARTS, require_non_empty=True) |
| if missing_sweep_charts: |
| failures.append("HF sweep charts missing") |
|
|
| return failures |
|
|
|
|
| def _strict_submission_checks(root: Path, readme_text: str) -> list[str]: |
| failures: list[str] = [] |
| failures.extend(_readme_placeholder_failures(readme_text)) |
| if not _readme_has_real_submission_links(readme_text): |
| failures.append("README real submission links missing") |
| failures.extend(_strict_training_checks(root)) |
| failures.extend(_strict_asset_checks(root)) |
| return list(dict.fromkeys(failures)) |
|
|
|
|
| def run_checks(root: Path = ROOT, strict_submission_links: bool = False) -> dict[str, object]: |
| missing_files = _missing(root, REQUIRED_FILES) |
| missing_artifacts = _missing(root, REQUIRED_ARTIFACTS, require_non_empty=True) |
| readme_check = _readme_checks(root) |
| readme_text = _readme_text(root) |
| missing_submission_env = _env_link_checks(strict=strict_submission_links, readme_text=readme_text) |
| strict_submission_failures = ( |
| _strict_submission_checks(root, readme_text=readme_text) if strict_submission_links else [] |
| ) |
|
|
| summary: dict[str, object] = { |
| "missing_files": missing_files, |
| "missing_artifacts": missing_artifacts, |
| "missing_readme_markers": readme_check["missing_markers"], |
| "missing_readme_links": readme_check["missing_links"], |
| "strict_submission_links": strict_submission_links, |
| "missing_submission_env": missing_submission_env, |
| "strict_submission_failures": strict_submission_failures, |
| "submission_ready": False, |
| "status": "ok", |
| } |
|
|
| has_failures = bool( |
| missing_files |
| or missing_artifacts |
| or readme_check["missing_markers"] |
| or readme_check["missing_links"] |
| or missing_submission_env |
| or strict_submission_failures |
| ) |
|
|
| summary["submission_ready"] = strict_submission_links and not has_failures |
| if has_failures: |
| summary["status"] = "fail" |
| return summary |
|
|
|
|
| def main() -> None: |
| root = Path(__file__).resolve().parents[1] |
| strict_submission_links = os.getenv("POLYGUARD_ENFORCE_SUBMISSION_LINKS", "false").lower() in { |
| "1", |
| "true", |
| "yes", |
| "on", |
| } |
|
|
| summary = run_checks(root=root, strict_submission_links=strict_submission_links) |
|
|
| out = root / "outputs" / "reports" / "acceptance_gate.json" |
| out.parent.mkdir(parents=True, exist_ok=True) |
| out.write_text(json.dumps(summary, ensure_ascii=True, indent=2), encoding="utf-8") |
|
|
| if summary["status"] == "fail": |
| raise SystemExit(f"acceptance_gate_failed: {summary}") |
| print("acceptance_gate_ok") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|