code_SAS_VLM2Vec / eval_benchmark_V5.py
MgGladys's picture
Add files using upload-large-folder tool
ac8b25b verified
import time
import json
import os
import pickle
import numpy as np
import torch
import torch.distributed as dist
from torch.utils.data import DataLoader
from tqdm import tqdm
from transformers import HfArgumentParser, AutoConfig
from src.arguments import ModelArguments, DataArguments, TrainingArguments
from src.data.collator.eval_collator import MultimodalEvalDataCollator
from src.data.eval_dataset.base_eval_dataset import AutoEvalPairDataset, generate_cand_dataset
from src.eval_utils.metrics import RankingMetrics
from src.model.model_cut_layer_AOP_add_text_cut import MMEBModel
from src.model.processor import get_backbone_name, load_processor
from src.utils import batch_to_device, print_master
from src.classifier_utils_V5 import EarlyExitClassifier
# ===========================
# 简化的配置读取
# ===========================
def get_env_config():
ee_enabled = os.environ.get("EE_ENABLED", "0").strip().lower() in {"1","true","yes","on"}
# 如果开启 EE,读取阈值;否则忽略
threshold = float(os.environ.get("EE_THRESHOLD", "0.5"))
classifier_path = os.environ.get("EE_CLASSIFIER_PATH", "")
layer = int(os.environ.get("EE_LAYER", "12"))
# AOP 配置
aop_enabled = os.environ.get("AOP_ENABLED", "0").strip().lower() in {"1","true","yes","on"}
# ... (其他 AOP 参数可按需读取,这里简化)
return {
"ee_enabled": ee_enabled,
"ee_threshold": threshold,
"ee_layer": layer,
"classifier_path": classifier_path,
"aop_enabled": aop_enabled
}
# ===========================
# 核心推理函数 (含 Baseline 和 Ours 双路径)
# ===========================
def run_benchmark(
model, classifier, processor, model_args, data_args, training_args,
qry_dataset, cand_mid_dict, cand_last_dict, cfg, dataset_name, out_dir
):
device = training_args.device
local_rank = dist.get_rank() if dist.is_initialized() else 0
is_main = (local_rank == 0)
# 1. 准备数据加载器
collator = MultimodalEvalDataCollator(processor, model_args, data_args, "qry")
loader = DataLoader(
qry_dataset, batch_size=training_args.per_device_eval_batch_size,
collate_fn=collator, num_workers=training_args.dataloader_num_workers
)
# 2. 准备候选集 (全量转为 FP32 Numpy 以便快速检索)
cand_ids = list(cand_last_dict.keys())
# Baseline 用 Last,Ours 用 Mid + Last
cand_last_np = np.stack([cand_last_dict[c] for c in cand_ids]).astype(np.float32)
if cfg["ee_enabled"]:
cand_mid_np = np.stack([cand_mid_dict[c] for c in cand_ids]).astype(np.float32)
# 转 Tensor 用于分类器输入 (保持 BF16)
cand_mid_t = torch.from_numpy(cand_mid_np).to(device=device, dtype=torch.bfloat16)
model.eval()
if classifier: classifier.eval(); classifier.to(device)
# 3. 预热 (Warmup) - 关键!防止第一次运行慢影响统计
print_master(f"🔥 Warming up GPU...")
for _ in range(5):
dummy_inputs = next(iter(loader))
dummy_inputs = batch_to_device(dummy_inputs, device)
with torch.no_grad(), torch.autocast("cuda", dtype=torch.bfloat16):
_ = model.encoder(**dummy_inputs, stop_at_layer=None)
torch.cuda.synchronize()
# =========================================================
# ⏱️ 正式计时开始 (Benchmark Loop)
# =========================================================
total_samples = 0
start_time = time.perf_counter()
pred_dicts = []
for inputs, infos in tqdm(loader, desc=f"Benchmarking {dataset_name}", disable=not is_main):
inputs = batch_to_device(inputs, device)
B = inputs["input_ids"].shape[0]
total_samples += B
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
# 路径 A: 纯净基线 (Baseline: Full Forward)
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
if not cfg["ee_enabled"]:
with torch.no_grad(), torch.autocast("cuda", dtype=torch.bfloat16):
# 直接跑到底,没有任何 hook 或中间层提取开销
out = model.encoder(
**inputs,
return_dict=True,
output_hidden_states=False, # 不需要 hidden states
stop_at_layer=None,
compute_lm_head=False
)
# Pooling
hs = out.last_hidden_state
if hs is None: hs = out.hidden_states[-1]
am = getattr(out, "attention_mask", None)
if am is None: am = inputs.get("attention_mask", None)
reps = model._pooling(hs, am).float().cpu().numpy() # 转 FP32 CPU
# 检索 (点积)
scores = np.dot(reps, cand_last_np.T)
topk_inds = np.argsort(-scores, axis=1)[:, :50] # 只取 Top50 够了
# 记录结果
for i in range(B):
cids = [cand_ids[k] for k in topk_inds[i]]
_record(pred_dicts, infos[i], cids)
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
# 路径 B: 我们的方法 (Ours: Mid -> Cls -> Tail)
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
else:
with torch.no_grad(), torch.autocast("cuda", dtype=torch.bfloat16):
# 1. Run to Mid
out_mid = model.encoder(
**inputs, return_dict=True, output_hidden_states=False,
stop_at_layer=cfg["ee_layer"], compute_lm_head=False
)
hs_mid = getattr(out_mid, "last_hidden_state", None)
if hs_mid is None: hs_mid = out_mid.hidden_states[-1]
am_mid = getattr(out_mid, "attention_mask", None)
if am_mid is None: am_mid = inputs.get("attention_mask", None)
reps_mid = model._pooling(hs_mid, am_mid) # BF16 Tensor
# 2. Classifier Logic
# 特征计算
cos_mid = reps_mid @ cand_mid_t.T
# ... (省略繁琐的27维特征计算,保持您之前代码中的逻辑) ...
# 为了Benchmark准确性,这里必须包含所有特征计算开销
# [假设 scalar_inputs 已计算完成]
# 模拟特征计算开销 (实际代码请填入完整逻辑)
scalar_inputs = _mock_feature_extraction(cos_mid, device)
# V5 FP32 推理
logits = classifier(scalar_inputs.float(), torch.zeros(B, dtype=torch.long, device=device), qry_emb=reps_mid.float())
probs = torch.sigmoid(logits).squeeze(1)
# 3. Dynamic Thresholding (Batch-level Ratio)
# 强制 batch 内 50% 压缩率,用于公平对比
# 实际论文中可用 Absolute Threshold,这里演示 Ratio 保证稳定
k = int(B * 0.5)
top_vals, _ = torch.topk(probs, k=k)
dyn_thresh = top_vals[-1]
exit_mask = (probs < dyn_thresh).cpu().numpy() # True = Early Exit
# 4. Branching
exit_indices = np.where(exit_mask)[0]
cont_indices = np.where(~exit_mask)[0]
# Early Exit 检索
if len(exit_indices) > 0:
reps_exit_np = reps_mid[exit_indices].float().cpu().numpy()
scores = np.dot(reps_exit_np, cand_mid_np.T)
inds = np.argsort(-scores, axis=1)[:, :50]
for i, idx in enumerate(exit_indices):
_record(pred_dicts, infos[idx], [cand_ids[x] for x in inds[i]])
# Continue 检索
if len(cont_indices) > 0:
# Resume Forward
interm = out_mid.intermediate_state
# Slice state
subset = {k: v[cont_indices] if v is not None and isinstance(v, torch.Tensor) else v
for k,v in interm.items() if k in ["hidden_states", "attention_mask", "position_ids"]}
subset["next_layer_idx"] = int(interm["next_layer_idx"])
out_last = model.encoder(
return_dict=True, output_hidden_states=False, stop_at_layer=None,
resume_state=subset, compute_lm_head=False
)
# Pooling & Search
hs = out_last.last_hidden_state
am = subset["attention_mask"]
reps_cont = model._pooling(hs, am).float().cpu().numpy()
scores = np.dot(reps_cont, cand_last_np.T)
inds = np.argsort(-scores, axis=1)[:, :50]
for i, idx in enumerate(cont_indices):
_record(pred_dicts, infos[idx], [cand_ids[x] for x in inds[i]])
# ⏱️ 计时结束
torch.cuda.synchronize()
end_time = time.perf_counter()
total_time = end_time - start_time
latency_ms = (total_time / total_samples) * 1000
throughput = total_samples / total_time
# 打印特殊标记供 Shell 脚本解析
print_master(f"\n[BENCHMARK_RESULT] Mode={'Ours' if cfg['ee_enabled'] else 'Baseline'} | Samples={total_samples} | TotalTime={total_time:.4f}s | Latency={latency_ms:.4f}ms | Throughput={throughput:.2f}qps")
return pred_dicts
def _record(pred_dicts, info, cids):
pred_dicts.append({
"prediction": cids,
"label": info.get("label_name") or info.get("label"),
})
def _mock_feature_extraction(cos_mid, device):
# 这里只是占位,实际运行请把 eval_test_time_with_classifier_V5.py 中的特征计算逻辑拷过来
# 构造一个假的 27 维特征,避免代码跑不动
return torch.randn(cos_mid.size(0), 27, device=device)
# ... (main 函数与其他部分类似,主要是调用 run_benchmark) ...
def main():
# ... (Standard HF Parser & Model Loading) ...
# 获取配置
cfg = get_env_config()
# 加载 Classifier (仅当 EE_ENABLED=1 时)
classifier = None
if cfg["ee_enabled"]:
print_master(f"🚀 Mode: Accelerated (EE + AOP)")
# ... (Load V5 Classifier Logic) ...
# classifier = ...
else:
print_master(f"🐢 Mode: Baseline (Full Forward)")
# ... (Load Dataset) ...
# Run
run_benchmark(model, classifier, ...)
if __name__ == '__main__':
main()