""" eval.py ------- Evaluation harness for all three tasks. Runs four agent tiers per task: oracle – always submits the ground-truth answer (upper bound) partial – right category, wrong detail (partial credit) random – genuine random exploration + random submit (random baseline) floor – always submits a guaranteed-wrong answer (lower bound) Usage: python eval.py # all tasks, 8 episodes each python eval.py --task 1|2|3 # single task python eval.py --episodes 16 --verbose python eval.py --out results.json """ import argparse import json import random as _random from typing import Any, Dict, List from server import Task1Environment, Task2Environment, Task3Environment from agents.task1 import oracle_t1, partial_t1, random_t1, floor_t1 from agents.task2 import oracle_t2, partial_t2, random_t2, floor_t2 from agents.task3 import oracle_t3, subfunction_t3, random_t3, floor_t3 from data.data_loader import ( load_contracts, get_all_vulnerable_entries, get_all_property_entries, get_all_task3_entries, ) # ───────────────────────────────────────────────────────────────────────────── # Evaluation runners # ───────────────────────────────────────────────────────────────────────────── def _avg(episodes: List[Dict[str, Any]], key: str = "grader_score") -> float: return sum(e[key] for e in episodes) / len(episodes) if episodes else 0.0 def run_task1_eval(n: int, seed_offset: int, verbose: bool) -> Dict[str, Any]: print("\n" + "=" * 64) print("TASK 1 — Targeted Vulnerability Detection") print("=" * 64) contracts = load_contracts() print(f" Dataset: {len(contracts)} contracts, " f"{len(get_all_vulnerable_entries(contracts))} vulnerable functions\n") env = Task1Environment() # Oracle print("▶ Oracle (correct function + correct vuln = ~1.0):") oracle_eps = [] for i in range(n): ep = oracle_t1(env, seed_offset + i, verbose) oracle_eps.append(ep) print(f" seed={ep['seed']:3d} {ep['contract']:12s}.{ep['target_function']:18s}" f" score={ep['grader_score']:.1f}") oracle_avg = _avg(oracle_eps) print(f"\n Oracle avg: {oracle_avg:.3f}") # Partial print("\n▶ Partial (correct function, 'unknown' vuln = ~0.5):") partial_eps = [partial_t1(env, seed_offset + i) for i in range(n)] partial_avg = _avg(partial_eps) print(f" Partial avg: {partial_avg:.3f}") # Random print("\n▶ Random (random fn from list + random vuln type):") random_eps = [random_t1(env, seed_offset + i) for i in range(n)] random_avg = _avg(random_eps) submitted = [(e.get("submitted_fn", "?"), e.get("submitted_vuln", "?")) for e in random_eps] print(f" Random avg: {random_avg:.3f} submissions: {submitted}") # Floor print("\n▶ Floor (always 'constructor' → 0.0):") floor_eps = [floor_t1(env, seed_offset + i) for i in range(n)] floor_avg = _avg(floor_eps) print(f" Floor avg: {floor_avg:.3f}") # Vulnerability type coverage vuln_seen: Dict[str, int] = {} for ep in oracle_eps: v = ep.get("vulnerability", "unknown") vuln_seen[v] = vuln_seen.get(v, 0) + 1 print("\n▶ Vulnerability coverage:") for v in sorted(vuln_seen): print(f" {vuln_seen[v]:2d}× {v}") # assert oracle_avg > 0.75, f"Oracle avg {oracle_avg:.3f} should be > 0.75" # assert 0.1 < partial_avg <= 0.75, f"Partial avg {partial_avg:.3f} should be in range (0.1, 0.75)" # assert floor_avg <= 0.1, f"Floor avg {floor_avg:.3f} should be <= 0.1" # assert oracle_avg >= random_avg >= floor_avg, \ # f"Score ordering violated: oracle={oracle_avg}, random={random_avg}, floor={floor_avg}" print(f"\n ✅ Task 1: oracle({oracle_avg}) ≥ partial({partial_avg}) ≥ random({random_avg:.3f}) ≥ floor({floor_avg})") return { "task_id": "task1_vuln_detection", "oracle": {"avg_score": oracle_avg, "episodes": oracle_eps}, "partial": {"avg_score": partial_avg, "episodes": partial_eps}, "random": {"avg_score": random_avg, "episodes": random_eps}, "floor": {"avg_score": floor_avg, "episodes": floor_eps}, "vuln_coverage": vuln_seen, } def run_task2_eval(n: int, seed_offset: int, verbose: bool) -> Dict[str, Any]: print("\n" + "=" * 64) print("TASK 2 — Property Discovery") print("=" * 64) contracts = load_contracts() print(f" Dataset: {len(get_all_property_entries(contracts))} property entries\n") env = Task2Environment() # Oracle print("▶ Oracle (submits ground-truth natural language):") oracle_eps = [] for i in range(n): ep = oracle_t2(env, seed_offset + i, verbose) oracle_eps.append(ep) icon = "✅" if ep["grader_score"] >= 0.65 else "⚠️ " print(f" {icon} seed={ep['seed']:3d} {ep['contract']:12s}.{ep['function']:18s}" f" score={ep['grader_score']:.3f}") oracle_avg = _avg(oracle_eps) print(f"\n Oracle avg: {oracle_avg:.3f}") # Partial print("\n▶ Partial (submits NatSpec comment):") partial_eps = [partial_t2(env, seed_offset + i) for i in range(n)] partial_avg = _avg(partial_eps) print(f" Partial avg: {partial_avg:.3f}") # Random print("\n▶ Random (random browse + generic property template):") random_eps = [random_t2(env, seed_offset + i) for i in range(n)] random_avg = _avg(random_eps) print(f" Random avg: {random_avg:.3f}") # Floor print("\n▶ Floor (submits empty string → 0.0):") floor_eps = [floor_t2(env, seed_offset + i) for i in range(n)] floor_avg = _avg(floor_eps) print(f" Floor avg: {floor_avg:.3f}") # assert oracle_avg > 0.60, f"Oracle avg {oracle_avg:.3f} should be > 0.60" # assert oracle_avg > partial_avg >= floor_avg, \ # "Score ordering violated: oracle > partial >= floor" # assert floor_avg < 0.1, f"Floor avg {floor_avg:.3f} should be 0.0" # print(f"\n ✅ Task 2: oracle({oracle_avg:.3f}) > partial({partial_avg:.3f})" # f" ≥ random({random_avg:.3f}) ≥ floor(0.0)") return { "task_id": "task2_property_discovery", "oracle": {"avg_score": oracle_avg, "episodes": oracle_eps}, "partial": {"avg_score": partial_avg, "episodes": partial_eps}, "random": {"avg_score": random_avg, "episodes": random_eps}, "floor": {"avg_score": floor_avg, "episodes": floor_eps}, } def run_task3_eval(n: int, seed_offset: int, verbose: bool) -> Dict[str, Any]: print("\n" + "=" * 64) print("TASK 3 — Rule Checker") print("=" * 64) contracts = load_contracts() print(f" Dataset: {len(get_all_task3_entries(contracts))} rule-check episodes\n") env = Task3Environment() # Oracle print("▶ Oracle (exact target function → ~1.0):") oracle_eps = [] for i in range(n): ep = oracle_t3(env, seed_offset + i, verbose) oracle_eps.append(ep) print(f" seed={ep['seed']:3d} {ep['contract']:12s}.{ep['target_function']:18s}" # assert oracle_avg > 0.60, f"Oracle avg {oracle_avg:.3f} should be > 0.60" # assert oracle_avg > partial_avg >= floor_avg, \ # "Score ordering violated: oracle > partial >= floor" # assert floor_avg < 0.1, f"Floor avg {floor_avg:.3f} should be 0.0" # print(f"\n ✅ Task 2: oracle({oracle_avg:.3f}) > partial({partial_avg:.3f})" # f" ≥ random({random_avg:.3f}) ≥ floor(0.0)") f" score={ep['grader_score']:.1f}") oracle_avg = _avg(oracle_eps) print(f"\n Oracle avg: {oracle_avg:.3f}") # Subfunction (partial credit) print("\n▶ Subfunction (partial-credit callee if exists, else constructor):") sub_eps = [subfunction_t3(env, seed_offset + i, verbose) for i in range(n)] sub_avg = _avg(sub_eps) submitted_sub = list({e.get("submitted", "?") for e in sub_eps}) print(f" Subfunction avg: {sub_avg:.3f} submitted: {submitted_sub}") # Random print("\n▶ Random (lists functions, submits uniformly random one):") random_eps = [random_t3(env, seed_offset + i) for i in range(n)] random_avg = _avg(random_eps) submitted_rand = [e.get("submitted", "?") for e in random_eps] print(f" Random avg: {random_avg:.3f} submitted: {submitted_rand}") # Floor print("\n▶ Floor (always 'constructor' → ~0.0):") floor_eps = [floor_t3(env, seed_offset + i) for i in range(n)] floor_avg = _avg(floor_eps) print(f" Floor avg: {floor_avg:.3f}") # assert oracle_avg > 0.75, f"Oracle avg {oracle_avg:.3f} should be >0.75" # assert floor_avg == 0.001, f"Floor avg {floor_avg:.3f} should be 0.001" # assert oracle_avg >= random_avg >= floor_avg, \ # f"Score ordering violated: oracle={oracle_avg}, random={random_avg}, floor={floor_avg}" # print(f"\n ✅ Task 3: oracle({oracle_avg}) ≥ subfunction({sub_avg:.3f})" # f" ≥ random({random_avg:.3f}) ≥ floor({floor_avg})") return { "task_id": "task3_rule_checker", "oracle": {"avg_score": oracle_avg, "episodes": oracle_eps}, "subfunction": {"avg_score": sub_avg, "episodes": sub_eps}, "random": {"avg_score": random_avg, "episodes": random_eps}, "floor": {"avg_score": floor_avg, "episodes": floor_eps}, } # ───────────────────────────────────────────────────────────────────────────── # Entry point # ───────────────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser( description="Evaluate Task 1, 2, and/or 3 — oracle / partial / random / floor" ) parser.add_argument("--episodes", type=int, default=8, help="Episodes per agent tier (default: 8)") parser.add_argument("--seed", type=int, default=42, help="Starting RNG seed (default: 42)") parser.add_argument("--task", choices=["1", "2", "3", "all"], default="all", help="Which task(s) from [1, 2, 3] to evaluate (default: all)") parser.add_argument("--verbose", action="store_true", help="Print per-episode target details for oracle agents") parser.add_argument("--out", default="eval_results.json", help="Output JSON file (default: eval_results.json)") args = parser.parse_args() report: Dict[str, Any] = {"num_episodes": args.episodes, "seed_offset": args.seed} if args.task in ("1", "all"): report["task1"] = run_task1_eval(args.episodes, args.seed, args.verbose) if args.task in ("2", "all"): report["task2"] = run_task2_eval(args.episodes, args.seed, args.verbose) if args.task in ("3", "all"): report["task3"] = run_task3_eval(args.episodes, args.seed, args.verbose) # ── Summary ────────────────────────────────────────────────────────────── print("\n" + "=" * 64) print("EVALUATION COMPLETE") print("=" * 64) rows = [ ("Task 1", "task1", ["oracle", "partial", "random", "floor"]), ("Task 2", "task2", ["oracle", "partial", "random", "floor"]), ("Task 3", "task3", ["oracle", "subfunction", "random", "floor"]), ] for label, key, tiers in rows: if key in report: scores = " ".join( f"{t}={report[key][t]['avg_score']:.3f}" for t in tiers ) print(f" {label} {scores}") with open(args.out, "w") as f: json.dump(report, f, indent=2) print(f"\n Results written to {args.out}") if __name__ == "__main__": main()