Spaces:
Running
Running
| """Agents for Task 1: Function + Vulnerability Identification.""" | |
| import random as _random | |
| from typing import Any, Dict, List | |
| from server import Task1Environment | |
| from env.schemas import Action, ActionType | |
| from data.data_loader import load_contracts, get_function_by_name | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Helpers | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _parse_fn_list(result_text: str) -> List[str]: | |
| """Parse 'Functions in X: f1, f2, f3' into [f1, f2, f3].""" | |
| if ": " in result_text: | |
| return [f.strip() for f in result_text.split(": ", 1)[-1].split(", ") if f.strip()] | |
| return [] | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Task 1 agents | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def oracle_t1(env: Task1Environment, seed: int, verbose: bool = False) -> Dict[str, Any]: | |
| """Submits exact ground-truth function + vulnerability type β score = 1.0.""" | |
| r = env.reset(seed=seed) | |
| obs = r.observation | |
| fn_name = env.state().target_function | |
| contracts = load_contracts() | |
| vuln_issue = "" | |
| for c in contracts: | |
| fn = get_function_by_name(c, fn_name) | |
| if fn and fn.get("vulnerable"): | |
| vuln_issue = fn["vulnerability_details"]["issue"] | |
| break | |
| if verbose: | |
| print(f" {obs.contract_name}.{fn_name}() [{vuln_issue}]") | |
| env.step(Action(action_type=ActionType.LIST_FUNCTIONS)) | |
| env.step(Action(action_type=ActionType.GET_FUNCTION_CODE, params={"function_name": fn_name})) | |
| result = env.step(Action(action_type=ActionType.SUBMIT, params={"function_name": fn_name, | |
| "vulnerability_type": vuln_issue})) | |
| return {"seed": seed, "contract": obs.contract_name, "target_function": fn_name, | |
| "vulnerability": vuln_issue, "grader_score": result.reward.value} | |
| def partial_t1(env: Task1Environment, seed: int) -> Dict[str, Any]: | |
| """Correct function, 'unknown' vuln type β score = 0.5.""" | |
| env.reset(seed=seed) | |
| fn_name = env.state().target_function | |
| result = env.step(Action(action_type=ActionType.SUBMIT, | |
| params={"function_name": fn_name, "vulnerability_type": "unknown"})) | |
| return {"seed": seed, "grader_score": result.reward.value} | |
| def random_t1(env: Task1Environment, seed: int) -> Dict[str, Any]: | |
| """Genuine random agent: random browse then submits a random function + random vuln type. | |
| Uses a seeded RNG (offset from episode seed) so results are reproducible. | |
| Expected score: low (~0β5%) since must randomly hit both right function and right keyword, plus | |
| can submit only once per episode. | |
| """ | |
| rng = _random.Random(seed ^ 0x5A1AD) # different RNG stream from episode seed | |
| env.reset(seed=seed) | |
| # Step 1: list functions to get real candidates | |
| s = env.step(Action(action_type=ActionType.LIST_FUNCTIONS)) | |
| fns = _parse_fn_list(s.observation.last_action_result or "") | |
| if not fns: | |
| fns = ["deposit", "withdraw", "constructor"] # fallback | |
| # Step 2: do 1β2 random browse actions (not repeated) | |
| browse_pool = [ | |
| (ActionType.GET_FILE_METADATA, {}), | |
| (ActionType.GET_CALL_GRAPH, {}), | |
| (ActionType.GET_STATE_VARIABLE, {}), | |
| ] | |
| _random.Random(seed).shuffle(browse_pool) # deterministic order | |
| for at, params in browse_pool[:rng.randint(1, 2)]: | |
| env.step(Action(action_type=at, params=params)) | |
| # Step 3: submit a random function from the real list, random vuln type | |
| random_fn = rng.choice(fns) | |
| vuln_pool = [ | |
| "bad logic", "incorrect check", "overflow", "no guard", "wrong order", | |
| "missing event", "unprotected", "stale data", "unsafe cast", | |
| ] | |
| random_vuln = rng.choice(vuln_pool) | |
| result = env.step(Action(action_type=ActionType.SUBMIT, | |
| params={"function_name": random_fn, | |
| "vulnerability_type": random_vuln})) | |
| return {"seed": seed, "grader_score": result.reward.value, "submitted_fn": | |
| random_fn, "submitted_vuln": random_vuln} | |
| def floor_t1(env: Task1Environment, seed: int) -> Dict[str, Any]: | |
| """Always submits 'constructor' β guaranteed score = 0.0001""" | |
| env.reset(seed=seed) | |
| result = env.step(Action(action_type=ActionType.SUBMIT, | |
| params={"function_name": "constructor", | |
| "vulnerability_type": "reentrancy"})) | |
| return {"seed": seed, "grader_score": result.reward.value } | |