| |
| import os |
| import math |
| import pandas as pd |
| from tqdm import tqdm |
| import torch |
| from datasets import load_dataset |
| from transformers import AutoModelForSequenceClassification, AutoTokenizer |
| from sentence_transformers import CrossEncoder |
| MODEL_NAME = "deeppin/Qwen3-Reranker-8B-SequenceClassification" |
| DATA_PATH = "data/valid.parquet" |
| BATCH_SIZE = 8 |
| MAX_LENGTH = 8192 |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
| def format_instruction(instruction, query, doc): |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| output = f"<Instruct>: {instruction}\n<Query>: {query}\n<Document>: {doc}" |
| return output |
| import re |
| import re |
| _SYS_BLOCK = re.compile( |
| r"<\|im_start\|\>\s*system\b.*?<\|im_end\|\>", re.IGNORECASE | re.DOTALL |
| ) |
| _TURN_BLOCK = re.compile( |
| r"<\|im_start\|\>\s*(user|assistant)\b\s*(.*?)\s*<\|im_end\|\>", |
| re.IGNORECASE | re.DOTALL, |
| ) |
| _ANY_CHATML_TAG = re.compile(r"<\|[^|]+?\|>") |
|
|
| _SYS = re.compile(r"<\|im_start\|\>\s*system\b(.*?)<\|im_end\|\>", re.I|re.S) |
| _TURN = re.compile(r"<\|im_start\|\>\s*(user|assistant)\b(.*?)<\|im_end\|\>", re.I|re.S) |
| _TAG = re.compile(r"<\|[^|]+?\|>") |
|
|
| _START = re.compile(r"<\|im_start\|\>\s*(system|user|assistant)\s*", re.IGNORECASE) |
| _END = re.compile(r"<\|im_end\|\>", re.IGNORECASE) |
| _ANY = re.compile(r"<\|[^|>]+?\|>", re.IGNORECASE) |
| _THINK_BLOCK = re.compile(r"<think>.*?</think>", re.IGNORECASE | re.DOTALL) |
|
|
| def flatten_chatml(text: str, keep_think: bool = False, *, single_line: bool = False, sep: str = " ") -> str: |
| if not isinstance(text, str): |
| return "" |
|
|
| t = text.replace("\r\n", "\n") |
| if not keep_think: |
| t = _THINK_BLOCK.sub("", t) |
|
|
| t = _START.sub("", t) |
| t = _END.sub("\n", t) |
| t = _ANY.sub("", t) |
|
|
| |
| t = re.sub(r"[ \t]*\n[ \t]*", "\n", t) |
| t = re.sub(r"\n{3,}", "\n\n", t) |
| t = t.strip() |
|
|
| if single_line: |
| |
| t = t.replace("\r", "\n") |
| t = re.sub(r"[\n\u2028\u2029]+", sep, t) |
| |
| t = re.sub(r"[ \t\u00A0]{2,}", " ", t) |
| t = re.sub(r"\s{2,}", " ", t) |
| t = t.strip() |
| return t |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| tokenizer = AutoTokenizer.from_pretrained( |
| MODEL_NAME, |
| padding_side="left", |
| use_fast=False, |
| trust_remote_code=True, |
| ) |
| tokenizer.truncation_side = "left" |
| |
| if tokenizer.pad_token_id is None: |
| if tokenizer.eos_token_id is not None: |
| tokenizer.pad_token = tokenizer.eos_token |
| else: |
| tokenizer.add_special_tokens({"pad_token": "<|endoftext|>"}) |
|
|
| |
| |
| |
| |
| |
|
|
| |
| model = AutoModelForSequenceClassification.from_pretrained( |
| MODEL_NAME, torch_dtype=torch.float16, attn_implementation="flash_attention_2", |
| trust_remote_code=True, |
| ).to("cuda").eval() |
| model.config.pad_token_id = tokenizer.pad_token_id |
| TASK = "Given a roleplay prompt and recent context, score candidate replies higher when they stay in character, continue the scene coherently, and feel vivid and engaging." |
|
|
| |
| df = pd.read_parquet(DATA_PATH) |
| need_cols = ["chosen_prompt", "chosen", "reject"] |
| for col in need_cols: |
| if col not in df.columns: |
| raise ValueError(f"缺少必要列:{col}") |
|
|
| def norm_text(x): |
| if x is None or (isinstance(x, float) and math.isnan(x)): |
| return "" |
| return str(x).strip() |
|
|
| df = df[need_cols].copy() |
| for col in need_cols: |
| |
| df[col] = df[col].map(lambda s: flatten_chatml(norm_text(s), single_line=True, sep="")) |
|
|
| |
| mask = (df["chosen_prompt"].str.len()>0) & (df["chosen"].str.len()>0) & (df["reject"].str.len()>0) |
| df = df[mask].reset_index(drop=True) |
| total = len(df) |
| if total == 0: |
| raise ValueError("过滤后无有效样本。请检查数据内容。") |
| print(f"[Info] 有效样本数: {total}") |
|
|
| |
| correct = 0 |
| seen = 0 |
|
|
| for idx, row in tqdm(df.iterrows(), total=len(df), desc="Scoring (per-sample)"): |
| q_clean = row["chosen_prompt"] |
| c_clean = row["chosen"] |
| r_clean = row["reject"] |
|
|
| p1 = format_instruction(TASK, q_clean, c_clean) |
| p2 = format_instruction(TASK, q_clean, r_clean) |
|
|
| enc = tokenizer([p1, p2], padding=True, truncation=True, max_length=MAX_LENGTH, return_tensors="pt") |
| enc = {k: v.to(DEVICE) for k, v in enc.items()} |
|
|
| with torch.no_grad(): |
| logits = model(**enc).logits.squeeze(-1) |
|
|
| l1, l2 = float(logits[0]), float(logits[1]) |
| is_correct = (l1 > l2) |
|
|
| correct += int(is_correct) |
| seen += 1 |
| print(f"[{idx}] logits={[l1, l2]} | first>second={is_correct} | running_acc={correct/seen:.2%} ({correct}/{seen})") |
|
|
| print(f"\n[Result] Total={seen} | Correct={correct} | Accuracy={correct/seen:.2%}") |