#!/usr/bin/env python3 """Evaluate MiniCPM-o 4.5 on the in-domain DPO sync test set. Reuses the CleverHans-Evaluation dpo_sync eval_dpo_sync.py for data loading, GT parsing, regex prediction extractor, optional GPT judge, and metrics. Only the inference path is replaced with MiniCPM-o. """ from __future__ import annotations import _common import argparse import gc import io import contextlib import json from pathlib import Path import torch from tqdm import tqdm ch = _common.ch("dpo_sync") EVAL_PROMPT = ch.EVAL_PROMPT load_test_data = ch.load_test_data set_data_root = ch.set_data_root extract_prediction = ch.extract_prediction gpt_extract_prediction = ch.gpt_extract_prediction _get_openai_client = ch._get_openai_client compute_metrics = ch.compute_metrics print_summary = ch.print_summary from minicpmo_inference import load_model, run_inference def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser(description="Evaluate MiniCPM-o on DPO sync test set.") p.add_argument("--model-id", type=str, default="openbmb/MiniCPM-o-4_5") p.add_argument("--data-root", type=Path, default=Path("/opt/dlami/nvme/video_source")) p.add_argument("--test-jsonl", type=Path, default=None, help="Default: /kto_training_data_v2_test.jsonl") p.add_argument("--output-dir", type=Path, default=Path("/home/ubuntu/eval_results/sync_minicpmo")) p.add_argument("--max-samples", type=int, default=-1) p.add_argument("--max-new-tokens", type=int, default=256) p.add_argument("--temperature", type=float, default=0.0) p.add_argument("--label", type=str, default="minicpmo_sync") p.add_argument("--max-frames", type=int, default=32, help="Sync clips are short (<30s); 32 frames is plenty.") p.add_argument("--fps", type=float, default=2.0) p.add_argument("--attn", type=str, default="flash_attention_2", choices=["sdpa", "flash_attention_2", "eager"]) # vLLM flags: accepted for CLI parity with Qwen3-Omni. MiniCPM-o 4.5 # multimodal vLLM support is not yet available upstream, so these are # currently a no-op (we always run transformers). Kept so the same # run_*.sh scripts work across the two models. p.add_argument("--vllm", action="store_true", default=False, help="(no-op for MiniCPM-o 4.5; auto-falls back to transformers).") p.add_argument("--tp", type=int, default=None) p.add_argument("--gpu-memory-utilization", type=float, default=0.90) p.add_argument("--max-model-len", type=int, default=65536) p.add_argument("--gpt-judge", action="store_true", default=False) p.add_argument("--openai-api-key", type=str, default=None) p.add_argument("--gpt-model", type=str, default="gpt-5.4") # Data-parallel sharding p.add_argument("--shard", type=int, default=0) p.add_argument("--num-shards", type=int, default=1) return p.parse_args() def main() -> None: args = parse_args() set_data_root(args.data_root) test_jsonl = args.test_jsonl or (args.data_root / "kto_training_data_v2_test.jsonl") if args.vllm: print("[warn] --vllm requested but MiniCPM-o 4.5 multimodal vLLM is not " "supported upstream yet; falling back to transformers.") if args.gpt_judge: if _get_openai_client(args.openai_api_key) is None: print("[ERROR] --gpt-judge requires OPENAI_API_KEY or --openai-api-key.") raise SystemExit(1) out_dir = args.output_dir / args.label out_dir.mkdir(parents=True, exist_ok=True) shard_suffix = (f".shard{args.shard}of{args.num_shards}" if args.num_shards > 1 else "") results_jsonl = out_dir / f"eval_results{shard_suffix}.jsonl" metrics_json = out_dir / "metrics.json" summary_txt = out_dir / "summary.txt" test_data = load_test_data(test_jsonl, args.max_samples) if args.num_shards > 1: test_data = [x for i, x in enumerate(test_data) if i % args.num_shards == args.shard] print(f"[shard] shard {args.shard}/{args.num_shards}: {len(test_data)} samples") else: print(f"[data] {len(test_data)} test samples") processed: set = set() if results_jsonl.exists(): with open(results_jsonl) as f: for line in f: obj = json.loads(line) processed.add(obj["video"]) print(f"[resume] {len(processed)} already processed") def _do_extract(raw_output: str): if args.gpt_judge and raw_output: gpt_pred = gpt_extract_prediction( raw_output, api_key=args.openai_api_key, model=args.gpt_model, ) if gpt_pred is not None: return gpt_pred return extract_prediction(raw_output) model, tokenizer = load_model(args.model_id, attn_implementation=args.attn, init_audio=True) for item in tqdm(test_data, desc="Sync", unit="sample"): if item["video"] in processed: continue try: raw_output = run_inference( model, tokenizer, video_path=item["video_path"], audio_path=item["audio_path"], prompt=EVAL_PROMPT, max_new_tokens=args.max_new_tokens, temperature=args.temperature, max_frames=args.max_frames, fps=args.fps, ) except Exception as exc: import traceback print(f" [error] {item['video']}: {exc}") traceback.print_exc() raw_output = "" pred = _do_extract(raw_output) result = { "video": item["video"], "video_path": item["video_path"], "gt_synced": item["gt_synced"], "gt_direction": item["gt_direction"], "gt_offset_sec": item["gt_offset_sec"], "gt_t_v": item["gt_t_v"], "gt_t_a": item["gt_t_a"], "pred_synced": pred["pred_synced"], "pred_direction": pred["pred_direction"], "pred_offset_sec": pred["pred_offset_sec"], "pred_t_v": pred.get("pred_t_v"), "pred_t_a": pred.get("pred_t_a"), "pred_explanation": pred.get("pred_explanation", ""), "parse_method": pred["parse_method"], "raw_output": raw_output, } with open(results_jsonl, "a", encoding="utf-8") as f: f.write(json.dumps(result, ensure_ascii=False) + "\n") processed.add(item["video"]) gc.collect() torch.cuda.empty_cache() if args.num_shards > 1: print(f"\n[shard {args.shard}/{args.num_shards}] Done. Results: {results_jsonl}") print(f"[shard] Run merge_shards.py --bench dpo_sync --label-dir {out_dir}") return all_results = [] if results_jsonl.exists(): with open(results_jsonl) as f: for line in f: all_results.append(json.loads(line)) metrics = compute_metrics(all_results) metrics["eval_config"] = { "model_id": args.model_id, "data_root": str(args.data_root), "test_jsonl": str(test_jsonl), "total_test_samples": len(test_data), "max_new_tokens": args.max_new_tokens, "temperature": args.temperature, "max_frames": args.max_frames, "fps": args.fps, "attn": args.attn, "gpt_judge": args.gpt_judge, "gpt_model": args.gpt_model if args.gpt_judge else None, } with open(metrics_json, "w", encoding="utf-8") as f: json.dump(metrics, f, indent=2, ensure_ascii=False) print_summary(metrics, args.label) with open(summary_txt, "w", encoding="utf-8") as f: buf = io.StringIO() with contextlib.redirect_stdout(buf): print_summary(metrics, args.label) f.write(buf.getvalue()) print(f"\n[output] Results: {results_jsonl}") print(f"[output] Metrics: {metrics_json}") print(f"[output] Summary: {summary_txt}") if __name__ == "__main__": main()