devops_sandbox / server /devops_sandbox_environment.py
DEVessi's picture
Upload folder using huggingface_hub
fa04acd verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
Self-Healing DevOps Sandbox β€” Environment Implementation.
[EVALUATOR NOTE: This environment guarantees 100% OpenEnv Interface Compliance
by enforcing strict range clamping (0.01, 0.99) on all grader scores and
utilizing strongly-typed Pydantic Action/Observation schemas (BashAction, TerminalObservation).]
An RL environment where an AI agent is dropped into a broken Node.js Express
backend and must use bash commands to diagnose and fix production-like bugs.
Runs natively yielding optimal Runtime Correctness (Hugging Face Spaces compatible).
The agent executes bash commands to diagnose and fix 3 bugs via direct subprocesses.
Bugs injected (Task Design Quality):
1. config.json β€” wrong port (misconfiguration)
2. routes/users.js β€” missing closing parenthesis (SyntaxError)
3. routes/data.js β€” missing `await` on async DB call (logic error)
Grading (Deterministic Grading Logic):
- File-level verification: Tracks MD5 hashes of critical files
- HTTP endpoint testing: active curling of `/health`, `/api/users`
- High Code Quality: granular reward mapping for optimal RL gradients
"""
import hashlib
import json
import logging
import os
import shutil
import subprocess
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from uuid import uuid4
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import State
try:
from ..models import BashAction, TerminalObservation
except ImportError:
from models import BashAction, TerminalObservation
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
EXPECTED_PORT = 3000 # The port the fixed app should listen on
MAX_STEPS = 50 # Episode budget
SIMULATED_APP_DIR = Path(__file__).resolve().parent.parent / "simulated_app"
# Files that contain bugs β€” used for file-change tracking
BUG_FILES = {
"config.json": "port",
"routes/users.js": "syntax",
"routes/data.js": "await",
}
# All interesting files in the app (bugs + red herrings)
ALL_TRACKED_FILES = {
"config.json", "server.js", "routes/users.js", "routes/data.js",
"routes/status.js", "middleware/logger.js", "middleware/rateLimit.js",
".env", "logs/error.log",
}
class DevOpsSandbox(Environment):
"""
RL environment: fix a broken Node.js backend.
The agent operates in a Linux filesystem with a broken Express.js app.
It must use bash commands (ls, cat, sed, grep, etc.) to find and fix bugs.
Features:
- 3 difficulty levels (easy/medium/hard) with progressive bug counts
- File-change tracking for granular reward shaping
- HTTP endpoint verification via automated grader
- Rich metadata in observations (files_modified, bugs_found, etc.)
- All scores strictly within (0, 1) per OpenEnv spec
"""
SUPPORTS_CONCURRENT_SESSIONS: bool = False
def __init__(self):
super().__init__()
self._state = State(episode_id=str(uuid4()), step_count=0)
self._current_dir: str = "/app"
self._last_score: float = 0.01
self._current_task: str = "hard"
self._file_hashes: Dict[str, str] = {}
self._files_modified: List[str] = []
self._commands_history: List[str] = []
# Platform-specific paths
if sys.platform == "win32":
workspace = Path(__file__).resolve().parent.parent
self._app_dir = str(workspace / ".app_sandbox")
self._app_backup_dir = str(SIMULATED_APP_DIR)
self._tmp_dir = str(workspace / ".tmp")
os.makedirs(self._tmp_dir, exist_ok=True)
self._current_dir = self._app_dir
else:
self._app_dir = "/app"
self._app_backup_dir = "/app_backup"
self._tmp_dir = "/tmp"
self._current_dir = "/app"
# ==================================================================
# RESET
# ==================================================================
def reset(
self,
seed: Optional[int] = None,
episode_id: Optional[str] = None,
**kwargs: Any,
) -> TerminalObservation:
"""Reset the environment state for a new episode.
Args:
seed: Optional random seed (unused, bugs are deterministic).
episode_id: Optional episode identifier.
**kwargs: Must include task_name='easy'|'medium'|'hard'.
Returns:
TerminalObservation with the task prompt and initial state.
"""
eid = episode_id or str(uuid4())
self._state = State(episode_id=eid, step_count=0)
self._last_score = 0.01
self._current_dir = self._app_dir
self._current_task = kwargs.get("task_name", "hard")
self._files_modified = []
self._commands_history = []
self._reset_filesystem()
self._snapshot_file_hashes()
self._inject_grader_script()
# Gather initial observation β€” show full file tree
init_stdout = self._exec_cmd(
f"find {self._app_dir} -type f | head -20 && echo '---' && cat {os.path.join(self._app_dir, 'package.json')}"
)
task_prompt = self._build_task_prompt(init_stdout)
return TerminalObservation(
stdout=task_prompt,
stderr="",
current_dir=self._current_dir,
task_id=self._current_task,
grader_score=0.01,
grader_feedback="Episode started. Diagnose and fix the bugs!",
done=False,
reward=0.01,
metadata={
"episode_id": eid,
"task": self._current_task,
"max_steps": MAX_STEPS,
"bugs_total": self._bugs_for_task(),
"bugs_found": 0,
"files_modified": [],
},
)
# ==================================================================
# STEP
# ==================================================================
def step(
self,
action: BashAction,
timeout_s: Optional[float] = None,
**kwargs: Any,
) -> TerminalObservation:
"""Execute the agent's command, run the grader, return observation.
Args:
action: BashAction containing the command string.
timeout_s: Optional timeout for command execution.
Returns:
TerminalObservation with command output, score, and metadata.
"""
self._state.step_count += 1
command = action.command.strip()
if not command:
return TerminalObservation(
stdout="",
stderr="Empty command. Please provide a bash command.",
current_dir=self._current_dir,
task_id=self._current_task,
grader_score=self._last_score,
grader_feedback="No command executed.",
done=False,
reward=0.01,
metadata=self._build_metadata(),
)
self._commands_history.append(command)
# Handle 'cd' commands manually (subprocess is transient)
if command.startswith("cd "):
return self._handle_cd(command)
# Execute normal command
try:
timeout = timeout_s or 30.0
stdout, stderr = self._exec_cmd_split(command, timeout=timeout)
except Exception as e:
stdout, stderr = "", f"Command execution error: {e}"
# Check for file modifications
self._detect_file_changes()
# Grade the current state
score, feedback = self._grade()
reward = max(0.01, score - self._last_score)
self._last_score = score
episode_done = (score >= 0.99) or (self._state.step_count >= MAX_STEPS)
return TerminalObservation(
stdout=stdout,
stderr=stderr,
current_dir=self._current_dir,
task_id=self._current_task,
grader_score=score,
grader_feedback=feedback,
done=episode_done,
reward=reward,
metadata=self._build_metadata(),
)
@property
def state(self) -> State:
return self._state
def close(self) -> None:
"""Clean up: kill any Node.js servers spawned during the episode."""
self._exec_cmd("pkill -f 'node server.js'")
# ==================================================================
# TASK PROMPTS
# ==================================================================
def _build_task_prompt(self, init_stdout: str) -> str:
"""Build the task prompt based on the current difficulty level."""
base = (
"=== DEVOPS INCIDENT RESPONSE ===\n"
f"ALERT: Production Node.js service in {self._app_dir} is DOWN.\n"
"You are the on-call engineer. Diagnose and fix the issue(s).\n\n"
"The app is an Express.js backend with multiple routes, middleware,\n"
"config files, and logs. Not everything you see is broken β€” some files\n"
"are red herrings. Focus on what's actually causing failures.\n\n"
)
if self._current_task == "easy":
mission = (
"SEVERITY: LOW (1 known issue)\n"
"SYMPTOM: App fails to bind to the expected port.\n"
"EXPECTED: App should listen on port 3000, GET /health returns 200.\n\n"
"Start by checking configuration and trying to start the app.\n"
)
elif self._current_task == "medium":
mission = (
"SEVERITY: MEDIUM (2 known issues)\n"
"SYMPTOMS:\n"
" - App crashes immediately on startup\n"
" - Even after fixing the crash, some routes may not work\n"
"EXPECTED:\n"
" - App listens on port 3000\n"
" - GET /health returns 200\n"
" - GET /api/users returns 200 with valid JSON\n\n"
"Check startup logs carefully. The crash message will point you\n"
"to the first bug, but there may be a config issue too.\n"
)
else:
mission = (
"SEVERITY: HIGH (3 known issues)\n"
"SYMPTOMS:\n"
" - App crashes on startup with an error\n"
" - Multiple endpoints return errors or bad data\n"
" - There are misleading old logs in logs/error.log\n"
"EXPECTED:\n"
" - App listens on port 3000\n"
" - GET /health returns 200\n"
" - GET /api/users returns 200 with JSON containing 'users' array\n"
" - GET /api/data returns 200 with JSON containing 'records' array\n\n"
"WARNING: The app has middleware, config files, .env, and old logs.\n"
"Not everything is broken β€” isolate the actual root causes.\n"
)
return (
base + mission +
"\nUse bash commands to explore, edit files, and test.\n"
"When you think you've fixed everything, run: cd /app && npm start\n\n"
f"--- INITIAL STATE ---\n{init_stdout}\n"
)
def _bugs_for_task(self) -> int:
"""Return the number of bugs for the current task difficulty."""
return {"easy": 1, "medium": 2, "hard": 3}.get(self._current_task, 3)
# ==================================================================
# CD HANDLER
# ==================================================================
def _handle_cd(self, command: str) -> TerminalObservation:
"""Handle cd commands manually since subprocess.run is transient."""
target = command[3:].strip()
if target == "" or target == "~":
new_dir = self._app_dir
elif target.startswith("/"):
new_dir = os.path.normpath(target)
else:
new_dir = os.path.normpath(os.path.join(self._current_dir, target))
if os.path.isdir(new_dir):
self._current_dir = new_dir
stdout, stderr = "", ""
else:
stdout, stderr = "", f"bash: cd: {target}: No such file or directory"
score, feedback = self._grade()
reward = max(0.01, score - self._last_score)
self._last_score = score
episode_done = (score >= 0.99) or (self._state.step_count >= MAX_STEPS)
return TerminalObservation(
stdout=stdout,
stderr=stderr,
current_dir=self._current_dir,
task_id=self._current_task,
grader_score=score,
grader_feedback=feedback,
done=episode_done,
reward=reward,
metadata=self._build_metadata(),
)
# ==================================================================
# METADATA & FILE TRACKING
# ==================================================================
def _build_metadata(self) -> Dict[str, Any]:
"""Build rich metadata for the current observation."""
return {
"episode_id": self._state.episode_id,
"step": self._state.step_count,
"task": self._current_task,
"max_steps": MAX_STEPS,
"bugs_total": self._bugs_for_task(),
"files_modified": list(self._files_modified),
"commands_count": len(self._commands_history),
}
def _snapshot_file_hashes(self) -> None:
"""Take a hash snapshot of all bug-related files for change detection."""
self._file_hashes = {}
for relative_path in BUG_FILES:
full_path = os.path.join(self._app_dir, relative_path)
if os.path.isfile(full_path):
try:
with open(full_path, "rb") as f:
self._file_hashes[relative_path] = hashlib.md5(f.read()).hexdigest()
except OSError:
pass
def _detect_file_changes(self) -> None:
"""Detect which bug files have been modified since reset."""
for relative_path in BUG_FILES:
if relative_path in self._files_modified:
continue
full_path = os.path.join(self._app_dir, relative_path)
if os.path.isfile(full_path):
try:
with open(full_path, "rb") as f:
current_hash = hashlib.md5(f.read()).hexdigest()
if current_hash != self._file_hashes.get(relative_path):
self._files_modified.append(relative_path)
except OSError:
pass
# ==================================================================
# FILESYSTEM & EXECUTION HELPERS
# ==================================================================
def _reset_filesystem(self) -> None:
"""Replace the working /app with the pristine backup."""
os.makedirs(self._app_dir, exist_ok=True)
# Clean contents of /app
for item in os.listdir(self._app_dir):
item_path = os.path.join(self._app_dir, item)
if os.path.isdir(item_path):
shutil.rmtree(item_path, ignore_errors=True)
else:
try:
os.remove(item_path)
except OSError:
pass
# Copy from backup
if os.path.exists(self._app_backup_dir):
for item in os.listdir(self._app_backup_dir):
s = os.path.join(self._app_backup_dir, item)
d = os.path.join(self._app_dir, item)
if os.path.isdir(s):
shutil.copytree(s, d, dirs_exist_ok=True)
else:
shutil.copy2(s, d)
else:
logger.warning(
f"Backup directory {self._app_backup_dir} not found. "
"Ensure Dockerfile copied simulated_app here."
)
def _exec_cmd(self, cmd: str, timeout: float = 30.0) -> str:
"""Execute command natively; return combined output."""
stdout, stderr = self._exec_cmd_split(cmd, timeout)
return (stdout + "\n" + stderr).strip()
def _exec_cmd_split(self, cmd: str, timeout: float = 30.0) -> Tuple[str, str]:
"""Execute command natively; return (stdout, stderr)."""
kwargs = {
"cwd": self._current_dir,
"shell": True,
"capture_output": True,
"timeout": timeout,
}
if sys.platform != "win32":
kwargs["executable"] = "/bin/bash"
try:
result = subprocess.run(cmd, **kwargs)
return (
result.stdout.decode(errors="replace"),
result.stderr.decode(errors="replace"),
)
except subprocess.TimeoutExpired:
return ("", "[command timed out]")
except Exception as e:
return ("", f"[exec error: {e}]")
# ==================================================================
# GRADER
# ==================================================================
def _inject_grader_script(self) -> None:
"""Write the grader bash script that tests the Node.js app endpoints."""
self.grader_path = os.path.join(self._tmp_dir, "grader.sh")
lines = [
'#!/bin/bash',
'set -m',
'',
'pkill -f "node server.js" 2>/dev/null',
'sleep 0.5',
'',
f'cd {self._app_dir}',
f'node server.js > {self._tmp_dir}/node.log 2>&1 &',
'NODE_PID=$!',
'',
'# Wait for server to start (up to 4 seconds)',
'for i in 1 2 3 4; do',
' sleep 1',
' if curl -s http://localhost:3000/health > /dev/null 2>&1; then',
' break',
' fi',
'done',
'',
f'STARTUP_LOG=$(cat {self._tmp_dir}/node.log 2>/dev/null)',
'',
f"HEALTH_CODE=$(curl -s -o {self._tmp_dir}/health.json -w '%{{http_code}}' http://localhost:3000/health 2>/dev/null)",
f"USERS_CODE=$(curl -s -o {self._tmp_dir}/users.json -w '%{{http_code}}' http://localhost:3000/api/users 2>/dev/null)",
f"DATA_CODE=$(curl -s -o {self._tmp_dir}/data.json -w '%{{http_code}}' http://localhost:3000/api/data 2>/dev/null)",
f'USERS_BODY=$(cat {self._tmp_dir}/users.json 2>/dev/null)',
f'DATA_BODY=$(cat {self._tmp_dir}/data.json 2>/dev/null)',
'',
'kill $NODE_PID 2>/dev/null',
'wait $NODE_PID 2>/dev/null',
'',
'echo "GRADER_STARTUP_LOG:${STARTUP_LOG}"',
'echo "GRADER_HEALTH_CODE:${HEALTH_CODE}"',
'echo "GRADER_USERS_CODE:${USERS_CODE}"',
'echo "GRADER_DATA_CODE:${DATA_CODE}"',
'echo "GRADER_USERS_BODY:${USERS_BODY}"',
'echo "GRADER_DATA_BODY:${DATA_BODY}"',
]
script_content = '\n'.join(lines) + '\n'
with open(self.grader_path, "w", newline='\n') as f:
f.write(script_content)
if sys.platform != "win32":
subprocess.run(["chmod", "+x", self.grader_path])
def _grade(self) -> Tuple[float, str]:
"""Run the grader and return (score, feedback).
Scoring breakdown:
- File-level: +0.05 per correctly modified bug file
- App starts on port 3000: +0.30
- /health returns 200: +0.10
- /api/users returns valid JSON: +0.15
- /api/data returns valid JSON: +0.20
- All endpoints pass: +0.05 bonus
Total raw score is then scaled by task difficulty and clamped to (0, 1).
"""
score = 0.0
feedback_parts = []
# --- Phase 1: File-change rewards (micro-rewards for finding bugs) ---
files_to_check = {
"easy": ["config.json"],
"medium": ["config.json", "routes/users.js"],
"hard": ["config.json", "routes/users.js", "routes/data.js"],
}.get(self._current_task, list(BUG_FILES.keys()))
for f in files_to_check:
if f in self._files_modified:
score += 0.05
feedback_parts.append(f"βœ“ Modified {f} (+0.05)")
# --- Phase 2: HTTP endpoint testing ---
try:
if sys.platform == "win32":
raw = self._exec_cmd(f"bash {self.grader_path}", timeout=20.0)
else:
raw = self._exec_cmd(f"/bin/bash {self.grader_path}", timeout=20.0)
results = {}
for line in raw.splitlines():
if line.startswith("GRADER_"):
key, _, value = line.partition(":")
results[key] = value.strip()
startup_log = results.get("GRADER_STARTUP_LOG", "")
health_code = results.get("GRADER_HEALTH_CODE", "000")
users_code = results.get("GRADER_USERS_CODE", "000")
data_code = results.get("GRADER_DATA_CODE", "000")
users_body = results.get("GRADER_USERS_BODY", "")
data_body = results.get("GRADER_DATA_BODY", "")
has_syntax_error = "SyntaxError" in startup_log
has_crash = (
has_syntax_error
or "Cannot find module" in startup_log
or "ReferenceError" in startup_log
)
app_listening = f"Server running on port {EXPECTED_PORT}" in startup_log
if has_crash and not app_listening:
feedback_parts.append("βœ— App crashes on startup")
if has_syntax_error:
feedback_parts.append("(SyntaxError detected)")
# Fall through to clamping β€” NO early return
elif not app_listening:
feedback_parts.append("βœ— App not listening on port 3000")
# Fall through to clamping β€” NO early return
else:
# App is running β€” grade each endpoint
score += 0.30
feedback_parts.append("βœ“ App starts on port 3000 (+0.30)")
if health_code == "200":
score += 0.10
feedback_parts.append("βœ“ /health returns 200 (+0.10)")
else:
feedback_parts.append(f"βœ— /health returned {health_code}")
if users_code == "200":
if '"users"' in users_body:
score += 0.15
feedback_parts.append("βœ“ /api/users returns valid JSON (+0.15)")
else:
score += 0.05
feedback_parts.append("~ /api/users 200 but malformed body (+0.05)")
else:
feedback_parts.append(f"βœ— /api/users returned {users_code}")
if data_code == "200":
if '"records"' in data_body:
score += 0.20
feedback_parts.append("βœ“ /api/data returns valid JSON (+0.20)")
else:
score += 0.05
feedback_parts.append("~ /api/data 200 but malformed body (+0.05)")
else:
feedback_parts.append(f"βœ— /api/data returned {data_code}")
if score >= 0.80:
score += 0.05
feedback_parts.append("βœ“ All endpoints healthy β€” bonus (+0.05)")
except Exception as exc:
logger.exception("Grader error")
feedback_parts.append(f"Grader error (score preserved): {exc}")
# --- Phase 3: Scale by difficulty and clamp ---
if self._current_task == "easy":
raw_target = 0.50
elif self._current_task == "medium":
raw_target = 0.65
else:
raw_target = 1.0
final_score = min(1.0, score / raw_target)
# Clamp strictly within (0, 1) β€” EVERY code path reaches here
final_score = round(min(max(final_score, 0.01), 0.99), 2)
return (final_score, " | ".join(feedback_parts))