from __future__ import annotations import json import os import re import time from pathlib import Path from typing import Dict, Iterable, List, Tuple from openai import OpenAI DEFAULT_GPT_MODEL = os.environ.get("RPC_BENCH_GPT_MODEL", "gpt-5-2025-08-07") DEFAULT_GEMINI_MODEL = os.environ.get("RPC_BENCH_GEMINI_MODEL", "gemini-2.5-pro") OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "") OPENAI_BASE_URL = os.environ.get("OPENAI_BASE_URL", "") def _client() -> OpenAI: return OpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_BASE_URL or None) def _extract_json(text: str) -> str: text = text.strip() if "```json" in text: match = re.search(r"```json(.*?)```", text, re.DOTALL) if match: return match.group(1).strip() if "```" in text: match = re.search(r"```(.*?)```", text, re.DOTALL) if match: return match.group(1).strip() return text def _load_jsonl(path: str | Path) -> List[Dict]: rows: List[Dict] = [] with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue rows.append(json.loads(line)) return rows def _judge(messages: List[Dict], model: str) -> str: client = _client() response = client.chat.completions.create( model=model, messages=messages, stream=False, ) return response.choices[0].message.content or "" def _score_prompt(title: str, abstract: str, question: str, reference_answer: str, predicted_answer: str) -> List[Dict]: system_prompt = ( "You are a strict paper-answer judge. Return JSON only. " "Score the prediction on three dimensions: Conciseness, Correctness, Completeness. " "Each dimension must contain a numeric rating in [1, 5] and a short reason." ) user_prompt = ( f"Title: {title}\n" f"Abstract: {abstract}\n" f"Question: {question}\n" f"Reference answer: {reference_answer}\n" f"Predicted answer: {predicted_answer}\n" "Return JSON only with keys Conciseness, Correctness, Completeness." ) return [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}] def _normalize_rating_block(content: Dict) -> Dict: result = {} for key in ("Conciseness", "Correctness", "Completeness"): value = content.get(key, {}) if isinstance(value, dict): rating = float(value.get("rating", 0.0)) reason = value.get("reason", "") else: rating = float(value) reason = "" result[key] = {"rating": rating, "reason": reason} return result def paper_qa_score( file_path: str | Path, eval_path: str | Path, out_path: str | Path, judge_model: str = "gpt", ) -> None: gold_items = _load_jsonl(file_path) pred_items = _load_jsonl(eval_path) paper_dict: Dict[str, Dict[str, str]] = {} qa_items: List[Dict] = [] for paper in gold_items: paper_dict[paper["id"]] = { "title": paper.get("title", ""), "abstract": paper.get("abstract", ""), } for idx, qa in enumerate(paper.get("qa_pairs", []), start=1): qa_items.append( { "id": paper["id"], "part_idx": idx, "question": qa["question"], "answer": qa["answer"], "category": qa["category"], } ) os.makedirs(Path(out_path).parent, exist_ok=True) if len(qa_items) != len(pred_items): raise ValueError(f"Prediction count mismatch: expected {len(qa_items)}, got {len(pred_items)}") model_name = DEFAULT_GPT_MODEL if judge_model == "gpt" else DEFAULT_GEMINI_MODEL for gold, pred in zip(qa_items, pred_items): if gold["id"] != pred["id"] or gold["part_idx"] != pred["part_idx"]: raise ValueError(f"Submission order mismatch at {gold['id']} / {gold['part_idx']}") if gold["category"] == "Claim_Verification": score_block = [] else: messages = _score_prompt( paper_dict[gold["id"]]["title"], paper_dict[gold["id"]]["abstract"], gold["question"], gold["answer"], pred["gen_answer"], ) raw = _judge(messages, model_name) score_block = _normalize_rating_block(json.loads(_extract_json(raw))) time.sleep(float(os.environ.get("RPC_BENCH_JUDGE_SLEEP", "0"))) with open(out_path, "a", encoding="utf-8") as fw: fw.write( json.dumps( { "id": gold["id"], "part_idx": gold["part_idx"], "question": gold["question"], "reference_answer": gold["answer"], "predicted_answer": pred["gen_answer"], "category": gold["category"], "score": score_block, }, ensure_ascii=False, ) + "\n" ) def get_llm_score(eval_path: str | Path) -> Tuple[Dict[str, float], Dict[str, Tuple[float, float, float]]]: category_dict: Dict[str, Dict[str, float]] = {} sum_c1 = sum_c2 = sum_c3 = 0.0 count = 0 with open(eval_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue item = json.loads(line) category = item["category"] if category == "Claim_Verification": continue if category not in category_dict: category_dict[category] = {"Conciseness": 0.0, "Correctness": 0.0, "Completeness": 0.0, "count": 0.0} content = item.get("score", {}) c1 = float(content.get("Conciseness", {}).get("rating", 0.0)) c2 = float(content.get("Correctness", {}).get("rating", 0.0)) c3 = float(content.get("Completeness", {}).get("rating", 0.0)) category_dict[category]["Conciseness"] += c1 category_dict[category]["Correctness"] += c2 category_dict[category]["Completeness"] += c3 category_dict[category]["count"] += 1 sum_c1 += c1 sum_c2 += c2 sum_c3 += c3 count += 1 result: Dict[str, Tuple[float, float, float]] = {} for category, values in category_dict.items(): denom = max(values["count"], 1.0) result[category] = ( values["Conciseness"] / denom, values["Correctness"] / denom, values["Completeness"] / denom, ) total_scores = { "Conciseness": sum_c1 / max(count, 1), "Correctness": sum_c2 / max(count, 1), "Completeness": sum_c3 / max(count, 1), } return total_scores, result def calculate_acc(pred: List[str], gold: List[str]) -> float: if not pred: return 0.0 return sum(1 for p, g in zip(pred, gold) if p == g) / len(pred) def get_verification_score(gold_path: str | Path, eval_path: str | Path) -> float: gold_answers: List[str] = [] pred_answers: List[str] = [] for paper in _load_jsonl(gold_path): for qa in paper.get("qa_pairs", []): if qa.get("category") == "Claim_Verification": gold_answers.append(str(qa.get("answer", "")).strip()) for item in _load_jsonl(eval_path): if item.get("category") == "Claim_Verification": pred_answers.append(str(item.get("gen_answer", "")).strip()) if len(gold_answers) != len(pred_answers): raise ValueError( f"Claim verification count mismatch: expected {len(gold_answers)}, got {len(pred_answers)}" ) normalized_pred: List[str] = [] for gold, pred in zip(gold_answers, pred_answers): if pred not in {"True", "False"}: normalized_pred.append("False" if gold == "True" else "True") else: normalized_pred.append(pred) return calculate_acc(normalized_pred, gold_answers[: len(normalized_pred)]) def evaluate_submission(gold_path: str | Path, pred_path: str | Path, out_dir: str | Path, judge_model: str = "gpt") -> Dict[str, float]: out_dir = Path(out_dir) out_dir.mkdir(parents=True, exist_ok=True) judged_path = out_dir / f"{Path(pred_path).stem}_{judge_model}_judge.jsonl" if judged_path.exists(): judged_path.unlink() paper_qa_score(gold_path, pred_path, judged_path, judge_model=judge_model) llm_total, _ = get_llm_score(judged_path) claim_acc = get_verification_score(gold_path, pred_path) f1_like = ( 2 * llm_total["Correctness"] * llm_total["Completeness"] / (llm_total["Correctness"] + llm_total["Completeness"] + 1e-8) ) info = llm_total["Conciseness"] * f1_like * 4 return { "Conciseness": round(llm_total["Conciseness"] * 20, 4), "Correctness": round(llm_total["Correctness"] * 20, 4), "Completeness": round(llm_total["Completeness"] * 20, 4), "F1-like": round(f1_like * 20, 4), "Info": round(info * 20, 4), "Claim Accuracy": round(claim_acc * 100, 4), }