ml-intern / eval /evaluate.py
akseljoonas's picture
adding hf datasets i/o
c1fac32
raw
history blame
11.2 kB
"""
Rubric-based evaluation following the "Rubrics as Rewards" paper.
Implements RaR-Explicit: Weighted sum of individual criterion scores (Equation 1)
"""
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, List, Optional
import litellm
import pandas as pd
from hf_dataset_io import df_to_hub
from pydantic import BaseModel
class CriterionCheck(BaseModel):
"""Result of checking a single rubric criterion."""
title: str
description: str
weight: int
satisfied: bool
reasoning: Optional[str] = None
class RubricEvaluation(BaseModel):
"""Complete rubric-based evaluation result."""
criterion_checks: List[CriterionCheck]
raw_score: float # Unnormalized score
normalized_score: float # Score normalized to [0, 1]
class EvaluatedResponse(BaseModel):
"""Complete evaluated response with rubric scores."""
discussion_title: str
discussion_url: str
question: str
response: str
reference_answer: str
evaluation: RubricEvaluation
CRITERION_PROMPT = """You are evaluating whether a response satisfies a specific evaluation criterion.
Question: {question}
Response to evaluate: {response}
Evaluation Criterion:
{criterion_description}
Your task: Determine if the response satisfies this criterion.
Output a JSON object with:
- "satisfied": true or false
- "reasoning": Brief explanation (1-2 sentences) of why it does or doesn't satisfy the criterion
Be strict but fair. The criterion must be clearly satisfied for you to answer true."""
class RubricData(BaseModel):
"""Rubric data loaded from file."""
title: str
description: str
weight: int
def load_rubrics_from_file(rubric_file: str) -> Dict[str, List[RubricData]]:
"""
Load rubrics from JSONL file and index by question.
Args:
rubric_file: Path to rubric JSONL file
Returns:
Dictionary mapping questions to their rubrics
"""
rubrics_by_question = {}
with open(rubric_file, "r") as f:
for line in f:
entry = json.loads(line)
question = entry["question"]
# Parse rubric JSON string
rubric_data = json.loads(entry["rubric"])
rubrics = [RubricData(**r) for r in rubric_data["rubrics"]]
rubrics_by_question[question] = rubrics
return rubrics_by_question
def check_criterion(
question: str, response: str, criterion: RubricData, model: str = "gpt-4o-mini"
) -> CriterionCheck:
"""
Check if response satisfies a single criterion.
Args:
question: The question being answered
response: The response to evaluate
criterion: The rubric criterion to check
model: LLM model for judging
Returns:
CriterionCheck with satisfaction result
"""
prompt = CRITERION_PROMPT.format(
question=question,
response=response,
criterion_description=criterion.description,
)
llm_response = litellm.completion(
model=model,
messages=[
{
"role": "system",
"content": "You are an expert evaluator for rubric-based assessment.",
},
{"role": "user", "content": prompt},
],
temperature=0.0,
response_format=CriterionCheck,
)
result = CriterionCheck.model_validate_json(llm_response.choices[0].message.content)
return result
def evaluate_with_rubrics(
question: str,
response: str,
reference_answer: str,
rubrics: List[RubricData],
model: str = "gpt-4o-mini",
) -> RubricEvaluation:
"""
Evaluate response using RaR-Explicit method (weighted sum).
Implements Equation 1 from paper:
r(x, ŷ) = Σ(w_j * c_j(x, ŷ)) / Σ(w_j)
Args:
question: The question
response: Response to evaluate
reference_answer: Reference answer (not directly used, but available)
rubrics: List of rubric criteria
model: LLM model for judging
Returns:
RubricEvaluation with normalized score
"""
# Check each criterion independently
checks = []
for rubric in rubrics:
check = check_criterion(question, response, rubric, model)
checks.append(check)
# Calculate weighted score (Equation 1)
# Only positive weights contribute to denominator
positive_weights = sum(abs(r.weight) for r in rubrics if r.weight > 0)
raw_score = 0.0
for check in checks:
if check.satisfied:
raw_score += check.weight
# Normalize to [0, 1]
normalized_score = raw_score / positive_weights if positive_weights > 0 else 0.0
# Clip to [0, 1] in case pitfalls make it negative
normalized_score = max(0.0, min(1.0, normalized_score))
return RubricEvaluation(
raw_score=raw_score,
normalized_score=normalized_score,
criterion_checks=checks,
)
def evaluate_dataset_with_rubrics(
input_file: str,
rubric_file: str,
ground_truth_file: str,
output_file: str = "rubric_evaluation_results.jsonl",
model: str = "gpt-4o-mini",
max_concurrent: int = 10,
limit: Optional[int] = None,
push_to_hub: Optional[str] = None,
) -> None:
"""
Evaluate all responses using rubric-based assessment.
Args:
input_file: Path to JSONL with responses to evaluate
rubric_file: Path to JSONL with rubrics (output from generate_rubrics.py)
ground_truth_file: Path to JSONL with ground truth answers
output_file: Path to output JSONL file
model: LLM model for judging
max_concurrent: Maximum concurrent evaluations
limit: Optional limit on number of examples
push_to_hub: Optional HuggingFace dataset spec (e.g., username/dataset@evaluations)
"""
# Load data
print(f"Loading responses from {input_file}...")
with open(input_file, "r") as f:
responses = [json.loads(line) for line in f]
print(f"Loading rubrics from {rubric_file}...")
rubrics_by_question = load_rubrics_from_file(rubric_file)
print(f"Loading ground truth from {ground_truth_file}...")
with open(ground_truth_file, "r") as f:
ground_truths = [json.loads(line) for line in f]
if limit:
responses = responses[:limit]
ground_truths = ground_truths[:limit]
print(f"Loaded {len(responses)} responses to evaluate")
print(f"Judge model: {model}")
# Match responses with rubrics and ground truth
evaluation_tasks = []
for response_data, gt_data in zip(responses, ground_truths):
question = gt_data["question"]
# Find rubrics for this question
rubrics = rubrics_by_question.get(question)
if not rubrics:
print(f"Warning: No rubrics found for question: {question[:50]}...")
continue
evaluation_tasks.append(
{
"question": question,
"response": response_data["solution"],
"reference_answer": gt_data["solution"],
"rubrics": rubrics,
"metadata": {
"discussion_title": response_data.get("discussion_title", ""),
"discussion_url": response_data.get("discussion_url", ""),
},
}
)
print(
f"Running {len(evaluation_tasks)} evaluations with {max_concurrent} parallel workers..."
)
# Run evaluations in parallel
results = []
with ThreadPoolExecutor(max_workers=max_concurrent) as executor:
# Submit all tasks
future_to_idx = {}
for idx, task in enumerate(evaluation_tasks):
future = executor.submit(
evaluate_with_rubrics,
question=task["question"],
response=task["response"],
reference_answer=task["reference_answer"],
rubrics=task["rubrics"],
model=model,
)
future_to_idx[future] = idx
# Collect results in order
results = [None] * len(evaluation_tasks)
completed = 0
for future in as_completed(future_to_idx):
idx = future_to_idx[future]
results[idx] = future.result()
completed += 1
print(f"Completed: {completed}/{len(evaluation_tasks)}", end="\r")
print() # New line after progress
# Combine results with metadata
output_data = []
total_score = 0.0
for task, evaluation in zip(evaluation_tasks, results):
evaluated_response = EvaluatedResponse(
discussion_title=task["metadata"]["discussion_title"],
discussion_url=task["metadata"]["discussion_url"],
question=task["question"],
response=task["response"],
reference_answer=task["reference_answer"],
evaluation=evaluation,
)
output_data.append(evaluated_response)
total_score += evaluation.normalized_score
# Convert to DataFrame for HuggingFace upload
results_df = pd.DataFrame([entry.model_dump() for entry in output_data])
# Upload to HuggingFace if specified (before saving JSONL)
if push_to_hub:
print(f"\nUploading to HuggingFace: {push_to_hub}")
upload_success = df_to_hub(
df=results_df,
dataset_spec=push_to_hub,
split="test",
private=False,
)
if not upload_success:
print("Warning: HuggingFace upload failed, but continuing to save JSONL...")
# Write results to JSONL file
print(f"\nWriting results to {output_file}...")
with open(output_file, "w") as f:
for entry in output_data:
f.write(entry.model_dump_json() + "\n")
# Print summary
avg_score = total_score / len(output_data) if output_data else 0.0
print("\n" + "=" * 60)
print("RUBRIC-BASED EVALUATION SUMMARY")
print("=" * 60)
print(f"Total examples: {len(output_data)}")
print(f"Judge model: {model}")
print(f"Average normalized score: {avg_score:.3f}")
print(f"Average percentage: {avg_score * 100:.1f}%")
# Per-criterion statistics
total_satisfied = sum(
sum(1 for check in eval.evaluation.criterion_checks if check.satisfied)
for eval in output_data
)
total_criteria = sum(len(eval.evaluation.criterion_checks) for eval in output_data)
satisfaction_rate = total_satisfied / total_criteria if total_criteria > 0 else 0.0
print(f"Criteria satisfaction rate: {satisfaction_rate * 100:.1f}%")
if push_to_hub and upload_success:
print(f"Pushed to: {push_to_hub}")
print("=" * 60)
if __name__ == "__main__":
evaluate_dataset_with_rubrics(
input_file="eval/qa_pairs_accepted.jsonl",
rubric_file="eval/qa_rubrics.jsonl",
ground_truth_file="eval/qa_pairs_accepted.jsonl",
output_file="rubric_evaluation.jsonl",
model="gpt-4o-mini",
max_concurrent=10,
limit=30, # Set to None to evaluate all
push_to_hub="akseljoonas/hf-agent-benchmark@ground-truth", # Set to "username/dataset@evaluations" to upload
)