test / eval.py
QAQ123's picture
Upload RPC-Bench Space
e9e6671 verified
from __future__ import annotations
import json
import os
import re
import time
from pathlib import Path
from typing import Dict, Iterable, List, Tuple
from openai import OpenAI
DEFAULT_GPT_MODEL = os.environ.get("RPC_BENCH_GPT_MODEL", "gpt-5-2025-08-07")
DEFAULT_GEMINI_MODEL = os.environ.get("RPC_BENCH_GEMINI_MODEL", "gemini-2.5-pro")
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "")
OPENAI_BASE_URL = os.environ.get("OPENAI_BASE_URL", "")
def _client() -> OpenAI:
return OpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_BASE_URL or None)
def _extract_json(text: str) -> str:
text = text.strip()
if "```json" in text:
match = re.search(r"```json(.*?)```", text, re.DOTALL)
if match:
return match.group(1).strip()
if "```" in text:
match = re.search(r"```(.*?)```", text, re.DOTALL)
if match:
return match.group(1).strip()
return text
def _load_jsonl(path: str | Path) -> List[Dict]:
rows: List[Dict] = []
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
rows.append(json.loads(line))
return rows
def _judge(messages: List[Dict], model: str) -> str:
client = _client()
response = client.chat.completions.create(
model=model,
messages=messages,
stream=False,
)
return response.choices[0].message.content or ""
def _score_prompt(title: str, abstract: str, question: str, reference_answer: str, predicted_answer: str) -> List[Dict]:
system_prompt = (
"You are a strict paper-answer judge. Return JSON only. "
"Score the prediction on three dimensions: Conciseness, Correctness, Completeness. "
"Each dimension must contain a numeric rating in [1, 5] and a short reason."
)
user_prompt = (
f"Title: {title}\n"
f"Abstract: {abstract}\n"
f"Question: {question}\n"
f"Reference answer: {reference_answer}\n"
f"Predicted answer: {predicted_answer}\n"
"Return JSON only with keys Conciseness, Correctness, Completeness."
)
return [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}]
def _normalize_rating_block(content: Dict) -> Dict:
result = {}
for key in ("Conciseness", "Correctness", "Completeness"):
value = content.get(key, {})
if isinstance(value, dict):
rating = float(value.get("rating", 0.0))
reason = value.get("reason", "")
else:
rating = float(value)
reason = ""
result[key] = {"rating": rating, "reason": reason}
return result
def paper_qa_score(
file_path: str | Path,
eval_path: str | Path,
out_path: str | Path,
judge_model: str = "gpt",
) -> None:
gold_items = _load_jsonl(file_path)
pred_items = _load_jsonl(eval_path)
paper_dict: Dict[str, Dict[str, str]] = {}
qa_items: List[Dict] = []
for paper in gold_items:
paper_dict[paper["id"]] = {
"title": paper.get("title", ""),
"abstract": paper.get("abstract", ""),
}
for idx, qa in enumerate(paper.get("qa_pairs", []), start=1):
qa_items.append(
{
"id": paper["id"],
"part_idx": idx,
"question": qa["question"],
"answer": qa["answer"],
"category": qa["category"],
}
)
os.makedirs(Path(out_path).parent, exist_ok=True)
if len(qa_items) != len(pred_items):
raise ValueError(f"Prediction count mismatch: expected {len(qa_items)}, got {len(pred_items)}")
model_name = DEFAULT_GPT_MODEL if judge_model == "gpt" else DEFAULT_GEMINI_MODEL
for gold, pred in zip(qa_items, pred_items):
if gold["id"] != pred["id"] or gold["part_idx"] != pred["part_idx"]:
raise ValueError(f"Submission order mismatch at {gold['id']} / {gold['part_idx']}")
if gold["category"] == "Claim_Verification":
score_block = []
else:
messages = _score_prompt(
paper_dict[gold["id"]]["title"],
paper_dict[gold["id"]]["abstract"],
gold["question"],
gold["answer"],
pred["gen_answer"],
)
raw = _judge(messages, model_name)
score_block = _normalize_rating_block(json.loads(_extract_json(raw)))
time.sleep(float(os.environ.get("RPC_BENCH_JUDGE_SLEEP", "0")))
with open(out_path, "a", encoding="utf-8") as fw:
fw.write(
json.dumps(
{
"id": gold["id"],
"part_idx": gold["part_idx"],
"question": gold["question"],
"reference_answer": gold["answer"],
"predicted_answer": pred["gen_answer"],
"category": gold["category"],
"score": score_block,
},
ensure_ascii=False,
)
+ "\n"
)
def get_llm_score(eval_path: str | Path) -> Tuple[Dict[str, float], Dict[str, Tuple[float, float, float]]]:
category_dict: Dict[str, Dict[str, float]] = {}
sum_c1 = sum_c2 = sum_c3 = 0.0
count = 0
with open(eval_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
item = json.loads(line)
category = item["category"]
if category == "Claim_Verification":
continue
if category not in category_dict:
category_dict[category] = {"Conciseness": 0.0, "Correctness": 0.0, "Completeness": 0.0, "count": 0.0}
content = item.get("score", {})
c1 = float(content.get("Conciseness", {}).get("rating", 0.0))
c2 = float(content.get("Correctness", {}).get("rating", 0.0))
c3 = float(content.get("Completeness", {}).get("rating", 0.0))
category_dict[category]["Conciseness"] += c1
category_dict[category]["Correctness"] += c2
category_dict[category]["Completeness"] += c3
category_dict[category]["count"] += 1
sum_c1 += c1
sum_c2 += c2
sum_c3 += c3
count += 1
result: Dict[str, Tuple[float, float, float]] = {}
for category, values in category_dict.items():
denom = max(values["count"], 1.0)
result[category] = (
values["Conciseness"] / denom,
values["Correctness"] / denom,
values["Completeness"] / denom,
)
total_scores = {
"Conciseness": sum_c1 / max(count, 1),
"Correctness": sum_c2 / max(count, 1),
"Completeness": sum_c3 / max(count, 1),
}
return total_scores, result
def calculate_acc(pred: List[str], gold: List[str]) -> float:
if not pred:
return 0.0
return sum(1 for p, g in zip(pred, gold) if p == g) / len(pred)
def get_verification_score(gold_path: str | Path, eval_path: str | Path) -> float:
gold_answers: List[str] = []
pred_answers: List[str] = []
for paper in _load_jsonl(gold_path):
for qa in paper.get("qa_pairs", []):
if qa.get("category") == "Claim_Verification":
gold_answers.append(str(qa.get("answer", "")).strip())
for item in _load_jsonl(eval_path):
if item.get("category") == "Claim_Verification":
pred_answers.append(str(item.get("gen_answer", "")).strip())
if len(gold_answers) != len(pred_answers):
raise ValueError(
f"Claim verification count mismatch: expected {len(gold_answers)}, got {len(pred_answers)}"
)
normalized_pred: List[str] = []
for gold, pred in zip(gold_answers, pred_answers):
if pred not in {"True", "False"}:
normalized_pred.append("False" if gold == "True" else "True")
else:
normalized_pred.append(pred)
return calculate_acc(normalized_pred, gold_answers[: len(normalized_pred)])
def evaluate_submission(gold_path: str | Path, pred_path: str | Path, out_dir: str | Path, judge_model: str = "gpt") -> Dict[str, float]:
out_dir = Path(out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
judged_path = out_dir / f"{Path(pred_path).stem}_{judge_model}_judge.jsonl"
if judged_path.exists():
judged_path.unlink()
paper_qa_score(gold_path, pred_path, judged_path, judge_model=judge_model)
llm_total, _ = get_llm_score(judged_path)
claim_acc = get_verification_score(gold_path, pred_path)
f1_like = (
2 * llm_total["Correctness"] * llm_total["Completeness"]
/ (llm_total["Correctness"] + llm_total["Completeness"] + 1e-8)
)
info = llm_total["Conciseness"] * f1_like * 4
return {
"Conciseness": round(llm_total["Conciseness"] * 20, 4),
"Correctness": round(llm_total["Correctness"] * 20, 4),
"Completeness": round(llm_total["Completeness"] * 20, 4),
"F1-like": round(f1_like * 20, 4),
"Info": round(info * 20, 4),
"Claim Accuracy": round(claim_acc * 100, 4),
}