import time import json import os import pickle import numpy as np import torch import torch.distributed as dist from torch.utils.data import DataLoader from tqdm import tqdm from transformers import HfArgumentParser, AutoConfig from src.arguments import ModelArguments, DataArguments, TrainingArguments from src.data.collator.eval_collator import MultimodalEvalDataCollator from src.data.eval_dataset.base_eval_dataset import AutoEvalPairDataset, generate_cand_dataset from src.eval_utils.metrics import RankingMetrics from src.model.model_cut_layer_AOP_add_text_cut import MMEBModel from src.model.processor import get_backbone_name, load_processor from src.utils import batch_to_device, print_master from src.classifier_utils_V5 import EarlyExitClassifier # =========================== # 简化的配置读取 # =========================== def get_env_config(): ee_enabled = os.environ.get("EE_ENABLED", "0").strip().lower() in {"1","true","yes","on"} # 如果开启 EE,读取阈值;否则忽略 threshold = float(os.environ.get("EE_THRESHOLD", "0.5")) classifier_path = os.environ.get("EE_CLASSIFIER_PATH", "") layer = int(os.environ.get("EE_LAYER", "12")) # AOP 配置 aop_enabled = os.environ.get("AOP_ENABLED", "0").strip().lower() in {"1","true","yes","on"} # ... (其他 AOP 参数可按需读取,这里简化) return { "ee_enabled": ee_enabled, "ee_threshold": threshold, "ee_layer": layer, "classifier_path": classifier_path, "aop_enabled": aop_enabled } # =========================== # 核心推理函数 (含 Baseline 和 Ours 双路径) # =========================== def run_benchmark( model, classifier, processor, model_args, data_args, training_args, qry_dataset, cand_mid_dict, cand_last_dict, cfg, dataset_name, out_dir ): device = training_args.device local_rank = dist.get_rank() if dist.is_initialized() else 0 is_main = (local_rank == 0) # 1. 准备数据加载器 collator = MultimodalEvalDataCollator(processor, model_args, data_args, "qry") loader = DataLoader( qry_dataset, batch_size=training_args.per_device_eval_batch_size, collate_fn=collator, num_workers=training_args.dataloader_num_workers ) # 2. 准备候选集 (全量转为 FP32 Numpy 以便快速检索) cand_ids = list(cand_last_dict.keys()) # Baseline 用 Last,Ours 用 Mid + Last cand_last_np = np.stack([cand_last_dict[c] for c in cand_ids]).astype(np.float32) if cfg["ee_enabled"]: cand_mid_np = np.stack([cand_mid_dict[c] for c in cand_ids]).astype(np.float32) # 转 Tensor 用于分类器输入 (保持 BF16) cand_mid_t = torch.from_numpy(cand_mid_np).to(device=device, dtype=torch.bfloat16) model.eval() if classifier: classifier.eval(); classifier.to(device) # 3. 预热 (Warmup) - 关键!防止第一次运行慢影响统计 print_master(f"🔥 Warming up GPU...") for _ in range(5): dummy_inputs = next(iter(loader)) dummy_inputs = batch_to_device(dummy_inputs, device) with torch.no_grad(), torch.autocast("cuda", dtype=torch.bfloat16): _ = model.encoder(**dummy_inputs, stop_at_layer=None) torch.cuda.synchronize() # ========================================================= # ⏱️ 正式计时开始 (Benchmark Loop) # ========================================================= total_samples = 0 start_time = time.perf_counter() pred_dicts = [] for inputs, infos in tqdm(loader, desc=f"Benchmarking {dataset_name}", disable=not is_main): inputs = batch_to_device(inputs, device) B = inputs["input_ids"].shape[0] total_samples += B # >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> # 路径 A: 纯净基线 (Baseline: Full Forward) # >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> if not cfg["ee_enabled"]: with torch.no_grad(), torch.autocast("cuda", dtype=torch.bfloat16): # 直接跑到底,没有任何 hook 或中间层提取开销 out = model.encoder( **inputs, return_dict=True, output_hidden_states=False, # 不需要 hidden states stop_at_layer=None, compute_lm_head=False ) # Pooling hs = out.last_hidden_state if hs is None: hs = out.hidden_states[-1] am = getattr(out, "attention_mask", None) if am is None: am = inputs.get("attention_mask", None) reps = model._pooling(hs, am).float().cpu().numpy() # 转 FP32 CPU # 检索 (点积) scores = np.dot(reps, cand_last_np.T) topk_inds = np.argsort(-scores, axis=1)[:, :50] # 只取 Top50 够了 # 记录结果 for i in range(B): cids = [cand_ids[k] for k in topk_inds[i]] _record(pred_dicts, infos[i], cids) # >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> # 路径 B: 我们的方法 (Ours: Mid -> Cls -> Tail) # >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> else: with torch.no_grad(), torch.autocast("cuda", dtype=torch.bfloat16): # 1. Run to Mid out_mid = model.encoder( **inputs, return_dict=True, output_hidden_states=False, stop_at_layer=cfg["ee_layer"], compute_lm_head=False ) hs_mid = getattr(out_mid, "last_hidden_state", None) if hs_mid is None: hs_mid = out_mid.hidden_states[-1] am_mid = getattr(out_mid, "attention_mask", None) if am_mid is None: am_mid = inputs.get("attention_mask", None) reps_mid = model._pooling(hs_mid, am_mid) # BF16 Tensor # 2. Classifier Logic # 特征计算 cos_mid = reps_mid @ cand_mid_t.T # ... (省略繁琐的27维特征计算,保持您之前代码中的逻辑) ... # 为了Benchmark准确性,这里必须包含所有特征计算开销 # [假设 scalar_inputs 已计算完成] # 模拟特征计算开销 (实际代码请填入完整逻辑) scalar_inputs = _mock_feature_extraction(cos_mid, device) # V5 FP32 推理 logits = classifier(scalar_inputs.float(), torch.zeros(B, dtype=torch.long, device=device), qry_emb=reps_mid.float()) probs = torch.sigmoid(logits).squeeze(1) # 3. Dynamic Thresholding (Batch-level Ratio) # 强制 batch 内 50% 压缩率,用于公平对比 # 实际论文中可用 Absolute Threshold,这里演示 Ratio 保证稳定 k = int(B * 0.5) top_vals, _ = torch.topk(probs, k=k) dyn_thresh = top_vals[-1] exit_mask = (probs < dyn_thresh).cpu().numpy() # True = Early Exit # 4. Branching exit_indices = np.where(exit_mask)[0] cont_indices = np.where(~exit_mask)[0] # Early Exit 检索 if len(exit_indices) > 0: reps_exit_np = reps_mid[exit_indices].float().cpu().numpy() scores = np.dot(reps_exit_np, cand_mid_np.T) inds = np.argsort(-scores, axis=1)[:, :50] for i, idx in enumerate(exit_indices): _record(pred_dicts, infos[idx], [cand_ids[x] for x in inds[i]]) # Continue 检索 if len(cont_indices) > 0: # Resume Forward interm = out_mid.intermediate_state # Slice state subset = {k: v[cont_indices] if v is not None and isinstance(v, torch.Tensor) else v for k,v in interm.items() if k in ["hidden_states", "attention_mask", "position_ids"]} subset["next_layer_idx"] = int(interm["next_layer_idx"]) out_last = model.encoder( return_dict=True, output_hidden_states=False, stop_at_layer=None, resume_state=subset, compute_lm_head=False ) # Pooling & Search hs = out_last.last_hidden_state am = subset["attention_mask"] reps_cont = model._pooling(hs, am).float().cpu().numpy() scores = np.dot(reps_cont, cand_last_np.T) inds = np.argsort(-scores, axis=1)[:, :50] for i, idx in enumerate(cont_indices): _record(pred_dicts, infos[idx], [cand_ids[x] for x in inds[i]]) # ⏱️ 计时结束 torch.cuda.synchronize() end_time = time.perf_counter() total_time = end_time - start_time latency_ms = (total_time / total_samples) * 1000 throughput = total_samples / total_time # 打印特殊标记供 Shell 脚本解析 print_master(f"\n[BENCHMARK_RESULT] Mode={'Ours' if cfg['ee_enabled'] else 'Baseline'} | Samples={total_samples} | TotalTime={total_time:.4f}s | Latency={latency_ms:.4f}ms | Throughput={throughput:.2f}qps") return pred_dicts def _record(pred_dicts, info, cids): pred_dicts.append({ "prediction": cids, "label": info.get("label_name") or info.get("label"), }) def _mock_feature_extraction(cos_mid, device): # 这里只是占位,实际运行请把 eval_test_time_with_classifier_V5.py 中的特征计算逻辑拷过来 # 构造一个假的 27 维特征,避免代码跑不动 return torch.randn(cos_mid.size(0), 27, device=device) # ... (main 函数与其他部分类似,主要是调用 run_benchmark) ... def main(): # ... (Standard HF Parser & Model Loading) ... # 获取配置 cfg = get_env_config() # 加载 Classifier (仅当 EE_ENABLED=1 时) classifier = None if cfg["ee_enabled"]: print_master(f"🚀 Mode: Accelerated (EE + AOP)") # ... (Load V5 Classifier Logic) ... # classifier = ... else: print_master(f"🐢 Mode: Baseline (Full Forward)") # ... (Load Dataset) ... # Run run_benchmark(model, classifier, ...) if __name__ == '__main__': main()