Spaces:
Running
Running
File size: 5,264 Bytes
671787b 88875f7 671787b 7f7bcc6 671787b 7f7bcc6 671787b 7f7bcc6 671787b 7f7bcc6 671787b 7f7bcc6 671787b 7f7bcc6 671787b 7f7bcc6 671787b 7f7bcc6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 | """Agents for Task 1: Function + Vulnerability Identification."""
import random as _random
from typing import Any, Dict, List
from server import Task1Environment
from env.schemas import Action, ActionType
from data.data_loader import load_contracts, get_function_by_name
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Helpers
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _parse_fn_list(result_text: str) -> List[str]:
"""Parse 'Functions in X: f1, f2, f3' into [f1, f2, f3]."""
if ": " in result_text:
return [f.strip() for f in result_text.split(": ", 1)[-1].split(", ") if f.strip()]
return []
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Task 1 agents
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def oracle_t1(env: Task1Environment, seed: int, verbose: bool = False) -> Dict[str, Any]:
"""Submits exact ground-truth function + vulnerability type β score = 1.0."""
r = env.reset(seed=seed)
obs = r.observation
fn_name = env.state().target_function
contracts = load_contracts()
vuln_issue = ""
for c in contracts:
fn = get_function_by_name(c, fn_name)
if fn and fn.get("vulnerable"):
vuln_issue = fn["vulnerability_details"]["issue"]
break
if verbose:
print(f" {obs.contract_name}.{fn_name}() [{vuln_issue}]")
env.step(Action(action_type=ActionType.LIST_FUNCTIONS))
env.step(Action(action_type=ActionType.GET_FUNCTION_CODE, params={"function_name": fn_name}))
result = env.step(Action(action_type=ActionType.SUBMIT, params={"function_name": fn_name,
"vulnerability_type": vuln_issue}))
return {"seed": seed, "contract": obs.contract_name, "target_function": fn_name,
"vulnerability": vuln_issue, "grader_score": result.reward.value}
def partial_t1(env: Task1Environment, seed: int) -> Dict[str, Any]:
"""Correct function, 'unknown' vuln type β score = 0.5."""
env.reset(seed=seed)
fn_name = env.state().target_function
result = env.step(Action(action_type=ActionType.SUBMIT,
params={"function_name": fn_name, "vulnerability_type": "unknown"}))
return {"seed": seed, "grader_score": result.reward.value}
def random_t1(env: Task1Environment, seed: int) -> Dict[str, Any]:
"""Genuine random agent: random browse then submits a random function + random vuln type.
Uses a seeded RNG (offset from episode seed) so results are reproducible.
Expected score: low (~0β5%) since must randomly hit both right function and right keyword, plus
can submit only once per episode.
"""
rng = _random.Random(seed ^ 0x5A1AD) # different RNG stream from episode seed
env.reset(seed=seed)
# Step 1: list functions to get real candidates
s = env.step(Action(action_type=ActionType.LIST_FUNCTIONS))
fns = _parse_fn_list(s.observation.last_action_result or "")
if not fns:
fns = ["deposit", "withdraw", "constructor"] # fallback
# Step 2: do 1β2 random browse actions (not repeated)
browse_pool = [
(ActionType.GET_FILE_METADATA, {}),
(ActionType.GET_CALL_GRAPH, {}),
(ActionType.GET_STATE_VARIABLE, {}),
]
_random.Random(seed).shuffle(browse_pool) # deterministic order
for at, params in browse_pool[:rng.randint(1, 2)]:
env.step(Action(action_type=at, params=params))
# Step 3: submit a random function from the real list, random vuln type
random_fn = rng.choice(fns)
vuln_pool = [
"bad logic", "incorrect check", "overflow", "no guard", "wrong order",
"missing event", "unprotected", "stale data", "unsafe cast",
]
random_vuln = rng.choice(vuln_pool)
result = env.step(Action(action_type=ActionType.SUBMIT,
params={"function_name": random_fn,
"vulnerability_type": random_vuln}))
return {"seed": seed, "grader_score": result.reward.value, "submitted_fn":
random_fn, "submitted_vuln": random_vuln}
def floor_t1(env: Task1Environment, seed: int) -> Dict[str, Any]:
"""Always submits 'constructor' β guaranteed score = 0.0001"""
env.reset(seed=seed)
result = env.step(Action(action_type=ActionType.SUBMIT,
params={"function_name": "constructor",
"vulnerability_type": "reentrancy"}))
return {"seed": seed, "grader_score": result.reward.value }
|