Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """ | |
| Rubric-based evaluation following the "Rubrics as Rewards" paper. | |
| Implements RaR-Explicit: Weighted sum of individual criterion scores (Equation 1) | |
| """ | |
| import json | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from typing import Dict, List, Optional | |
| import litellm | |
| import pandas as pd | |
| from hf_dataset_io import df_to_hub | |
| from pydantic import BaseModel | |
| class CriterionCheck(BaseModel): | |
| """Result of checking a single rubric criterion.""" | |
| title: str | |
| description: str | |
| weight: int | |
| satisfied: bool | |
| reasoning: Optional[str] = None | |
| class RubricEvaluation(BaseModel): | |
| """Complete rubric-based evaluation result.""" | |
| criterion_checks: List[CriterionCheck] | |
| raw_score: float # Unnormalized score | |
| normalized_score: float # Score normalized to [0, 1] | |
| class EvaluatedResponse(BaseModel): | |
| """Complete evaluated response with rubric scores.""" | |
| discussion_title: str | |
| discussion_url: str | |
| question: str | |
| response: str | |
| reference_answer: str | |
| evaluation: RubricEvaluation | |
| CRITERION_PROMPT = """You are evaluating whether a response satisfies a specific evaluation criterion. | |
| Question: {question} | |
| Response to evaluate: {response} | |
| Evaluation Criterion: | |
| {criterion_description} | |
| Your task: Determine if the response satisfies this criterion. | |
| Output a JSON object with: | |
| - "satisfied": true or false | |
| - "reasoning": Brief explanation (1-2 sentences) of why it does or doesn't satisfy the criterion | |
| Be strict but fair. The criterion must be clearly satisfied for you to answer true.""" | |
| class RubricData(BaseModel): | |
| """Rubric data loaded from file.""" | |
| title: str | |
| description: str | |
| weight: int | |
| def load_rubrics_from_file(rubric_file: str) -> Dict[str, List[RubricData]]: | |
| """ | |
| Load rubrics from JSONL file and index by question. | |
| Args: | |
| rubric_file: Path to rubric JSONL file | |
| Returns: | |
| Dictionary mapping questions to their rubrics | |
| """ | |
| rubrics_by_question = {} | |
| with open(rubric_file, "r") as f: | |
| for line in f: | |
| entry = json.loads(line) | |
| question = entry["question"] | |
| # Parse rubric JSON string | |
| rubric_data = json.loads(entry["rubric"]) | |
| rubrics = [RubricData(**r) for r in rubric_data["rubrics"]] | |
| rubrics_by_question[question] = rubrics | |
| return rubrics_by_question | |
| def check_criterion( | |
| question: str, response: str, criterion: RubricData, model: str = "gpt-4o-mini" | |
| ) -> CriterionCheck: | |
| """ | |
| Check if response satisfies a single criterion. | |
| Args: | |
| question: The question being answered | |
| response: The response to evaluate | |
| criterion: The rubric criterion to check | |
| model: LLM model for judging | |
| Returns: | |
| CriterionCheck with satisfaction result | |
| """ | |
| prompt = CRITERION_PROMPT.format( | |
| question=question, | |
| response=response, | |
| criterion_description=criterion.description, | |
| ) | |
| llm_response = litellm.completion( | |
| model=model, | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": "You are an expert evaluator for rubric-based assessment.", | |
| }, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| temperature=0.0, | |
| response_format=CriterionCheck, | |
| ) | |
| result = CriterionCheck.model_validate_json(llm_response.choices[0].message.content) | |
| return result | |
| def evaluate_with_rubrics( | |
| question: str, | |
| response: str, | |
| reference_answer: str, | |
| rubrics: List[RubricData], | |
| model: str = "gpt-4o-mini", | |
| ) -> RubricEvaluation: | |
| """ | |
| Evaluate response using RaR-Explicit method (weighted sum). | |
| Implements Equation 1 from paper: | |
| r(x, ŷ) = Σ(w_j * c_j(x, ŷ)) / Σ(w_j) | |
| Args: | |
| question: The question | |
| response: Response to evaluate | |
| reference_answer: Reference answer (not directly used, but available) | |
| rubrics: List of rubric criteria | |
| model: LLM model for judging | |
| Returns: | |
| RubricEvaluation with normalized score | |
| """ | |
| # Check each criterion independently | |
| checks = [] | |
| for rubric in rubrics: | |
| check = check_criterion(question, response, rubric, model) | |
| checks.append(check) | |
| # Calculate weighted score (Equation 1) | |
| # Only positive weights contribute to denominator | |
| positive_weights = sum(abs(r.weight) for r in rubrics if r.weight > 0) | |
| raw_score = 0.0 | |
| for check in checks: | |
| if check.satisfied: | |
| raw_score += check.weight | |
| # Normalize to [0, 1] | |
| normalized_score = raw_score / positive_weights if positive_weights > 0 else 0.0 | |
| # Clip to [0, 1] in case pitfalls make it negative | |
| normalized_score = max(0.0, min(1.0, normalized_score)) | |
| return RubricEvaluation( | |
| raw_score=raw_score, | |
| normalized_score=normalized_score, | |
| criterion_checks=checks, | |
| ) | |
| def evaluate_dataset_with_rubrics( | |
| input_file: str, | |
| rubric_file: str, | |
| ground_truth_file: str, | |
| output_file: str = "rubric_evaluation_results.jsonl", | |
| model: str = "gpt-4o-mini", | |
| max_concurrent: int = 10, | |
| limit: Optional[int] = None, | |
| push_to_hub: Optional[str] = None, | |
| ) -> None: | |
| """ | |
| Evaluate all responses using rubric-based assessment. | |
| Args: | |
| input_file: Path to JSONL with responses to evaluate | |
| rubric_file: Path to JSONL with rubrics (output from generate_rubrics.py) | |
| ground_truth_file: Path to JSONL with ground truth answers | |
| output_file: Path to output JSONL file | |
| model: LLM model for judging | |
| max_concurrent: Maximum concurrent evaluations | |
| limit: Optional limit on number of examples | |
| push_to_hub: Optional HuggingFace dataset spec (e.g., username/dataset@evaluations) | |
| """ | |
| # Load data | |
| print(f"Loading responses from {input_file}...") | |
| with open(input_file, "r") as f: | |
| responses = [json.loads(line) for line in f] | |
| print(f"Loading rubrics from {rubric_file}...") | |
| rubrics_by_question = load_rubrics_from_file(rubric_file) | |
| print(f"Loading ground truth from {ground_truth_file}...") | |
| with open(ground_truth_file, "r") as f: | |
| ground_truths = [json.loads(line) for line in f] | |
| if limit: | |
| responses = responses[:limit] | |
| ground_truths = ground_truths[:limit] | |
| print(f"Loaded {len(responses)} responses to evaluate") | |
| print(f"Judge model: {model}") | |
| # Match responses with rubrics and ground truth | |
| evaluation_tasks = [] | |
| for response_data, gt_data in zip(responses, ground_truths): | |
| question = gt_data["question"] | |
| # Find rubrics for this question | |
| rubrics = rubrics_by_question.get(question) | |
| if not rubrics: | |
| print(f"Warning: No rubrics found for question: {question[:50]}...") | |
| continue | |
| evaluation_tasks.append( | |
| { | |
| "question": question, | |
| "response": response_data["solution"], | |
| "reference_answer": gt_data["solution"], | |
| "rubrics": rubrics, | |
| "metadata": { | |
| "discussion_title": response_data.get("discussion_title", ""), | |
| "discussion_url": response_data.get("discussion_url", ""), | |
| }, | |
| } | |
| ) | |
| print( | |
| f"Running {len(evaluation_tasks)} evaluations with {max_concurrent} parallel workers..." | |
| ) | |
| # Run evaluations in parallel | |
| results = [] | |
| with ThreadPoolExecutor(max_workers=max_concurrent) as executor: | |
| # Submit all tasks | |
| future_to_idx = {} | |
| for idx, task in enumerate(evaluation_tasks): | |
| future = executor.submit( | |
| evaluate_with_rubrics, | |
| question=task["question"], | |
| response=task["response"], | |
| reference_answer=task["reference_answer"], | |
| rubrics=task["rubrics"], | |
| model=model, | |
| ) | |
| future_to_idx[future] = idx | |
| # Collect results in order | |
| results = [None] * len(evaluation_tasks) | |
| completed = 0 | |
| for future in as_completed(future_to_idx): | |
| idx = future_to_idx[future] | |
| results[idx] = future.result() | |
| completed += 1 | |
| print(f"Completed: {completed}/{len(evaluation_tasks)}", end="\r") | |
| print() # New line after progress | |
| # Combine results with metadata | |
| output_data = [] | |
| total_score = 0.0 | |
| for task, evaluation in zip(evaluation_tasks, results): | |
| evaluated_response = EvaluatedResponse( | |
| discussion_title=task["metadata"]["discussion_title"], | |
| discussion_url=task["metadata"]["discussion_url"], | |
| question=task["question"], | |
| response=task["response"], | |
| reference_answer=task["reference_answer"], | |
| evaluation=evaluation, | |
| ) | |
| output_data.append(evaluated_response) | |
| total_score += evaluation.normalized_score | |
| # Convert to DataFrame for HuggingFace upload | |
| results_df = pd.DataFrame([entry.model_dump() for entry in output_data]) | |
| # Upload to HuggingFace if specified (before saving JSONL) | |
| if push_to_hub: | |
| print(f"\nUploading to HuggingFace: {push_to_hub}") | |
| upload_success = df_to_hub( | |
| df=results_df, | |
| dataset_spec=push_to_hub, | |
| split="test", | |
| private=False, | |
| ) | |
| if not upload_success: | |
| print("Warning: HuggingFace upload failed, but continuing to save JSONL...") | |
| # Write results to JSONL file | |
| print(f"\nWriting results to {output_file}...") | |
| with open(output_file, "w") as f: | |
| for entry in output_data: | |
| f.write(entry.model_dump_json() + "\n") | |
| # Print summary | |
| avg_score = total_score / len(output_data) if output_data else 0.0 | |
| print("\n" + "=" * 60) | |
| print("RUBRIC-BASED EVALUATION SUMMARY") | |
| print("=" * 60) | |
| print(f"Total examples: {len(output_data)}") | |
| print(f"Judge model: {model}") | |
| print(f"Average normalized score: {avg_score:.3f}") | |
| print(f"Average percentage: {avg_score * 100:.1f}%") | |
| # Per-criterion statistics | |
| total_satisfied = sum( | |
| sum(1 for check in eval.evaluation.criterion_checks if check.satisfied) | |
| for eval in output_data | |
| ) | |
| total_criteria = sum(len(eval.evaluation.criterion_checks) for eval in output_data) | |
| satisfaction_rate = total_satisfied / total_criteria if total_criteria > 0 else 0.0 | |
| print(f"Criteria satisfaction rate: {satisfaction_rate * 100:.1f}%") | |
| if push_to_hub and upload_success: | |
| print(f"Pushed to: {push_to_hub}") | |
| print("=" * 60) | |
| if __name__ == "__main__": | |
| evaluate_dataset_with_rubrics( | |
| input_file="eval/qa_pairs_accepted.jsonl", | |
| rubric_file="eval/qa_rubrics.jsonl", | |
| ground_truth_file="eval/qa_pairs_accepted.jsonl", | |
| output_file="rubric_evaluation.jsonl", | |
| model="gpt-4o-mini", | |
| max_concurrent=10, | |
| limit=30, # Set to None to evaluate all | |
| push_to_hub="akseljoonas/hf-agent-benchmark@ground-truth", # Set to "username/dataset@evaluations" to upload | |
| ) | |