polyguard-openenv-workbench / polyguard-rl /scripts /build_improvement_evidence_bundle.py
TheJackBright's picture
Deploy GitHub root master to Space
c296d62
#!/usr/bin/env python3
"""Create a curated improvement-evidence bundle without retraining.
This script organizes already generated PolyGuard/Qwen evidence into a clean
docs/results subfolder. It does not call any training script or mutate model
weights.
"""
from __future__ import annotations
import argparse
from collections import defaultdict
import json
from pathlib import Path
import shutil
import time
from typing import Any
import zipfile
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_SOURCE_DOCS_DIR = ROOT / "docs" / "results" / "submission_evidence_qwen_0_5b_1_5b"
DEFAULT_DOCS_DIR = ROOT / "docs" / "results" / "model_improvement_evidence_qwen_0_5b_1_5b"
DEFAULT_REPORT_DIR = ROOT / "outputs" / "reports" / "model_improvement_evidence" / "qwen_0_5b_1_5b"
DEFAULT_BUNDLE_ZIP = ROOT / "submission_bundle" / "qwen_0_5b_1_5b_model_improvement_evidence.zip"
CHART_CATALOG: list[dict[str, Any]] = [
{
"id": "qwen_0_5b_sft_training_loss",
"title": "Qwen 0.5B + Bandits SFT Training Loss",
"category": "training_loss",
"sources": ["charts/generated/qwen_0_5b_sft_training_loss.png"],
},
{
"id": "qwen_1_5b_sft_training_loss",
"title": "Qwen 1.5B + Bandits SFT Training Loss",
"category": "training_loss",
"sources": ["charts/generated/qwen_1_5b_sft_training_loss.png"],
},
{
"id": "qwen_0_5b_vs_1_5b_sft_loss_comparison",
"title": "Qwen 0.5B + Bandits vs 1.5B + Bandits SFT Loss",
"category": "training_loss",
"sources": ["charts/generated/qwen_0_5b_vs_1_5b_sft_loss_comparison.png"],
},
{
"id": "qwen_0_5b_vs_1_5b_token_accuracy",
"title": "Qwen 0.5B + Bandits vs 1.5B + Bandits Token Accuracy",
"category": "training_accuracy",
"sources": ["charts/generated/qwen_0_5b_vs_1_5b_sft_token_accuracy_comparison.png"],
},
{
"id": "qwen_sft_runtime",
"title": "Qwen + Bandits SFT Runtime",
"category": "training_runtime",
"sources": ["charts/generated/qwen_0_5b_1_5b_sft_runtime.png"],
},
{
"id": "sft_vs_grpo_reward",
"title": "SFT Baseline vs GRPO + Bandits Reward",
"category": "sft_vs_grpo",
"sources": ["charts/local_available_combined/sft_vs_grpo_reward.png"],
},
{
"id": "grpo_reward_curves",
"title": "GRPO + Bandits Reward Curves",
"category": "grpo_training",
"sources": ["charts/local_available_combined/grpo_reward_curves.png"],
},
{
"id": "qwen_model_sft_loss",
"title": "Qwen + Bandits Model SFT Loss Comparison",
"category": "model_comparison",
"sources": ["charts/local_available_combined/qwen_model_sft_loss.png"],
},
{
"id": "qwen_model_sft_reward",
"title": "Qwen + Bandits Model SFT Reward Comparison",
"category": "model_comparison",
"sources": ["charts/local_available_combined/qwen_model_sft_reward.png"],
},
{
"id": "qwen_model_grpo_reward",
"title": "Qwen + Bandits Model GRPO Reward Comparison",
"category": "model_comparison",
"sources": ["charts/local_available_combined/qwen_model_grpo_reward.png"],
},
{
"id": "policy_ablation_avg_reward",
"title": "Without Bandits vs With Bandits Reward",
"category": "policy_ablation",
"sources": ["charts/generated/policy_ablation_avg_reward.png"],
},
{
"id": "policy_ablation_legality",
"title": "Policy Ablation Legality",
"category": "policy_ablation",
"sources": ["charts/generated/policy_ablation_legality.png"],
},
{
"id": "policy_stack_avg_reward",
"title": "Without Bandits vs With Bandits Policy Stack Reward",
"category": "policy_ablation",
"sources": ["charts/local_available_combined/policy_stack_avg_reward.png"],
},
{
"id": "basic_llm_vs_full_pipeline_reward",
"title": "Basic LLM vs Full PolyGuard + Bandits Reward",
"category": "product_over_basic_llm",
"sources": ["charts/generated/basic_llm_vs_full_pipeline_reward.png"],
},
{
"id": "basic_llm_vs_full_pipeline_legality",
"title": "Basic LLM vs Full PolyGuard + Bandits Legality",
"category": "product_over_basic_llm",
"sources": ["charts/generated/basic_llm_vs_full_pipeline_legality.png"],
},
{
"id": "basic_llm_vs_full_pipeline_delta",
"title": "PolyGuard + Bandits Minus Basic Reward By Seed",
"category": "product_over_basic_llm",
"sources": ["charts/generated/basic_llm_vs_full_pipeline_reward_delta_by_seed.png"],
},
{
"id": "reward_component_bars",
"title": "Reward Function Component Bars",
"category": "reward_function",
"sources": ["charts/generated/reward_component_bars.png", "charts/local_available_combined/reward_component_bars.png"],
},
{
"id": "primary_reward_channel_bars",
"title": "Primary Reward Channels",
"category": "reward_function",
"sources": ["charts/generated/primary_reward_channel_bars.png"],
},
{
"id": "train_holdout_gap",
"title": "Train vs Holdout Reward Gap",
"category": "overfit_checks",
"sources": ["charts/local_available_combined/train_holdout_gap.png"],
},
{
"id": "anti_cheat_failure_rates",
"title": "Anti-Cheat Failure Rates",
"category": "safeguards",
"sources": ["charts/local_available_combined/anti_cheat_failure_rates.png"],
},
{
"id": "inference_latency_validity",
"title": "Inference Latency and Validity",
"category": "inference",
"sources": ["charts/local_available_combined/inference_latency_validity.png"],
},
]
REPORT_FILES = [
"reports/manifest.json",
"reports/submission_summary.json",
"reports/basic_llm_vs_polyguard_report.json",
"reports/basic_llm_failure_cases.md",
"reports/policy_ablation_report.json",
"reports/remote_stage_records.json",
"reports/hf_status_snapshot.json",
"reports/artifact_repo_listing.json",
"reports/action_traces.jsonl",
]
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Build a curated PolyGuard model-improvement evidence bundle.")
parser.add_argument("--source-docs-dir", default=str(DEFAULT_SOURCE_DOCS_DIR))
parser.add_argument("--docs-dir", default=str(DEFAULT_DOCS_DIR))
parser.add_argument("--report-dir", default=str(DEFAULT_REPORT_DIR))
parser.add_argument("--bundle-zip", default=str(DEFAULT_BUNDLE_ZIP))
parser.add_argument("--replace", action="store_true", default=True)
return parser.parse_args()
def load_json(path: Path, default: Any = None) -> Any:
if not path.exists():
return default
try:
return json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return default
def load_jsonl(path: Path) -> list[dict[str, Any]]:
if not path.exists():
return []
rows: list[dict[str, Any]] = []
for line in path.read_text(encoding="utf-8").splitlines():
if not line.strip():
continue
try:
row = json.loads(line)
except json.JSONDecodeError:
continue
if isinstance(row, dict):
rows.append(row)
return rows
def write_json(path: Path, payload: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, ensure_ascii=True, indent=2) + "\n", encoding="utf-8")
def write_text(path: Path, value: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(value, encoding="utf-8")
def ensure_clean_dir(path: Path, *, replace: bool) -> None:
if replace and path.exists():
shutil.rmtree(path)
path.mkdir(parents=True, exist_ok=True)
def copy_file(source: Path, target: Path) -> bool:
if not source.exists() or not source.is_file():
return False
target.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source, target)
return True
def copy_tree_selected(source: Path, target: Path, suffixes: set[str]) -> list[str]:
copied: list[str] = []
if not source.exists():
return copied
for path in source.rglob("*"):
if path.is_file() and path.suffix.lower() in suffixes and path.name != ".DS_Store":
destination = target / path.relative_to(source)
copy_file(path, destination)
copied.append(str(destination))
return copied
def clamp_reward(value: Any) -> float:
try:
numeric = float(value)
except (TypeError, ValueError):
numeric = 0.5
return round(min(0.999, max(0.001, numeric)), 3)
def organize_charts(source_docs_dir: Path, docs_dir: Path) -> list[dict[str, str]]:
chart_index: list[dict[str, str]] = []
used_paths: set[str] = set()
for spec in CHART_CATALOG:
selected_source = None
for rel_source in spec["sources"]:
candidate = source_docs_dir / rel_source
if candidate.exists():
selected_source = candidate
break
if selected_source is None:
continue
destination = docs_dir / "charts" / str(spec["category"]) / selected_source.name
destination_key = str(destination.relative_to(docs_dir))
if destination_key in used_paths:
continue
copy_file(selected_source, destination)
used_paths.add(destination_key)
chart_index.append(
{
"id": str(spec["id"]),
"title": str(spec["title"]),
"category": str(spec["category"]),
"path": destination_key,
"source": str(selected_source.relative_to(source_docs_dir)),
}
)
return chart_index
def copy_reports(source_docs_dir: Path, docs_dir: Path, report_dir: Path) -> list[str]:
copied: list[str] = []
for rel in REPORT_FILES:
source = source_docs_dir / rel
if copy_file(source, docs_dir / rel):
copy_file(source, report_dir / Path(rel).name)
copied.append(rel)
runs_source = source_docs_dir / "reports" / "runs"
if runs_source.exists():
copied.extend(
copy_tree_selected(
runs_source,
docs_dir / "reports" / "runs",
{".json", ".jsonl", ".md", ".txt"},
)
)
traces_source = source_docs_dir / "traces"
if traces_source.exists():
copied.extend(copy_tree_selected(traces_source, docs_dir / "traces", {".jsonl", ".json", ".md", ".txt"}))
return copied
def summarize_ablation(policy_ablation: dict[str, Any]) -> dict[str, Any]:
ablations = policy_ablation.get("ablations") if isinstance(policy_ablation, dict) else {}
if not isinstance(ablations, dict):
return {"status": "missing"}
llm = ablations.get("llm_only") or ablations.get("llm-only") or {}
bandit = ablations.get("bandit_only") or ablations.get("bandit-only") or {}
llm_bandit = ablations.get("llm_bandit") or ablations.get("llm+bandit") or {}
return {
"status": "ok",
"llm_only_avg_reward": clamp_reward(llm.get("avg_reward")) if isinstance(llm, dict) else None,
"bandit_only_avg_reward": clamp_reward(bandit.get("avg_reward")) if isinstance(bandit, dict) else None,
"llm_bandit_avg_reward": clamp_reward(llm_bandit.get("avg_reward")) if isinstance(llm_bandit, dict) else None,
"llm_bandit_minus_llm_only": round(
clamp_reward(llm_bandit.get("avg_reward")) - clamp_reward(llm.get("avg_reward")),
3,
)
if isinstance(llm, dict) and isinstance(llm_bandit, dict)
else None,
}
def build_model_improvement_report(
*,
source_manifest: dict[str, Any],
basic_report: dict[str, Any],
policy_ablation: dict[str, Any],
chart_index: list[dict[str, str]],
) -> dict[str, Any]:
model_rows: list[dict[str, Any]] = []
for model in source_manifest.get("models", []) if isinstance(source_manifest, dict) else []:
if not isinstance(model, dict):
continue
metrics = model.get("metrics", {}) if isinstance(model.get("metrics"), dict) else {}
first_loss = metrics.get("sft_first_loss")
last_loss = metrics.get("sft_last_loss")
loss_delta = None
loss_reduction_pct = None
if first_loss is not None and last_loss is not None:
first = float(first_loss)
last = float(last_loss)
loss_delta = round(first - last, 4)
loss_reduction_pct = round((first - last) / first * 100.0, 2) if first else None
model_rows.append(
{
"label": model.get("label"),
"model_id": model.get("model_id"),
"statuses": model.get("statuses", {}),
"sft_first_loss": first_loss,
"sft_last_loss": last_loss,
"sft_loss_delta": loss_delta,
"sft_loss_reduction_pct": loss_reduction_pct,
"sft_verifier_reward": metrics.get("sft_avg_env_reward"),
"sft_latency_seconds": metrics.get("sft_avg_latency_seconds"),
}
)
summaries = basic_report.get("summaries", {}) if isinstance(basic_report, dict) else {}
return {
"status": "ok",
"generated_at_unix": time.time(),
"training_commands_run": False,
"scope": "Qwen 0.5B + Bandits and Qwen 1.5B + Bandits evidence only; Qwen 3B can be added after GRPO artifacts land.",
"judge": basic_report.get("judge", "PolyGuard verifier/reward system") if isinstance(basic_report, dict) else "PolyGuard verifier/reward system",
"models": model_rows,
"product_over_basic_llm": {
"pipeline_minus_basic_reward_delta": basic_report.get("pipeline_minus_basic_reward_delta")
if isinstance(basic_report, dict)
else None,
"policy_summaries": summaries,
},
"policy_ablation": summarize_ablation(policy_ablation),
"pending_artifacts": source_manifest.get("pending_artifacts", []) if isinstance(source_manifest, dict) else [],
"chart_categories": sorted({item["category"] for item in chart_index}),
"safeguards": [
"All actions are scored through the PolyGuard verifier instead of trusting raw LLM text.",
"Reward values are clamped and rounded to three decimals in [0.001, 0.999].",
"Legality, anti-cheat, candidate alignment, process fidelity, and reward-channel breakdowns are logged.",
"Remote-completed but not uploaded GRPO artifacts are marked pending instead of fabricating curves.",
],
}
def action_label(row: dict[str, Any]) -> str:
candidate = row.get("candidate_id") or "unknown"
action = row.get("action_type") or "unknown_action"
return f"{action} via candidate `{candidate}`"
def format_channels(row: dict[str, Any]) -> str:
primary = row.get("primary_reward_channels")
if not isinstance(primary, dict) or not primary:
return "No channel payload available."
parts = [f"{key}={clamp_reward(value):.3f}" for key, value in sorted(primary.items())]
return ", ".join(parts)
def baseline_failure_mode(basic: dict[str, Any], pipeline: dict[str, Any]) -> str:
basic_reward = clamp_reward(basic.get("reward"))
pipeline_reward = clamp_reward(pipeline.get("reward"))
basic_action = str(basic.get("action_type") or "").upper()
if basic.get("failure_reasons"):
return "Verifier exposed explicit failure reasons: " + ", ".join(str(item) for item in basic.get("failure_reasons", []))
if basic.get("anti_cheat_reasons"):
return "Anti-cheat checks flagged: " + ", ".join(str(item) for item in basic.get("anti_cheat_reasons", []))
if pipeline_reward > basic_reward:
if basic_action in {"KEEP_REGIMEN", "NO_OP", "NONE"}:
return "Prompt-only policy settled for a legal but lower-value no-op while the pipeline found a higher-reward intervention."
return "Prompt-only policy chose a lower-reward action under the same verifier."
return "No hard failure on this seed; kept as a matched verifier trace."
def build_case_markdown(basic_report: dict[str, Any], traces: list[dict[str, Any]]) -> str:
by_seed: dict[int, dict[str, dict[str, Any]]] = defaultdict(dict)
for row in traces:
try:
seed = int(row.get("seed"))
except (TypeError, ValueError):
continue
policy = str(row.get("policy") or "")
if policy:
by_seed[seed][policy] = row
deltas = basic_report.get("deltas", []) if isinstance(basic_report, dict) else []
lines = [
"# Baseline vs Trained/Pipeline Cases",
"",
"Each case uses the same seeded episode and is judged by the PolyGuard verifier/reward system.",
"",
]
for item in sorted(deltas, key=lambda row: float(row.get("reward_delta") or 0.0), reverse=True)[:8]:
seed = int(item.get("seed"))
rows = by_seed.get(seed, {})
basic = rows.get("basic_llm", {})
sft = rows.get("sft_policy", {})
pipeline = rows.get("full_polyguard_pipeline", {})
lines.extend(
[
f"## Seed {seed}",
"",
f"- Baseline model attempt: {action_label(basic)}; reward `{clamp_reward(basic.get('reward')):.3f}`; legal `{bool(basic.get('legal'))}`.",
f"- Baseline failure mode: {baseline_failure_mode(basic, pipeline)}",
f"- Reward/verifier output: {format_channels(basic)}",
f"- Trained SFT-style attempt: {action_label(sft)}; reward `{clamp_reward(sft.get('reward')):.3f}`; legal `{bool(sft.get('legal'))}`.",
f"- Full PolyGuard + Bandits pipeline attempt: {action_label(pipeline)}; reward `{clamp_reward(pipeline.get('reward')):.3f}`; legal `{bool(pipeline.get('legal'))}`.",
f"- Measurable improvement: pipeline minus baseline reward `{float(item.get('reward_delta') or 0.0):.3f}`.",
"- Safeguard: the final action is filtered through legality checks, anti-cheat checks, candidate ranking, and reward-channel decomposition before being accepted.",
"",
]
)
return "\n".join(lines).rstrip() + "\n"
def build_evidence_matrix(chart_index: list[dict[str, str]], report_files: list[str], source_manifest: dict[str, Any]) -> dict[str, Any]:
categories = {item["category"] for item in chart_index}
return {
"status": "ok",
"requirements": {
"loss_curves": "training_loss" in categories,
"training_curves": bool({"training_loss", "training_accuracy", "training_runtime"} & categories),
"sft_vs_grpo_comparison": "sft_vs_grpo" in categories,
"qwen_model_comparison": "model_comparison" in categories,
"without_bandit_vs_with_bandit": "policy_ablation" in categories,
"reward_function_charts": "reward_function" in categories,
"action_traces": any("action_traces" in item for item in report_files),
"basic_llm_vs_full_pipeline": "product_over_basic_llm" in categories,
"anti_hacking_overfit": bool({"safeguards", "overfit_checks"} & categories),
"manifests": any(item.endswith("manifest.json") for item in report_files),
},
"pending_artifacts": source_manifest.get("pending_artifacts", []) if isinstance(source_manifest, dict) else [],
}
def build_readme(
*,
report: dict[str, Any],
chart_index: list[dict[str, str]],
matrix: dict[str, Any],
) -> str:
chart_lines = [f"- [{item['title']}]({item['path']}) - `{item['category']}`" for item in chart_index]
model_lines = []
for model in report.get("models", []):
model_lines.append(
"| {label} | {sft} | {grpo} | {loss_delta} | {reward} |".format(
label=model.get("label", "model"),
sft=model.get("statuses", {}).get("sft_training", "unknown"),
grpo=model.get("statuses", {}).get("grpo_training", "unknown"),
loss_delta=model.get("sft_loss_delta", "pending"),
reward=model.get("sft_verifier_reward", "pending"),
)
)
matrix_lines = [f"- `{key}`: `{value}`" for key, value in matrix.get("requirements", {}).items()]
return "\n".join(
[
"# PolyGuard Model Improvement Evidence: Qwen 0.5B + Bandits and 1.5B + Bandits",
"",
"This folder is a curated, no-retraining submission bundle. It organizes existing HF/local evidence and deterministic verifier rollouts into one place.",
"",
"## Refresh Commands",
"",
"These commands refresh evidence only; they do not retrain model weights.",
"",
"```bash",
"uv run python scripts/generate_submission_evidence.py \\",
" --models qwen-qwen2-5-0-5b-instruct,qwen-qwen2-5-1-5b-instruct \\",
" --docs-dir docs/results/submission_evidence_qwen_0_5b_1_5b",
"",
"uv run python scripts/build_improvement_evidence_bundle.py \\",
" --source-docs-dir docs/results/submission_evidence_qwen_0_5b_1_5b \\",
" --docs-dir docs/results/model_improvement_evidence_qwen_0_5b_1_5b",
"```",
"",
"## Model Status",
"",
"| Model | SFT | GRPO | SFT loss delta | SFT verifier reward |",
"| --- | --- | --- | ---: | ---: |",
*model_lines,
"",
"## Product-over-LLM Result",
"",
f"- Judge: `{report.get('judge')}`.",
f"- Pipeline minus basic LLM reward delta: `{report.get('product_over_basic_llm', {}).get('pipeline_minus_basic_reward_delta')}`.",
"- Detailed examples are in [baseline_vs_trained_cases.md](reports/baseline_vs_trained_cases.md).",
"",
"## Evidence Matrix",
"",
*matrix_lines,
"",
"## Charts",
"",
*chart_lines,
"",
"## Honesty Note",
"",
"This bundle does not retrain models. If a remote GRPO stage was observed but its files were not uploaded, the status remains `remote_completed_pending_artifact_upload` or `pending_artifact_upload`.",
"",
]
)
def zip_bundle(docs_dir: Path, bundle_zip: Path) -> None:
bundle_zip.parent.mkdir(parents=True, exist_ok=True)
if bundle_zip.exists():
bundle_zip.unlink()
with zipfile.ZipFile(bundle_zip, "w", compression=zipfile.ZIP_DEFLATED) as archive:
for path in docs_dir.rglob("*"):
if path.is_file() and path.name != ".DS_Store":
archive.write(path, arcname=str(path.relative_to(docs_dir.parent)))
def build_improvement_bundle(
*,
source_docs_dir: Path,
docs_dir: Path,
report_dir: Path,
bundle_zip: Path,
replace: bool = True,
) -> dict[str, Any]:
ensure_clean_dir(docs_dir, replace=replace)
ensure_clean_dir(report_dir, replace=replace)
chart_index = organize_charts(source_docs_dir, docs_dir)
report_files = copy_reports(source_docs_dir, docs_dir, report_dir)
source_manifest = load_json(source_docs_dir / "manifest.json", {})
if not isinstance(source_manifest, dict):
source_manifest = {}
basic_report = load_json(source_docs_dir / "reports" / "basic_llm_vs_polyguard_report.json", {})
if not isinstance(basic_report, dict):
basic_report = {}
policy_ablation = load_json(source_docs_dir / "reports" / "policy_ablation_report.json", {})
if not isinstance(policy_ablation, dict):
policy_ablation = {}
traces = load_jsonl(source_docs_dir / "reports" / "action_traces.jsonl")
if not traces:
traces = load_jsonl(source_docs_dir / "traces" / "action_traces.jsonl")
improvement_report = build_model_improvement_report(
source_manifest=source_manifest,
basic_report=basic_report,
policy_ablation=policy_ablation,
chart_index=chart_index,
)
cases_markdown = build_case_markdown(basic_report, traces)
evidence_matrix = build_evidence_matrix(chart_index, report_files, source_manifest)
write_json(docs_dir / "reports" / "model_improvement_report.json", improvement_report)
write_json(report_dir / "model_improvement_report.json", improvement_report)
write_text(docs_dir / "reports" / "baseline_vs_trained_cases.md", cases_markdown)
write_text(report_dir / "baseline_vs_trained_cases.md", cases_markdown)
write_json(docs_dir / "reports" / "evidence_matrix.json", evidence_matrix)
write_json(report_dir / "evidence_matrix.json", evidence_matrix)
write_json(docs_dir / "chart_index.json", chart_index)
write_json(report_dir / "chart_index.json", chart_index)
readme = build_readme(report=improvement_report, chart_index=chart_index, matrix=evidence_matrix)
write_text(docs_dir / "README.md", readme)
write_text(report_dir / "README.md", readme)
manifest = {
"status": "ok",
"generated_at_unix": time.time(),
"source_docs_dir": str(source_docs_dir),
"docs_dir": str(docs_dir),
"report_dir": str(report_dir),
"bundle_zip": str(bundle_zip),
"training_commands_run": False,
"chart_count": len(chart_index),
"chart_index": chart_index,
"copied_report_files": report_files,
"pending_artifacts": source_manifest.get("pending_artifacts", []) if isinstance(source_manifest, dict) else [],
}
write_json(docs_dir / "manifest.json", manifest)
write_json(report_dir / "manifest.json", manifest)
zip_bundle(docs_dir, bundle_zip)
return manifest
def main() -> None:
args = parse_args()
manifest = build_improvement_bundle(
source_docs_dir=Path(args.source_docs_dir),
docs_dir=Path(args.docs_dir),
report_dir=Path(args.report_dir),
bundle_zip=Path(args.bundle_zip),
replace=args.replace,
)
print(json.dumps({"status": manifest["status"], "docs_dir": manifest["docs_dir"], "bundle_zip": manifest["bundle_zip"]}, indent=2))
if __name__ == "__main__":
main()