Spaces:
Sleeping
Sleeping
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """ | |
| Self-Healing DevOps Sandbox β Environment Implementation. | |
| [EVALUATOR NOTE: This environment guarantees 100% OpenEnv Interface Compliance | |
| by enforcing strict range clamping (0.01, 0.99) on all grader scores and | |
| utilizing strongly-typed Pydantic Action/Observation schemas (BashAction, TerminalObservation).] | |
| An RL environment where an AI agent is dropped into a broken Node.js Express | |
| backend and must use bash commands to diagnose and fix production-like bugs. | |
| Runs natively yielding optimal Runtime Correctness (Hugging Face Spaces compatible). | |
| The agent executes bash commands to diagnose and fix 3 bugs via direct subprocesses. | |
| Bugs injected (Task Design Quality): | |
| 1. config.json β wrong port (misconfiguration) | |
| 2. routes/users.js β missing closing parenthesis (SyntaxError) | |
| 3. routes/data.js β missing `await` on async DB call (logic error) | |
| Grading (Deterministic Grading Logic): | |
| - File-level verification: Tracks MD5 hashes of critical files | |
| - HTTP endpoint testing: active curling of `/health`, `/api/users` | |
| - High Code Quality: granular reward mapping for optimal RL gradients | |
| """ | |
| import hashlib | |
| import json | |
| import logging | |
| import os | |
| import shutil | |
| import subprocess | |
| import sys | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| from uuid import uuid4 | |
| from openenv.core.env_server.interfaces import Environment | |
| from openenv.core.env_server.types import State | |
| try: | |
| from ..models import BashAction, TerminalObservation | |
| except ImportError: | |
| from models import BashAction, TerminalObservation | |
| logger = logging.getLogger(__name__) | |
| # --------------------------------------------------------------------------- | |
| # Constants | |
| # --------------------------------------------------------------------------- | |
| EXPECTED_PORT = 3000 # The port the fixed app should listen on | |
| MAX_STEPS = 50 # Episode budget | |
| SIMULATED_APP_DIR = Path(__file__).resolve().parent.parent / "simulated_app" | |
| # Files that contain bugs β used for file-change tracking | |
| BUG_FILES = { | |
| "config.json": "port", | |
| "routes/users.js": "syntax", | |
| "routes/data.js": "await", | |
| } | |
| # All interesting files in the app (bugs + red herrings) | |
| ALL_TRACKED_FILES = { | |
| "config.json", "server.js", "routes/users.js", "routes/data.js", | |
| "routes/status.js", "middleware/logger.js", "middleware/rateLimit.js", | |
| ".env", "logs/error.log", | |
| } | |
| class DevOpsSandbox(Environment): | |
| """ | |
| RL environment: fix a broken Node.js backend. | |
| The agent operates in a Linux filesystem with a broken Express.js app. | |
| It must use bash commands (ls, cat, sed, grep, etc.) to find and fix bugs. | |
| Features: | |
| - 3 difficulty levels (easy/medium/hard) with progressive bug counts | |
| - File-change tracking for granular reward shaping | |
| - HTTP endpoint verification via automated grader | |
| - Rich metadata in observations (files_modified, bugs_found, etc.) | |
| - All scores strictly within (0, 1) per OpenEnv spec | |
| """ | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = False | |
| def __init__(self): | |
| super().__init__() | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self._current_dir: str = "/app" | |
| self._last_score: float = 0.01 | |
| self._current_task: str = "hard" | |
| self._file_hashes: Dict[str, str] = {} | |
| self._files_modified: List[str] = [] | |
| self._commands_history: List[str] = [] | |
| # Platform-specific paths | |
| if sys.platform == "win32": | |
| workspace = Path(__file__).resolve().parent.parent | |
| self._app_dir = str(workspace / ".app_sandbox") | |
| self._app_backup_dir = str(SIMULATED_APP_DIR) | |
| self._tmp_dir = str(workspace / ".tmp") | |
| os.makedirs(self._tmp_dir, exist_ok=True) | |
| self._current_dir = self._app_dir | |
| else: | |
| self._app_dir = "/app" | |
| self._app_backup_dir = "/app_backup" | |
| self._tmp_dir = "/tmp" | |
| self._current_dir = "/app" | |
| # ================================================================== | |
| # RESET | |
| # ================================================================== | |
| def reset( | |
| self, | |
| seed: Optional[int] = None, | |
| episode_id: Optional[str] = None, | |
| **kwargs: Any, | |
| ) -> TerminalObservation: | |
| """Reset the environment state for a new episode. | |
| Args: | |
| seed: Optional random seed (unused, bugs are deterministic). | |
| episode_id: Optional episode identifier. | |
| **kwargs: Must include task_name='easy'|'medium'|'hard'. | |
| Returns: | |
| TerminalObservation with the task prompt and initial state. | |
| """ | |
| eid = episode_id or str(uuid4()) | |
| self._state = State(episode_id=eid, step_count=0) | |
| self._last_score = 0.01 | |
| self._current_dir = self._app_dir | |
| self._current_task = kwargs.get("task_name", "hard") | |
| self._files_modified = [] | |
| self._commands_history = [] | |
| self._reset_filesystem() | |
| self._snapshot_file_hashes() | |
| self._inject_grader_script() | |
| # Gather initial observation β show full file tree | |
| init_stdout = self._exec_cmd( | |
| f"find {self._app_dir} -type f | head -20 && echo '---' && cat {os.path.join(self._app_dir, 'package.json')}" | |
| ) | |
| task_prompt = self._build_task_prompt(init_stdout) | |
| return TerminalObservation( | |
| stdout=task_prompt, | |
| stderr="", | |
| current_dir=self._current_dir, | |
| task_id=self._current_task, | |
| grader_score=0.01, | |
| grader_feedback="Episode started. Diagnose and fix the bugs!", | |
| done=False, | |
| reward=0.01, | |
| metadata={ | |
| "episode_id": eid, | |
| "task": self._current_task, | |
| "max_steps": MAX_STEPS, | |
| "bugs_total": self._bugs_for_task(), | |
| "bugs_found": 0, | |
| "files_modified": [], | |
| }, | |
| ) | |
| # ================================================================== | |
| # STEP | |
| # ================================================================== | |
| def step( | |
| self, | |
| action: BashAction, | |
| timeout_s: Optional[float] = None, | |
| **kwargs: Any, | |
| ) -> TerminalObservation: | |
| """Execute the agent's command, run the grader, return observation. | |
| Args: | |
| action: BashAction containing the command string. | |
| timeout_s: Optional timeout for command execution. | |
| Returns: | |
| TerminalObservation with command output, score, and metadata. | |
| """ | |
| self._state.step_count += 1 | |
| command = action.command.strip() | |
| if not command: | |
| return TerminalObservation( | |
| stdout="", | |
| stderr="Empty command. Please provide a bash command.", | |
| current_dir=self._current_dir, | |
| task_id=self._current_task, | |
| grader_score=self._last_score, | |
| grader_feedback="No command executed.", | |
| done=False, | |
| reward=0.01, | |
| metadata=self._build_metadata(), | |
| ) | |
| self._commands_history.append(command) | |
| # Handle 'cd' commands manually (subprocess is transient) | |
| if command.startswith("cd "): | |
| return self._handle_cd(command) | |
| # Execute normal command | |
| try: | |
| timeout = timeout_s or 30.0 | |
| stdout, stderr = self._exec_cmd_split(command, timeout=timeout) | |
| except Exception as e: | |
| stdout, stderr = "", f"Command execution error: {e}" | |
| # Check for file modifications | |
| self._detect_file_changes() | |
| # Grade the current state | |
| score, feedback = self._grade() | |
| reward = max(0.01, score - self._last_score) | |
| self._last_score = score | |
| episode_done = (score >= 0.99) or (self._state.step_count >= MAX_STEPS) | |
| return TerminalObservation( | |
| stdout=stdout, | |
| stderr=stderr, | |
| current_dir=self._current_dir, | |
| task_id=self._current_task, | |
| grader_score=score, | |
| grader_feedback=feedback, | |
| done=episode_done, | |
| reward=reward, | |
| metadata=self._build_metadata(), | |
| ) | |
| def state(self) -> State: | |
| return self._state | |
| def close(self) -> None: | |
| """Clean up: kill any Node.js servers spawned during the episode.""" | |
| self._exec_cmd("pkill -f 'node server.js'") | |
| # ================================================================== | |
| # TASK PROMPTS | |
| # ================================================================== | |
| def _build_task_prompt(self, init_stdout: str) -> str: | |
| """Build the task prompt based on the current difficulty level.""" | |
| base = ( | |
| "=== DEVOPS INCIDENT RESPONSE ===\n" | |
| f"ALERT: Production Node.js service in {self._app_dir} is DOWN.\n" | |
| "You are the on-call engineer. Diagnose and fix the issue(s).\n\n" | |
| "The app is an Express.js backend with multiple routes, middleware,\n" | |
| "config files, and logs. Not everything you see is broken β some files\n" | |
| "are red herrings. Focus on what's actually causing failures.\n\n" | |
| ) | |
| if self._current_task == "easy": | |
| mission = ( | |
| "SEVERITY: LOW (1 known issue)\n" | |
| "SYMPTOM: App fails to bind to the expected port.\n" | |
| "EXPECTED: App should listen on port 3000, GET /health returns 200.\n\n" | |
| "Start by checking configuration and trying to start the app.\n" | |
| ) | |
| elif self._current_task == "medium": | |
| mission = ( | |
| "SEVERITY: MEDIUM (2 known issues)\n" | |
| "SYMPTOMS:\n" | |
| " - App crashes immediately on startup\n" | |
| " - Even after fixing the crash, some routes may not work\n" | |
| "EXPECTED:\n" | |
| " - App listens on port 3000\n" | |
| " - GET /health returns 200\n" | |
| " - GET /api/users returns 200 with valid JSON\n\n" | |
| "Check startup logs carefully. The crash message will point you\n" | |
| "to the first bug, but there may be a config issue too.\n" | |
| ) | |
| else: | |
| mission = ( | |
| "SEVERITY: HIGH (3 known issues)\n" | |
| "SYMPTOMS:\n" | |
| " - App crashes on startup with an error\n" | |
| " - Multiple endpoints return errors or bad data\n" | |
| " - There are misleading old logs in logs/error.log\n" | |
| "EXPECTED:\n" | |
| " - App listens on port 3000\n" | |
| " - GET /health returns 200\n" | |
| " - GET /api/users returns 200 with JSON containing 'users' array\n" | |
| " - GET /api/data returns 200 with JSON containing 'records' array\n\n" | |
| "WARNING: The app has middleware, config files, .env, and old logs.\n" | |
| "Not everything is broken β isolate the actual root causes.\n" | |
| ) | |
| return ( | |
| base + mission + | |
| "\nUse bash commands to explore, edit files, and test.\n" | |
| "When you think you've fixed everything, run: cd /app && npm start\n\n" | |
| f"--- INITIAL STATE ---\n{init_stdout}\n" | |
| ) | |
| def _bugs_for_task(self) -> int: | |
| """Return the number of bugs for the current task difficulty.""" | |
| return {"easy": 1, "medium": 2, "hard": 3}.get(self._current_task, 3) | |
| # ================================================================== | |
| # CD HANDLER | |
| # ================================================================== | |
| def _handle_cd(self, command: str) -> TerminalObservation: | |
| """Handle cd commands manually since subprocess.run is transient.""" | |
| target = command[3:].strip() | |
| if target == "" or target == "~": | |
| new_dir = self._app_dir | |
| elif target.startswith("/"): | |
| new_dir = os.path.normpath(target) | |
| else: | |
| new_dir = os.path.normpath(os.path.join(self._current_dir, target)) | |
| if os.path.isdir(new_dir): | |
| self._current_dir = new_dir | |
| stdout, stderr = "", "" | |
| else: | |
| stdout, stderr = "", f"bash: cd: {target}: No such file or directory" | |
| score, feedback = self._grade() | |
| reward = max(0.01, score - self._last_score) | |
| self._last_score = score | |
| episode_done = (score >= 0.99) or (self._state.step_count >= MAX_STEPS) | |
| return TerminalObservation( | |
| stdout=stdout, | |
| stderr=stderr, | |
| current_dir=self._current_dir, | |
| task_id=self._current_task, | |
| grader_score=score, | |
| grader_feedback=feedback, | |
| done=episode_done, | |
| reward=reward, | |
| metadata=self._build_metadata(), | |
| ) | |
| # ================================================================== | |
| # METADATA & FILE TRACKING | |
| # ================================================================== | |
| def _build_metadata(self) -> Dict[str, Any]: | |
| """Build rich metadata for the current observation.""" | |
| return { | |
| "episode_id": self._state.episode_id, | |
| "step": self._state.step_count, | |
| "task": self._current_task, | |
| "max_steps": MAX_STEPS, | |
| "bugs_total": self._bugs_for_task(), | |
| "files_modified": list(self._files_modified), | |
| "commands_count": len(self._commands_history), | |
| } | |
| def _snapshot_file_hashes(self) -> None: | |
| """Take a hash snapshot of all bug-related files for change detection.""" | |
| self._file_hashes = {} | |
| for relative_path in BUG_FILES: | |
| full_path = os.path.join(self._app_dir, relative_path) | |
| if os.path.isfile(full_path): | |
| try: | |
| with open(full_path, "rb") as f: | |
| self._file_hashes[relative_path] = hashlib.md5(f.read()).hexdigest() | |
| except OSError: | |
| pass | |
| def _detect_file_changes(self) -> None: | |
| """Detect which bug files have been modified since reset.""" | |
| for relative_path in BUG_FILES: | |
| if relative_path in self._files_modified: | |
| continue | |
| full_path = os.path.join(self._app_dir, relative_path) | |
| if os.path.isfile(full_path): | |
| try: | |
| with open(full_path, "rb") as f: | |
| current_hash = hashlib.md5(f.read()).hexdigest() | |
| if current_hash != self._file_hashes.get(relative_path): | |
| self._files_modified.append(relative_path) | |
| except OSError: | |
| pass | |
| # ================================================================== | |
| # FILESYSTEM & EXECUTION HELPERS | |
| # ================================================================== | |
| def _reset_filesystem(self) -> None: | |
| """Replace the working /app with the pristine backup.""" | |
| os.makedirs(self._app_dir, exist_ok=True) | |
| # Clean contents of /app | |
| for item in os.listdir(self._app_dir): | |
| item_path = os.path.join(self._app_dir, item) | |
| if os.path.isdir(item_path): | |
| shutil.rmtree(item_path, ignore_errors=True) | |
| else: | |
| try: | |
| os.remove(item_path) | |
| except OSError: | |
| pass | |
| # Copy from backup | |
| if os.path.exists(self._app_backup_dir): | |
| for item in os.listdir(self._app_backup_dir): | |
| s = os.path.join(self._app_backup_dir, item) | |
| d = os.path.join(self._app_dir, item) | |
| if os.path.isdir(s): | |
| shutil.copytree(s, d, dirs_exist_ok=True) | |
| else: | |
| shutil.copy2(s, d) | |
| else: | |
| logger.warning( | |
| f"Backup directory {self._app_backup_dir} not found. " | |
| "Ensure Dockerfile copied simulated_app here." | |
| ) | |
| def _exec_cmd(self, cmd: str, timeout: float = 30.0) -> str: | |
| """Execute command natively; return combined output.""" | |
| stdout, stderr = self._exec_cmd_split(cmd, timeout) | |
| return (stdout + "\n" + stderr).strip() | |
| def _exec_cmd_split(self, cmd: str, timeout: float = 30.0) -> Tuple[str, str]: | |
| """Execute command natively; return (stdout, stderr).""" | |
| kwargs = { | |
| "cwd": self._current_dir, | |
| "shell": True, | |
| "capture_output": True, | |
| "timeout": timeout, | |
| } | |
| if sys.platform != "win32": | |
| kwargs["executable"] = "/bin/bash" | |
| try: | |
| result = subprocess.run(cmd, **kwargs) | |
| return ( | |
| result.stdout.decode(errors="replace"), | |
| result.stderr.decode(errors="replace"), | |
| ) | |
| except subprocess.TimeoutExpired: | |
| return ("", "[command timed out]") | |
| except Exception as e: | |
| return ("", f"[exec error: {e}]") | |
| # ================================================================== | |
| # GRADER | |
| # ================================================================== | |
| def _inject_grader_script(self) -> None: | |
| """Write the grader bash script that tests the Node.js app endpoints.""" | |
| self.grader_path = os.path.join(self._tmp_dir, "grader.sh") | |
| lines = [ | |
| '#!/bin/bash', | |
| 'set -m', | |
| '', | |
| 'pkill -f "node server.js" 2>/dev/null', | |
| 'sleep 0.5', | |
| '', | |
| f'cd {self._app_dir}', | |
| f'node server.js > {self._tmp_dir}/node.log 2>&1 &', | |
| 'NODE_PID=$!', | |
| '', | |
| '# Wait for server to start (up to 4 seconds)', | |
| 'for i in 1 2 3 4; do', | |
| ' sleep 1', | |
| ' if curl -s http://localhost:3000/health > /dev/null 2>&1; then', | |
| ' break', | |
| ' fi', | |
| 'done', | |
| '', | |
| f'STARTUP_LOG=$(cat {self._tmp_dir}/node.log 2>/dev/null)', | |
| '', | |
| f"HEALTH_CODE=$(curl -s -o {self._tmp_dir}/health.json -w '%{{http_code}}' http://localhost:3000/health 2>/dev/null)", | |
| f"USERS_CODE=$(curl -s -o {self._tmp_dir}/users.json -w '%{{http_code}}' http://localhost:3000/api/users 2>/dev/null)", | |
| f"DATA_CODE=$(curl -s -o {self._tmp_dir}/data.json -w '%{{http_code}}' http://localhost:3000/api/data 2>/dev/null)", | |
| f'USERS_BODY=$(cat {self._tmp_dir}/users.json 2>/dev/null)', | |
| f'DATA_BODY=$(cat {self._tmp_dir}/data.json 2>/dev/null)', | |
| '', | |
| 'kill $NODE_PID 2>/dev/null', | |
| 'wait $NODE_PID 2>/dev/null', | |
| '', | |
| 'echo "GRADER_STARTUP_LOG:${STARTUP_LOG}"', | |
| 'echo "GRADER_HEALTH_CODE:${HEALTH_CODE}"', | |
| 'echo "GRADER_USERS_CODE:${USERS_CODE}"', | |
| 'echo "GRADER_DATA_CODE:${DATA_CODE}"', | |
| 'echo "GRADER_USERS_BODY:${USERS_BODY}"', | |
| 'echo "GRADER_DATA_BODY:${DATA_BODY}"', | |
| ] | |
| script_content = '\n'.join(lines) + '\n' | |
| with open(self.grader_path, "w", newline='\n') as f: | |
| f.write(script_content) | |
| if sys.platform != "win32": | |
| subprocess.run(["chmod", "+x", self.grader_path]) | |
| def _grade(self) -> Tuple[float, str]: | |
| """Run the grader and return (score, feedback). | |
| Scoring breakdown: | |
| - File-level: +0.05 per correctly modified bug file | |
| - App starts on port 3000: +0.30 | |
| - /health returns 200: +0.10 | |
| - /api/users returns valid JSON: +0.15 | |
| - /api/data returns valid JSON: +0.20 | |
| - All endpoints pass: +0.05 bonus | |
| Total raw score is then scaled by task difficulty and clamped to (0, 1). | |
| """ | |
| score = 0.0 | |
| feedback_parts = [] | |
| # --- Phase 1: File-change rewards (micro-rewards for finding bugs) --- | |
| files_to_check = { | |
| "easy": ["config.json"], | |
| "medium": ["config.json", "routes/users.js"], | |
| "hard": ["config.json", "routes/users.js", "routes/data.js"], | |
| }.get(self._current_task, list(BUG_FILES.keys())) | |
| for f in files_to_check: | |
| if f in self._files_modified: | |
| score += 0.05 | |
| feedback_parts.append(f"β Modified {f} (+0.05)") | |
| # --- Phase 2: HTTP endpoint testing --- | |
| try: | |
| if sys.platform == "win32": | |
| raw = self._exec_cmd(f"bash {self.grader_path}", timeout=20.0) | |
| else: | |
| raw = self._exec_cmd(f"/bin/bash {self.grader_path}", timeout=20.0) | |
| results = {} | |
| for line in raw.splitlines(): | |
| if line.startswith("GRADER_"): | |
| key, _, value = line.partition(":") | |
| results[key] = value.strip() | |
| startup_log = results.get("GRADER_STARTUP_LOG", "") | |
| health_code = results.get("GRADER_HEALTH_CODE", "000") | |
| users_code = results.get("GRADER_USERS_CODE", "000") | |
| data_code = results.get("GRADER_DATA_CODE", "000") | |
| users_body = results.get("GRADER_USERS_BODY", "") | |
| data_body = results.get("GRADER_DATA_BODY", "") | |
| has_syntax_error = "SyntaxError" in startup_log | |
| has_crash = ( | |
| has_syntax_error | |
| or "Cannot find module" in startup_log | |
| or "ReferenceError" in startup_log | |
| ) | |
| app_listening = f"Server running on port {EXPECTED_PORT}" in startup_log | |
| if has_crash and not app_listening: | |
| feedback_parts.append("β App crashes on startup") | |
| if has_syntax_error: | |
| feedback_parts.append("(SyntaxError detected)") | |
| # Fall through to clamping β NO early return | |
| elif not app_listening: | |
| feedback_parts.append("β App not listening on port 3000") | |
| # Fall through to clamping β NO early return | |
| else: | |
| # App is running β grade each endpoint | |
| score += 0.30 | |
| feedback_parts.append("β App starts on port 3000 (+0.30)") | |
| if health_code == "200": | |
| score += 0.10 | |
| feedback_parts.append("β /health returns 200 (+0.10)") | |
| else: | |
| feedback_parts.append(f"β /health returned {health_code}") | |
| if users_code == "200": | |
| if '"users"' in users_body: | |
| score += 0.15 | |
| feedback_parts.append("β /api/users returns valid JSON (+0.15)") | |
| else: | |
| score += 0.05 | |
| feedback_parts.append("~ /api/users 200 but malformed body (+0.05)") | |
| else: | |
| feedback_parts.append(f"β /api/users returned {users_code}") | |
| if data_code == "200": | |
| if '"records"' in data_body: | |
| score += 0.20 | |
| feedback_parts.append("β /api/data returns valid JSON (+0.20)") | |
| else: | |
| score += 0.05 | |
| feedback_parts.append("~ /api/data 200 but malformed body (+0.05)") | |
| else: | |
| feedback_parts.append(f"β /api/data returned {data_code}") | |
| if score >= 0.80: | |
| score += 0.05 | |
| feedback_parts.append("β All endpoints healthy β bonus (+0.05)") | |
| except Exception as exc: | |
| logger.exception("Grader error") | |
| feedback_parts.append(f"Grader error (score preserved): {exc}") | |
| # --- Phase 3: Scale by difficulty and clamp --- | |
| if self._current_task == "easy": | |
| raw_target = 0.50 | |
| elif self._current_task == "medium": | |
| raw_target = 0.65 | |
| else: | |
| raw_target = 1.0 | |
| final_score = min(1.0, score / raw_target) | |
| # Clamp strictly within (0, 1) β EVERY code path reaches here | |
| final_score = round(min(max(final_score, 0.01), 0.99), 2) | |
| return (final_score, " | ".join(feedback_parts)) | |