"""Grader for RedTeam PentestLab Environment.""" import sys import re import json from typing import Dict, List, Tuple SAFE_TASK_IDS = ["task_1", "task_2", "task_3", "task_4", "task_5", "task_6"] def clamp_score(score: float) -> float: """Clamp a score to be strictly within (0, 1). This is the SINGLE source of truth for score bounds. Every score — per-task AND overall — MUST pass through here before being stored, printed, or serialised. Clamp to the open interval (0, 1) using minimal safe margins. """ return max(1e-6, min(1 - 1e-6, score)) def parse_inference_output(output: str) -> List[Dict]: """Parse inference.py output into one record per task block.""" tasks: List[Dict] = [] current: Dict | None = None for line in output.split("\n"): line = line.strip() if line.startswith("[START]"): match = re.search(r"task=(\S+)\s+env=(\S+)\s+model=(\S+)", line) if match: current = { "task": match.group(1), "env": match.group(2), "model": match.group(3), "success": False, "steps": 0, "rewards": [], "step_details": [], } elif line.startswith("[STEP]") and current is not None: match = re.search( r"step=(\S+)\s+action=(\w+)\s+reward=([\d.-]+)\s+done=(\w+)\s+error=(\w+)", line, ) if match: current["step_details"].append( { "step": match.group(1), "action": match.group(2), "reward": float(match.group(3)), "done": match.group(4) == "true", "error": None if match.group(5) == "null" else match.group(5), } ) elif line.startswith("[END]") and current is not None: match = re.search( r"success=(\w+)\s+(?:steps=(\d+)\s+)?rewards=([\d.,\s-]+)", line, ) if match: current["success"] = match.group(1) == "true" rewards_str = match.group(3) current["rewards"] = [ float(r.strip()) for r in rewards_str.split(",") if r.strip() ] parsed_steps = int(match.group(2)) if match.group(2) else len(current["rewards"]) current["steps"] = parsed_steps tasks.append(current) current = None return tasks def grade_task(data: Dict) -> Tuple[float, Dict]: """ Grade the agent's performance on a single task. Returns: (score, details) where score is strictly within (0, 1) """ details = { "success": data["success"], "steps_taken": len(data["rewards"]), "total_reward": sum(data["rewards"]) if data["rewards"] else 0.0, "penalties": 0, "violations": [], } # Base score: 0.45 for success, 0.20 for failure # (chosen so that final score stays well inside (0, 1)) if data["success"]: score = 0.45 else: score = 0.20 # Reward bonus (up to 0.25) total_reward = sum(data["rewards"]) if data["rewards"] else 0.0 # Per-task max: easy≈0.62, medium≈0.63, hard≈0.72. Use 0.75 as safe ceiling. max_possible = 0.75 reward_ratio = min(total_reward / max_possible, 1.0) if max_possible > 0 else 0.0 score += reward_ratio * 0.25 # Check for violations for step_detail in data.get("step_details", []): if step_detail.get("reward", 0) < 0: details["penalties"] += 1 details["violations"].append(f"Step {step_detail.get('step', '?')}: {step_detail.get('action', '?')}") # Penalty for violations (-0.03 per violation, max -0.09) violation_penalty = min(details["penalties"] * 0.03, 0.09) score -= violation_penalty # *** CRITICAL: clamp to strictly (0, 1) *** score = clamp_score(score) details["final_score"] = score return score, details def main(): """Main grader entry point.""" if len(sys.argv) < 2: print("Usage: python grader.py ") sys.exit(1) output_file = sys.argv[1] try: with open(output_file, "r") as f: output = f.read() except FileNotFoundError: print(f"ERROR: File not found: {output_file}") sys.exit(1) # Parse output tasks = parse_inference_output(output) # Ensure we always have at least 3 tasks (contest requirement) if not tasks or len(tasks) < 3: print(f"WARNING: Only parsed {len(tasks)} tasks, creating fallbacks to reach 3 tasks", file=sys.stderr) fallback_template = { "task": None, "env": "redteam_pentest", "model": "unknown", "success": False, "steps": 0, "rewards": [], "step_details": [], } while len(tasks) < 3: fallback = fallback_template.copy() fallback["task"] = SAFE_TASK_IDS[len(tasks)] if len(tasks) < len(SAFE_TASK_IDS) else "fallback" tasks.append(fallback) # Grade each task independently graded_tasks = [] for task_data in tasks: score, details = grade_task(task_data) # Double-check: clamp again (should be redundant, but safety first) score = clamp_score(score) details["final_score"] = score graded_tasks.append((task_data, score, details)) # Compute overall score overall_score = sum(score for _, score, _ in graded_tasks) / len(graded_tasks) overall_score = clamp_score(overall_score) # Output individual task scores in machine-readable format for index, (task_data, score, details) in enumerate(graded_tasks, 1): task_id = SAFE_TASK_IDS[index - 1] if (index - 1) < len(SAFE_TASK_IDS) else "fallback" # Final clamp right at the output boundary final_task_score = clamp_score(details["final_score"]) # Validate strictly: must be > 0 and < 1 assert 0.0 < final_task_score < 1.0, f"Score {final_task_score} is out of (0,1) range!" print(f"TASK_SCORE:{task_id}:{final_task_score}") print(f"OVERALL_SCORE:{overall_score}") # Output JSON for machine parsing json_tasks = [] for index, (task_data, score, details) in enumerate(graded_tasks): clamped = clamp_score(score) json_tasks.append({ "task_id": SAFE_TASK_IDS[index] if index < len(SAFE_TASK_IDS) else "fallback", "score": float(clamped), }) json_output = { "overall_score": float(overall_score), "tasks": json_tasks, } print(f"\nJSON_OUTPUT:{json.dumps(json_output)}") # Exit with 0 so the evaluation platform does not treat the grader as crashed. sys.exit(0) if __name__ == "__main__": main()