| from __future__ import annotations |
|
|
| import json |
| import os |
| import re |
| import time |
| from pathlib import Path |
| from typing import Dict, Iterable, List, Tuple |
|
|
| from openai import OpenAI |
|
|
|
|
| DEFAULT_GPT_MODEL = os.environ.get("RPC_BENCH_GPT_MODEL", "gpt-5-2025-08-07") |
| DEFAULT_GEMINI_MODEL = os.environ.get("RPC_BENCH_GEMINI_MODEL", "gemini-2.5-pro") |
| OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "") |
| OPENAI_BASE_URL = os.environ.get("OPENAI_BASE_URL", "") |
|
|
|
|
| def _client() -> OpenAI: |
| return OpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_BASE_URL or None) |
|
|
|
|
| def _extract_json(text: str) -> str: |
| text = text.strip() |
| if "```json" in text: |
| match = re.search(r"```json(.*?)```", text, re.DOTALL) |
| if match: |
| return match.group(1).strip() |
| if "```" in text: |
| match = re.search(r"```(.*?)```", text, re.DOTALL) |
| if match: |
| return match.group(1).strip() |
| return text |
|
|
|
|
| def _load_jsonl(path: str | Path) -> List[Dict]: |
| rows: List[Dict] = [] |
| with open(path, "r", encoding="utf-8") as f: |
| for line in f: |
| line = line.strip() |
| if not line: |
| continue |
| rows.append(json.loads(line)) |
| return rows |
|
|
|
|
| def _judge(messages: List[Dict], model: str) -> str: |
| client = _client() |
| response = client.chat.completions.create( |
| model=model, |
| messages=messages, |
| stream=False, |
| ) |
| return response.choices[0].message.content or "" |
|
|
|
|
| def _score_prompt(title: str, abstract: str, question: str, reference_answer: str, predicted_answer: str) -> List[Dict]: |
| system_prompt = ( |
| "You are a strict paper-answer judge. Return JSON only. " |
| "Score the prediction on three dimensions: Conciseness, Correctness, Completeness. " |
| "Each dimension must contain a numeric rating in [1, 5] and a short reason." |
| ) |
| user_prompt = ( |
| f"Title: {title}\n" |
| f"Abstract: {abstract}\n" |
| f"Question: {question}\n" |
| f"Reference answer: {reference_answer}\n" |
| f"Predicted answer: {predicted_answer}\n" |
| "Return JSON only with keys Conciseness, Correctness, Completeness." |
| ) |
| return [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}] |
|
|
|
|
| def _normalize_rating_block(content: Dict) -> Dict: |
| result = {} |
| for key in ("Conciseness", "Correctness", "Completeness"): |
| value = content.get(key, {}) |
| if isinstance(value, dict): |
| rating = float(value.get("rating", 0.0)) |
| reason = value.get("reason", "") |
| else: |
| rating = float(value) |
| reason = "" |
| result[key] = {"rating": rating, "reason": reason} |
| return result |
|
|
|
|
| def paper_qa_score( |
| file_path: str | Path, |
| eval_path: str | Path, |
| out_path: str | Path, |
| judge_model: str = "gpt", |
| ) -> None: |
| gold_items = _load_jsonl(file_path) |
| pred_items = _load_jsonl(eval_path) |
|
|
| paper_dict: Dict[str, Dict[str, str]] = {} |
| qa_items: List[Dict] = [] |
| for paper in gold_items: |
| paper_dict[paper["id"]] = { |
| "title": paper.get("title", ""), |
| "abstract": paper.get("abstract", ""), |
| } |
| for idx, qa in enumerate(paper.get("qa_pairs", []), start=1): |
| qa_items.append( |
| { |
| "id": paper["id"], |
| "part_idx": idx, |
| "question": qa["question"], |
| "answer": qa["answer"], |
| "category": qa["category"], |
| } |
| ) |
|
|
| os.makedirs(Path(out_path).parent, exist_ok=True) |
| if len(qa_items) != len(pred_items): |
| raise ValueError(f"Prediction count mismatch: expected {len(qa_items)}, got {len(pred_items)}") |
|
|
| model_name = DEFAULT_GPT_MODEL if judge_model == "gpt" else DEFAULT_GEMINI_MODEL |
| for gold, pred in zip(qa_items, pred_items): |
| if gold["id"] != pred["id"] or gold["part_idx"] != pred["part_idx"]: |
| raise ValueError(f"Submission order mismatch at {gold['id']} / {gold['part_idx']}") |
|
|
| if gold["category"] == "Claim_Verification": |
| score_block = [] |
| else: |
| messages = _score_prompt( |
| paper_dict[gold["id"]]["title"], |
| paper_dict[gold["id"]]["abstract"], |
| gold["question"], |
| gold["answer"], |
| pred["gen_answer"], |
| ) |
| raw = _judge(messages, model_name) |
| score_block = _normalize_rating_block(json.loads(_extract_json(raw))) |
| time.sleep(float(os.environ.get("RPC_BENCH_JUDGE_SLEEP", "0"))) |
|
|
| with open(out_path, "a", encoding="utf-8") as fw: |
| fw.write( |
| json.dumps( |
| { |
| "id": gold["id"], |
| "part_idx": gold["part_idx"], |
| "question": gold["question"], |
| "reference_answer": gold["answer"], |
| "predicted_answer": pred["gen_answer"], |
| "category": gold["category"], |
| "score": score_block, |
| }, |
| ensure_ascii=False, |
| ) |
| + "\n" |
| ) |
|
|
|
|
| def get_llm_score(eval_path: str | Path) -> Tuple[Dict[str, float], Dict[str, Tuple[float, float, float]]]: |
| category_dict: Dict[str, Dict[str, float]] = {} |
| sum_c1 = sum_c2 = sum_c3 = 0.0 |
| count = 0 |
|
|
| with open(eval_path, "r", encoding="utf-8") as f: |
| for line in f: |
| line = line.strip() |
| if not line: |
| continue |
| item = json.loads(line) |
| category = item["category"] |
| if category == "Claim_Verification": |
| continue |
|
|
| if category not in category_dict: |
| category_dict[category] = {"Conciseness": 0.0, "Correctness": 0.0, "Completeness": 0.0, "count": 0.0} |
|
|
| content = item.get("score", {}) |
| c1 = float(content.get("Conciseness", {}).get("rating", 0.0)) |
| c2 = float(content.get("Correctness", {}).get("rating", 0.0)) |
| c3 = float(content.get("Completeness", {}).get("rating", 0.0)) |
|
|
| category_dict[category]["Conciseness"] += c1 |
| category_dict[category]["Correctness"] += c2 |
| category_dict[category]["Completeness"] += c3 |
| category_dict[category]["count"] += 1 |
|
|
| sum_c1 += c1 |
| sum_c2 += c2 |
| sum_c3 += c3 |
| count += 1 |
|
|
| result: Dict[str, Tuple[float, float, float]] = {} |
| for category, values in category_dict.items(): |
| denom = max(values["count"], 1.0) |
| result[category] = ( |
| values["Conciseness"] / denom, |
| values["Correctness"] / denom, |
| values["Completeness"] / denom, |
| ) |
|
|
| total_scores = { |
| "Conciseness": sum_c1 / max(count, 1), |
| "Correctness": sum_c2 / max(count, 1), |
| "Completeness": sum_c3 / max(count, 1), |
| } |
| return total_scores, result |
|
|
|
|
| def calculate_acc(pred: List[str], gold: List[str]) -> float: |
| if not pred: |
| return 0.0 |
| return sum(1 for p, g in zip(pred, gold) if p == g) / len(pred) |
|
|
|
|
| def get_verification_score(gold_path: str | Path, eval_path: str | Path) -> float: |
| gold_answers: List[str] = [] |
| pred_answers: List[str] = [] |
|
|
| for paper in _load_jsonl(gold_path): |
| for qa in paper.get("qa_pairs", []): |
| if qa.get("category") == "Claim_Verification": |
| gold_answers.append(str(qa.get("answer", "")).strip()) |
|
|
| for item in _load_jsonl(eval_path): |
| if item.get("category") == "Claim_Verification": |
| pred_answers.append(str(item.get("gen_answer", "")).strip()) |
|
|
| if len(gold_answers) != len(pred_answers): |
| raise ValueError( |
| f"Claim verification count mismatch: expected {len(gold_answers)}, got {len(pred_answers)}" |
| ) |
|
|
| normalized_pred: List[str] = [] |
| for gold, pred in zip(gold_answers, pred_answers): |
| if pred not in {"True", "False"}: |
| normalized_pred.append("False" if gold == "True" else "True") |
| else: |
| normalized_pred.append(pred) |
|
|
| return calculate_acc(normalized_pred, gold_answers[: len(normalized_pred)]) |
|
|
|
|
| def evaluate_submission(gold_path: str | Path, pred_path: str | Path, out_dir: str | Path, judge_model: str = "gpt") -> Dict[str, float]: |
| out_dir = Path(out_dir) |
| out_dir.mkdir(parents=True, exist_ok=True) |
|
|
| judged_path = out_dir / f"{Path(pred_path).stem}_{judge_model}_judge.jsonl" |
| if judged_path.exists(): |
| judged_path.unlink() |
|
|
| paper_qa_score(gold_path, pred_path, judged_path, judge_model=judge_model) |
| llm_total, _ = get_llm_score(judged_path) |
| claim_acc = get_verification_score(gold_path, pred_path) |
|
|
| f1_like = ( |
| 2 * llm_total["Correctness"] * llm_total["Completeness"] |
| / (llm_total["Correctness"] + llm_total["Completeness"] + 1e-8) |
| ) |
| info = llm_total["Conciseness"] * f1_like * 4 |
| return { |
| "Conciseness": round(llm_total["Conciseness"] * 20, 4), |
| "Correctness": round(llm_total["Correctness"] * 20, 4), |
| "Completeness": round(llm_total["Completeness"] * 20, 4), |
| "F1-like": round(f1_like * 20, 4), |
| "Info": round(info * 20, 4), |
| "Claim Accuracy": round(claim_acc * 100, 4), |
| } |
|
|