MiniCPM-Evaluation / scripts /eval_dpo_sync.py
Rakancorle11's picture
Upload folder using huggingface_hub
b2c2640 verified
#!/usr/bin/env python3
"""Evaluate MiniCPM-o 4.5 on the in-domain DPO sync test set.
Reuses the CleverHans-Evaluation dpo_sync eval_dpo_sync.py for data loading,
GT parsing, regex prediction extractor, optional GPT judge, and metrics.
Only the inference path is replaced with MiniCPM-o.
"""
from __future__ import annotations
import _common
import argparse
import gc
import io
import contextlib
import json
from pathlib import Path
import torch
from tqdm import tqdm
ch = _common.ch("dpo_sync")
EVAL_PROMPT = ch.EVAL_PROMPT
load_test_data = ch.load_test_data
set_data_root = ch.set_data_root
extract_prediction = ch.extract_prediction
gpt_extract_prediction = ch.gpt_extract_prediction
_get_openai_client = ch._get_openai_client
compute_metrics = ch.compute_metrics
print_summary = ch.print_summary
from minicpmo_inference import load_model, run_inference
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Evaluate MiniCPM-o on DPO sync test set.")
p.add_argument("--model-id", type=str, default="openbmb/MiniCPM-o-4_5")
p.add_argument("--data-root", type=Path,
default=Path("/opt/dlami/nvme/video_source"))
p.add_argument("--test-jsonl", type=Path, default=None,
help="Default: <data-root>/kto_training_data_v2_test.jsonl")
p.add_argument("--output-dir", type=Path,
default=Path("/home/ubuntu/eval_results/sync_minicpmo"))
p.add_argument("--max-samples", type=int, default=-1)
p.add_argument("--max-new-tokens", type=int, default=256)
p.add_argument("--temperature", type=float, default=0.0)
p.add_argument("--label", type=str, default="minicpmo_sync")
p.add_argument("--max-frames", type=int, default=32,
help="Sync clips are short (<30s); 32 frames is plenty.")
p.add_argument("--fps", type=float, default=2.0)
p.add_argument("--attn", type=str, default="flash_attention_2",
choices=["sdpa", "flash_attention_2", "eager"])
# vLLM flags: accepted for CLI parity with Qwen3-Omni. MiniCPM-o 4.5
# multimodal vLLM support is not yet available upstream, so these are
# currently a no-op (we always run transformers). Kept so the same
# run_*.sh scripts work across the two models.
p.add_argument("--vllm", action="store_true", default=False,
help="(no-op for MiniCPM-o 4.5; auto-falls back to transformers).")
p.add_argument("--tp", type=int, default=None)
p.add_argument("--gpu-memory-utilization", type=float, default=0.90)
p.add_argument("--max-model-len", type=int, default=65536)
p.add_argument("--gpt-judge", action="store_true", default=False)
p.add_argument("--openai-api-key", type=str, default=None)
p.add_argument("--gpt-model", type=str, default="gpt-5.4")
# Data-parallel sharding
p.add_argument("--shard", type=int, default=0)
p.add_argument("--num-shards", type=int, default=1)
return p.parse_args()
def main() -> None:
args = parse_args()
set_data_root(args.data_root)
test_jsonl = args.test_jsonl or (args.data_root / "kto_training_data_v2_test.jsonl")
if args.vllm:
print("[warn] --vllm requested but MiniCPM-o 4.5 multimodal vLLM is not "
"supported upstream yet; falling back to transformers.")
if args.gpt_judge:
if _get_openai_client(args.openai_api_key) is None:
print("[ERROR] --gpt-judge requires OPENAI_API_KEY or --openai-api-key.")
raise SystemExit(1)
out_dir = args.output_dir / args.label
out_dir.mkdir(parents=True, exist_ok=True)
shard_suffix = (f".shard{args.shard}of{args.num_shards}"
if args.num_shards > 1 else "")
results_jsonl = out_dir / f"eval_results{shard_suffix}.jsonl"
metrics_json = out_dir / "metrics.json"
summary_txt = out_dir / "summary.txt"
test_data = load_test_data(test_jsonl, args.max_samples)
if args.num_shards > 1:
test_data = [x for i, x in enumerate(test_data) if i % args.num_shards == args.shard]
print(f"[shard] shard {args.shard}/{args.num_shards}: {len(test_data)} samples")
else:
print(f"[data] {len(test_data)} test samples")
processed: set = set()
if results_jsonl.exists():
with open(results_jsonl) as f:
for line in f:
obj = json.loads(line)
processed.add(obj["video"])
print(f"[resume] {len(processed)} already processed")
def _do_extract(raw_output: str):
if args.gpt_judge and raw_output:
gpt_pred = gpt_extract_prediction(
raw_output, api_key=args.openai_api_key, model=args.gpt_model,
)
if gpt_pred is not None:
return gpt_pred
return extract_prediction(raw_output)
model, tokenizer = load_model(args.model_id, attn_implementation=args.attn,
init_audio=True)
for item in tqdm(test_data, desc="Sync", unit="sample"):
if item["video"] in processed:
continue
try:
raw_output = run_inference(
model, tokenizer,
video_path=item["video_path"],
audio_path=item["audio_path"],
prompt=EVAL_PROMPT,
max_new_tokens=args.max_new_tokens,
temperature=args.temperature,
max_frames=args.max_frames,
fps=args.fps,
)
except Exception as exc:
import traceback
print(f" [error] {item['video']}: {exc}")
traceback.print_exc()
raw_output = ""
pred = _do_extract(raw_output)
result = {
"video": item["video"],
"video_path": item["video_path"],
"gt_synced": item["gt_synced"],
"gt_direction": item["gt_direction"],
"gt_offset_sec": item["gt_offset_sec"],
"gt_t_v": item["gt_t_v"],
"gt_t_a": item["gt_t_a"],
"pred_synced": pred["pred_synced"],
"pred_direction": pred["pred_direction"],
"pred_offset_sec": pred["pred_offset_sec"],
"pred_t_v": pred.get("pred_t_v"),
"pred_t_a": pred.get("pred_t_a"),
"pred_explanation": pred.get("pred_explanation", ""),
"parse_method": pred["parse_method"],
"raw_output": raw_output,
}
with open(results_jsonl, "a", encoding="utf-8") as f:
f.write(json.dumps(result, ensure_ascii=False) + "\n")
processed.add(item["video"])
gc.collect()
torch.cuda.empty_cache()
if args.num_shards > 1:
print(f"\n[shard {args.shard}/{args.num_shards}] Done. Results: {results_jsonl}")
print(f"[shard] Run merge_shards.py --bench dpo_sync --label-dir {out_dir}")
return
all_results = []
if results_jsonl.exists():
with open(results_jsonl) as f:
for line in f:
all_results.append(json.loads(line))
metrics = compute_metrics(all_results)
metrics["eval_config"] = {
"model_id": args.model_id,
"data_root": str(args.data_root),
"test_jsonl": str(test_jsonl),
"total_test_samples": len(test_data),
"max_new_tokens": args.max_new_tokens,
"temperature": args.temperature,
"max_frames": args.max_frames,
"fps": args.fps,
"attn": args.attn,
"gpt_judge": args.gpt_judge,
"gpt_model": args.gpt_model if args.gpt_judge else None,
}
with open(metrics_json, "w", encoding="utf-8") as f:
json.dump(metrics, f, indent=2, ensure_ascii=False)
print_summary(metrics, args.label)
with open(summary_txt, "w", encoding="utf-8") as f:
buf = io.StringIO()
with contextlib.redirect_stdout(buf):
print_summary(metrics, args.label)
f.write(buf.getvalue())
print(f"\n[output] Results: {results_jsonl}")
print(f"[output] Metrics: {metrics_json}")
print(f"[output] Summary: {summary_txt}")
if __name__ == "__main__":
main()