| """ |
| Helion-OSC Evaluation Script |
| Comprehensive evaluation suite for code generation and mathematical reasoning |
| """ |
|
|
| import os |
| import json |
| import torch |
| import logging |
| import numpy as np |
| from typing import List, Dict, Any, Optional, Tuple |
| from dataclasses import dataclass, field |
| from tqdm import tqdm |
| import subprocess |
| import tempfile |
| import signal |
| from contextlib import contextmanager |
| import multiprocessing as mp |
| from transformers import AutoTokenizer, AutoModelForCausalLM |
| from datasets import load_dataset |
| import re |
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class EvaluationConfig: |
| """Configuration for evaluation""" |
| model_name: str = "DeepXR/Helion-OSC" |
| device: str = "cuda" if torch.cuda.is_available() else "cpu" |
| batch_size: int = 4 |
| max_length: int = 2048 |
| temperature: float = 0.7 |
| top_p: float = 0.95 |
| num_samples: int = 1 |
| timeout: int = 5 |
| output_dir: str = "./evaluation_results" |
|
|
|
|
| class TimeoutException(Exception): |
| """Exception raised when code execution times out""" |
| pass |
|
|
|
|
| @contextmanager |
| def time_limit(seconds): |
| """Context manager for timing out code execution""" |
| def signal_handler(signum, frame): |
| raise TimeoutException("Code execution timed out") |
| |
| signal.signal(signal.SIGALRM, signal_handler) |
| signal.alarm(seconds) |
| try: |
| yield |
| finally: |
| signal.alarm(0) |
|
|
|
|
| class CodeExecutor: |
| """Safe code execution environment""" |
| |
| @staticmethod |
| def execute_python(code: str, timeout: int = 5) -> Tuple[bool, str]: |
| """ |
| Execute Python code safely |
| |
| Args: |
| code: Python code to execute |
| timeout: Timeout in seconds |
| |
| Returns: |
| Tuple of (success, output/error) |
| """ |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: |
| f.write(code) |
| temp_file = f.name |
| |
| try: |
| result = subprocess.run( |
| ['python', temp_file], |
| capture_output=True, |
| text=True, |
| timeout=timeout |
| ) |
| |
| os.unlink(temp_file) |
| |
| if result.returncode == 0: |
| return True, result.stdout |
| else: |
| return False, result.stderr |
| |
| except subprocess.TimeoutExpired: |
| os.unlink(temp_file) |
| return False, "Execution timed out" |
| except Exception as e: |
| if os.path.exists(temp_file): |
| os.unlink(temp_file) |
| return False, str(e) |
| |
| @staticmethod |
| def check_syntax(code: str, language: str = "python") -> Tuple[bool, str]: |
| """ |
| Check code syntax without execution |
| |
| Args: |
| code: Code to check |
| language: Programming language |
| |
| Returns: |
| Tuple of (is_valid, error_message) |
| """ |
| if language.lower() == "python": |
| try: |
| compile(code, '<string>', 'exec') |
| return True, "" |
| except SyntaxError as e: |
| return False, str(e) |
| |
| return True, "Syntax checking not implemented for this language" |
|
|
|
|
| class HumanEvalEvaluator: |
| """Evaluator for HumanEval benchmark""" |
| |
| def __init__(self, config: EvaluationConfig): |
| self.config = config |
| self.tokenizer = AutoTokenizer.from_pretrained(config.model_name) |
| self.model = AutoModelForCausalLM.from_pretrained( |
| config.model_name, |
| torch_dtype=torch.bfloat16 if config.device == "cuda" else torch.float32, |
| device_map="auto" if config.device == "cuda" else None |
| ) |
| if config.device == "cpu": |
| self.model = self.model.to(config.device) |
| self.model.eval() |
| self.executor = CodeExecutor() |
| |
| def load_humaneval(self) -> List[Dict]: |
| """Load HumanEval dataset""" |
| logger.info("Loading HumanEval dataset...") |
| dataset = load_dataset("openai_humaneval", split="test") |
| return list(dataset) |
| |
| def generate_solution(self, prompt: str) -> str: |
| """Generate code solution for a prompt""" |
| inputs = self.tokenizer(prompt, return_tensors="pt").to(self.config.device) |
| |
| with torch.no_grad(): |
| outputs = self.model.generate( |
| **inputs, |
| max_length=self.config.max_length, |
| temperature=self.config.temperature, |
| top_p=self.config.top_p, |
| do_sample=True, |
| pad_token_id=self.tokenizer.eos_token_id |
| ) |
| |
| generated = self.tokenizer.decode(outputs[0], skip_special_tokens=True) |
| |
| solution = generated[len(prompt):].strip() |
| return solution |
| |
| def test_solution(self, solution: str, test_code: str) -> bool: |
| """Test a solution against test cases""" |
| full_code = solution + "\n" + test_code |
| success, output = self.executor.execute_python(full_code, self.config.timeout) |
| return success |
| |
| def evaluate(self) -> Dict[str, float]: |
| """Run HumanEval evaluation""" |
| logger.info("Starting HumanEval evaluation...") |
| |
| problems = self.load_humaneval() |
| results = { |
| "total": len(problems), |
| "passed": 0, |
| "failed": 0, |
| "syntax_errors": 0, |
| "runtime_errors": 0, |
| "timeouts": 0 |
| } |
| |
| for problem in tqdm(problems, desc="Evaluating HumanEval"): |
| prompt = problem["prompt"] |
| test = problem["test"] |
| entry_point = problem["entry_point"] |
| |
| |
| solution = self.generate_solution(prompt) |
| |
| |
| is_valid, error = self.executor.check_syntax(solution) |
| if not is_valid: |
| results["syntax_errors"] += 1 |
| results["failed"] += 1 |
| continue |
| |
| |
| try: |
| if self.test_solution(solution, test): |
| results["passed"] += 1 |
| else: |
| results["failed"] += 1 |
| results["runtime_errors"] += 1 |
| except TimeoutException: |
| results["failed"] += 1 |
| results["timeouts"] += 1 |
| |
| |
| results["pass@1"] = results["passed"] / results["total"] |
| |
| logger.info(f"HumanEval Results: {results}") |
| return results |
|
|
|
|
| class MBPPEvaluator: |
| """Evaluator for MBPP (Mostly Basic Python Problems) benchmark""" |
| |
| def __init__(self, config: EvaluationConfig): |
| self.config = config |
| self.tokenizer = AutoTokenizer.from_pretrained(config.model_name) |
| self.model = AutoModelForCausalLM.from_pretrained( |
| config.model_name, |
| torch_dtype=torch.bfloat16 if config.device == "cuda" else torch.float32, |
| device_map="auto" if config.device == "cuda" else None |
| ) |
| if config.device == "cpu": |
| self.model = self.model.to(config.device) |
| self.model.eval() |
| self.executor = CodeExecutor() |
| |
| def load_mbpp(self) -> List[Dict]: |
| """Load MBPP dataset""" |
| logger.info("Loading MBPP dataset...") |
| dataset = load_dataset("mbpp", split="test") |
| return list(dataset) |
| |
| def generate_solution(self, prompt: str) -> str: |
| """Generate code solution""" |
| inputs = self.tokenizer(prompt, return_tensors="pt").to(self.config.device) |
| |
| with torch.no_grad(): |
| outputs = self.model.generate( |
| **inputs, |
| max_length=self.config.max_length, |
| temperature=self.config.temperature, |
| top_p=self.config.top_p, |
| do_sample=True, |
| pad_token_id=self.tokenizer.eos_token_id |
| ) |
| |
| generated = self.tokenizer.decode(outputs[0], skip_special_tokens=True) |
| solution = generated[len(prompt):].strip() |
| return solution |
| |
| def evaluate(self) -> Dict[str, float]: |
| """Run MBPP evaluation""" |
| logger.info("Starting MBPP evaluation...") |
| |
| problems = self.load_mbpp() |
| results = { |
| "total": len(problems), |
| "passed": 0, |
| "failed": 0 |
| } |
| |
| for problem in tqdm(problems, desc="Evaluating MBPP"): |
| prompt = problem["text"] |
| test_cases = problem["test_list"] |
| |
| |
| solution = self.generate_solution(prompt) |
| |
| |
| all_passed = True |
| for test in test_cases: |
| test_code = solution + "\n" + test |
| success, _ = self.executor.execute_python(test_code, self.config.timeout) |
| if not success: |
| all_passed = False |
| break |
| |
| if all_passed: |
| results["passed"] += 1 |
| else: |
| results["failed"] += 1 |
| |
| results["pass@1"] = results["passed"] / results["total"] |
| |
| logger.info(f"MBPP Results: {results}") |
| return results |
|
|
|
|
| class GSM8KEvaluator: |
| """Evaluator for GSM8K mathematical reasoning benchmark""" |
| |
| def __init__(self, config: EvaluationConfig): |
| self.config = config |
| self.tokenizer = AutoTokenizer.from_pretrained(config.model_name) |
| self.model = AutoModelForCausalLM.from_pretrained( |
| config.model_name, |
| torch_dtype=torch.bfloat16 if config.device == "cuda" else torch.float32, |
| device_map="auto" if config.device == "cuda" else None |
| ) |
| if config.device == "cpu": |
| self.model = self.model.to(config.device) |
| self.model.eval() |
| |
| def load_gsm8k(self) -> List[Dict]: |
| """Load GSM8K dataset""" |
| logger.info("Loading GSM8K dataset...") |
| dataset = load_dataset("gsm8k", "main", split="test") |
| return list(dataset) |
| |
| def extract_answer(self, text: str) -> Optional[float]: |
| """Extract numerical answer from text""" |
| |
| patterns = [ |
| r'####\s*(-?\d+\.?\d*)', |
| r'answer is\s*(-?\d+\.?\d*)', |
| r'equals?\s*(-?\d+\.?\d*)', |
| r'=\s*(-?\d+\.?\d*)', |
| r'\$?\s*(-?\d+\.?\d*)\s*$' |
| ] |
| |
| for pattern in patterns: |
| match = re.search(pattern, text, re.IGNORECASE) |
| if match: |
| try: |
| return float(match.group(1)) |
| except: |
| continue |
| |
| return None |
| |
| def generate_solution(self, problem: str) -> str: |
| """Generate solution for math problem""" |
| prompt = f"Problem: {problem}\n\nLet's solve this step by step:\n" |
| inputs = self.tokenizer(prompt, return_tensors="pt").to(self.config.device) |
| |
| with torch.no_grad(): |
| outputs = self.model.generate( |
| **inputs, |
| max_length=self.config.max_length, |
| temperature=0.3, |
| top_p=0.9, |
| do_sample=False, |
| pad_token_id=self.tokenizer.eos_token_id |
| ) |
| |
| generated = self.tokenizer.decode(outputs[0], skip_special_tokens=True) |
| return generated |
| |
| def evaluate(self) -> Dict[str, float]: |
| """Run GSM8K evaluation""" |
| logger.info("Starting GSM8K evaluation...") |
| |
| problems = self.load_gsm8k() |
| results = { |
| "total": len(problems), |
| "correct": 0, |
| "incorrect": 0, |
| "no_answer": 0 |
| } |
| |
| for problem in tqdm(problems, desc="Evaluating GSM8K"): |
| question = problem["question"] |
| correct_answer_text = problem["answer"] |
| |
| |
| correct_answer = self.extract_answer(correct_answer_text) |
| if correct_answer is None: |
| continue |
| |
| |
| solution = self.generate_solution(question) |
| |
| |
| predicted_answer = self.extract_answer(solution) |
| |
| if predicted_answer is None: |
| results["no_answer"] += 1 |
| results["incorrect"] += 1 |
| elif abs(predicted_answer - correct_answer) < 1e-5: |
| results["correct"] += 1 |
| else: |
| results["incorrect"] += 1 |
| |
| results["accuracy"] = results["correct"] / results["total"] |
| |
| logger.info(f"GSM8K Results: {results}") |
| return results |
|
|
|
|
| class ComprehensiveEvaluator: |
| """Run comprehensive evaluation across all benchmarks""" |
| |
| def __init__(self, config: EvaluationConfig): |
| self.config = config |
| os.makedirs(config.output_dir, exist_ok=True) |
| |
| def run_all_evaluations(self) -> Dict[str, Any]: |
| """Run all evaluation benchmarks""" |
| logger.info("Starting comprehensive evaluation...") |
| |
| all_results = {} |
| |
| |
| try: |
| logger.info("\n" + "="*80) |
| logger.info("Running HumanEval Evaluation") |
| logger.info("="*80) |
| humaneval_evaluator = HumanEvalEvaluator(self.config) |
| all_results["humaneval"] = humaneval_evaluator.evaluate() |
| except Exception as e: |
| logger.error(f"HumanEval evaluation failed: {e}") |
| all_results["humaneval"] = {"error": str(e)} |
| |
| |
| try: |
| logger.info("\n" + "="*80) |
| logger.info("Running MBPP Evaluation") |
| logger.info("="*80) |
| mbpp_evaluator = MBPPEvaluator(self.config) |
| all_results["mbpp"] = mbpp_evaluator.evaluate() |
| except Exception as e: |
| logger.error(f"MBPP evaluation failed: {e}") |
| all_results["mbpp"] = {"error": str(e)} |
| |
| |
| try: |
| logger.info("\n" + "="*80) |
| logger.info("Running GSM8K Evaluation") |
| logger.info("="*80) |
| gsm8k_evaluator = GSM8KEvaluator(self.config) |
| all_results["gsm8k"] = gsm8k_evaluator.evaluate() |
| except Exception as e: |
| logger.error(f"GSM8K evaluation failed: {e}") |
| all_results["gsm8k"] = {"error": str(e)} |
| |
| |
| self.save_results(all_results) |
| |
| |
| self.print_summary(all_results) |
| |
| return all_results |
| |
| def save_results(self, results: Dict[str, Any]): |
| """Save evaluation results to file""" |
| output_file = os.path.join(self.config.output_dir, "evaluation_results.json") |
| with open(output_file, 'w') as f: |
| json.dump(results, f, indent=2) |
| logger.info(f"Results saved to {output_file}") |
| |
| def print_summary(self, results: Dict[str, Any]): |
| """Print evaluation summary""" |
| logger.info("\n" + "="*80) |
| logger.info("EVALUATION SUMMARY") |
| logger.info("="*80) |
| |
| if "humaneval" in results and "pass@1" in results["humaneval"]: |
| logger.info(f"HumanEval Pass@1: {results['humaneval']['pass@1']:.3f}") |
| |
| if "mbpp" in results and "pass@1" in results["mbpp"]: |
| logger.info(f"MBPP Pass@1: {results['mbpp']['pass@1']:.3f}") |
| |
| if "gsm8k" in results and "accuracy" in results["gsm8k"]: |
| logger.info(f"GSM8K Accuracy: {results['gsm8k']['accuracy']:.3f}") |
| |
| logger.info("="*80) |
|
|
|
|
| def main(): |
| """Main evaluation script""" |
| import argparse |
| |
| parser = argparse.ArgumentParser(description="Evaluate Helion-OSC model") |
| parser.add_argument("--model_name", type=str, default="DeepXR/Helion-OSC") |
| parser.add_argument("--device", type=str, default="cuda" if torch.cuda.is_available() else "cpu") |
| parser.add_argument("--batch_size", type=int, default=4) |
| parser.add_argument("--max_length", type=int, default=2048) |
| parser.add_argument("--temperature", type=float, default=0.7) |
| parser.add_argument("--top_p", type=float, default=0.95) |
| parser.add_argument("--timeout", type=int, default=5) |
| parser.add_argument("--output_dir", type=str, default="./evaluation_results") |
| parser.add_argument("--benchmark", type=str, choices=["all", "humaneval", "mbpp", "gsm8k"], default="all") |
| |
| args = parser.parse_args() |
| |
| config = EvaluationConfig( |
| model_name=args.model_name, |
| device=args.device, |
| batch_size=args.batch_size, |
| max_length=args.max_length, |
| temperature=args.temperature, |
| top_p=args.top_p, |
| timeout=args.timeout, |
| output_dir=args.output_dir |
| ) |
| |
| if args.benchmark == "all": |
| evaluator = ComprehensiveEvaluator(config) |
| evaluator.run_all_evaluations() |
| elif args.benchmark == "humaneval": |
| evaluator = HumanEvalEvaluator(config) |
| evaluator.evaluate() |
| elif args.benchmark == "mbpp": |
| evaluator = MBPPEvaluator(config) |
| evaluator.evaluate() |
| elif args.benchmark == "gsm8k": |
| evaluator = GSM8KEvaluator(config) |
| evaluator.evaluate() |
|
|
|
|
| if __name__ == "__main__": |
| main() |