| import os |
| import regex |
| from typing import Any, List, Callable |
| from ..core.logging import logger |
| from .benchmark import Benchmark |
| from ..utils.utils import download_file |
| from ..core.module_utils import load_json |
| from ..utils.aflow_utils.data_utils import AFLOW_DATASET_FILES_MAP, download_aflow_benchmark_data |
|
|
|
|
| GSM8K_FILES_MAP = {"train": "train.jsonl", "dev": None, "test": "test.jsonl"} |
| VALID_RAW_GSM8K_FILES = [file for file in list(GSM8K_FILES_MAP.values()) if file is not None] |
|
|
| def download_raw_gsm8k_data(name: str, save_folder: str): |
|
|
| assert name in VALID_RAW_GSM8K_FILES, f"'{name}' is an invalid GSM8K file name. Available file names: {VALID_RAW_GSM8K_FILES}" |
| url = f"https://raw.githubusercontent.com/openai/grade-school-math/master/grade_school_math/data/{name}" |
| typ = "train" if "train" in name else "test" |
| logger.info(f"Downloading GSM8K {typ} data from: {url}") |
| download_file(url=url, save_file=os.path.join(save_folder, name)) |
|
|
|
|
| def load_gsm8k_data(file_path: str) -> List[dict]: |
|
|
| base_name = os.path.basename(file_path) |
| file_type_map = {file_name: typ for typ, file_name in GSM8K_FILES_MAP.items()} |
| assert base_name in file_type_map, f"'{base_name}' is an invalid gsm8k file name. Available file names: {VALID_RAW_GSM8K_FILES}" |
|
|
| typ = file_type_map[base_name] |
| data = load_json(path=file_path, type="jsonl") |
| new_data = [] |
| for i, example in enumerate(data): |
| item = {"id": f"{typ}-{i+1}"} |
| item.update(example) |
| new_data.append(item) |
| return new_data |
| |
|
|
| class GSM8K(Benchmark): |
|
|
| """Benchmark class for evaluating math reasoning on GSM8K dataset. |
| |
| GSM8K (Grade School Math 8K) is a dataset of math word problems that |
| test a model's ability to solve grade school level math problems requiring |
| multi-step reasoning. This class handles loading the dataset, evaluating |
| solutions, and computing metrics based on answer accuracy. |
| |
| Each GSM8K example has the following structure: |
| { |
| "id": "test-1", |
| "question": "the question", |
| "answer": "the answer" |
| } |
| |
| The benchmark evaluates answers by extracting the final numerical value |
| and comparing it to the ground truth answer. |
| """ |
| |
| def __init__(self, path: str = None, mode: str = "all", **kwargs): |
| path = os.path.expanduser(path or "~/.evoagentx/data/gsm8k") |
| super().__init__(name=type(self).__name__, path=path, mode=mode, **kwargs) |
|
|
| def _load_data_from_file(self, file_name: str): |
| if file_name is None: |
| return None |
| file_path = os.path.join(self.path, file_name) |
| if not os.path.exists(file_path): |
| download_raw_gsm8k_data(name=file_name, save_folder=self.path) |
| |
| logger.info(f"loading GSM8K data from {file_path} ...") |
| return load_gsm8k_data(file_path=file_path) |
| |
| def _load_data(self): |
| if self.mode == "train" or self.mode == "all": |
| self._train_data = self._load_data_from_file(file_name=GSM8K_FILES_MAP["train"]) |
| if self.mode == "dev" or self.mode == "all": |
| self._dev_data = self._load_data_from_file(file_name=GSM8K_FILES_MAP["dev"]) |
| if self.mode == "test" or self.mode == "all": |
| self._test_data = self._load_data_from_file(file_name=GSM8K_FILES_MAP["test"]) |
| |
| def _get_label(self, example: Any) -> Any: |
| return example["answer"] |
| |
| def _get_id(self, example: Any) -> Any: |
| return example["id"] |
| |
| def extract_last_number(self, text: str) -> float: |
| """ |
| Extract the last number from a text. |
| """ |
| matches = regex.findall(r"[-+]?\d+(?:,\d{3})*(?:\.\d+)?|\d+\.\d+", str(text)) |
| if matches: |
| last_number = matches[-1].replace(",", "").strip() |
| try: |
| last_number = float(last_number) |
| return last_number |
| except ValueError: |
| return None |
| return None |
| |
| def evaluate(self, prediction: Any, label: Any) -> dict: |
| ground_truth_answer = self.extract_last_number(label) |
| predicted_answer = self.extract_last_number(prediction) |
| if predicted_answer is None: |
| return {"solve_rate": 0.0} |
| solve_rate = 1.0 if abs(predicted_answer - ground_truth_answer) < 1e-6 else 0.0 |
| return {"solve_rate": solve_rate} |
|
|
|
|
| class AFlowGSM8K(GSM8K): |
|
|
| """AFlow-specific implementation of GSM8K benchmark. |
| |
| This class extends the GSM8K benchmark with features specific to the |
| AFlow framework, including loading from AFlow-formatted data files and |
| supporting asynchronous evaluation for workflows. |
| |
| Attributes: |
| path: Path to the directory containing AFlow-formatted GSM8K files. |
| mode: Data loading mode ("train", "dev", "test", or "all"). |
| _train_data: Training dataset loaded from AFlow format. |
| _dev_data: Development dataset loaded from AFlow format. |
| _test_data: Test dataset loaded from AFlow format. |
| """ |
|
|
| def __init__(self, path: str = None, mode: str = "all", **kwargs): |
| path = os.path.expanduser(path or "~/.evoagentx/data/aflow/gsm8k") |
| super().__init__(path=path, mode=mode, **kwargs) |
|
|
| def _load_data_from_file(self, file_name: str): |
| if file_name is None: |
| return None |
| file_path = os.path.join(self.path, file_name) |
| if not os.path.exists(file_path): |
| download_aflow_benchmark_data(dataset="gsm8k", save_folder=self.path) |
| return load_json(path=file_path, type="jsonl") |
| |
| def _load_data(self): |
|
|
| if self.mode == "train" or self.mode == "all": |
| logger.info(f"Loading train data from {AFLOW_DATASET_FILES_MAP['gsm8k']['train']}") |
| self._train_data = self._load_data_from_file(file_name=AFLOW_DATASET_FILES_MAP["gsm8k"]["train"]) |
| if self.mode == "dev" or self.mode == "all": |
| logger.info(f"Loading dev data from {AFLOW_DATASET_FILES_MAP['gsm8k']['dev']}") |
| self._dev_data = self._load_data_from_file(file_name=AFLOW_DATASET_FILES_MAP["gsm8k"]["dev"]) |
| if self.mode == "test" or self.mode == "all": |
| logger.info(f"Loading test data from {AFLOW_DATASET_FILES_MAP['gsm8k']['test']}") |
| self._test_data = self._load_data_from_file(file_name=AFLOW_DATASET_FILES_MAP["gsm8k"]["test"]) |
| |
| async def async_evaluate(self, graph: Callable, example: Any) -> float: |
|
|
| input_text = example["question"] |
| label = self._get_label(example) |
| output = await graph(input_text) |
| metrics = await super().async_evaluate(prediction=output, label=label) |
| return metrics["solve_rate"] |