"""Agents for Task 1: Function + Vulnerability Identification.""" import random as _random from typing import Any, Dict, List from server import Task1Environment from env.schemas import Action, ActionType from data.data_loader import load_contracts, get_function_by_name # ───────────────────────────────────────────────────────────────────────────── # Helpers # ───────────────────────────────────────────────────────────────────────────── def _parse_fn_list(result_text: str) -> List[str]: """Parse 'Functions in X: f1, f2, f3' into [f1, f2, f3].""" if ": " in result_text: return [f.strip() for f in result_text.split(": ", 1)[-1].split(", ") if f.strip()] return [] # ───────────────────────────────────────────────────────────────────────────── # Task 1 agents # ───────────────────────────────────────────────────────────────────────────── def oracle_t1(env: Task1Environment, seed: int, verbose: bool = False) -> Dict[str, Any]: """Submits exact ground-truth function + vulnerability type → score = 1.0.""" r = env.reset(seed=seed) obs = r.observation fn_name = env.state().target_function contracts = load_contracts() vuln_issue = "" for c in contracts: fn = get_function_by_name(c, fn_name) if fn and fn.get("vulnerable"): vuln_issue = fn["vulnerability_details"]["issue"] break if verbose: print(f" {obs.contract_name}.{fn_name}() [{vuln_issue}]") env.step(Action(action_type=ActionType.LIST_FUNCTIONS)) env.step(Action(action_type=ActionType.GET_FUNCTION_CODE, params={"function_name": fn_name})) result = env.step(Action(action_type=ActionType.SUBMIT, params={"function_name": fn_name, "vulnerability_type": vuln_issue})) return {"seed": seed, "contract": obs.contract_name, "target_function": fn_name, "vulnerability": vuln_issue, "grader_score": result.reward.value} def partial_t1(env: Task1Environment, seed: int) -> Dict[str, Any]: """Correct function, 'unknown' vuln type → score = 0.5.""" env.reset(seed=seed) fn_name = env.state().target_function result = env.step(Action(action_type=ActionType.SUBMIT, params={"function_name": fn_name, "vulnerability_type": "unknown"})) return {"seed": seed, "grader_score": result.reward.value} def random_t1(env: Task1Environment, seed: int) -> Dict[str, Any]: """Genuine random agent: random browse then submits a random function + random vuln type. Uses a seeded RNG (offset from episode seed) so results are reproducible. Expected score: low (~0–5%) since must randomly hit both right function and right keyword, plus can submit only once per episode. """ rng = _random.Random(seed ^ 0x5A1AD) # different RNG stream from episode seed env.reset(seed=seed) # Step 1: list functions to get real candidates s = env.step(Action(action_type=ActionType.LIST_FUNCTIONS)) fns = _parse_fn_list(s.observation.last_action_result or "") if not fns: fns = ["deposit", "withdraw", "constructor"] # fallback # Step 2: do 1–2 random browse actions (not repeated) browse_pool = [ (ActionType.GET_FILE_METADATA, {}), (ActionType.GET_CALL_GRAPH, {}), (ActionType.GET_STATE_VARIABLE, {}), ] _random.Random(seed).shuffle(browse_pool) # deterministic order for at, params in browse_pool[:rng.randint(1, 2)]: env.step(Action(action_type=at, params=params)) # Step 3: submit a random function from the real list, random vuln type random_fn = rng.choice(fns) vuln_pool = [ "bad logic", "incorrect check", "overflow", "no guard", "wrong order", "missing event", "unprotected", "stale data", "unsafe cast", ] random_vuln = rng.choice(vuln_pool) result = env.step(Action(action_type=ActionType.SUBMIT, params={"function_name": random_fn, "vulnerability_type": random_vuln})) return {"seed": seed, "grader_score": result.reward.value, "submitted_fn": random_fn, "submitted_vuln": random_vuln} def floor_t1(env: Task1Environment, seed: int) -> Dict[str, Any]: """Always submits 'constructor' → guaranteed score = 0.0001""" env.reset(seed=seed) result = env.step(Action(action_type=ActionType.SUBMIT, params={"function_name": "constructor", "vulnerability_type": "reentrancy"})) return {"seed": seed, "grader_score": result.reward.value }