| import os |
| import re |
| import gzip |
| import shutil |
| import pandas as pd |
| import contextlib |
| import signal |
| import scicode |
| import signal |
| from contextlib import contextmanager |
| import numpy as np |
| from typing import Union, Any, Callable, List, Dict, Tuple |
| from .benchmark import CodingBenchmark |
| from ..core.logging import logger |
| from ..utils.utils import download_file |
| from ..core.module_utils import load_json |
| from ..utils.aflow_utils.data_utils import AFLOW_DATASET_FILES_MAP, download_aflow_benchmark_data |
|
|
|
|
| class TimeoutException(Exception): pass |
|
|
| @contextmanager |
| def time_limit(seconds): |
| def signal_handler(signum, frame): |
| raise TimeoutException("Timed out!") |
| signal.signal(signal.SIGALRM, signal_handler) |
| signal.alarm(seconds) |
| try: |
| yield |
| finally: |
| signal.alarm(0) |
|
|
| SCICODE_DEFAULT_URL = "https://raw.githubusercontent.com/scicode-bench/scicode/main/data/scicode.jsonl.gz" |
|
|
|
|
| def download_raw_scicode_data(save_folder: str, url: str = SCICODE_DEFAULT_URL) -> str: |
| """ |
| Download and unzip the raw SciCode jsonl(.gz) to `save_folder`. |
| |
| Returns: |
| str: Path to the unzipped jsonl file. |
| """ |
| os.makedirs(save_folder, exist_ok=True) |
| gz_path = os.path.join(save_folder, "scicode.jsonl.gz") |
| jsonl_path = os.path.join(save_folder, "scicode.jsonl") |
|
|
| logger.info(f"Downloading SciCode data from {url} ...") |
| download_file(url=url, save_file=gz_path) |
|
|
| logger.info("Unzipping SciCode data ...") |
| with gzip.open(gz_path, "rb") as f_in, open(jsonl_path, "wb") as f_out: |
| shutil.copyfileobj(f_in, f_out) |
| if os.path.exists(gz_path): |
| os.remove(gz_path) |
|
|
| return jsonl_path |
|
|
|
|
| |
| |
| |
|
|
| def _extract_entry_point_from_header(header: str) -> str: |
| """ |
| Given a SciCode 'function_header' string like: |
| "def get_alpha(recvec, alpha_scaling=5):\n '''...'''" |
| return "get_alpha". |
| """ |
| m = re.search(r"def\s+([A-Za-z_][A-Za-z0-9_]*)\s*\(", header) |
| if not m: |
| raise ValueError("Could not parse entry point from function_header") |
| return m.group(1) |
|
|
|
|
| def _coerce_scicode_row_to_examples(row: Dict[str, Any]) -> List[Dict[str, Any]]: |
| """ |
| SciCode rows may contain a single task or multiple step tasks. |
| We normalize them to a list of examples with a unified structure: |
| { |
| "task_id": "SciCode/<name>#<sub_id>", |
| "prompt": <function_header + optional docstring block>, |
| "entry_point": <func_name>, |
| "canonical_solution": <ground_truth_code>, |
| "tests": List[str], # list of python test snippets |
| "imports": str # optional import prelude (e.g., 'import numpy as np') |
| } |
| """ |
| examples: List[Dict[str, Any]] = [] |
|
|
| name = str(row[0]) if 0 in row or isinstance(row, list) else str(row.get("name", "unknown")) |
| |
| if isinstance(row, list): |
| |
| |
| |
| description = None |
| doc_or_header = None |
| imports_block = None |
| steps_or_code = None |
| tests = None |
|
|
| |
| for item in row: |
| |
| if isinstance(item, str) and item.strip().startswith('"""'): |
| |
| doc_or_header = item |
| elif isinstance(item, str) and (item.startswith("import ") or "from " in item): |
| imports_block = item |
| elif isinstance(item, list): |
| |
| if item and isinstance(item[0], dict) and "function_header" in item[0]: |
| steps_or_code = item |
| elif item and isinstance(item[0], str) and item[0].strip().startswith(("ref", "assert", "from ")): |
| tests = item |
| elif isinstance(item, dict): |
| |
| steps_or_code = [item] |
|
|
| |
| if isinstance(steps_or_code, list) and steps_or_code and isinstance(steps_or_code[0], dict): |
| for idx, step in enumerate(steps_or_code): |
| header = step.get("function_header") or step.get("header") or "" |
| code = step.get("ground_truth_code") or step.get("solution") or "" |
| step_tests = step.get("test_cases") or [] |
| entry_point = _extract_entry_point_from_header(header) |
| prompt = header |
| examples.append( |
| { |
| "task_id": f"SciCode/{name}#step{idx+1}", |
| "prompt": prompt, |
| "entry_point": entry_point, |
| "canonical_solution": code, |
| "tests": step_tests, |
| "imports": imports_block or "", |
| } |
| ) |
| else: |
| |
| |
| |
| |
| code_blob = None |
| for item in row: |
| if isinstance(item, str) and "def " in item and "return" in item: |
| code_blob = item |
| break |
| |
| if code_blob: |
| |
| headers = list(re.finditer(r"(?ms)^(def\s+[A-Za-z_][A-Za-z0-9_]*\s*\(.*?\):\s*\n)", code_blob)) |
| if headers: |
| last_header = headers[-1].group(1) |
| entry_point = _extract_entry_point_from_header(last_header) |
| else: |
| entry_point = "solution" |
|
|
| |
| prompt = doc_or_header or f"def {entry_point}(*args, **kwargs):\n '''Fill in the function body.'''\n ..." |
| examples.append( |
| { |
| "task_id": f"SciCode/{name}", |
| "prompt": prompt, |
| "entry_point": entry_point, |
| "canonical_solution": code_blob, |
| "tests": tests or [], |
| "imports": imports_block or "", |
| } |
| ) |
|
|
| else: |
| |
| |
| steps = row.get("steps", []) |
| imports_block = row.get("required_dependencies", "") |
| task_name = row.get("step_number", "unknown") |
| |
|
|
| if steps: |
| for idx, step in enumerate(steps): |
| header = step.get("function_header", "") |
| code = step.get("ground_truth_code", "") |
| step_tests = step.get("test_cases", []) |
| entry_point = _extract_entry_point_from_header(header) |
| examples.append( |
| { |
| "task_id": f"SciCode/{task_name}#step{idx+1}", |
| "prompt": header, |
| "entry_point": entry_point, |
| "canonical_solution": code, |
| "tests": step_tests, |
| "imports": imports_block or "", |
| } |
| ) |
| else: |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| header = row.get("function_header", "") |
| prompt_update = row.get("step_description_prompt", "") |
| code = row.get("ground_truth_code", "") |
| tests = row.get("test_cases", []) |
| returnline = row.get("return_line", "") |
| entry_point = _extract_entry_point_from_header(header) if header else "solution" |
| bkgd = row.get("step_background","") |
| prompt = header or f"def {entry_point}(*args, **kwargs):\n " |
| examples.append( |
| { |
| "task_id": f"SciCode/{task_name}", |
| "prompt": bkgd+prompt_update+prompt+'''Fill in the function body.\n''', |
| "entry_point": entry_point, |
| "canonical_solution": code, |
| "tests": tests, |
| "imports": imports_block or "", |
| } |
| ) |
|
|
| return examples |
|
|
|
|
| def load_scicode_data(jsonl_path: str) -> List[Dict[str, Any]]: |
| """ |
| Load SciCode jsonl and expand into normalized examples. |
| """ |
| raw = load_json(jsonl_path, type="jsonl") |
| |
| all_examples: List[Dict[str, Any]] = [] |
| for row in raw: |
| try: |
| all_examples.extend(_coerce_scicode_row_to_examples(row)) |
| except Exception as e: |
| logger.warning(f"[SciCode] Skipping a malformed row due to: {e}") |
| return all_examples |
|
|
|
|
| |
| |
| |
|
|
| class SciCode(CodingBenchmark): |
| """ |
| Benchmark class for evaluating code generation on SciCode. |
| |
| SciCode problems provide: |
| - function_header (prompt stub) |
| - ground_truth_code (reference implementation) |
| - test_cases (list[str] of python asserts) |
| |
| We normalize each item and evaluate by executing the candidate implementation |
| against the provided test cases. Since many SciCode tests reference a variable |
| named `target`, we heuristically pre-compute `target` from the reference |
| implementation when necessary, or set it to True for boolean-allclose tests. |
| """ |
|
|
| def __init__(self, path: str = None, mode: str = "all", timeout: int = 60, k: Union[int, list] = 1, **kwargs): |
| path = os.path.expanduser(path or "~/.evoagentx/data/scicode") |
| self.k = k |
| self.name = "scicode" |
| super().__init__(name=type(self).__name__, path=path, mode=mode, timeout=timeout, **kwargs) |
|
|
| |
|
|
| def _load_data(self): |
| |
| |
| |
|
|
| |
|
|
| if self.mode in ("dev", "all"): |
| self._dev_data = load_scicode_data("/home/tl688/pitl688/selfevolve/SciCode/eval/data/subproblems_dev.jsonl") |
| self._data_ground = pd.read_pickle("/home/tl688/pitl688/selfevolve/SciCode/eval/data/problems_dev.pkl") |
| if self.mode in ("test", "all"): |
| self._test_data = load_scicode_data("/home/tl688/pitl688/selfevolve/SciCode/eval/data/subproblems_test.jsonl") |
| self._test_data_ground = pd.read_pickle("/home/tl688/pitl688/selfevolve/SciCode/eval/data/problems_test.pkl") |
| try: |
| self._data_ground = pd.concat((self._data_ground, self._test_data_ground)) |
| except: |
| self._data_ground = self._test_data_ground |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| def _get_label(self, example: Any): |
| """ |
| For SciCode we treat the label as the full test suite plus metadata. |
| """ |
| return { |
| "task_id": example["task_id"], |
| "entry_point": example["entry_point"], |
| "tests": example.get("tests", []), |
| "canonical_solution": example.get("canonical_solution", ""), |
| "imports": example.get("imports", ""), |
| } |
|
|
| def _get_id(self, example: Any): |
| return example["task_id"] |
|
|
| |
|
|
| @staticmethod |
| def _build_reference_namespace(imports: str, canonical_solution: str) -> Dict[str, Any]: |
| """ |
| Build an execution namespace that defines the reference function. |
| """ |
| ns: Dict[str, Any] = {"np": np, "scicode":scicode} |
| if imports: |
| exec(imports, ns, ns) |
| if canonical_solution: |
| exec(canonical_solution, ns, ns) |
| return ns |
|
|
| @staticmethod |
| def _extract_candidate_exprs_from_test(test_src: str) -> List[str]: |
| """ |
| Heuristically extract expressions that are compared against `target` inside np.allclose(..., target) |
| or equality checks like "== target" / ", target)" etc. Returns a list of python expressions (as strings) |
| that we should evaluate with the *reference* implementation to generate `target`. |
| |
| This is a pragmatic parser covering the most common SciCode patterns. |
| """ |
| exprs: List[str] = [] |
| |
| for m in re.finditer(r"np\.allclose\s*\(\s*(?P<expr>.+?)\s*,\s*target\s*\)", test_src, flags=re.DOTALL): |
| exprs.append(m.group("expr")) |
|
|
| |
| for m in re.finditer(r"assert\s+(?P<expr>.+?)\s*==\s*target", test_src): |
| exprs.append(m.group("expr")) |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| return exprs |
|
|
| @staticmethod |
| def _compute_target_list(exprs: List[str], ref_ns: Dict[str, Any]) -> Any: |
| """ |
| Given a list of expressions (strings), evaluate them in the reference namespace. |
| If multiple expressions are found, we pack them into a tuple in the same order. |
| If no expression found, return True (to support tests of the form `assert <bool>, target`). |
| """ |
| if not exprs: |
| return True |
| values = [] |
| for ex in exprs: |
| |
| local_ns: Dict[str, Any] = {} |
| val = eval(ex, ref_ns, local_ns) |
| values.append(val) |
| if len(values) == 1: |
| return values[0] |
| return tuple(values) |
|
|
| def _make_harness(self, task_id: str, entry_point: str, imports: str, canonical_solution: str, tests: List[str], candidate_src: str) -> str: |
| """ |
| Construct an executable harness that: |
| 1) Defines imports |
| 2) Defines candidate implementation (prompt + candidate completion) |
| 3) Pre-computes `target` using the reference implementation for each test (heuristics) |
| 4) Executes the original test snippet with `target` bound. |
| We run each test independently within the same process, stopping on first failure. |
| """ |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| return candidate_src |
|
|
| def handle_special_cases(self, task_id: str, solution: str, test: str) -> Tuple[str, str]: |
| """ |
| Hook: adjust solution/test for edge cases in SciCode, if needed. |
| Currently, we leave as-is and fallback to the base handler. |
| """ |
| import re |
| start = "```python" |
| end = "```" |
| s = solution |
| if start in s and end in s: |
| solution = s[s.find(start)+len(start):s.rfind(end)] |
| print("solution start") |
| print(solution) |
| print("solution end") |
| return super().handle_special_cases(task_id=task_id, solution=solution, test=test) |
|
|
| def evaluate(self, prediction: Any, label: Any) -> dict: |
| """ |
| Evaluate the solution code. |
| |
| Args: |
| prediction (str | List[str]): The solution code(s). |
| label (dict | List[dict]): The unit test code(s). |
| |
| Returns: |
| dict: The evaluation metrics (pass@k). |
| """ |
| prediction, label = self._check_evaluation_inputs(prediction, label) |
| import pickle |
| |
| data = {"prediction":prediction, "label":label} |
| |
| |
| |
| |
| |
| results = [] |
| for solution in prediction: |
| print(solution) |
| solution_states = [] |
| for label_data in label: |
| task_id = label_data["task_id"] |
| prompt = self.get_example_by_id(task_id)["prompt"] |
| unit_test = label_data["tests"] |
| extract_target = self._data_ground[self._data_ground['test_cases']==unit_test]['target'].values[0] |
| unit_test = label_data['imports'] + "\n" +label_data["tests"] |
| |
| if "numpy.ndarray" in str(type(extract_target)) and 'numpy.bool_' != str(type(extract_target)): |
| unit_test = unit_test.replace('target', str(extract_target.tolist())) |
| elif 'tuple' in str(type(extract_target)): |
| try: |
| update_target = tuple([i.tolist() for i in extract_target]) |
| unit_test = unit_test.replace('target', str(update_target)) |
| except: |
| unit_test = unit_test.replace('target', str(extract_target)) |
| elif 'dict' in str(type(extract_target)): |
| update_target = dict() |
| for i in extract_target.keys(): |
| update_target[i] = extract_target[i].tolist() |
| unit_test = unit_test.replace('target', str(update_target)) |
| else: |
| unit_test = unit_test.replace('target', str(extract_target)) |
| |
| |
| entry_point = label_data["entry_point"] |
| state, message = self.check_solution_scicode( |
| task_id=task_id, |
| solution=prompt + solution, |
| test=unit_test, |
| entry_point=entry_point |
| ) |
| if state != self.SUCCESS: |
| break |
| solution_states.append(state) |
| self.error_list[task_id] = message.split('\n')[0] |
| results.append(len(solution_states)==len(label) and all(state==self.SUCCESS for state in solution_states)) |
| |
| k_list = [self.k] if isinstance(self.k, int) else self.k |
| pass_at_k = self.compute_pass_at_k(results, k_list) |
| |
| return pass_at_k |
|
|
|
|
| class AFlowSciCode(SciCode): |
| """ |
| AFlow-specific implementation of SciCode benchmark. |
| Uses AFLOW_DATASET_FILES_MAP['scicode'] for split files (if provided by your distribution). |
| """ |
|
|
| def __init__(self, path: str = None, mode: str = "all", timeout: int = 60, k: Union[int, list] = 1, **kwargs): |
| self._dev_data = load_scicode_data("/home/tl688/pitl688/selfevolve/SciCode/eval/data/subproblems_dev.jsonl") |
| self._data_ground = pd.read_pickle("/home/tl688/pitl688/selfevolve/SciCode/eval/data/problems_dev.pkl") |
| self._test_data = load_scicode_data("/home/tl688/pitl688/selfevolve/SciCode/eval/data/subproblems_test.jsonl") |
| self._test_data_ground = pd.read_pickle("/home/tl688/pitl688/selfevolve/SciCode/eval/data/problems_test.pkl") |
| try: |
| self._data_ground = pd.concat((self._data_ground, self._test_data_ground)) |
| except: |
| self._data_ground = self._test_data_ground |
| self.k = k |
| super().__init__(path=path, mode=mode, timeout=timeout, k=k, **kwargs) |
| |
| def extract_test_cases_with_entry_point(self, entry_point: str): |
| """ |
| Extract test cases with the given entry point. |
| """ |
|
|
| hardcoded_cases = { |
| "find_zero": "", |
| "decode_cyclic": "", |
| "decode_shift": "", |
| "by_length": "", |
| "add": "", |
| "triangle_area": "", |
| "correct_bracketing": "", |
| "solve": "", |
| "sum_squares": "", |
| "starts_one_ends": "", |
| } |
| if entry_point in hardcoded_cases: |
| return hardcoded_cases[entry_point] |
| |
| for case in self._test_cases: |
| if case["entry_point"] == entry_point: |
| return case["test"] |
| |
| return None |
| |
| async def async_evaluate(self, graph: Callable, example: Any) -> float: |
|
|
| |
| prompt, entry_point = example["prompt"], example["entry_point"] |
| solution = await graph(prompt, entry_point) |
| label = self._get_label(example) |
| metrics = await super().async_evaluate(prediction=solution, label=label) |
| return metrics["pass@1"] |
|
|
| def evaluate(self, prediction: Any, label: Any) -> dict: |
| """ |
| Evaluate the solution code. |
| |
| Args: |
| prediction (str | List[str]): The solution code(s). |
| label (dict | List[dict]): The unit test code(s). |
| |
| Returns: |
| dict: The evaluation metrics (pass@k). |
| """ |
| prediction, label = self._check_evaluation_inputs(prediction, label) |
| results = [] |
| for solution in prediction: |
| |
| solution_states = [] |
| for label_data in label: |
| task_id = label_data["task_id"] |
| prompt = self.get_example_by_id(task_id)["prompt"] |
| unit_test = label_data["tests"] |
| extract_target = self._data_ground[self._data_ground['test_cases']==unit_test]['target'].values[0] |
| unit_test = label_data['imports'] + "\n" +label_data["tests"] |
| |
| if "numpy.ndarray" in str(type(extract_target)) and 'numpy.bool_' != str(type(extract_target)): |
| unit_test = unit_test.replace('target', str(extract_target.tolist())) |
| elif 'tuple' in str(type(extract_target)): |
| try: |
| update_target = tuple([i.tolist() for i in extract_target]) |
| unit_test = unit_test.replace('target', str(update_target)) |
| except: |
| unit_test = unit_test.replace('target', str(extract_target)) |
| elif 'dict' in str(type(extract_target)): |
| update_target = dict() |
| for i in extract_target.keys(): |
| update_target[i] = extract_target[i].tolist() |
| unit_test = unit_test.replace('target', str(update_target)) |
| else: |
| unit_test = unit_test.replace('target', str(extract_target)) |
| |
| |
| entry_point = label_data["entry_point"] |
| state, message = self.check_solution_scicode( |
| task_id=task_id, |
| solution=prompt + solution, |
| test=unit_test, |
| entry_point=entry_point |
| ) |
| |
| |
| if state != self.SUCCESS: |
| break |
| solution_states.append(state) |
| self.error_list[task_id] = message.split('\n')[0] |
| results.append(len(solution_states)==len(label) and all(state==self.SUCCESS for state in solution_states)) |
| |
| k_list = [self.k] if isinstance(self.k, int) else self.k |
| pass_at_k = self.compute_pass_at_k(results, k_list) |
| |
| return pass_at_k |