| """ |
| Two-Stage Evaluation Pipeline + Docker GPU Sandbox |
| ==================================================== |
| Stage 1: AST Pre-flight Linter — structural gating (syntax errors, forbidden modules) |
| Stage 2: Constitutional Pre-Flight Check — budget/SPOF/SLA validation |
| Stage 3: Docker GPU Sandbox Execution — double-lock memory enforcement with tensor challenges |
| |
| The sandbox implements the Physics Test: |
| - Injects VRAM constraint preamble (Layer 2: torch.cuda.set_per_process_memory_fraction) |
| - Injects dummy tensor workload (the weight) |
| - Boots container with --memory=500m + GPU passthrough (Layer 1: cgroups) |
| - Determines: crash (Outcome A, -1.00 penalty) or pass (Outcome B, +0.40 reward) |
| """ |
|
|
| import ast |
| import sys |
| import logging |
| from typing import Optional |
|
|
| from engine.docker_sandbox import DockerGPUSandbox |
| from engine.tensor_challenges import TensorChallengeGenerator |
|
|
| logger = logging.getLogger("swarm-os.evaluator") |
|
|
|
|
| |
| FORBIDDEN_MODULES = {"os", "subprocess", "shutil", "pathlib", "socket", "http", "requests"} |
|
|
|
|
| class TwoStageEvaluator: |
| """ |
| Two-stage evaluation pipeline for AI-generated code. |
| Runs in 0.01s (AST) + Docker execution time. |
| |
| In production mode, uses the DockerGPUSandbox with real GPU passthrough. |
| In mock mode, simulates results for development without Docker. |
| """ |
|
|
| def __init__(self, gpu_vram_gb: float = 12.0): |
| self.docker_sandbox = DockerGPUSandbox(gpu_total_vram_gb=gpu_vram_gb) |
| self.challenge_generator = TensorChallengeGenerator() |
| logger.info("TwoStageEvaluator initialized with %.0fGB GPU VRAM budget", gpu_vram_gb) |
|
|
| def ast_preflight(self, code: str) -> dict: |
| """ |
| Stage 1 — Pre-Flight AST Linter. |
| Runs in ~0.01s. Checks: |
| - Syntax validity (can the code parse?) |
| - Forbidden module imports (security gate) |
| |
| Returns: |
| dict with passed (bool), errors (list), forbidden_imports (list) |
| """ |
| errors = [] |
| forbidden_found = [] |
|
|
| |
| try: |
| tree = ast.parse(code) |
| except SyntaxError as e: |
| return { |
| "passed": False, |
| "errors": [f"SyntaxError at line {e.lineno}: {e.msg}"], |
| "forbidden_imports": [], |
| } |
|
|
| |
| for node in ast.walk(tree): |
| if isinstance(node, ast.Import): |
| for alias in node.names: |
| module = alias.name.split(".")[0] |
| if module in FORBIDDEN_MODULES: |
| forbidden_found.append(module) |
| elif isinstance(node, ast.ImportFrom): |
| if node.module: |
| module = node.module.split(".")[0] |
| if module in FORBIDDEN_MODULES: |
| forbidden_found.append(module) |
|
|
| if forbidden_found: |
| return { |
| "passed": False, |
| "errors": [f"Forbidden import: {m}" for m in forbidden_found], |
| "forbidden_imports": forbidden_found, |
| } |
|
|
| return { |
| "passed": True, |
| "errors": [], |
| "forbidden_imports": [], |
| } |
|
|
| def constitutional_preflight(self, telemetry: dict, budget_remaining: float, |
| sla_remaining: float) -> dict: |
| """ |
| Constitutional Pre-Flight Check. |
| Three boolean checks that must all pass before sandbox execution: |
| 1. Does this action exceed the FinOps budget ceiling? |
| 2. Does this introduce a new single point of failure? |
| 3. Does this violate the SLA recovery window? |
| """ |
| checks = { |
| "budget_ok": budget_remaining > 0, |
| "no_spof": telemetry.get("active_compute_nodes", 0) > 1, |
| "sla_ok": sla_remaining > 60, |
| } |
| passed = all(checks.values()) |
| return { |
| "passed": passed, |
| "checks": checks, |
| "blocked_reasons": [ |
| reason for reason, ok in [ |
| ("FinOps budget exceeded", checks["budget_ok"]), |
| ("Single point of failure detected", checks["no_spof"]), |
| ("SLA recovery window violated", checks["sla_ok"]), |
| ] if not ok |
| ], |
| } |
|
|
| def sandbox_execute( |
| self, |
| code: str, |
| filename: str, |
| mock_mode: bool = True, |
| challenge_tier: Optional[int] = None, |
| use_tensor_challenge: bool = True, |
| inject_vram_lock: bool = True, |
| profile_vram: bool = True, |
| ) -> dict: |
| """ |
| Stage 3 — Docker Sandbox Execution with Tensor Challenge. |
| |
| In mock mode: simulates execution results for development. |
| In production mode: |
| 1. Selects a tensor challenge (auto-curriculum or specified tier) |
| 2. Passes code + challenge to DockerGPUSandbox |
| 3. Container boots with double-lock constraints |
| 4. Returns structured outcome (PASS / OOMKilled / CUDA_OOM / ERROR) |
| |
| Args: |
| code: AI-generated Python code |
| filename: Script filename for logging |
| mock_mode: If True, skip Docker and return simulated results |
| challenge_tier: Force a specific challenge tier (1-3), or None for auto |
| use_tensor_challenge: Whether to inject the PyTorch tensor stress workload |
| inject_vram_lock: Whether to inject the torch VRAM limiter preamble |
| profile_vram: Whether to append the torch-based VRAM profiling epilogue |
| |
| Returns: |
| dict with status, vram_peak_gb, error_type, causal_trigger, etc. |
| """ |
| if mock_mode: |
| return self._mock_sandbox(code, filename) |
|
|
| |
| return self._production_sandbox( |
| code, |
| filename, |
| challenge_tier, |
| use_tensor_challenge=use_tensor_challenge, |
| inject_vram_lock=inject_vram_lock, |
| profile_vram=profile_vram, |
| ) |
|
|
| def _production_sandbox( |
| self, |
| code: str, |
| filename: str, |
| challenge_tier: Optional[int] = None, |
| use_tensor_challenge: bool = True, |
| inject_vram_lock: bool = True, |
| profile_vram: bool = True, |
| ) -> dict: |
| """ |
| Production Docker sandbox execution with the full physics test. |
| |
| Pipeline: |
| 1. Select tensor challenge tier (curriculum learning) |
| 2. Get challenge workload code |
| 3. Execute in DockerGPUSandbox (double-lock + GPU passthrough) |
| 4. Parse result and record to challenge stats |
| """ |
| challenge = None |
| if use_tensor_challenge: |
| tier = challenge_tier or self.challenge_generator.get_curriculum_tier() |
| challenge = self.challenge_generator.get_challenge(tier=tier) |
| logger.info( |
| "Production sandbox: file=%s, challenge='%s' (tier=%d, raw=%dMB)", |
| filename, challenge["name"], challenge["tier"], challenge["raw_memory_mb"], |
| ) |
| else: |
| logger.info("Production sandbox: file=%s, plain Python validation mode", filename) |
|
|
| |
| result = self.docker_sandbox.execute( |
| code=code, |
| filename=filename, |
| tensor_challenge=challenge["code"] if challenge else None, |
| inject_vram_lock=inject_vram_lock, |
| profile_vram=profile_vram, |
| ) |
|
|
| |
| if challenge: |
| passed = result["status"] == "PASS" |
| self.challenge_generator.record_result(tier=challenge["tier"], passed=passed) |
| result["challenge"] = { |
| "name": challenge["name"], |
| "tier": challenge["tier"], |
| "raw_memory_mb": challenge["raw_memory_mb"], |
| "hint": challenge["hint_to_sre"] if not passed else None, |
| } |
| result["curriculum"] = self.challenge_generator.get_stats() |
|
|
| return result |
|
|
| def _mock_sandbox(self, code: str, filename: str) -> dict: |
| """ |
| Mock sandbox execution for development. |
| Simulates realistic results based on code content analysis. |
| """ |
| code_lower = code.lower() |
|
|
| |
| has_checkpointing = any(k in code_lower for k in [ |
| "checkpoint", "torch.utils.checkpoint", "checkpoint_sequential", |
| ]) |
| has_mixed_precision = any(k in code_lower for k in [ |
| "autocast", "float16", "half()", "gradscaler", "torch.float16", |
| ]) |
| has_fsdp = any(k in code_lower for k in [ |
| "fsdp", "fullyshardeddataparallel", "fully_sharded", |
| ]) |
| has_chunking = any(k in code_lower for k in [ |
| "chunk", "split", "batch_process", "micro_batch", |
| ]) |
|
|
| |
| optimization_count = sum([has_checkpointing, has_mixed_precision, has_fsdp, has_chunking]) |
|
|
| if optimization_count >= 2: |
| |
| return { |
| "status": "PASS", |
| "vram_peak_mb": 148, |
| "vram_peak_gb": 0.14, |
| "latency_ms": 1850, |
| "error_type": None, |
| "causal_trigger": "network_spike_post_fsdp" if has_fsdp else None, |
| "optimization_detected": ",".join(filter(None, [ |
| "gradient_checkpointing" if has_checkpointing else None, |
| "mixed_precision" if has_mixed_precision else None, |
| "fsdp_sharding" if has_fsdp else None, |
| "chunked_processing" if has_chunking else None, |
| ])), |
| "constraint_layers": { |
| "ram_cgroup": "500m", |
| "vram_fraction": 0.042, |
| "layer_triggered": "none (within budget)", |
| }, |
| } |
| elif optimization_count == 1: |
| |
| return { |
| "status": "PASS", |
| "vram_peak_mb": 380, |
| "vram_peak_gb": 0.37, |
| "latency_ms": 3200, |
| "error_type": None, |
| "causal_trigger": "network_spike_post_fsdp" if has_fsdp else None, |
| "optimization_detected": "gradient_checkpointing" if has_checkpointing |
| else "mixed_precision" if has_mixed_precision |
| else "fsdp_sharding" if has_fsdp |
| else "chunked_processing", |
| "constraint_layers": { |
| "ram_cgroup": "500m", |
| "vram_fraction": 0.042, |
| "layer_triggered": "none (within budget)", |
| }, |
| } |
| else: |
| |
| return { |
| "status": "OOMKilled", |
| "vram_peak_mb": 512, |
| "vram_peak_gb": 0.50, |
| "latency_ms": 0, |
| "error_type": "OOM_CUDA", |
| "causal_trigger": None, |
| "optimization_detected": None, |
| "constraint_layers": { |
| "ram_cgroup": "500m", |
| "vram_fraction": 0.042, |
| "layer_triggered": "Layer 2 (VRAM fraction)", |
| }, |
| } |
|
|
| def get_sandbox_health(self) -> dict: |
| """Check Docker sandbox readiness for /api/telemetry.""" |
| try: |
| return self.docker_sandbox.health_check() |
| except Exception as e: |
| return { |
| "docker_daemon": False, |
| "gpu_runtime": False, |
| "sandbox_image": False, |
| "error": str(e), |
| } |
|
|
| def get_challenge_stats(self) -> dict: |
| """Get tensor challenge statistics for dashboard display.""" |
| return self.challenge_generator.get_stats() |
|
|