#!/usr/bin/env python3 """Create a curated improvement-evidence bundle without retraining. This script organizes already generated PolyGuard/Qwen evidence into a clean docs/results subfolder. It does not call any training script or mutate model weights. """ from __future__ import annotations import argparse from collections import defaultdict import json from pathlib import Path import shutil import time from typing import Any import zipfile ROOT = Path(__file__).resolve().parents[1] DEFAULT_SOURCE_DOCS_DIR = ROOT / "docs" / "results" / "submission_evidence_qwen_0_5b_1_5b" DEFAULT_DOCS_DIR = ROOT / "docs" / "results" / "model_improvement_evidence_qwen_0_5b_1_5b" DEFAULT_REPORT_DIR = ROOT / "outputs" / "reports" / "model_improvement_evidence" / "qwen_0_5b_1_5b" DEFAULT_BUNDLE_ZIP = ROOT / "submission_bundle" / "qwen_0_5b_1_5b_model_improvement_evidence.zip" CHART_CATALOG: list[dict[str, Any]] = [ { "id": "qwen_0_5b_sft_training_loss", "title": "Qwen 0.5B + Bandits SFT Training Loss", "category": "training_loss", "sources": ["charts/generated/qwen_0_5b_sft_training_loss.png"], }, { "id": "qwen_1_5b_sft_training_loss", "title": "Qwen 1.5B + Bandits SFT Training Loss", "category": "training_loss", "sources": ["charts/generated/qwen_1_5b_sft_training_loss.png"], }, { "id": "qwen_0_5b_vs_1_5b_sft_loss_comparison", "title": "Qwen 0.5B + Bandits vs 1.5B + Bandits SFT Loss", "category": "training_loss", "sources": ["charts/generated/qwen_0_5b_vs_1_5b_sft_loss_comparison.png"], }, { "id": "qwen_0_5b_vs_1_5b_token_accuracy", "title": "Qwen 0.5B + Bandits vs 1.5B + Bandits Token Accuracy", "category": "training_accuracy", "sources": ["charts/generated/qwen_0_5b_vs_1_5b_sft_token_accuracy_comparison.png"], }, { "id": "qwen_sft_runtime", "title": "Qwen + Bandits SFT Runtime", "category": "training_runtime", "sources": ["charts/generated/qwen_0_5b_1_5b_sft_runtime.png"], }, { "id": "sft_vs_grpo_reward", "title": "SFT Baseline vs GRPO + Bandits Reward", "category": "sft_vs_grpo", "sources": ["charts/local_available_combined/sft_vs_grpo_reward.png"], }, { "id": "grpo_reward_curves", "title": "GRPO + Bandits Reward Curves", "category": "grpo_training", "sources": ["charts/local_available_combined/grpo_reward_curves.png"], }, { "id": "qwen_model_sft_loss", "title": "Qwen + Bandits Model SFT Loss Comparison", "category": "model_comparison", "sources": ["charts/local_available_combined/qwen_model_sft_loss.png"], }, { "id": "qwen_model_sft_reward", "title": "Qwen + Bandits Model SFT Reward Comparison", "category": "model_comparison", "sources": ["charts/local_available_combined/qwen_model_sft_reward.png"], }, { "id": "qwen_model_grpo_reward", "title": "Qwen + Bandits Model GRPO Reward Comparison", "category": "model_comparison", "sources": ["charts/local_available_combined/qwen_model_grpo_reward.png"], }, { "id": "policy_ablation_avg_reward", "title": "Without Bandits vs With Bandits Reward", "category": "policy_ablation", "sources": ["charts/generated/policy_ablation_avg_reward.png"], }, { "id": "policy_ablation_legality", "title": "Policy Ablation Legality", "category": "policy_ablation", "sources": ["charts/generated/policy_ablation_legality.png"], }, { "id": "policy_stack_avg_reward", "title": "Without Bandits vs With Bandits Policy Stack Reward", "category": "policy_ablation", "sources": ["charts/local_available_combined/policy_stack_avg_reward.png"], }, { "id": "basic_llm_vs_full_pipeline_reward", "title": "Basic LLM vs Full PolyGuard + Bandits Reward", "category": "product_over_basic_llm", "sources": ["charts/generated/basic_llm_vs_full_pipeline_reward.png"], }, { "id": "basic_llm_vs_full_pipeline_legality", "title": "Basic LLM vs Full PolyGuard + Bandits Legality", "category": "product_over_basic_llm", "sources": ["charts/generated/basic_llm_vs_full_pipeline_legality.png"], }, { "id": "basic_llm_vs_full_pipeline_delta", "title": "PolyGuard + Bandits Minus Basic Reward By Seed", "category": "product_over_basic_llm", "sources": ["charts/generated/basic_llm_vs_full_pipeline_reward_delta_by_seed.png"], }, { "id": "reward_component_bars", "title": "Reward Function Component Bars", "category": "reward_function", "sources": ["charts/generated/reward_component_bars.png", "charts/local_available_combined/reward_component_bars.png"], }, { "id": "primary_reward_channel_bars", "title": "Primary Reward Channels", "category": "reward_function", "sources": ["charts/generated/primary_reward_channel_bars.png"], }, { "id": "train_holdout_gap", "title": "Train vs Holdout Reward Gap", "category": "overfit_checks", "sources": ["charts/local_available_combined/train_holdout_gap.png"], }, { "id": "anti_cheat_failure_rates", "title": "Anti-Cheat Failure Rates", "category": "safeguards", "sources": ["charts/local_available_combined/anti_cheat_failure_rates.png"], }, { "id": "inference_latency_validity", "title": "Inference Latency and Validity", "category": "inference", "sources": ["charts/local_available_combined/inference_latency_validity.png"], }, ] REPORT_FILES = [ "reports/manifest.json", "reports/submission_summary.json", "reports/basic_llm_vs_polyguard_report.json", "reports/basic_llm_failure_cases.md", "reports/policy_ablation_report.json", "reports/remote_stage_records.json", "reports/hf_status_snapshot.json", "reports/artifact_repo_listing.json", "reports/action_traces.jsonl", ] def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Build a curated PolyGuard model-improvement evidence bundle.") parser.add_argument("--source-docs-dir", default=str(DEFAULT_SOURCE_DOCS_DIR)) parser.add_argument("--docs-dir", default=str(DEFAULT_DOCS_DIR)) parser.add_argument("--report-dir", default=str(DEFAULT_REPORT_DIR)) parser.add_argument("--bundle-zip", default=str(DEFAULT_BUNDLE_ZIP)) parser.add_argument("--replace", action="store_true", default=True) return parser.parse_args() def load_json(path: Path, default: Any = None) -> Any: if not path.exists(): return default try: return json.loads(path.read_text(encoding="utf-8")) except json.JSONDecodeError: return default def load_jsonl(path: Path) -> list[dict[str, Any]]: if not path.exists(): return [] rows: list[dict[str, Any]] = [] for line in path.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue try: row = json.loads(line) except json.JSONDecodeError: continue if isinstance(row, dict): rows.append(row) return rows def write_json(path: Path, payload: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, ensure_ascii=True, indent=2) + "\n", encoding="utf-8") def write_text(path: Path, value: str) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(value, encoding="utf-8") def ensure_clean_dir(path: Path, *, replace: bool) -> None: if replace and path.exists(): shutil.rmtree(path) path.mkdir(parents=True, exist_ok=True) def copy_file(source: Path, target: Path) -> bool: if not source.exists() or not source.is_file(): return False target.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(source, target) return True def copy_tree_selected(source: Path, target: Path, suffixes: set[str]) -> list[str]: copied: list[str] = [] if not source.exists(): return copied for path in source.rglob("*"): if path.is_file() and path.suffix.lower() in suffixes and path.name != ".DS_Store": destination = target / path.relative_to(source) copy_file(path, destination) copied.append(str(destination)) return copied def clamp_reward(value: Any) -> float: try: numeric = float(value) except (TypeError, ValueError): numeric = 0.5 return round(min(0.999, max(0.001, numeric)), 3) def organize_charts(source_docs_dir: Path, docs_dir: Path) -> list[dict[str, str]]: chart_index: list[dict[str, str]] = [] used_paths: set[str] = set() for spec in CHART_CATALOG: selected_source = None for rel_source in spec["sources"]: candidate = source_docs_dir / rel_source if candidate.exists(): selected_source = candidate break if selected_source is None: continue destination = docs_dir / "charts" / str(spec["category"]) / selected_source.name destination_key = str(destination.relative_to(docs_dir)) if destination_key in used_paths: continue copy_file(selected_source, destination) used_paths.add(destination_key) chart_index.append( { "id": str(spec["id"]), "title": str(spec["title"]), "category": str(spec["category"]), "path": destination_key, "source": str(selected_source.relative_to(source_docs_dir)), } ) return chart_index def copy_reports(source_docs_dir: Path, docs_dir: Path, report_dir: Path) -> list[str]: copied: list[str] = [] for rel in REPORT_FILES: source = source_docs_dir / rel if copy_file(source, docs_dir / rel): copy_file(source, report_dir / Path(rel).name) copied.append(rel) runs_source = source_docs_dir / "reports" / "runs" if runs_source.exists(): copied.extend( copy_tree_selected( runs_source, docs_dir / "reports" / "runs", {".json", ".jsonl", ".md", ".txt"}, ) ) traces_source = source_docs_dir / "traces" if traces_source.exists(): copied.extend(copy_tree_selected(traces_source, docs_dir / "traces", {".jsonl", ".json", ".md", ".txt"})) return copied def summarize_ablation(policy_ablation: dict[str, Any]) -> dict[str, Any]: ablations = policy_ablation.get("ablations") if isinstance(policy_ablation, dict) else {} if not isinstance(ablations, dict): return {"status": "missing"} llm = ablations.get("llm_only") or ablations.get("llm-only") or {} bandit = ablations.get("bandit_only") or ablations.get("bandit-only") or {} llm_bandit = ablations.get("llm_bandit") or ablations.get("llm+bandit") or {} return { "status": "ok", "llm_only_avg_reward": clamp_reward(llm.get("avg_reward")) if isinstance(llm, dict) else None, "bandit_only_avg_reward": clamp_reward(bandit.get("avg_reward")) if isinstance(bandit, dict) else None, "llm_bandit_avg_reward": clamp_reward(llm_bandit.get("avg_reward")) if isinstance(llm_bandit, dict) else None, "llm_bandit_minus_llm_only": round( clamp_reward(llm_bandit.get("avg_reward")) - clamp_reward(llm.get("avg_reward")), 3, ) if isinstance(llm, dict) and isinstance(llm_bandit, dict) else None, } def build_model_improvement_report( *, source_manifest: dict[str, Any], basic_report: dict[str, Any], policy_ablation: dict[str, Any], chart_index: list[dict[str, str]], ) -> dict[str, Any]: model_rows: list[dict[str, Any]] = [] for model in source_manifest.get("models", []) if isinstance(source_manifest, dict) else []: if not isinstance(model, dict): continue metrics = model.get("metrics", {}) if isinstance(model.get("metrics"), dict) else {} first_loss = metrics.get("sft_first_loss") last_loss = metrics.get("sft_last_loss") loss_delta = None loss_reduction_pct = None if first_loss is not None and last_loss is not None: first = float(first_loss) last = float(last_loss) loss_delta = round(first - last, 4) loss_reduction_pct = round((first - last) / first * 100.0, 2) if first else None model_rows.append( { "label": model.get("label"), "model_id": model.get("model_id"), "statuses": model.get("statuses", {}), "sft_first_loss": first_loss, "sft_last_loss": last_loss, "sft_loss_delta": loss_delta, "sft_loss_reduction_pct": loss_reduction_pct, "sft_verifier_reward": metrics.get("sft_avg_env_reward"), "sft_latency_seconds": metrics.get("sft_avg_latency_seconds"), } ) summaries = basic_report.get("summaries", {}) if isinstance(basic_report, dict) else {} return { "status": "ok", "generated_at_unix": time.time(), "training_commands_run": False, "scope": "Qwen 0.5B + Bandits and Qwen 1.5B + Bandits evidence only; Qwen 3B can be added after GRPO artifacts land.", "judge": basic_report.get("judge", "PolyGuard verifier/reward system") if isinstance(basic_report, dict) else "PolyGuard verifier/reward system", "models": model_rows, "product_over_basic_llm": { "pipeline_minus_basic_reward_delta": basic_report.get("pipeline_minus_basic_reward_delta") if isinstance(basic_report, dict) else None, "policy_summaries": summaries, }, "policy_ablation": summarize_ablation(policy_ablation), "pending_artifacts": source_manifest.get("pending_artifacts", []) if isinstance(source_manifest, dict) else [], "chart_categories": sorted({item["category"] for item in chart_index}), "safeguards": [ "All actions are scored through the PolyGuard verifier instead of trusting raw LLM text.", "Reward values are clamped and rounded to three decimals in [0.001, 0.999].", "Legality, anti-cheat, candidate alignment, process fidelity, and reward-channel breakdowns are logged.", "Remote-completed but not uploaded GRPO artifacts are marked pending instead of fabricating curves.", ], } def action_label(row: dict[str, Any]) -> str: candidate = row.get("candidate_id") or "unknown" action = row.get("action_type") or "unknown_action" return f"{action} via candidate `{candidate}`" def format_channels(row: dict[str, Any]) -> str: primary = row.get("primary_reward_channels") if not isinstance(primary, dict) or not primary: return "No channel payload available." parts = [f"{key}={clamp_reward(value):.3f}" for key, value in sorted(primary.items())] return ", ".join(parts) def baseline_failure_mode(basic: dict[str, Any], pipeline: dict[str, Any]) -> str: basic_reward = clamp_reward(basic.get("reward")) pipeline_reward = clamp_reward(pipeline.get("reward")) basic_action = str(basic.get("action_type") or "").upper() if basic.get("failure_reasons"): return "Verifier exposed explicit failure reasons: " + ", ".join(str(item) for item in basic.get("failure_reasons", [])) if basic.get("anti_cheat_reasons"): return "Anti-cheat checks flagged: " + ", ".join(str(item) for item in basic.get("anti_cheat_reasons", [])) if pipeline_reward > basic_reward: if basic_action in {"KEEP_REGIMEN", "NO_OP", "NONE"}: return "Prompt-only policy settled for a legal but lower-value no-op while the pipeline found a higher-reward intervention." return "Prompt-only policy chose a lower-reward action under the same verifier." return "No hard failure on this seed; kept as a matched verifier trace." def build_case_markdown(basic_report: dict[str, Any], traces: list[dict[str, Any]]) -> str: by_seed: dict[int, dict[str, dict[str, Any]]] = defaultdict(dict) for row in traces: try: seed = int(row.get("seed")) except (TypeError, ValueError): continue policy = str(row.get("policy") or "") if policy: by_seed[seed][policy] = row deltas = basic_report.get("deltas", []) if isinstance(basic_report, dict) else [] lines = [ "# Baseline vs Trained/Pipeline Cases", "", "Each case uses the same seeded episode and is judged by the PolyGuard verifier/reward system.", "", ] for item in sorted(deltas, key=lambda row: float(row.get("reward_delta") or 0.0), reverse=True)[:8]: seed = int(item.get("seed")) rows = by_seed.get(seed, {}) basic = rows.get("basic_llm", {}) sft = rows.get("sft_policy", {}) pipeline = rows.get("full_polyguard_pipeline", {}) lines.extend( [ f"## Seed {seed}", "", f"- Baseline model attempt: {action_label(basic)}; reward `{clamp_reward(basic.get('reward')):.3f}`; legal `{bool(basic.get('legal'))}`.", f"- Baseline failure mode: {baseline_failure_mode(basic, pipeline)}", f"- Reward/verifier output: {format_channels(basic)}", f"- Trained SFT-style attempt: {action_label(sft)}; reward `{clamp_reward(sft.get('reward')):.3f}`; legal `{bool(sft.get('legal'))}`.", f"- Full PolyGuard + Bandits pipeline attempt: {action_label(pipeline)}; reward `{clamp_reward(pipeline.get('reward')):.3f}`; legal `{bool(pipeline.get('legal'))}`.", f"- Measurable improvement: pipeline minus baseline reward `{float(item.get('reward_delta') or 0.0):.3f}`.", "- Safeguard: the final action is filtered through legality checks, anti-cheat checks, candidate ranking, and reward-channel decomposition before being accepted.", "", ] ) return "\n".join(lines).rstrip() + "\n" def build_evidence_matrix(chart_index: list[dict[str, str]], report_files: list[str], source_manifest: dict[str, Any]) -> dict[str, Any]: categories = {item["category"] for item in chart_index} return { "status": "ok", "requirements": { "loss_curves": "training_loss" in categories, "training_curves": bool({"training_loss", "training_accuracy", "training_runtime"} & categories), "sft_vs_grpo_comparison": "sft_vs_grpo" in categories, "qwen_model_comparison": "model_comparison" in categories, "without_bandit_vs_with_bandit": "policy_ablation" in categories, "reward_function_charts": "reward_function" in categories, "action_traces": any("action_traces" in item for item in report_files), "basic_llm_vs_full_pipeline": "product_over_basic_llm" in categories, "anti_hacking_overfit": bool({"safeguards", "overfit_checks"} & categories), "manifests": any(item.endswith("manifest.json") for item in report_files), }, "pending_artifacts": source_manifest.get("pending_artifacts", []) if isinstance(source_manifest, dict) else [], } def build_readme( *, report: dict[str, Any], chart_index: list[dict[str, str]], matrix: dict[str, Any], ) -> str: chart_lines = [f"- [{item['title']}]({item['path']}) - `{item['category']}`" for item in chart_index] model_lines = [] for model in report.get("models", []): model_lines.append( "| {label} | {sft} | {grpo} | {loss_delta} | {reward} |".format( label=model.get("label", "model"), sft=model.get("statuses", {}).get("sft_training", "unknown"), grpo=model.get("statuses", {}).get("grpo_training", "unknown"), loss_delta=model.get("sft_loss_delta", "pending"), reward=model.get("sft_verifier_reward", "pending"), ) ) matrix_lines = [f"- `{key}`: `{value}`" for key, value in matrix.get("requirements", {}).items()] return "\n".join( [ "# PolyGuard Model Improvement Evidence: Qwen 0.5B + Bandits and 1.5B + Bandits", "", "This folder is a curated, no-retraining submission bundle. It organizes existing HF/local evidence and deterministic verifier rollouts into one place.", "", "## Refresh Commands", "", "These commands refresh evidence only; they do not retrain model weights.", "", "```bash", "uv run python scripts/generate_submission_evidence.py \\", " --models qwen-qwen2-5-0-5b-instruct,qwen-qwen2-5-1-5b-instruct \\", " --docs-dir docs/results/submission_evidence_qwen_0_5b_1_5b", "", "uv run python scripts/build_improvement_evidence_bundle.py \\", " --source-docs-dir docs/results/submission_evidence_qwen_0_5b_1_5b \\", " --docs-dir docs/results/model_improvement_evidence_qwen_0_5b_1_5b", "```", "", "## Model Status", "", "| Model | SFT | GRPO | SFT loss delta | SFT verifier reward |", "| --- | --- | --- | ---: | ---: |", *model_lines, "", "## Product-over-LLM Result", "", f"- Judge: `{report.get('judge')}`.", f"- Pipeline minus basic LLM reward delta: `{report.get('product_over_basic_llm', {}).get('pipeline_minus_basic_reward_delta')}`.", "- Detailed examples are in [baseline_vs_trained_cases.md](reports/baseline_vs_trained_cases.md).", "", "## Evidence Matrix", "", *matrix_lines, "", "## Charts", "", *chart_lines, "", "## Honesty Note", "", "This bundle does not retrain models. If a remote GRPO stage was observed but its files were not uploaded, the status remains `remote_completed_pending_artifact_upload` or `pending_artifact_upload`.", "", ] ) def zip_bundle(docs_dir: Path, bundle_zip: Path) -> None: bundle_zip.parent.mkdir(parents=True, exist_ok=True) if bundle_zip.exists(): bundle_zip.unlink() with zipfile.ZipFile(bundle_zip, "w", compression=zipfile.ZIP_DEFLATED) as archive: for path in docs_dir.rglob("*"): if path.is_file() and path.name != ".DS_Store": archive.write(path, arcname=str(path.relative_to(docs_dir.parent))) def build_improvement_bundle( *, source_docs_dir: Path, docs_dir: Path, report_dir: Path, bundle_zip: Path, replace: bool = True, ) -> dict[str, Any]: ensure_clean_dir(docs_dir, replace=replace) ensure_clean_dir(report_dir, replace=replace) chart_index = organize_charts(source_docs_dir, docs_dir) report_files = copy_reports(source_docs_dir, docs_dir, report_dir) source_manifest = load_json(source_docs_dir / "manifest.json", {}) if not isinstance(source_manifest, dict): source_manifest = {} basic_report = load_json(source_docs_dir / "reports" / "basic_llm_vs_polyguard_report.json", {}) if not isinstance(basic_report, dict): basic_report = {} policy_ablation = load_json(source_docs_dir / "reports" / "policy_ablation_report.json", {}) if not isinstance(policy_ablation, dict): policy_ablation = {} traces = load_jsonl(source_docs_dir / "reports" / "action_traces.jsonl") if not traces: traces = load_jsonl(source_docs_dir / "traces" / "action_traces.jsonl") improvement_report = build_model_improvement_report( source_manifest=source_manifest, basic_report=basic_report, policy_ablation=policy_ablation, chart_index=chart_index, ) cases_markdown = build_case_markdown(basic_report, traces) evidence_matrix = build_evidence_matrix(chart_index, report_files, source_manifest) write_json(docs_dir / "reports" / "model_improvement_report.json", improvement_report) write_json(report_dir / "model_improvement_report.json", improvement_report) write_text(docs_dir / "reports" / "baseline_vs_trained_cases.md", cases_markdown) write_text(report_dir / "baseline_vs_trained_cases.md", cases_markdown) write_json(docs_dir / "reports" / "evidence_matrix.json", evidence_matrix) write_json(report_dir / "evidence_matrix.json", evidence_matrix) write_json(docs_dir / "chart_index.json", chart_index) write_json(report_dir / "chart_index.json", chart_index) readme = build_readme(report=improvement_report, chart_index=chart_index, matrix=evidence_matrix) write_text(docs_dir / "README.md", readme) write_text(report_dir / "README.md", readme) manifest = { "status": "ok", "generated_at_unix": time.time(), "source_docs_dir": str(source_docs_dir), "docs_dir": str(docs_dir), "report_dir": str(report_dir), "bundle_zip": str(bundle_zip), "training_commands_run": False, "chart_count": len(chart_index), "chart_index": chart_index, "copied_report_files": report_files, "pending_artifacts": source_manifest.get("pending_artifacts", []) if isinstance(source_manifest, dict) else [], } write_json(docs_dir / "manifest.json", manifest) write_json(report_dir / "manifest.json", manifest) zip_bundle(docs_dir, bundle_zip) return manifest def main() -> None: args = parse_args() manifest = build_improvement_bundle( source_docs_dir=Path(args.source_docs_dir), docs_dir=Path(args.docs_dir), report_dir=Path(args.report_dir), bundle_zip=Path(args.bundle_zip), replace=args.replace, ) print(json.dumps({"status": manifest["status"], "docs_dir": manifest["docs_dir"], "bundle_zip": manifest["bundle_zip"]}, indent=2)) if __name__ == "__main__": main()