File size: 6,969 Bytes
e2cf8f8 e93446d e2cf8f8 e93446d e2cf8f8 e93446d e2cf8f8 e93446d e2cf8f8 e93446d e2cf8f8 e93446d e2cf8f8 e93446d e2cf8f8 e93446d e2cf8f8 e93446d e2cf8f8 e93446d e2cf8f8 e93446d e2cf8f8 e93446d e2cf8f8 e93446d e2cf8f8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 | """
AgentDebuggerEnv β Sandboxed Code Execution (Gold Standard)
============================================================
Isolated execution environment for user-submitted code.
Implements multi-layered security:
1. AST-based static analysis (blocks dangerous builtins & dunders)
3. Subprocess isolation with strict timeouts
4. Resource limits (memory/CPU)
"""
import subprocess
import tempfile
import os
import time
import ast
from typing import Tuple
BLOCKED_IMPORTS = [
"os", "sys", "subprocess", "socket", "importlib", "shutil",
"pathlib", "glob", "pickle", "shelve", "dbm", "sqlite3",
"ftplib", "http", "urllib", "requests", "httpx", "asyncio",
"multiprocessing", "threading",
"ctypes", "cffi", "resource", "signal", "mmap", "gc"
]
DANGEROUS_BUILTINS = [
"eval", "exec", "compile", "getattr", "setattr", "delattr",
"input", "breakpoint", "help", "open"
]
EXECUTION_TIMEOUT_SECONDS = 10 # Hackathon spec: strictly 10s
MEMORY_LIMIT_MB = 256
def _build_security_prelude(blocked_imports: list[str]) -> str:
"""Build a Python script snippet that hardens the environment before user code runs."""
blocked_repr = repr(blocked_imports)
builtins_repr = repr(DANGEROUS_BUILTINS)
return f'''
import ast as _ast
import sys as _sys
import builtins as _builtins
# ββ 1. Resource Limits ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
try:
import resource as _resource
# Limit memory usage (Address Space) to 256MB
_mem_limit = {MEMORY_LIMIT_MB} * 1024 * 1024
_resource.setrlimit(_resource.RLIMIT_AS, (_mem_limit, _mem_limit))
except Exception:
pass
# ββ 2. AST Static Analysis βββββββββββββββββββββββββββββββββββββββββββββββββββ
_BLOCKED_IMPORTS = {blocked_repr}
_DANGEROUS_BUILTINS = {builtins_repr}
# We use _builtins.open because it might be nullified later in the user's scope
try:
_source_to_check = _builtins.open(__file__).read()
# Find the marker line and only check code after it
_marker = "# --- USER CODE START ---"
_marker_pos = _source_to_check.find(_marker)
if _marker_pos != -1:
_source_to_check = _source_to_check[_marker_pos + len(_marker):]
_tree = _ast.parse(_source_to_check)
for _node in _ast.walk(_tree):
# Block dangerous imports
if isinstance(_node, (_ast.Import, _ast.ImportFrom)):
_names = []
if isinstance(_node, _ast.Import):
_names = [a.name.split('.')[0] for a in _node.names]
else:
if _node.module:
_names = [_node.module.split('.')[0]]
for _name in _names:
if _name in _BLOCKED_IMPORTS:
print(f"BLOCKED IMPORT: '{{_name}}' is not allowed in the sandbox.")
_sys.exit(1)
# Block dangerous builtins (static names)
if isinstance(_node, _ast.Name) and _node.id in _DANGEROUS_BUILTINS:
print(f"SECURITY ERROR: Use of '{{_node.id}}' is prohibited.")
_sys.exit(1)
# Block Dunder attribute access and leading underscores (reflection)
if isinstance(_node, _ast.Attribute):
if _node.attr.startswith('_'):
print(f"SECURITY ERROR: Access to internal attribute '{{_node.attr}}' is prohibited.")
_sys.exit(1)
except SyntaxError:
pass # Let the actual execution catch syntax errors
except Exception as e:
# Any other error during check is a sandbox failure
# print(f"SANDBOX INTERNALS ERROR: {{str(e)}}")
pass
# ββ 3. Runtime Protection ββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Block __import__ to catch dynamic imports at runtime
_orig_import = _builtins.__import__
def _restricted_import(name, *args, _orig_import=_orig_import, _blocked=_BLOCKED_IMPORTS, **kwargs):
_top = name.split(".")[0]
if _top in _blocked:
raise ImportError(f"BLOCKED IMPORT: '{{name}}' is not allowed in the sandbox.")
return _orig_import(name, *args, **kwargs)
_builtins.__import__ = _restricted_import
# Nullify dangerous builtins
for _b in _DANGEROUS_BUILTINS:
if _b not in ('setattr', 'getattr', 'delattr'):
_builtins.__dict__[_b] = None
# Clean up namespace gracefully
for _v in ["_ast", "_sys", "_builtins", "_source_to_check", "_tree", "_node", "_marker", "_marker_pos", "_b", "_orig_import", "_restricted_import"]:
if _v in locals():
del locals()[_v]
'''
def execute_code(code: str, test_code: str, allow_threading: bool = False) -> Tuple[str, bool, int]:
"""
Execute code + test_code in a sandboxed subprocess.
Returns:
(output: str, timed_out: bool, execution_time_ms: int)
"""
# Build the blocked imports list, optionally allowing threading
blocked = [b for b in BLOCKED_IMPORTS if not (b == "threading" and allow_threading)]
# Build the full script: security prelude + user code + test code
prelude = _build_security_prelude(blocked)
full_script = prelude + "\n# --- USER CODE START ---\n" + code + "\n" + test_code
tmp_path = None
try:
# Write to a temporary file
with tempfile.NamedTemporaryFile(
mode='w', suffix='.py', prefix='sandbox_',
delete=False, dir=tempfile.gettempdir()
) as tmp:
tmp.write(full_script)
tmp_path = tmp.name
# Run in subprocess with timeout
start_time = time.time()
try:
result = subprocess.run(
["python3", tmp_path],
capture_output=True,
text=True,
timeout=EXECUTION_TIMEOUT_SECONDS,
env={
"PATH": os.environ.get("PATH", "/usr/bin:/usr/local/bin"),
"HOME": os.environ.get("HOME", "/tmp"),
"PYTHONDONTWRITEBYTECODE": "1",
}
)
elapsed_ms = int((time.time() - start_time) * 1000)
output = result.stdout + result.stderr
return (output.strip(), False, elapsed_ms)
except subprocess.TimeoutExpired:
elapsed_ms = int((time.time() - start_time) * 1000)
return (
f"TIMEOUT: Code execution exceeded {EXECUTION_TIMEOUT_SECONDS} second limit.",
True,
elapsed_ms
)
except Exception as e:
return (f"SANDBOX ERROR: {str(e)}", False, 0)
finally:
# Always clean up temp files
if tmp_path and os.path.exists(tmp_path):
try:
os.unlink(tmp_path)
except OSError:
pass
|