#!/usr/bin/env python3 from __future__ import annotations import argparse import json from pathlib import Path from typing import Any from huggingface_hub import hf_hub_download, snapshot_download from osint_env.config import clone_environment_config, load_shared_config from osint_env.env.environment import OSINTEnvironment from osint_env.llm import build_llm_client from osint_env.training import load_self_play_config from osint_env.training.self_play import _run_post_training_evaluation from osint_env.viz import export_dashboard def _build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description=( "Download the latest self-play checkpoint from Hugging Face, " "generate fresh questions, compare the finetuned checkpoint " "against the base model, and export benchmark-style HTML." ) ) parser.add_argument("--repo-id", required=True, help="HF repo id, for example Siddeshwar1625/osint-checkpoints.") parser.add_argument( "--run-prefix", required=True, help="Run folder inside the HF repo, for example self_play_hf_l40s_full.", ) parser.add_argument("--repo-type", default="model", help="HF repo type. Defaults to model.") parser.add_argument("--env-config", default="config/shared_config.json", help="Shared environment config.") parser.add_argument( "--train-config", default="config/self_play_training_hf_l40s_full.json", help="Self-play training config used for question generation and compare settings.", ) parser.add_argument( "--output-dir", default="artifacts/hf_checkpoint_eval", help="Directory where evaluation JSON and HTML artifacts will be written.", ) parser.add_argument( "--download-dir", default="artifacts/hf_downloads", help="Directory used for local HF downloads and cache materialization.", ) parser.add_argument( "--dashboard-name", default="post_training_benchmark_dashboard.html", help="Filename for the finetuned benchmark-style HTML dashboard.", ) parser.add_argument( "--original-dashboard-name", default="post_training_benchmark_dashboard_original.html", help="Filename for the base-model benchmark-style HTML dashboard.", ) parser.add_argument( "--leaderboard-name", default="post_training_compare_leaderboard.json", help="Filename for the two-row leaderboard JSON used by the HTML dashboards.", ) parser.add_argument( "--base-model", default="", help="Optional base model override. Defaults to the model recorded in self_play_summary.json.", ) parser.add_argument( "--finetuned-model-subpath", default="", help=( "Optional HF path to the finetuned model directory inside the repo. " "Defaults to the final answerer model recorded in self_play_summary.json." ), ) parser.add_argument( "--env-llm-provider", default="mock", help="Provider used only for environment construction. Defaults to mock.", ) parser.add_argument( "--allow-env-llm-seeding", action="store_true", help=( "Keep graph/task LLM seeding enabled while constructing the environment. " "By default this script disables it to avoid depending on a local LLM server." ), ) parser.add_argument( "--questions", type=int, default=0, help="Optional override for post_training_eval_questions.", ) parser.add_argument( "--generated-task-max-new-tokens", type=int, default=0, help="Optional override for generated_task_max_new_tokens.", ) parser.add_argument( "--answer-max-new-tokens", type=int, default=0, help="Optional override for post_training_eval_answer_max_new_tokens.", ) return parser def _strip_artifacts_prefix(path_value: str) -> str: path = Path(str(path_value).strip()) parts = path.parts if parts and parts[0] == "artifacts": return Path(*parts[1:]).as_posix() return path.as_posix() def _resolve_finetuned_model_subpath(summary: dict[str, Any], explicit: str) -> str: if explicit.strip(): return explicit.strip().strip("/") final_models = summary.get("final_models", {}) if isinstance(summary, dict) else {} candidate = str(final_models.get("answerer") or final_models.get("generator") or "").strip() if not candidate: raise ValueError("Could not resolve final model path from self_play_summary.json.") return _strip_artifacts_prefix(candidate) def _load_summary(repo_id: str, repo_type: str, run_prefix: str, download_dir: Path) -> tuple[Path, dict[str, Any]]: local_path = Path( hf_hub_download( repo_id=repo_id, repo_type=repo_type, filename=f"{run_prefix.strip('/')}/self_play_summary.json", local_dir=str(download_dir), ) ) payload = json.loads(local_path.read_text(encoding="utf-8")) if not isinstance(payload, dict): raise ValueError("self_play_summary.json did not contain a JSON object.") return local_path, payload def _download_model_dir(repo_id: str, repo_type: str, model_subpath: str, download_dir: Path) -> Path: normalized = model_subpath.strip().strip("/") snapshot_download( repo_id=repo_id, repo_type=repo_type, allow_patterns=[f"{normalized}/*"], local_dir=str(download_dir), ) local_model_dir = download_dir / normalized if not local_model_dir.exists(): raise FileNotFoundError(f"Downloaded model folder not found: {local_model_dir}") return local_model_dir def _benchmark_like_summary(summary: dict[str, Any]) -> dict[str, float]: task_success_rate = float(summary.get("task_success_rate", 0.0)) avg_graph_f1 = float(summary.get("avg_graph_f1", 0.0)) avg_reward = float(summary.get("avg_reward", 0.0)) leaderboard_score = ( 0.28 * task_success_rate + 0.20 * avg_graph_f1 + 0.05 * avg_reward ) return { "task_success_rate": task_success_rate, "tool_efficiency": 0.0, "avg_graph_f1": avg_graph_f1, "avg_steps_to_solution": 0.0, "deanonymization_accuracy": 0.0, "avg_reward": avg_reward, "avg_knowledge_carrier_reward": 0.0, "avg_knowledge_indexing_reward": 0.0, "avg_connectivity_reward": 0.0, "avg_format_reward": 0.0, "avg_relation_informativeness_reward": 0.0, "avg_entity_informativeness_reward": 0.0, "avg_diversity_reward": 0.0, "avg_soft_shaping_reward": 0.0, "avg_connectivity_gain_reward": 0.0, "avg_compactness_reward": 0.0, "avg_spawn_count": 0.0, "spawn_completion_rate": 0.0, "avg_spawn_critical_steps": 0.0, "spawn_signal": 0.0, "retrieval_signal": 0.0, "structural_signal": 0.0, "leaderboard_score": leaderboard_score, } def _benchmark_like_evaluation( payload: dict[str, Any], model_label: str, ) -> dict[str, Any]: model_evaluations = payload.get("model_evaluations", {}) if isinstance(payload, dict) else {} model_payload = model_evaluations.get(model_label, {}) if isinstance(model_evaluations, dict) else {} summary = model_payload.get("summary", {}) if isinstance(model_payload, dict) else {} episodes = model_payload.get("episodes", []) if isinstance(model_payload, dict) else [] benchmark_episodes: list[dict[str, Any]] = [] for episode in episodes if isinstance(episodes, list) else []: if not isinstance(episode, dict): continue benchmark_episodes.append( { "task_id": str(episode.get("task_id", "")), "task_type": str(episode.get("task_type", "")), "question": str(episode.get("question", "")), "task_answer": str(episode.get("task_answer", "")), "agent_answer": str(episode.get("agent_answer", "")), "graph_f1": float(episode.get("graph_f1", 0.0)), "reward": float(episode.get("reward", 0.0)), "steps": 0, "tool_calls": 0, "success": int(episode.get("success", 0)), "reward_components": {}, "spawn_count": 0, "spawn_critical_steps": 0, "pred_edges": list(episode.get("pred_edges", [])), "truth_edges": list(episode.get("truth_edges", [])), } ) return { "summary": _benchmark_like_summary(summary if isinstance(summary, dict) else {}), "episodes": benchmark_episodes, } def _leaderboard_records(compare_payload: dict[str, Any]) -> list[dict[str, Any]]: records: list[dict[str, Any]] = [] for idx, model_label in enumerate(("finetuned_answerer", "original_answerer"), start=1): evaluation = _benchmark_like_evaluation(compare_payload, model_label) records.append( { "run_id": f"post_train_{idx:02d}", "run_name": model_label, "episodes": len(evaluation.get("episodes", [])), "config": {"source": "post_training_evaluation"}, "metrics": evaluation.get("summary", {}), } ) return records def main() -> None: args = _build_parser().parse_args() download_dir = Path(args.download_dir) output_dir = Path(args.output_dir) download_dir.mkdir(parents=True, exist_ok=True) output_dir.mkdir(parents=True, exist_ok=True) summary_path, summary = _load_summary( repo_id=args.repo_id, repo_type=args.repo_type, run_prefix=args.run_prefix, download_dir=download_dir, ) finetuned_model_subpath = _resolve_finetuned_model_subpath(summary, args.finetuned_model_subpath) finetuned_model_dir = _download_model_dir( repo_id=args.repo_id, repo_type=args.repo_type, model_subpath=finetuned_model_subpath, download_dir=download_dir, ) train_cfg = load_self_play_config(args.train_config) if args.questions > 0: train_cfg.post_training_eval_questions = int(args.questions) if args.generated_task_max_new_tokens > 0: train_cfg.generated_task_max_new_tokens = int(args.generated_task_max_new_tokens) if args.answer_max_new_tokens > 0: train_cfg.post_training_eval_answer_max_new_tokens = int(args.answer_max_new_tokens) shared_cfg = load_shared_config(args.env_config) env_cfg = clone_environment_config(shared_cfg.environment) env_cfg.llm.provider = str(args.env_llm_provider).strip() or "mock" if not args.allow_env_llm_seeding: env_cfg.seeding.llm_generate_remaining_graph = False env_cfg.seeding.llm_generate_remaining_tasks = False base_model = str(args.base_model).strip() or str( summary.get("initial_models", {}).get("answerer") or summary.get("initial_models", {}).get("generator") or train_cfg.shared_model_name_or_path ) pipeline_mode = str(summary.get("pipeline_mode") or train_cfg.pipeline_mode or "swarm_v2") compare_payload = _run_post_training_evaluation( env_config=env_cfg, training_config=train_cfg, generator_model=str(finetuned_model_dir), answerer_models={ "finetuned_answerer": str(finetuned_model_dir), "original_answerer": base_model, }, output_dir=output_dir, pipeline_mode=pipeline_mode, effective_dry_run=False, ) env = OSINTEnvironment(env_cfg, llm=build_llm_client(env_cfg.llm)) env.reset() leaderboard_records = _leaderboard_records(compare_payload) leaderboard_path = output_dir / args.leaderboard_name leaderboard_path.write_text(json.dumps(leaderboard_records, indent=2, sort_keys=True), encoding="utf-8") finetuned_eval = _benchmark_like_evaluation(compare_payload, "finetuned_answerer") original_eval = _benchmark_like_evaluation(compare_payload, "original_answerer") finetuned_dashboard_path = output_dir / args.dashboard_name original_dashboard_path = output_dir / args.original_dashboard_name export_dashboard(env=env, evaluation=finetuned_eval, leaderboard_records=leaderboard_records, output_path=str(finetuned_dashboard_path)) export_dashboard(env=env, evaluation=original_eval, leaderboard_records=leaderboard_records, output_path=str(original_dashboard_path)) context = { "repo_id": args.repo_id, "repo_type": args.repo_type, "run_prefix": args.run_prefix, "summary_path": str(summary_path), "downloaded_finetuned_model": str(finetuned_model_dir), "base_model": base_model, "pipeline_mode": pipeline_mode, "environment_llm_provider": env_cfg.llm.provider, "env_llm_seeding_enabled": bool(args.allow_env_llm_seeding), "dashboard_paths": { "finetuned": str(finetuned_dashboard_path), "original": str(original_dashboard_path), }, "leaderboard_path": str(leaderboard_path), "evaluation_path": str(compare_payload.get("path", "")), } (output_dir / "evaluation_context.json").write_text(json.dumps(context, indent=2, sort_keys=True), encoding="utf-8") print( json.dumps( { "evaluation_path": compare_payload.get("path", ""), "dashboard_path": str(finetuned_dashboard_path), "original_dashboard_path": str(original_dashboard_path), "leaderboard_path": str(leaderboard_path), "summary": compare_payload.get("summary", {}), }, indent=2, sort_keys=True, ) ) if __name__ == "__main__": main()