File size: 4,699 Bytes
e2cf8f8 9940e16 e2cf8f8 0ee66d2 e2cf8f8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | """
AgentDebuggerEnv — Sandboxed Code Execution
============================================
Isolated execution environment for user-submitted code, providing
security through AST-based import filtering, subprocess isolation,
and runtime constraints.
"""
import subprocess
import tempfile
import os
import time
import ast
from typing import Tuple
BLOCKED_IMPORTS = [
"os", "sys", "subprocess", "socket", "importlib", "shutil",
"pathlib", "glob", "pickle", "shelve", "dbm", "sqlite3",
"ftplib", "http", "urllib", "requests", "httpx", "asyncio",
"multiprocessing", "threading",
"ctypes", "cffi", "resource", "signal", "mmap", "gc"
]
EXECUTION_TIMEOUT_SECONDS = 10
MEMORY_LIMIT_MB = 256
def _build_import_checker(blocked: list[str]) -> str:
"""Build a Python script snippet that checks for blocked imports using AST parsing."""
blocked_repr = repr(blocked)
return f'''
import ast as _ast
import sys as _sys
_BLOCKED = {blocked_repr}
_source_to_check = open(__file__).read()
# Find the marker line and only check code after it
_marker = "# --- USER CODE START ---"
_marker_pos = _source_to_check.find(_marker)
if _marker_pos != -1:
_source_to_check = _source_to_check[_marker_pos + len(_marker):]
try:
_tree = _ast.parse(_source_to_check)
except SyntaxError:
pass # Let the actual execution catch syntax errors
else:
for _node in _ast.walk(_tree):
if isinstance(_node, _ast.Import):
for _alias in _node.names:
_top = _alias.name.split(".")[0]
if _top in _BLOCKED:
print(f"BLOCKED IMPORT: '{{_alias.name}}' is not allowed in the sandbox.")
_sys.exit(1)
elif isinstance(_node, _ast.ImportFrom):
if _node.module:
_top = _node.module.split(".")[0]
if _top in _BLOCKED:
print(f"BLOCKED IMPORT: '{{_node.module}}' is not allowed in the sandbox.")
_sys.exit(1)
# Also block dangerous builtins
import builtins as _builtins
_original_import = _builtins.__import__
def _restricted_import(name, *args, **kwargs):
_top = name.split(".")[0]
if _top in _BLOCKED:
raise ImportError(f"BLOCKED IMPORT: '{{name}}' is not allowed in the sandbox.")
return _original_import(name, *args, **kwargs)
_builtins.__import__ = _restricted_import
'''
def execute_code(code: str, test_code: str, allow_threading: bool = False) -> Tuple[str, bool, int]:
"""
Execute code + test_code in a sandboxed subprocess.
Returns:
(output: str, timed_out: bool, execution_time_ms: int)
The output contains both stdout and stderr merged, exactly as a developer
would see in their terminal.
"""
# Build the blocked imports list, optionally allowing threading
blocked = [b for b in BLOCKED_IMPORTS if not (b == "threading" and allow_threading)]
# Build the full script: import checker + user code + test code
import_checker = _build_import_checker(blocked)
full_script = import_checker + "\n# --- USER CODE START ---\n" + code + "\n" + test_code
tmp_path = None
try:
# Write to a temporary file
with tempfile.NamedTemporaryFile(
mode='w', suffix='.py', prefix='sandbox_',
delete=False, dir=tempfile.gettempdir()
) as tmp:
tmp.write(full_script)
tmp_path = tmp.name
# Run in subprocess with timeout
start_time = time.time()
try:
result = subprocess.run(
["python3", tmp_path],
capture_output=True,
text=True,
timeout=EXECUTION_TIMEOUT_SECONDS,
env={
"PATH": os.environ.get("PATH", "/usr/bin:/usr/local/bin"),
"HOME": os.environ.get("HOME", "/tmp"),
"PYTHONDONTWRITEBYTECODE": "1",
}
)
elapsed_ms = int((time.time() - start_time) * 1000)
output = result.stdout + result.stderr
return (output.strip(), False, elapsed_ms)
except subprocess.TimeoutExpired:
elapsed_ms = int((time.time() - start_time) * 1000)
return (
f"TIMEOUT: Code execution exceeded {EXECUTION_TIMEOUT_SECONDS} second limit and was killed.",
True,
elapsed_ms
)
except Exception as e:
return (f"SANDBOX ERROR: {str(e)}", False, 0)
finally:
# Always clean up temp files
if tmp_path and os.path.exists(tmp_path):
try:
os.unlink(tmp_path)
except OSError:
pass
|