ajaxwin
refactor: Task3 reward model changed, agent adjusted for new model
48661cd
"""
eval.py
-------
Evaluation harness for all three tasks.
Runs four agent tiers per task:
oracle – always submits the ground-truth answer (upper bound)
partial – right category, wrong detail (partial credit)
random – genuine random exploration + random submit (random baseline)
floor – always submits a guaranteed-wrong answer (lower bound)
Usage:
python eval.py # all tasks, 8 episodes each
python eval.py --task 1|2|3 # single task
python eval.py --episodes 16 --verbose
python eval.py --out results.json
"""
import argparse
import json
import random as _random
from typing import Any, Dict, List
from server import Task1Environment, Task2Environment, Task3Environment
from agents.task1 import oracle_t1, partial_t1, random_t1, floor_t1
from agents.task2 import oracle_t2, partial_t2, random_t2, floor_t2
from agents.task3 import oracle_t3, subfunction_t3, random_t3, floor_t3
from data.data_loader import (
load_contracts,
get_all_vulnerable_entries,
get_all_property_entries,
get_all_task3_entries,
)
# ─────────────────────────────────────────────────────────────────────────────
# Evaluation runners
# ─────────────────────────────────────────────────────────────────────────────
def _avg(episodes: List[Dict[str, Any]], key: str = "grader_score") -> float:
return sum(e[key] for e in episodes) / len(episodes) if episodes else 0.0
def run_task1_eval(n: int, seed_offset: int, verbose: bool) -> Dict[str, Any]:
print("\n" + "=" * 64)
print("TASK 1 β€” Targeted Vulnerability Detection")
print("=" * 64)
contracts = load_contracts()
print(f" Dataset: {len(contracts)} contracts, "
f"{len(get_all_vulnerable_entries(contracts))} vulnerable functions\n")
env = Task1Environment()
# Oracle
print("β–Ά Oracle (correct function + correct vuln = ~1.0):")
oracle_eps = []
for i in range(n):
ep = oracle_t1(env, seed_offset + i, verbose)
oracle_eps.append(ep)
print(f" seed={ep['seed']:3d} {ep['contract']:12s}.{ep['target_function']:18s}"
f" score={ep['grader_score']:.1f}")
oracle_avg = _avg(oracle_eps)
print(f"\n Oracle avg: {oracle_avg:.3f}")
# Partial
print("\nβ–Ά Partial (correct function, 'unknown' vuln = ~0.5):")
partial_eps = [partial_t1(env, seed_offset + i) for i in range(n)]
partial_avg = _avg(partial_eps)
print(f" Partial avg: {partial_avg:.3f}")
# Random
print("\nβ–Ά Random (random fn from list + random vuln type):")
random_eps = [random_t1(env, seed_offset + i) for i in range(n)]
random_avg = _avg(random_eps)
submitted = [(e.get("submitted_fn", "?"), e.get("submitted_vuln", "?")) for e in random_eps]
print(f" Random avg: {random_avg:.3f} submissions: {submitted}")
# Floor
print("\nβ–Ά Floor (always 'constructor' β†’ 0.0):")
floor_eps = [floor_t1(env, seed_offset + i) for i in range(n)]
floor_avg = _avg(floor_eps)
print(f" Floor avg: {floor_avg:.3f}")
# Vulnerability type coverage
vuln_seen: Dict[str, int] = {}
for ep in oracle_eps:
v = ep.get("vulnerability", "unknown")
vuln_seen[v] = vuln_seen.get(v, 0) + 1
print("\nβ–Ά Vulnerability coverage:")
for v in sorted(vuln_seen):
print(f" {vuln_seen[v]:2d}Γ— {v}")
# assert oracle_avg > 0.75, f"Oracle avg {oracle_avg:.3f} should be > 0.75"
# assert 0.1 < partial_avg <= 0.75, f"Partial avg {partial_avg:.3f} should be in range (0.1, 0.75)"
# assert floor_avg <= 0.1, f"Floor avg {floor_avg:.3f} should be <= 0.1"
# assert oracle_avg >= random_avg >= floor_avg, \
# f"Score ordering violated: oracle={oracle_avg}, random={random_avg}, floor={floor_avg}"
print(f"\n βœ… Task 1: oracle({oracle_avg}) β‰₯ partial({partial_avg}) β‰₯ random({random_avg:.3f}) β‰₯ floor({floor_avg})")
return {
"task_id": "task1_vuln_detection",
"oracle": {"avg_score": oracle_avg, "episodes": oracle_eps},
"partial": {"avg_score": partial_avg, "episodes": partial_eps},
"random": {"avg_score": random_avg, "episodes": random_eps},
"floor": {"avg_score": floor_avg, "episodes": floor_eps},
"vuln_coverage": vuln_seen,
}
def run_task2_eval(n: int, seed_offset: int, verbose: bool) -> Dict[str, Any]:
print("\n" + "=" * 64)
print("TASK 2 β€” Property Discovery")
print("=" * 64)
contracts = load_contracts()
print(f" Dataset: {len(get_all_property_entries(contracts))} property entries\n")
env = Task2Environment()
# Oracle
print("β–Ά Oracle (submits ground-truth natural language):")
oracle_eps = []
for i in range(n):
ep = oracle_t2(env, seed_offset + i, verbose)
oracle_eps.append(ep)
icon = "βœ…" if ep["grader_score"] >= 0.65 else "⚠️ "
print(f" {icon} seed={ep['seed']:3d} {ep['contract']:12s}.{ep['function']:18s}"
f" score={ep['grader_score']:.3f}")
oracle_avg = _avg(oracle_eps)
print(f"\n Oracle avg: {oracle_avg:.3f}")
# Partial
print("\nβ–Ά Partial (submits NatSpec comment):")
partial_eps = [partial_t2(env, seed_offset + i) for i in range(n)]
partial_avg = _avg(partial_eps)
print(f" Partial avg: {partial_avg:.3f}")
# Random
print("\nβ–Ά Random (random browse + generic property template):")
random_eps = [random_t2(env, seed_offset + i) for i in range(n)]
random_avg = _avg(random_eps)
print(f" Random avg: {random_avg:.3f}")
# Floor
print("\nβ–Ά Floor (submits empty string β†’ 0.0):")
floor_eps = [floor_t2(env, seed_offset + i) for i in range(n)]
floor_avg = _avg(floor_eps)
print(f" Floor avg: {floor_avg:.3f}")
# assert oracle_avg > 0.60, f"Oracle avg {oracle_avg:.3f} should be > 0.60"
# assert oracle_avg > partial_avg >= floor_avg, \
# "Score ordering violated: oracle > partial >= floor"
# assert floor_avg < 0.1, f"Floor avg {floor_avg:.3f} should be 0.0"
# print(f"\n βœ… Task 2: oracle({oracle_avg:.3f}) > partial({partial_avg:.3f})"
# f" β‰₯ random({random_avg:.3f}) β‰₯ floor(0.0)")
return {
"task_id": "task2_property_discovery",
"oracle": {"avg_score": oracle_avg, "episodes": oracle_eps},
"partial": {"avg_score": partial_avg, "episodes": partial_eps},
"random": {"avg_score": random_avg, "episodes": random_eps},
"floor": {"avg_score": floor_avg, "episodes": floor_eps},
}
def run_task3_eval(n: int, seed_offset: int, verbose: bool) -> Dict[str, Any]:
print("\n" + "=" * 64)
print("TASK 3 β€” Rule Checker")
print("=" * 64)
contracts = load_contracts()
print(f" Dataset: {len(get_all_task3_entries(contracts))} rule-check episodes\n")
env = Task3Environment()
# Oracle
print("β–Ά Oracle (exact target function β†’ ~1.0):")
oracle_eps = []
for i in range(n):
ep = oracle_t3(env, seed_offset + i, verbose)
oracle_eps.append(ep)
print(f" seed={ep['seed']:3d} {ep['contract']:12s}.{ep['target_function']:18s}"
# assert oracle_avg > 0.60, f"Oracle avg {oracle_avg:.3f} should be > 0.60"
# assert oracle_avg > partial_avg >= floor_avg, \
# "Score ordering violated: oracle > partial >= floor"
# assert floor_avg < 0.1, f"Floor avg {floor_avg:.3f} should be 0.0"
# print(f"\n βœ… Task 2: oracle({oracle_avg:.3f}) > partial({partial_avg:.3f})"
# f" β‰₯ random({random_avg:.3f}) β‰₯ floor(0.0)")
f" score={ep['grader_score']:.1f}")
oracle_avg = _avg(oracle_eps)
print(f"\n Oracle avg: {oracle_avg:.3f}")
# Subfunction (partial credit)
print("\nβ–Ά Subfunction (partial-credit callee if exists, else constructor):")
sub_eps = [subfunction_t3(env, seed_offset + i, verbose) for i in range(n)]
sub_avg = _avg(sub_eps)
submitted_sub = list({e.get("submitted", "?") for e in sub_eps})
print(f" Subfunction avg: {sub_avg:.3f} submitted: {submitted_sub}")
# Random
print("\nβ–Ά Random (lists functions, submits uniformly random one):")
random_eps = [random_t3(env, seed_offset + i) for i in range(n)]
random_avg = _avg(random_eps)
submitted_rand = [e.get("submitted", "?") for e in random_eps]
print(f" Random avg: {random_avg:.3f} submitted: {submitted_rand}")
# Floor
print("\nβ–Ά Floor (always 'constructor' β†’ ~0.0):")
floor_eps = [floor_t3(env, seed_offset + i) for i in range(n)]
floor_avg = _avg(floor_eps)
print(f" Floor avg: {floor_avg:.3f}")
# assert oracle_avg > 0.75, f"Oracle avg {oracle_avg:.3f} should be >0.75"
# assert floor_avg == 0.001, f"Floor avg {floor_avg:.3f} should be 0.001"
# assert oracle_avg >= random_avg >= floor_avg, \
# f"Score ordering violated: oracle={oracle_avg}, random={random_avg}, floor={floor_avg}"
# print(f"\n βœ… Task 3: oracle({oracle_avg}) β‰₯ subfunction({sub_avg:.3f})"
# f" β‰₯ random({random_avg:.3f}) β‰₯ floor({floor_avg})")
return {
"task_id": "task3_rule_checker",
"oracle": {"avg_score": oracle_avg, "episodes": oracle_eps},
"subfunction": {"avg_score": sub_avg, "episodes": sub_eps},
"random": {"avg_score": random_avg, "episodes": random_eps},
"floor": {"avg_score": floor_avg, "episodes": floor_eps},
}
# ─────────────────────────────────────────────────────────────────────────────
# Entry point
# ─────────────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Evaluate Task 1, 2, and/or 3 β€” oracle / partial / random / floor"
)
parser.add_argument("--episodes", type=int, default=8,
help="Episodes per agent tier (default: 8)")
parser.add_argument("--seed", type=int, default=42,
help="Starting RNG seed (default: 42)")
parser.add_argument("--task", choices=["1", "2", "3", "all"], default="all",
help="Which task(s) from [1, 2, 3] to evaluate (default: all)")
parser.add_argument("--verbose", action="store_true",
help="Print per-episode target details for oracle agents")
parser.add_argument("--out", default="eval_results.json",
help="Output JSON file (default: eval_results.json)")
args = parser.parse_args()
report: Dict[str, Any] = {"num_episodes": args.episodes, "seed_offset": args.seed}
if args.task in ("1", "all"):
report["task1"] = run_task1_eval(args.episodes, args.seed, args.verbose)
if args.task in ("2", "all"):
report["task2"] = run_task2_eval(args.episodes, args.seed, args.verbose)
if args.task in ("3", "all"):
report["task3"] = run_task3_eval(args.episodes, args.seed, args.verbose)
# ── Summary ──────────────────────────────────────────────────────────────
print("\n" + "=" * 64)
print("EVALUATION COMPLETE")
print("=" * 64)
rows = [
("Task 1", "task1", ["oracle", "partial", "random", "floor"]),
("Task 2", "task2", ["oracle", "partial", "random", "floor"]),
("Task 3", "task3", ["oracle", "subfunction", "random", "floor"]),
]
for label, key, tiers in rows:
if key in report:
scores = " ".join(
f"{t}={report[key][t]['avg_score']:.3f}" for t in tiers
)
print(f" {label} {scores}")
with open(args.out, "w") as f:
json.dump(report, f, indent=2)
print(f"\n Results written to {args.out}")
if __name__ == "__main__":
main()