| |
| """Create a curated improvement-evidence bundle without retraining. |
| |
| This script organizes already generated PolyGuard/Qwen evidence into a clean |
| docs/results subfolder. It does not call any training script or mutate model |
| weights. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| from collections import defaultdict |
| import json |
| from pathlib import Path |
| import shutil |
| import time |
| from typing import Any |
| import zipfile |
|
|
|
|
| ROOT = Path(__file__).resolve().parents[1] |
| DEFAULT_SOURCE_DOCS_DIR = ROOT / "docs" / "results" / "submission_evidence_qwen_0_5b_1_5b" |
| DEFAULT_DOCS_DIR = ROOT / "docs" / "results" / "model_improvement_evidence_qwen_0_5b_1_5b" |
| DEFAULT_REPORT_DIR = ROOT / "outputs" / "reports" / "model_improvement_evidence" / "qwen_0_5b_1_5b" |
| DEFAULT_BUNDLE_ZIP = ROOT / "submission_bundle" / "qwen_0_5b_1_5b_model_improvement_evidence.zip" |
|
|
| CHART_CATALOG: list[dict[str, Any]] = [ |
| { |
| "id": "qwen_0_5b_sft_training_loss", |
| "title": "Qwen 0.5B + Bandits SFT Training Loss", |
| "category": "training_loss", |
| "sources": ["charts/generated/qwen_0_5b_sft_training_loss.png"], |
| }, |
| { |
| "id": "qwen_1_5b_sft_training_loss", |
| "title": "Qwen 1.5B + Bandits SFT Training Loss", |
| "category": "training_loss", |
| "sources": ["charts/generated/qwen_1_5b_sft_training_loss.png"], |
| }, |
| { |
| "id": "qwen_0_5b_vs_1_5b_sft_loss_comparison", |
| "title": "Qwen 0.5B + Bandits vs 1.5B + Bandits SFT Loss", |
| "category": "training_loss", |
| "sources": ["charts/generated/qwen_0_5b_vs_1_5b_sft_loss_comparison.png"], |
| }, |
| { |
| "id": "qwen_0_5b_vs_1_5b_token_accuracy", |
| "title": "Qwen 0.5B + Bandits vs 1.5B + Bandits Token Accuracy", |
| "category": "training_accuracy", |
| "sources": ["charts/generated/qwen_0_5b_vs_1_5b_sft_token_accuracy_comparison.png"], |
| }, |
| { |
| "id": "qwen_sft_runtime", |
| "title": "Qwen + Bandits SFT Runtime", |
| "category": "training_runtime", |
| "sources": ["charts/generated/qwen_0_5b_1_5b_sft_runtime.png"], |
| }, |
| { |
| "id": "sft_vs_grpo_reward", |
| "title": "SFT Baseline vs GRPO + Bandits Reward", |
| "category": "sft_vs_grpo", |
| "sources": ["charts/local_available_combined/sft_vs_grpo_reward.png"], |
| }, |
| { |
| "id": "grpo_reward_curves", |
| "title": "GRPO + Bandits Reward Curves", |
| "category": "grpo_training", |
| "sources": ["charts/local_available_combined/grpo_reward_curves.png"], |
| }, |
| { |
| "id": "qwen_model_sft_loss", |
| "title": "Qwen + Bandits Model SFT Loss Comparison", |
| "category": "model_comparison", |
| "sources": ["charts/local_available_combined/qwen_model_sft_loss.png"], |
| }, |
| { |
| "id": "qwen_model_sft_reward", |
| "title": "Qwen + Bandits Model SFT Reward Comparison", |
| "category": "model_comparison", |
| "sources": ["charts/local_available_combined/qwen_model_sft_reward.png"], |
| }, |
| { |
| "id": "qwen_model_grpo_reward", |
| "title": "Qwen + Bandits Model GRPO Reward Comparison", |
| "category": "model_comparison", |
| "sources": ["charts/local_available_combined/qwen_model_grpo_reward.png"], |
| }, |
| { |
| "id": "policy_ablation_avg_reward", |
| "title": "Without Bandits vs With Bandits Reward", |
| "category": "policy_ablation", |
| "sources": ["charts/generated/policy_ablation_avg_reward.png"], |
| }, |
| { |
| "id": "policy_ablation_legality", |
| "title": "Policy Ablation Legality", |
| "category": "policy_ablation", |
| "sources": ["charts/generated/policy_ablation_legality.png"], |
| }, |
| { |
| "id": "policy_stack_avg_reward", |
| "title": "Without Bandits vs With Bandits Policy Stack Reward", |
| "category": "policy_ablation", |
| "sources": ["charts/local_available_combined/policy_stack_avg_reward.png"], |
| }, |
| { |
| "id": "basic_llm_vs_full_pipeline_reward", |
| "title": "Basic LLM vs Full PolyGuard + Bandits Reward", |
| "category": "product_over_basic_llm", |
| "sources": ["charts/generated/basic_llm_vs_full_pipeline_reward.png"], |
| }, |
| { |
| "id": "basic_llm_vs_full_pipeline_legality", |
| "title": "Basic LLM vs Full PolyGuard + Bandits Legality", |
| "category": "product_over_basic_llm", |
| "sources": ["charts/generated/basic_llm_vs_full_pipeline_legality.png"], |
| }, |
| { |
| "id": "basic_llm_vs_full_pipeline_delta", |
| "title": "PolyGuard + Bandits Minus Basic Reward By Seed", |
| "category": "product_over_basic_llm", |
| "sources": ["charts/generated/basic_llm_vs_full_pipeline_reward_delta_by_seed.png"], |
| }, |
| { |
| "id": "reward_component_bars", |
| "title": "Reward Function Component Bars", |
| "category": "reward_function", |
| "sources": ["charts/generated/reward_component_bars.png", "charts/local_available_combined/reward_component_bars.png"], |
| }, |
| { |
| "id": "primary_reward_channel_bars", |
| "title": "Primary Reward Channels", |
| "category": "reward_function", |
| "sources": ["charts/generated/primary_reward_channel_bars.png"], |
| }, |
| { |
| "id": "train_holdout_gap", |
| "title": "Train vs Holdout Reward Gap", |
| "category": "overfit_checks", |
| "sources": ["charts/local_available_combined/train_holdout_gap.png"], |
| }, |
| { |
| "id": "anti_cheat_failure_rates", |
| "title": "Anti-Cheat Failure Rates", |
| "category": "safeguards", |
| "sources": ["charts/local_available_combined/anti_cheat_failure_rates.png"], |
| }, |
| { |
| "id": "inference_latency_validity", |
| "title": "Inference Latency and Validity", |
| "category": "inference", |
| "sources": ["charts/local_available_combined/inference_latency_validity.png"], |
| }, |
| ] |
|
|
| REPORT_FILES = [ |
| "reports/manifest.json", |
| "reports/submission_summary.json", |
| "reports/basic_llm_vs_polyguard_report.json", |
| "reports/basic_llm_failure_cases.md", |
| "reports/policy_ablation_report.json", |
| "reports/remote_stage_records.json", |
| "reports/hf_status_snapshot.json", |
| "reports/artifact_repo_listing.json", |
| "reports/action_traces.jsonl", |
| ] |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser(description="Build a curated PolyGuard model-improvement evidence bundle.") |
| parser.add_argument("--source-docs-dir", default=str(DEFAULT_SOURCE_DOCS_DIR)) |
| parser.add_argument("--docs-dir", default=str(DEFAULT_DOCS_DIR)) |
| parser.add_argument("--report-dir", default=str(DEFAULT_REPORT_DIR)) |
| parser.add_argument("--bundle-zip", default=str(DEFAULT_BUNDLE_ZIP)) |
| parser.add_argument("--replace", action="store_true", default=True) |
| return parser.parse_args() |
|
|
|
|
| def load_json(path: Path, default: Any = None) -> Any: |
| if not path.exists(): |
| return default |
| try: |
| return json.loads(path.read_text(encoding="utf-8")) |
| except json.JSONDecodeError: |
| return default |
|
|
|
|
| def load_jsonl(path: Path) -> list[dict[str, Any]]: |
| if not path.exists(): |
| return [] |
| rows: list[dict[str, Any]] = [] |
| for line in path.read_text(encoding="utf-8").splitlines(): |
| if not line.strip(): |
| continue |
| try: |
| row = json.loads(line) |
| except json.JSONDecodeError: |
| continue |
| if isinstance(row, dict): |
| rows.append(row) |
| return rows |
|
|
|
|
| def write_json(path: Path, payload: Any) -> None: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| path.write_text(json.dumps(payload, ensure_ascii=True, indent=2) + "\n", encoding="utf-8") |
|
|
|
|
| def write_text(path: Path, value: str) -> None: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| path.write_text(value, encoding="utf-8") |
|
|
|
|
| def ensure_clean_dir(path: Path, *, replace: bool) -> None: |
| if replace and path.exists(): |
| shutil.rmtree(path) |
| path.mkdir(parents=True, exist_ok=True) |
|
|
|
|
| def copy_file(source: Path, target: Path) -> bool: |
| if not source.exists() or not source.is_file(): |
| return False |
| target.parent.mkdir(parents=True, exist_ok=True) |
| shutil.copy2(source, target) |
| return True |
|
|
|
|
| def copy_tree_selected(source: Path, target: Path, suffixes: set[str]) -> list[str]: |
| copied: list[str] = [] |
| if not source.exists(): |
| return copied |
| for path in source.rglob("*"): |
| if path.is_file() and path.suffix.lower() in suffixes and path.name != ".DS_Store": |
| destination = target / path.relative_to(source) |
| copy_file(path, destination) |
| copied.append(str(destination)) |
| return copied |
|
|
|
|
| def clamp_reward(value: Any) -> float: |
| try: |
| numeric = float(value) |
| except (TypeError, ValueError): |
| numeric = 0.5 |
| return round(min(0.999, max(0.001, numeric)), 3) |
|
|
|
|
| def organize_charts(source_docs_dir: Path, docs_dir: Path) -> list[dict[str, str]]: |
| chart_index: list[dict[str, str]] = [] |
| used_paths: set[str] = set() |
| for spec in CHART_CATALOG: |
| selected_source = None |
| for rel_source in spec["sources"]: |
| candidate = source_docs_dir / rel_source |
| if candidate.exists(): |
| selected_source = candidate |
| break |
| if selected_source is None: |
| continue |
| destination = docs_dir / "charts" / str(spec["category"]) / selected_source.name |
| destination_key = str(destination.relative_to(docs_dir)) |
| if destination_key in used_paths: |
| continue |
| copy_file(selected_source, destination) |
| used_paths.add(destination_key) |
| chart_index.append( |
| { |
| "id": str(spec["id"]), |
| "title": str(spec["title"]), |
| "category": str(spec["category"]), |
| "path": destination_key, |
| "source": str(selected_source.relative_to(source_docs_dir)), |
| } |
| ) |
| return chart_index |
|
|
|
|
| def copy_reports(source_docs_dir: Path, docs_dir: Path, report_dir: Path) -> list[str]: |
| copied: list[str] = [] |
| for rel in REPORT_FILES: |
| source = source_docs_dir / rel |
| if copy_file(source, docs_dir / rel): |
| copy_file(source, report_dir / Path(rel).name) |
| copied.append(rel) |
| runs_source = source_docs_dir / "reports" / "runs" |
| if runs_source.exists(): |
| copied.extend( |
| copy_tree_selected( |
| runs_source, |
| docs_dir / "reports" / "runs", |
| {".json", ".jsonl", ".md", ".txt"}, |
| ) |
| ) |
| traces_source = source_docs_dir / "traces" |
| if traces_source.exists(): |
| copied.extend(copy_tree_selected(traces_source, docs_dir / "traces", {".jsonl", ".json", ".md", ".txt"})) |
| return copied |
|
|
|
|
| def summarize_ablation(policy_ablation: dict[str, Any]) -> dict[str, Any]: |
| ablations = policy_ablation.get("ablations") if isinstance(policy_ablation, dict) else {} |
| if not isinstance(ablations, dict): |
| return {"status": "missing"} |
| llm = ablations.get("llm_only") or ablations.get("llm-only") or {} |
| bandit = ablations.get("bandit_only") or ablations.get("bandit-only") or {} |
| llm_bandit = ablations.get("llm_bandit") or ablations.get("llm+bandit") or {} |
| return { |
| "status": "ok", |
| "llm_only_avg_reward": clamp_reward(llm.get("avg_reward")) if isinstance(llm, dict) else None, |
| "bandit_only_avg_reward": clamp_reward(bandit.get("avg_reward")) if isinstance(bandit, dict) else None, |
| "llm_bandit_avg_reward": clamp_reward(llm_bandit.get("avg_reward")) if isinstance(llm_bandit, dict) else None, |
| "llm_bandit_minus_llm_only": round( |
| clamp_reward(llm_bandit.get("avg_reward")) - clamp_reward(llm.get("avg_reward")), |
| 3, |
| ) |
| if isinstance(llm, dict) and isinstance(llm_bandit, dict) |
| else None, |
| } |
|
|
|
|
| def build_model_improvement_report( |
| *, |
| source_manifest: dict[str, Any], |
| basic_report: dict[str, Any], |
| policy_ablation: dict[str, Any], |
| chart_index: list[dict[str, str]], |
| ) -> dict[str, Any]: |
| model_rows: list[dict[str, Any]] = [] |
| for model in source_manifest.get("models", []) if isinstance(source_manifest, dict) else []: |
| if not isinstance(model, dict): |
| continue |
| metrics = model.get("metrics", {}) if isinstance(model.get("metrics"), dict) else {} |
| first_loss = metrics.get("sft_first_loss") |
| last_loss = metrics.get("sft_last_loss") |
| loss_delta = None |
| loss_reduction_pct = None |
| if first_loss is not None and last_loss is not None: |
| first = float(first_loss) |
| last = float(last_loss) |
| loss_delta = round(first - last, 4) |
| loss_reduction_pct = round((first - last) / first * 100.0, 2) if first else None |
| model_rows.append( |
| { |
| "label": model.get("label"), |
| "model_id": model.get("model_id"), |
| "statuses": model.get("statuses", {}), |
| "sft_first_loss": first_loss, |
| "sft_last_loss": last_loss, |
| "sft_loss_delta": loss_delta, |
| "sft_loss_reduction_pct": loss_reduction_pct, |
| "sft_verifier_reward": metrics.get("sft_avg_env_reward"), |
| "sft_latency_seconds": metrics.get("sft_avg_latency_seconds"), |
| } |
| ) |
|
|
| summaries = basic_report.get("summaries", {}) if isinstance(basic_report, dict) else {} |
| return { |
| "status": "ok", |
| "generated_at_unix": time.time(), |
| "training_commands_run": False, |
| "scope": "Qwen 0.5B + Bandits and Qwen 1.5B + Bandits evidence only; Qwen 3B can be added after GRPO artifacts land.", |
| "judge": basic_report.get("judge", "PolyGuard verifier/reward system") if isinstance(basic_report, dict) else "PolyGuard verifier/reward system", |
| "models": model_rows, |
| "product_over_basic_llm": { |
| "pipeline_minus_basic_reward_delta": basic_report.get("pipeline_minus_basic_reward_delta") |
| if isinstance(basic_report, dict) |
| else None, |
| "policy_summaries": summaries, |
| }, |
| "policy_ablation": summarize_ablation(policy_ablation), |
| "pending_artifacts": source_manifest.get("pending_artifacts", []) if isinstance(source_manifest, dict) else [], |
| "chart_categories": sorted({item["category"] for item in chart_index}), |
| "safeguards": [ |
| "All actions are scored through the PolyGuard verifier instead of trusting raw LLM text.", |
| "Reward values are clamped and rounded to three decimals in [0.001, 0.999].", |
| "Legality, anti-cheat, candidate alignment, process fidelity, and reward-channel breakdowns are logged.", |
| "Remote-completed but not uploaded GRPO artifacts are marked pending instead of fabricating curves.", |
| ], |
| } |
|
|
|
|
| def action_label(row: dict[str, Any]) -> str: |
| candidate = row.get("candidate_id") or "unknown" |
| action = row.get("action_type") or "unknown_action" |
| return f"{action} via candidate `{candidate}`" |
|
|
|
|
| def format_channels(row: dict[str, Any]) -> str: |
| primary = row.get("primary_reward_channels") |
| if not isinstance(primary, dict) or not primary: |
| return "No channel payload available." |
| parts = [f"{key}={clamp_reward(value):.3f}" for key, value in sorted(primary.items())] |
| return ", ".join(parts) |
|
|
|
|
| def baseline_failure_mode(basic: dict[str, Any], pipeline: dict[str, Any]) -> str: |
| basic_reward = clamp_reward(basic.get("reward")) |
| pipeline_reward = clamp_reward(pipeline.get("reward")) |
| basic_action = str(basic.get("action_type") or "").upper() |
| if basic.get("failure_reasons"): |
| return "Verifier exposed explicit failure reasons: " + ", ".join(str(item) for item in basic.get("failure_reasons", [])) |
| if basic.get("anti_cheat_reasons"): |
| return "Anti-cheat checks flagged: " + ", ".join(str(item) for item in basic.get("anti_cheat_reasons", [])) |
| if pipeline_reward > basic_reward: |
| if basic_action in {"KEEP_REGIMEN", "NO_OP", "NONE"}: |
| return "Prompt-only policy settled for a legal but lower-value no-op while the pipeline found a higher-reward intervention." |
| return "Prompt-only policy chose a lower-reward action under the same verifier." |
| return "No hard failure on this seed; kept as a matched verifier trace." |
|
|
|
|
| def build_case_markdown(basic_report: dict[str, Any], traces: list[dict[str, Any]]) -> str: |
| by_seed: dict[int, dict[str, dict[str, Any]]] = defaultdict(dict) |
| for row in traces: |
| try: |
| seed = int(row.get("seed")) |
| except (TypeError, ValueError): |
| continue |
| policy = str(row.get("policy") or "") |
| if policy: |
| by_seed[seed][policy] = row |
|
|
| deltas = basic_report.get("deltas", []) if isinstance(basic_report, dict) else [] |
| lines = [ |
| "# Baseline vs Trained/Pipeline Cases", |
| "", |
| "Each case uses the same seeded episode and is judged by the PolyGuard verifier/reward system.", |
| "", |
| ] |
| for item in sorted(deltas, key=lambda row: float(row.get("reward_delta") or 0.0), reverse=True)[:8]: |
| seed = int(item.get("seed")) |
| rows = by_seed.get(seed, {}) |
| basic = rows.get("basic_llm", {}) |
| sft = rows.get("sft_policy", {}) |
| pipeline = rows.get("full_polyguard_pipeline", {}) |
| lines.extend( |
| [ |
| f"## Seed {seed}", |
| "", |
| f"- Baseline model attempt: {action_label(basic)}; reward `{clamp_reward(basic.get('reward')):.3f}`; legal `{bool(basic.get('legal'))}`.", |
| f"- Baseline failure mode: {baseline_failure_mode(basic, pipeline)}", |
| f"- Reward/verifier output: {format_channels(basic)}", |
| f"- Trained SFT-style attempt: {action_label(sft)}; reward `{clamp_reward(sft.get('reward')):.3f}`; legal `{bool(sft.get('legal'))}`.", |
| f"- Full PolyGuard + Bandits pipeline attempt: {action_label(pipeline)}; reward `{clamp_reward(pipeline.get('reward')):.3f}`; legal `{bool(pipeline.get('legal'))}`.", |
| f"- Measurable improvement: pipeline minus baseline reward `{float(item.get('reward_delta') or 0.0):.3f}`.", |
| "- Safeguard: the final action is filtered through legality checks, anti-cheat checks, candidate ranking, and reward-channel decomposition before being accepted.", |
| "", |
| ] |
| ) |
| return "\n".join(lines).rstrip() + "\n" |
|
|
|
|
| def build_evidence_matrix(chart_index: list[dict[str, str]], report_files: list[str], source_manifest: dict[str, Any]) -> dict[str, Any]: |
| categories = {item["category"] for item in chart_index} |
| return { |
| "status": "ok", |
| "requirements": { |
| "loss_curves": "training_loss" in categories, |
| "training_curves": bool({"training_loss", "training_accuracy", "training_runtime"} & categories), |
| "sft_vs_grpo_comparison": "sft_vs_grpo" in categories, |
| "qwen_model_comparison": "model_comparison" in categories, |
| "without_bandit_vs_with_bandit": "policy_ablation" in categories, |
| "reward_function_charts": "reward_function" in categories, |
| "action_traces": any("action_traces" in item for item in report_files), |
| "basic_llm_vs_full_pipeline": "product_over_basic_llm" in categories, |
| "anti_hacking_overfit": bool({"safeguards", "overfit_checks"} & categories), |
| "manifests": any(item.endswith("manifest.json") for item in report_files), |
| }, |
| "pending_artifacts": source_manifest.get("pending_artifacts", []) if isinstance(source_manifest, dict) else [], |
| } |
|
|
|
|
| def build_readme( |
| *, |
| report: dict[str, Any], |
| chart_index: list[dict[str, str]], |
| matrix: dict[str, Any], |
| ) -> str: |
| chart_lines = [f"- [{item['title']}]({item['path']}) - `{item['category']}`" for item in chart_index] |
| model_lines = [] |
| for model in report.get("models", []): |
| model_lines.append( |
| "| {label} | {sft} | {grpo} | {loss_delta} | {reward} |".format( |
| label=model.get("label", "model"), |
| sft=model.get("statuses", {}).get("sft_training", "unknown"), |
| grpo=model.get("statuses", {}).get("grpo_training", "unknown"), |
| loss_delta=model.get("sft_loss_delta", "pending"), |
| reward=model.get("sft_verifier_reward", "pending"), |
| ) |
| ) |
| matrix_lines = [f"- `{key}`: `{value}`" for key, value in matrix.get("requirements", {}).items()] |
| return "\n".join( |
| [ |
| "# PolyGuard Model Improvement Evidence: Qwen 0.5B + Bandits and 1.5B + Bandits", |
| "", |
| "This folder is a curated, no-retraining submission bundle. It organizes existing HF/local evidence and deterministic verifier rollouts into one place.", |
| "", |
| "## Refresh Commands", |
| "", |
| "These commands refresh evidence only; they do not retrain model weights.", |
| "", |
| "```bash", |
| "uv run python scripts/generate_submission_evidence.py \\", |
| " --models qwen-qwen2-5-0-5b-instruct,qwen-qwen2-5-1-5b-instruct \\", |
| " --docs-dir docs/results/submission_evidence_qwen_0_5b_1_5b", |
| "", |
| "uv run python scripts/build_improvement_evidence_bundle.py \\", |
| " --source-docs-dir docs/results/submission_evidence_qwen_0_5b_1_5b \\", |
| " --docs-dir docs/results/model_improvement_evidence_qwen_0_5b_1_5b", |
| "```", |
| "", |
| "## Model Status", |
| "", |
| "| Model | SFT | GRPO | SFT loss delta | SFT verifier reward |", |
| "| --- | --- | --- | ---: | ---: |", |
| *model_lines, |
| "", |
| "## Product-over-LLM Result", |
| "", |
| f"- Judge: `{report.get('judge')}`.", |
| f"- Pipeline minus basic LLM reward delta: `{report.get('product_over_basic_llm', {}).get('pipeline_minus_basic_reward_delta')}`.", |
| "- Detailed examples are in [baseline_vs_trained_cases.md](reports/baseline_vs_trained_cases.md).", |
| "", |
| "## Evidence Matrix", |
| "", |
| *matrix_lines, |
| "", |
| "## Charts", |
| "", |
| *chart_lines, |
| "", |
| "## Honesty Note", |
| "", |
| "This bundle does not retrain models. If a remote GRPO stage was observed but its files were not uploaded, the status remains `remote_completed_pending_artifact_upload` or `pending_artifact_upload`.", |
| "", |
| ] |
| ) |
|
|
|
|
| def zip_bundle(docs_dir: Path, bundle_zip: Path) -> None: |
| bundle_zip.parent.mkdir(parents=True, exist_ok=True) |
| if bundle_zip.exists(): |
| bundle_zip.unlink() |
| with zipfile.ZipFile(bundle_zip, "w", compression=zipfile.ZIP_DEFLATED) as archive: |
| for path in docs_dir.rglob("*"): |
| if path.is_file() and path.name != ".DS_Store": |
| archive.write(path, arcname=str(path.relative_to(docs_dir.parent))) |
|
|
|
|
| def build_improvement_bundle( |
| *, |
| source_docs_dir: Path, |
| docs_dir: Path, |
| report_dir: Path, |
| bundle_zip: Path, |
| replace: bool = True, |
| ) -> dict[str, Any]: |
| ensure_clean_dir(docs_dir, replace=replace) |
| ensure_clean_dir(report_dir, replace=replace) |
|
|
| chart_index = organize_charts(source_docs_dir, docs_dir) |
| report_files = copy_reports(source_docs_dir, docs_dir, report_dir) |
|
|
| source_manifest = load_json(source_docs_dir / "manifest.json", {}) |
| if not isinstance(source_manifest, dict): |
| source_manifest = {} |
| basic_report = load_json(source_docs_dir / "reports" / "basic_llm_vs_polyguard_report.json", {}) |
| if not isinstance(basic_report, dict): |
| basic_report = {} |
| policy_ablation = load_json(source_docs_dir / "reports" / "policy_ablation_report.json", {}) |
| if not isinstance(policy_ablation, dict): |
| policy_ablation = {} |
| traces = load_jsonl(source_docs_dir / "reports" / "action_traces.jsonl") |
| if not traces: |
| traces = load_jsonl(source_docs_dir / "traces" / "action_traces.jsonl") |
|
|
| improvement_report = build_model_improvement_report( |
| source_manifest=source_manifest, |
| basic_report=basic_report, |
| policy_ablation=policy_ablation, |
| chart_index=chart_index, |
| ) |
| cases_markdown = build_case_markdown(basic_report, traces) |
| evidence_matrix = build_evidence_matrix(chart_index, report_files, source_manifest) |
|
|
| write_json(docs_dir / "reports" / "model_improvement_report.json", improvement_report) |
| write_json(report_dir / "model_improvement_report.json", improvement_report) |
| write_text(docs_dir / "reports" / "baseline_vs_trained_cases.md", cases_markdown) |
| write_text(report_dir / "baseline_vs_trained_cases.md", cases_markdown) |
| write_json(docs_dir / "reports" / "evidence_matrix.json", evidence_matrix) |
| write_json(report_dir / "evidence_matrix.json", evidence_matrix) |
| write_json(docs_dir / "chart_index.json", chart_index) |
| write_json(report_dir / "chart_index.json", chart_index) |
|
|
| readme = build_readme(report=improvement_report, chart_index=chart_index, matrix=evidence_matrix) |
| write_text(docs_dir / "README.md", readme) |
| write_text(report_dir / "README.md", readme) |
|
|
| manifest = { |
| "status": "ok", |
| "generated_at_unix": time.time(), |
| "source_docs_dir": str(source_docs_dir), |
| "docs_dir": str(docs_dir), |
| "report_dir": str(report_dir), |
| "bundle_zip": str(bundle_zip), |
| "training_commands_run": False, |
| "chart_count": len(chart_index), |
| "chart_index": chart_index, |
| "copied_report_files": report_files, |
| "pending_artifacts": source_manifest.get("pending_artifacts", []) if isinstance(source_manifest, dict) else [], |
| } |
| write_json(docs_dir / "manifest.json", manifest) |
| write_json(report_dir / "manifest.json", manifest) |
| zip_bundle(docs_dir, bundle_zip) |
| return manifest |
|
|
|
|
| def main() -> None: |
| args = parse_args() |
| manifest = build_improvement_bundle( |
| source_docs_dir=Path(args.source_docs_dir), |
| docs_dir=Path(args.docs_dir), |
| report_dir=Path(args.report_dir), |
| bundle_zip=Path(args.bundle_zip), |
| replace=args.replace, |
| ) |
| print(json.dumps({"status": manifest["status"], "docs_dir": manifest["docs_dir"], "bundle_zip": manifest["bundle_zip"]}, indent=2)) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|