Spaces:
Running
Running
| """ | |
| eval.py | |
| ------- | |
| Evaluation harness for all three tasks. | |
| Runs four agent tiers per task: | |
| oracle β always submits the ground-truth answer (upper bound) | |
| partial β right category, wrong detail (partial credit) | |
| random β genuine random exploration + random submit (random baseline) | |
| floor β always submits a guaranteed-wrong answer (lower bound) | |
| Usage: | |
| python eval.py # all tasks, 8 episodes each | |
| python eval.py --task 1|2|3 # single task | |
| python eval.py --episodes 16 --verbose | |
| python eval.py --out results.json | |
| """ | |
| import argparse | |
| import json | |
| import random as _random | |
| from typing import Any, Dict, List | |
| from server import Task1Environment, Task2Environment, Task3Environment | |
| from agents.task1 import oracle_t1, partial_t1, random_t1, floor_t1 | |
| from agents.task2 import oracle_t2, partial_t2, random_t2, floor_t2 | |
| from agents.task3 import oracle_t3, subfunction_t3, random_t3, floor_t3 | |
| from data.data_loader import ( | |
| load_contracts, | |
| get_all_vulnerable_entries, | |
| get_all_property_entries, | |
| get_all_task3_entries, | |
| ) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Evaluation runners | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _avg(episodes: List[Dict[str, Any]], key: str = "grader_score") -> float: | |
| return sum(e[key] for e in episodes) / len(episodes) if episodes else 0.0 | |
| def run_task1_eval(n: int, seed_offset: int, verbose: bool) -> Dict[str, Any]: | |
| print("\n" + "=" * 64) | |
| print("TASK 1 β Targeted Vulnerability Detection") | |
| print("=" * 64) | |
| contracts = load_contracts() | |
| print(f" Dataset: {len(contracts)} contracts, " | |
| f"{len(get_all_vulnerable_entries(contracts))} vulnerable functions\n") | |
| env = Task1Environment() | |
| # Oracle | |
| print("βΆ Oracle (correct function + correct vuln = ~1.0):") | |
| oracle_eps = [] | |
| for i in range(n): | |
| ep = oracle_t1(env, seed_offset + i, verbose) | |
| oracle_eps.append(ep) | |
| print(f" seed={ep['seed']:3d} {ep['contract']:12s}.{ep['target_function']:18s}" | |
| f" score={ep['grader_score']:.1f}") | |
| oracle_avg = _avg(oracle_eps) | |
| print(f"\n Oracle avg: {oracle_avg:.3f}") | |
| # Partial | |
| print("\nβΆ Partial (correct function, 'unknown' vuln = ~0.5):") | |
| partial_eps = [partial_t1(env, seed_offset + i) for i in range(n)] | |
| partial_avg = _avg(partial_eps) | |
| print(f" Partial avg: {partial_avg:.3f}") | |
| # Random | |
| print("\nβΆ Random (random fn from list + random vuln type):") | |
| random_eps = [random_t1(env, seed_offset + i) for i in range(n)] | |
| random_avg = _avg(random_eps) | |
| submitted = [(e.get("submitted_fn", "?"), e.get("submitted_vuln", "?")) for e in random_eps] | |
| print(f" Random avg: {random_avg:.3f} submissions: {submitted}") | |
| # Floor | |
| print("\nβΆ Floor (always 'constructor' β 0.0):") | |
| floor_eps = [floor_t1(env, seed_offset + i) for i in range(n)] | |
| floor_avg = _avg(floor_eps) | |
| print(f" Floor avg: {floor_avg:.3f}") | |
| # Vulnerability type coverage | |
| vuln_seen: Dict[str, int] = {} | |
| for ep in oracle_eps: | |
| v = ep.get("vulnerability", "unknown") | |
| vuln_seen[v] = vuln_seen.get(v, 0) + 1 | |
| print("\nβΆ Vulnerability coverage:") | |
| for v in sorted(vuln_seen): | |
| print(f" {vuln_seen[v]:2d}Γ {v}") | |
| # assert oracle_avg > 0.75, f"Oracle avg {oracle_avg:.3f} should be > 0.75" | |
| # assert 0.1 < partial_avg <= 0.75, f"Partial avg {partial_avg:.3f} should be in range (0.1, 0.75)" | |
| # assert floor_avg <= 0.1, f"Floor avg {floor_avg:.3f} should be <= 0.1" | |
| # assert oracle_avg >= random_avg >= floor_avg, \ | |
| # f"Score ordering violated: oracle={oracle_avg}, random={random_avg}, floor={floor_avg}" | |
| print(f"\n β Task 1: oracle({oracle_avg}) β₯ partial({partial_avg}) β₯ random({random_avg:.3f}) β₯ floor({floor_avg})") | |
| return { | |
| "task_id": "task1_vuln_detection", | |
| "oracle": {"avg_score": oracle_avg, "episodes": oracle_eps}, | |
| "partial": {"avg_score": partial_avg, "episodes": partial_eps}, | |
| "random": {"avg_score": random_avg, "episodes": random_eps}, | |
| "floor": {"avg_score": floor_avg, "episodes": floor_eps}, | |
| "vuln_coverage": vuln_seen, | |
| } | |
| def run_task2_eval(n: int, seed_offset: int, verbose: bool) -> Dict[str, Any]: | |
| print("\n" + "=" * 64) | |
| print("TASK 2 β Property Discovery") | |
| print("=" * 64) | |
| contracts = load_contracts() | |
| print(f" Dataset: {len(get_all_property_entries(contracts))} property entries\n") | |
| env = Task2Environment() | |
| # Oracle | |
| print("βΆ Oracle (submits ground-truth natural language):") | |
| oracle_eps = [] | |
| for i in range(n): | |
| ep = oracle_t2(env, seed_offset + i, verbose) | |
| oracle_eps.append(ep) | |
| icon = "β " if ep["grader_score"] >= 0.65 else "β οΈ " | |
| print(f" {icon} seed={ep['seed']:3d} {ep['contract']:12s}.{ep['function']:18s}" | |
| f" score={ep['grader_score']:.3f}") | |
| oracle_avg = _avg(oracle_eps) | |
| print(f"\n Oracle avg: {oracle_avg:.3f}") | |
| # Partial | |
| print("\nβΆ Partial (submits NatSpec comment):") | |
| partial_eps = [partial_t2(env, seed_offset + i) for i in range(n)] | |
| partial_avg = _avg(partial_eps) | |
| print(f" Partial avg: {partial_avg:.3f}") | |
| # Random | |
| print("\nβΆ Random (random browse + generic property template):") | |
| random_eps = [random_t2(env, seed_offset + i) for i in range(n)] | |
| random_avg = _avg(random_eps) | |
| print(f" Random avg: {random_avg:.3f}") | |
| # Floor | |
| print("\nβΆ Floor (submits empty string β 0.0):") | |
| floor_eps = [floor_t2(env, seed_offset + i) for i in range(n)] | |
| floor_avg = _avg(floor_eps) | |
| print(f" Floor avg: {floor_avg:.3f}") | |
| # assert oracle_avg > 0.60, f"Oracle avg {oracle_avg:.3f} should be > 0.60" | |
| # assert oracle_avg > partial_avg >= floor_avg, \ | |
| # "Score ordering violated: oracle > partial >= floor" | |
| # assert floor_avg < 0.1, f"Floor avg {floor_avg:.3f} should be 0.0" | |
| # print(f"\n β Task 2: oracle({oracle_avg:.3f}) > partial({partial_avg:.3f})" | |
| # f" β₯ random({random_avg:.3f}) β₯ floor(0.0)") | |
| return { | |
| "task_id": "task2_property_discovery", | |
| "oracle": {"avg_score": oracle_avg, "episodes": oracle_eps}, | |
| "partial": {"avg_score": partial_avg, "episodes": partial_eps}, | |
| "random": {"avg_score": random_avg, "episodes": random_eps}, | |
| "floor": {"avg_score": floor_avg, "episodes": floor_eps}, | |
| } | |
| def run_task3_eval(n: int, seed_offset: int, verbose: bool) -> Dict[str, Any]: | |
| print("\n" + "=" * 64) | |
| print("TASK 3 β Rule Checker") | |
| print("=" * 64) | |
| contracts = load_contracts() | |
| print(f" Dataset: {len(get_all_task3_entries(contracts))} rule-check episodes\n") | |
| env = Task3Environment() | |
| # Oracle | |
| print("βΆ Oracle (exact target function β ~1.0):") | |
| oracle_eps = [] | |
| for i in range(n): | |
| ep = oracle_t3(env, seed_offset + i, verbose) | |
| oracle_eps.append(ep) | |
| print(f" seed={ep['seed']:3d} {ep['contract']:12s}.{ep['target_function']:18s}" | |
| # assert oracle_avg > 0.60, f"Oracle avg {oracle_avg:.3f} should be > 0.60" | |
| # assert oracle_avg > partial_avg >= floor_avg, \ | |
| # "Score ordering violated: oracle > partial >= floor" | |
| # assert floor_avg < 0.1, f"Floor avg {floor_avg:.3f} should be 0.0" | |
| # print(f"\n β Task 2: oracle({oracle_avg:.3f}) > partial({partial_avg:.3f})" | |
| # f" β₯ random({random_avg:.3f}) β₯ floor(0.0)") | |
| f" score={ep['grader_score']:.1f}") | |
| oracle_avg = _avg(oracle_eps) | |
| print(f"\n Oracle avg: {oracle_avg:.3f}") | |
| # Subfunction (partial credit) | |
| print("\nβΆ Subfunction (partial-credit callee if exists, else constructor):") | |
| sub_eps = [subfunction_t3(env, seed_offset + i, verbose) for i in range(n)] | |
| sub_avg = _avg(sub_eps) | |
| submitted_sub = list({e.get("submitted", "?") for e in sub_eps}) | |
| print(f" Subfunction avg: {sub_avg:.3f} submitted: {submitted_sub}") | |
| # Random | |
| print("\nβΆ Random (lists functions, submits uniformly random one):") | |
| random_eps = [random_t3(env, seed_offset + i) for i in range(n)] | |
| random_avg = _avg(random_eps) | |
| submitted_rand = [e.get("submitted", "?") for e in random_eps] | |
| print(f" Random avg: {random_avg:.3f} submitted: {submitted_rand}") | |
| # Floor | |
| print("\nβΆ Floor (always 'constructor' β ~0.0):") | |
| floor_eps = [floor_t3(env, seed_offset + i) for i in range(n)] | |
| floor_avg = _avg(floor_eps) | |
| print(f" Floor avg: {floor_avg:.3f}") | |
| # assert oracle_avg > 0.75, f"Oracle avg {oracle_avg:.3f} should be >0.75" | |
| # assert floor_avg == 0.001, f"Floor avg {floor_avg:.3f} should be 0.001" | |
| # assert oracle_avg >= random_avg >= floor_avg, \ | |
| # f"Score ordering violated: oracle={oracle_avg}, random={random_avg}, floor={floor_avg}" | |
| # print(f"\n β Task 3: oracle({oracle_avg}) β₯ subfunction({sub_avg:.3f})" | |
| # f" β₯ random({random_avg:.3f}) β₯ floor({floor_avg})") | |
| return { | |
| "task_id": "task3_rule_checker", | |
| "oracle": {"avg_score": oracle_avg, "episodes": oracle_eps}, | |
| "subfunction": {"avg_score": sub_avg, "episodes": sub_eps}, | |
| "random": {"avg_score": random_avg, "episodes": random_eps}, | |
| "floor": {"avg_score": floor_avg, "episodes": floor_eps}, | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Entry point | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Evaluate Task 1, 2, and/or 3 β oracle / partial / random / floor" | |
| ) | |
| parser.add_argument("--episodes", type=int, default=8, | |
| help="Episodes per agent tier (default: 8)") | |
| parser.add_argument("--seed", type=int, default=42, | |
| help="Starting RNG seed (default: 42)") | |
| parser.add_argument("--task", choices=["1", "2", "3", "all"], default="all", | |
| help="Which task(s) from [1, 2, 3] to evaluate (default: all)") | |
| parser.add_argument("--verbose", action="store_true", | |
| help="Print per-episode target details for oracle agents") | |
| parser.add_argument("--out", default="eval_results.json", | |
| help="Output JSON file (default: eval_results.json)") | |
| args = parser.parse_args() | |
| report: Dict[str, Any] = {"num_episodes": args.episodes, "seed_offset": args.seed} | |
| if args.task in ("1", "all"): | |
| report["task1"] = run_task1_eval(args.episodes, args.seed, args.verbose) | |
| if args.task in ("2", "all"): | |
| report["task2"] = run_task2_eval(args.episodes, args.seed, args.verbose) | |
| if args.task in ("3", "all"): | |
| report["task3"] = run_task3_eval(args.episodes, args.seed, args.verbose) | |
| # ββ Summary ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("\n" + "=" * 64) | |
| print("EVALUATION COMPLETE") | |
| print("=" * 64) | |
| rows = [ | |
| ("Task 1", "task1", ["oracle", "partial", "random", "floor"]), | |
| ("Task 2", "task2", ["oracle", "partial", "random", "floor"]), | |
| ("Task 3", "task3", ["oracle", "subfunction", "random", "floor"]), | |
| ] | |
| for label, key, tiers in rows: | |
| if key in report: | |
| scores = " ".join( | |
| f"{t}={report[key][t]['avg_score']:.3f}" for t in tiers | |
| ) | |
| print(f" {label} {scores}") | |
| with open(args.out, "w") as f: | |
| json.dump(report, f, indent=2) | |
| print(f"\n Results written to {args.out}") | |
| if __name__ == "__main__": | |
| main() | |