OSINT / scripts /evaluate_hf_checkpoint.py
ritishshrirao's picture
Add evaluation, minor updates to HF space
8ad6382
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
from huggingface_hub import hf_hub_download, snapshot_download
from osint_env.config import clone_environment_config, load_shared_config
from osint_env.env.environment import OSINTEnvironment
from osint_env.llm import build_llm_client
from osint_env.training import load_self_play_config
from osint_env.training.self_play import _run_post_training_evaluation
from osint_env.viz import export_dashboard
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description=(
"Download the latest self-play checkpoint from Hugging Face, "
"generate fresh questions, compare the finetuned checkpoint "
"against the base model, and export benchmark-style HTML."
)
)
parser.add_argument("--repo-id", required=True, help="HF repo id, for example Siddeshwar1625/osint-checkpoints.")
parser.add_argument(
"--run-prefix",
required=True,
help="Run folder inside the HF repo, for example self_play_hf_l40s_full.",
)
parser.add_argument("--repo-type", default="model", help="HF repo type. Defaults to model.")
parser.add_argument("--env-config", default="config/shared_config.json", help="Shared environment config.")
parser.add_argument(
"--train-config",
default="config/self_play_training_hf_l40s_full.json",
help="Self-play training config used for question generation and compare settings.",
)
parser.add_argument(
"--output-dir",
default="artifacts/hf_checkpoint_eval",
help="Directory where evaluation JSON and HTML artifacts will be written.",
)
parser.add_argument(
"--download-dir",
default="artifacts/hf_downloads",
help="Directory used for local HF downloads and cache materialization.",
)
parser.add_argument(
"--dashboard-name",
default="post_training_benchmark_dashboard.html",
help="Filename for the finetuned benchmark-style HTML dashboard.",
)
parser.add_argument(
"--original-dashboard-name",
default="post_training_benchmark_dashboard_original.html",
help="Filename for the base-model benchmark-style HTML dashboard.",
)
parser.add_argument(
"--leaderboard-name",
default="post_training_compare_leaderboard.json",
help="Filename for the two-row leaderboard JSON used by the HTML dashboards.",
)
parser.add_argument(
"--base-model",
default="",
help="Optional base model override. Defaults to the model recorded in self_play_summary.json.",
)
parser.add_argument(
"--finetuned-model-subpath",
default="",
help=(
"Optional HF path to the finetuned model directory inside the repo. "
"Defaults to the final answerer model recorded in self_play_summary.json."
),
)
parser.add_argument(
"--env-llm-provider",
default="mock",
help="Provider used only for environment construction. Defaults to mock.",
)
parser.add_argument(
"--allow-env-llm-seeding",
action="store_true",
help=(
"Keep graph/task LLM seeding enabled while constructing the environment. "
"By default this script disables it to avoid depending on a local LLM server."
),
)
parser.add_argument(
"--questions",
type=int,
default=0,
help="Optional override for post_training_eval_questions.",
)
parser.add_argument(
"--generated-task-max-new-tokens",
type=int,
default=0,
help="Optional override for generated_task_max_new_tokens.",
)
parser.add_argument(
"--answer-max-new-tokens",
type=int,
default=0,
help="Optional override for post_training_eval_answer_max_new_tokens.",
)
return parser
def _strip_artifacts_prefix(path_value: str) -> str:
path = Path(str(path_value).strip())
parts = path.parts
if parts and parts[0] == "artifacts":
return Path(*parts[1:]).as_posix()
return path.as_posix()
def _resolve_finetuned_model_subpath(summary: dict[str, Any], explicit: str) -> str:
if explicit.strip():
return explicit.strip().strip("/")
final_models = summary.get("final_models", {}) if isinstance(summary, dict) else {}
candidate = str(final_models.get("answerer") or final_models.get("generator") or "").strip()
if not candidate:
raise ValueError("Could not resolve final model path from self_play_summary.json.")
return _strip_artifacts_prefix(candidate)
def _load_summary(repo_id: str, repo_type: str, run_prefix: str, download_dir: Path) -> tuple[Path, dict[str, Any]]:
local_path = Path(
hf_hub_download(
repo_id=repo_id,
repo_type=repo_type,
filename=f"{run_prefix.strip('/')}/self_play_summary.json",
local_dir=str(download_dir),
)
)
payload = json.loads(local_path.read_text(encoding="utf-8"))
if not isinstance(payload, dict):
raise ValueError("self_play_summary.json did not contain a JSON object.")
return local_path, payload
def _download_model_dir(repo_id: str, repo_type: str, model_subpath: str, download_dir: Path) -> Path:
normalized = model_subpath.strip().strip("/")
snapshot_download(
repo_id=repo_id,
repo_type=repo_type,
allow_patterns=[f"{normalized}/*"],
local_dir=str(download_dir),
)
local_model_dir = download_dir / normalized
if not local_model_dir.exists():
raise FileNotFoundError(f"Downloaded model folder not found: {local_model_dir}")
return local_model_dir
def _benchmark_like_summary(summary: dict[str, Any]) -> dict[str, float]:
task_success_rate = float(summary.get("task_success_rate", 0.0))
avg_graph_f1 = float(summary.get("avg_graph_f1", 0.0))
avg_reward = float(summary.get("avg_reward", 0.0))
leaderboard_score = (
0.28 * task_success_rate
+ 0.20 * avg_graph_f1
+ 0.05 * avg_reward
)
return {
"task_success_rate": task_success_rate,
"tool_efficiency": 0.0,
"avg_graph_f1": avg_graph_f1,
"avg_steps_to_solution": 0.0,
"deanonymization_accuracy": 0.0,
"avg_reward": avg_reward,
"avg_knowledge_carrier_reward": 0.0,
"avg_knowledge_indexing_reward": 0.0,
"avg_connectivity_reward": 0.0,
"avg_format_reward": 0.0,
"avg_relation_informativeness_reward": 0.0,
"avg_entity_informativeness_reward": 0.0,
"avg_diversity_reward": 0.0,
"avg_soft_shaping_reward": 0.0,
"avg_connectivity_gain_reward": 0.0,
"avg_compactness_reward": 0.0,
"avg_spawn_count": 0.0,
"spawn_completion_rate": 0.0,
"avg_spawn_critical_steps": 0.0,
"spawn_signal": 0.0,
"retrieval_signal": 0.0,
"structural_signal": 0.0,
"leaderboard_score": leaderboard_score,
}
def _benchmark_like_evaluation(
payload: dict[str, Any],
model_label: str,
) -> dict[str, Any]:
model_evaluations = payload.get("model_evaluations", {}) if isinstance(payload, dict) else {}
model_payload = model_evaluations.get(model_label, {}) if isinstance(model_evaluations, dict) else {}
summary = model_payload.get("summary", {}) if isinstance(model_payload, dict) else {}
episodes = model_payload.get("episodes", []) if isinstance(model_payload, dict) else []
benchmark_episodes: list[dict[str, Any]] = []
for episode in episodes if isinstance(episodes, list) else []:
if not isinstance(episode, dict):
continue
benchmark_episodes.append(
{
"task_id": str(episode.get("task_id", "")),
"task_type": str(episode.get("task_type", "")),
"question": str(episode.get("question", "")),
"task_answer": str(episode.get("task_answer", "")),
"agent_answer": str(episode.get("agent_answer", "")),
"graph_f1": float(episode.get("graph_f1", 0.0)),
"reward": float(episode.get("reward", 0.0)),
"steps": 0,
"tool_calls": 0,
"success": int(episode.get("success", 0)),
"reward_components": {},
"spawn_count": 0,
"spawn_critical_steps": 0,
"pred_edges": list(episode.get("pred_edges", [])),
"truth_edges": list(episode.get("truth_edges", [])),
}
)
return {
"summary": _benchmark_like_summary(summary if isinstance(summary, dict) else {}),
"episodes": benchmark_episodes,
}
def _leaderboard_records(compare_payload: dict[str, Any]) -> list[dict[str, Any]]:
records: list[dict[str, Any]] = []
for idx, model_label in enumerate(("finetuned_answerer", "original_answerer"), start=1):
evaluation = _benchmark_like_evaluation(compare_payload, model_label)
records.append(
{
"run_id": f"post_train_{idx:02d}",
"run_name": model_label,
"episodes": len(evaluation.get("episodes", [])),
"config": {"source": "post_training_evaluation"},
"metrics": evaluation.get("summary", {}),
}
)
return records
def main() -> None:
args = _build_parser().parse_args()
download_dir = Path(args.download_dir)
output_dir = Path(args.output_dir)
download_dir.mkdir(parents=True, exist_ok=True)
output_dir.mkdir(parents=True, exist_ok=True)
summary_path, summary = _load_summary(
repo_id=args.repo_id,
repo_type=args.repo_type,
run_prefix=args.run_prefix,
download_dir=download_dir,
)
finetuned_model_subpath = _resolve_finetuned_model_subpath(summary, args.finetuned_model_subpath)
finetuned_model_dir = _download_model_dir(
repo_id=args.repo_id,
repo_type=args.repo_type,
model_subpath=finetuned_model_subpath,
download_dir=download_dir,
)
train_cfg = load_self_play_config(args.train_config)
if args.questions > 0:
train_cfg.post_training_eval_questions = int(args.questions)
if args.generated_task_max_new_tokens > 0:
train_cfg.generated_task_max_new_tokens = int(args.generated_task_max_new_tokens)
if args.answer_max_new_tokens > 0:
train_cfg.post_training_eval_answer_max_new_tokens = int(args.answer_max_new_tokens)
shared_cfg = load_shared_config(args.env_config)
env_cfg = clone_environment_config(shared_cfg.environment)
env_cfg.llm.provider = str(args.env_llm_provider).strip() or "mock"
if not args.allow_env_llm_seeding:
env_cfg.seeding.llm_generate_remaining_graph = False
env_cfg.seeding.llm_generate_remaining_tasks = False
base_model = str(args.base_model).strip() or str(
summary.get("initial_models", {}).get("answerer")
or summary.get("initial_models", {}).get("generator")
or train_cfg.shared_model_name_or_path
)
pipeline_mode = str(summary.get("pipeline_mode") or train_cfg.pipeline_mode or "swarm_v2")
compare_payload = _run_post_training_evaluation(
env_config=env_cfg,
training_config=train_cfg,
generator_model=str(finetuned_model_dir),
answerer_models={
"finetuned_answerer": str(finetuned_model_dir),
"original_answerer": base_model,
},
output_dir=output_dir,
pipeline_mode=pipeline_mode,
effective_dry_run=False,
)
env = OSINTEnvironment(env_cfg, llm=build_llm_client(env_cfg.llm))
env.reset()
leaderboard_records = _leaderboard_records(compare_payload)
leaderboard_path = output_dir / args.leaderboard_name
leaderboard_path.write_text(json.dumps(leaderboard_records, indent=2, sort_keys=True), encoding="utf-8")
finetuned_eval = _benchmark_like_evaluation(compare_payload, "finetuned_answerer")
original_eval = _benchmark_like_evaluation(compare_payload, "original_answerer")
finetuned_dashboard_path = output_dir / args.dashboard_name
original_dashboard_path = output_dir / args.original_dashboard_name
export_dashboard(env=env, evaluation=finetuned_eval, leaderboard_records=leaderboard_records, output_path=str(finetuned_dashboard_path))
export_dashboard(env=env, evaluation=original_eval, leaderboard_records=leaderboard_records, output_path=str(original_dashboard_path))
context = {
"repo_id": args.repo_id,
"repo_type": args.repo_type,
"run_prefix": args.run_prefix,
"summary_path": str(summary_path),
"downloaded_finetuned_model": str(finetuned_model_dir),
"base_model": base_model,
"pipeline_mode": pipeline_mode,
"environment_llm_provider": env_cfg.llm.provider,
"env_llm_seeding_enabled": bool(args.allow_env_llm_seeding),
"dashboard_paths": {
"finetuned": str(finetuned_dashboard_path),
"original": str(original_dashboard_path),
},
"leaderboard_path": str(leaderboard_path),
"evaluation_path": str(compare_payload.get("path", "")),
}
(output_dir / "evaluation_context.json").write_text(json.dumps(context, indent=2, sort_keys=True), encoding="utf-8")
print(
json.dumps(
{
"evaluation_path": compare_payload.get("path", ""),
"dashboard_path": str(finetuned_dashboard_path),
"original_dashboard_path": str(original_dashboard_path),
"leaderboard_path": str(leaderboard_path),
"summary": compare_payload.get("summary", {}),
},
indent=2,
sort_keys=True,
)
)
if __name__ == "__main__":
main()