File size: 4,699 Bytes
e2cf8f8
 
 
9940e16
 
 
e2cf8f8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0ee66d2
e2cf8f8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""
AgentDebuggerEnv — Sandboxed Code Execution
============================================
Isolated execution environment for user-submitted code, providing
security through AST-based import filtering, subprocess isolation,
and runtime constraints.
"""

import subprocess
import tempfile
import os
import time
import ast
from typing import Tuple

BLOCKED_IMPORTS = [
    "os", "sys", "subprocess", "socket", "importlib", "shutil",
    "pathlib", "glob", "pickle", "shelve", "dbm", "sqlite3",
    "ftplib", "http", "urllib", "requests", "httpx", "asyncio",
    "multiprocessing", "threading",
    "ctypes", "cffi", "resource", "signal", "mmap", "gc"
]

EXECUTION_TIMEOUT_SECONDS = 10
MEMORY_LIMIT_MB = 256


def _build_import_checker(blocked: list[str]) -> str:
    """Build a Python script snippet that checks for blocked imports using AST parsing."""
    blocked_repr = repr(blocked)
    return f'''
import ast as _ast
import sys as _sys

_BLOCKED = {blocked_repr}
_source_to_check = open(__file__).read()

# Find the marker line and only check code after it
_marker = "# --- USER CODE START ---"
_marker_pos = _source_to_check.find(_marker)
if _marker_pos != -1:
    _source_to_check = _source_to_check[_marker_pos + len(_marker):]

try:
    _tree = _ast.parse(_source_to_check)
except SyntaxError:
    pass  # Let the actual execution catch syntax errors
else:
    for _node in _ast.walk(_tree):
        if isinstance(_node, _ast.Import):
            for _alias in _node.names:
                _top = _alias.name.split(".")[0]
                if _top in _BLOCKED:
                    print(f"BLOCKED IMPORT: '{{_alias.name}}' is not allowed in the sandbox.")
                    _sys.exit(1)
        elif isinstance(_node, _ast.ImportFrom):
            if _node.module:
                _top = _node.module.split(".")[0]
                if _top in _BLOCKED:
                    print(f"BLOCKED IMPORT: '{{_node.module}}' is not allowed in the sandbox.")
                    _sys.exit(1)

# Also block dangerous builtins
import builtins as _builtins
_original_import = _builtins.__import__

def _restricted_import(name, *args, **kwargs):
    _top = name.split(".")[0]
    if _top in _BLOCKED:
        raise ImportError(f"BLOCKED IMPORT: '{{name}}' is not allowed in the sandbox.")
    return _original_import(name, *args, **kwargs)

_builtins.__import__ = _restricted_import
'''


def execute_code(code: str, test_code: str, allow_threading: bool = False) -> Tuple[str, bool, int]:
    """
    Execute code + test_code in a sandboxed subprocess.

    Returns:
        (output: str, timed_out: bool, execution_time_ms: int)

    The output contains both stdout and stderr merged, exactly as a developer
    would see in their terminal.
    """
    # Build the blocked imports list, optionally allowing threading
    blocked = [b for b in BLOCKED_IMPORTS if not (b == "threading" and allow_threading)]

    # Build the full script: import checker + user code + test code
    import_checker = _build_import_checker(blocked)
    full_script = import_checker + "\n# --- USER CODE START ---\n" + code + "\n" + test_code

    tmp_path = None
    try:
        # Write to a temporary file
        with tempfile.NamedTemporaryFile(
            mode='w', suffix='.py', prefix='sandbox_',
            delete=False, dir=tempfile.gettempdir()
        ) as tmp:
            tmp.write(full_script)
            tmp_path = tmp.name

        # Run in subprocess with timeout
        start_time = time.time()
        try:
            result = subprocess.run(
                ["python3", tmp_path],
                capture_output=True,
                text=True,
                timeout=EXECUTION_TIMEOUT_SECONDS,
                env={
                    "PATH": os.environ.get("PATH", "/usr/bin:/usr/local/bin"),
                    "HOME": os.environ.get("HOME", "/tmp"),
                    "PYTHONDONTWRITEBYTECODE": "1",
                }
            )
            elapsed_ms = int((time.time() - start_time) * 1000)
            output = result.stdout + result.stderr
            return (output.strip(), False, elapsed_ms)

        except subprocess.TimeoutExpired:
            elapsed_ms = int((time.time() - start_time) * 1000)
            return (
                f"TIMEOUT: Code execution exceeded {EXECUTION_TIMEOUT_SECONDS} second limit and was killed.",
                True,
                elapsed_ms
            )

    except Exception as e:
        return (f"SANDBOX ERROR: {str(e)}", False, 0)

    finally:
        # Always clean up temp files
        if tmp_path and os.path.exists(tmp_path):
            try:
                os.unlink(tmp_path)
            except OSError:
                pass