File size: 12,008 Bytes
dc71cad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6b8d880
 
dc71cad
 
 
 
 
 
 
 
6b8d880
 
 
 
 
 
 
 
 
 
dc71cad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
"""
sandbox/executor.py
───────────────────
Secure Docker-based code execution sandbox.

Security model (document for interviews):
  1. --network=none           β€” no outbound internet access
  2. --memory / --cpus        β€” cgroup resource limits
  3. --read-only + tmpfs      β€” filesystem isolation; only /workspace is writable
  4. Command whitelist        β€” only git, pytest, python, pip are allowed
  5. 60s timeout              β€” runaway processes are killed via SIGKILL
  6. Non-root user (uid=1000) β€” no privilege escalation inside container

Workflow per issue:
  1. clone_repo()   β€” git clone the repo at base_commit into a temp volume
  2. apply_patch()  β€” write unified diff to /workspace, run git apply
  3. run_tests()    β€” pytest on FAIL_TO_PASS + PASS_TO_PASS test IDs
  4. cleanup()      β€” remove the Docker volume/container
"""
from __future__ import annotations

import logging
import os
import re
import subprocess
import tempfile
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Literal

logger = logging.getLogger(__name__)

# ── Allowed commands (whitelist) ──────────────────────────────────────────────
ALLOWED_COMMANDS = frozenset({
    "git", "pytest", "python", "python3", "pip", "pip3",
    "cat", "ls", "echo", "find", "grep", "head", "tail",
    "mkdir", "cp", "mv", "touch", "chmod",
})


@dataclass
class ExecResult:
    """Result of a sandboxed command execution."""
    command: str
    returncode: int
    stdout: str
    stderr: str
    elapsed_seconds: float
    timed_out: bool = False

    @property
    def success(self) -> bool:
        return self.returncode == 0 and not self.timed_out


@dataclass
class TestResult:
    """Structured result from running pytest inside the sandbox."""
    passed: list[str] = field(default_factory=list)
    failed: list[str] = field(default_factory=list)
    errors: list[str] = field(default_factory=list)
    raw_output: str = ""
    elapsed_seconds: float = 0.0
    timed_out: bool = False

    @property
    def all_passed(self) -> bool:
        return len(self.failed) == 0 and len(self.errors) == 0 and not self.timed_out

    def check_tests(
        self,
        fail_to_pass: list[str],
        pass_to_pass: list[str],
    ) -> tuple[bool, dict[str, bool], dict[str, bool]]:
        """
        Evaluate whether this run resolves the SWE-bench instance.

        Returns:
            resolved: bool
            ftp_results: {test_id: passed}
            ptp_results: {test_id: still_passing}
        """
        passed_set = set(self.passed)

        ftp_results = {t: (t in passed_set) for t in fail_to_pass}
        ptp_results = {t: (t in passed_set) for t in pass_to_pass}

        ftp_ok = all(ftp_results.values())
        ptp_ok = all(ptp_results.values())
        resolved = ftp_ok and ptp_ok

        return resolved, ftp_results, ptp_results


class SandboxExecutor:
    """
    Manages Docker-based sandbox for safe code execution.

    Usage:
        executor = SandboxExecutor(settings)
        with executor.workspace(instance) as ws:
            ws.apply_patch(patch_text)
            result = ws.run_tests(fail_to_pass, pass_to_pass)
    """

    def __init__(
        self,
        image: str = "code-agent-sandbox:latest",
        timeout: int = 60,
        memory_limit: str = "2g",
        cpu_limit: float = 2.0,
        network: str = "none",
        use_docker: bool = True,
    ):
        self.image = image
        self.timeout = timeout
        self.memory_limit = memory_limit
        self.cpu_limit = cpu_limit
        self.network = network
        self.use_docker = use_docker

        if use_docker:
            self._verify_docker()

    def _verify_docker(self) -> None:
        """Check Docker is available and the sandbox image exists."""
        try:
            result = subprocess.run(
                ["docker", "info"],
                capture_output=True, text=True, timeout=10
            )
            if result.returncode != 0:
                logger.warning("Docker is not running β€” sandbox will use local execution")
                self.use_docker = False
        except FileNotFoundError:
            logger.warning("Docker not found β€” sandbox will use local execution")
            self.use_docker = False

    def clone_repo(
        self,
        repo: str,
        base_commit: str,
        workspace_dir: Path,
    ) -> ExecResult:
        """
        Clone the target repo at base_commit into workspace_dir.

        Args:
            repo: 'owner/repo' format
            base_commit: git SHA to checkout
            workspace_dir: local directory to clone into
        """
        github_url = f"https://github.com/{repo}.git"
        workspace_dir.mkdir(parents=True, exist_ok=True)

        commit_label = base_commit[:8] if base_commit and base_commit != "HEAD" else "HEAD"
        logger.info("Cloning %s @ %s", repo, commit_label)
        clone_result = self._run_local(
            ["git", "clone", "--depth=1", github_url, str(workspace_dir)],
            timeout=120,  # network operation β€” longer timeout
        )
        if not clone_result.success:
            logger.error("Clone failed: %s", clone_result.stderr[:500])
            return clone_result

        # Only checkout a specific commit if one is explicitly provided
        # (skip when empty string or HEAD β€” --depth=1 already checked out latest)
        if base_commit and base_commit.strip() and base_commit.upper() != "HEAD":
            checkout_result = self._run_local(
                ["git", "checkout", base_commit],
                cwd=workspace_dir,
            )
            return checkout_result

        return clone_result

    def apply_patch(
        self,
        patch_text: str,
        workspace_dir: Path,
    ) -> ExecResult:
        """
        Write patch_text to a temp file and run `git apply` inside workspace.

        Returns ExecResult with success=True if patch applied cleanly.
        """
        if not patch_text.strip():
            logger.warning("Empty patch text β€” nothing to apply")
            return ExecResult("git apply", 1, "", "Empty patch", 0.0)

        patch_file = workspace_dir / "_agent_patch.diff"
        patch_file.write_text(patch_text)

        result = self._run_local(
            ["git", "apply", "--whitespace=fix", str(patch_file)],
            cwd=workspace_dir,
        )
        if not result.success:
            # Try with --reject to get partial application details
            logger.debug("git apply failed, stderr: %s", result.stderr[:300])
        return result

    def run_tests(
        self,
        workspace_dir: Path,
        test_ids: list[str],
        extra_args: list[str] | None = None,
    ) -> TestResult:
        """
        Run pytest on specific test IDs inside the workspace.

        Args:
            workspace_dir: repo root
            test_ids: list of pytest node IDs to run
            extra_args: additional pytest flags

        Returns:
            TestResult with passed/failed/errors lists
        """
        if not test_ids:
            logger.warning("No test IDs provided β€” skipping test run")
            return TestResult()

        pytest_args = ["python", "-m", "pytest", "-v", "--tb=short", "--no-header", "-rN"]
        if extra_args:
            pytest_args.extend(extra_args)
        pytest_args.extend(test_ids)

        if self.use_docker:
            result = self._run_in_docker(pytest_args, workspace_dir)
        else:
            result = self._run_local(pytest_args, cwd=workspace_dir)

        return self._parse_pytest_output(result)

    def _run_in_docker(self, cmd: list[str], workspace_dir: Path) -> ExecResult:
        """Run a command inside the Docker sandbox container."""
        _validate_command(cmd)

        docker_cmd = [
            "docker", "run",
            "--rm",
            f"--network={self.network}",
            f"--memory={self.memory_limit}",
            f"--cpus={self.cpu_limit}",
            "--read-only",
            "--tmpfs=/tmp:size=256m",
            f"--volume={workspace_dir}:/workspace:rw",
            "--workdir=/workspace",
            "--user=1000:1000",
            self.image,
        ] + cmd

        return self._run_local(docker_cmd, timeout=self.timeout)

    def _run_local(
        self,
        cmd: list[str],
        cwd: Path | None = None,
        timeout: int | None = None,
    ) -> ExecResult:
        """Execute a subprocess with timeout and capture output."""
        if timeout is None:
            timeout = self.timeout

        start = time.monotonic()
        try:
            proc = subprocess.run(
                cmd,
                capture_output=True,
                text=True,
                timeout=timeout,
                cwd=str(cwd) if cwd else None,
            )
            elapsed = time.monotonic() - start
            return ExecResult(
                command=" ".join(cmd),
                returncode=proc.returncode,
                stdout=proc.stdout,
                stderr=proc.stderr,
                elapsed_seconds=elapsed,
            )
        except subprocess.TimeoutExpired:
            elapsed = time.monotonic() - start
            logger.warning("Command timed out after %ds: %s", timeout, cmd[:3])
            return ExecResult(
                command=" ".join(cmd),
                returncode=-1,
                stdout="",
                stderr=f"TIMEOUT after {timeout}s",
                elapsed_seconds=elapsed,
                timed_out=True,
            )
        except Exception as e:
            elapsed = time.monotonic() - start
            logger.error("Command failed: %s | error: %s", cmd[:3], e)
            return ExecResult(
                command=" ".join(cmd),
                returncode=-2,
                stdout="",
                stderr=str(e),
                elapsed_seconds=elapsed,
            )

    @staticmethod
    def _parse_pytest_output(result: ExecResult) -> TestResult:
        """
        Parse pytest -v output to extract passed/failed test IDs.

        Pytest -v output format per test:
          tests/path/to/test.py::test_name PASSED
          tests/path/to/test.py::test_name FAILED
          tests/path/to/test.py::test_name ERROR
        """
        test_result = TestResult(
            raw_output=result.stdout + result.stderr,
            elapsed_seconds=result.elapsed_seconds,
            timed_out=result.timed_out,
        )

        passed_pattern = re.compile(r"^(.+?::[\w\[\]-]+)\s+PASSED", re.MULTILINE)
        failed_pattern = re.compile(r"^(.+?::[\w\[\]-]+)\s+FAILED", re.MULTILINE)
        error_pattern = re.compile(r"^(.+?::[\w\[\]-]+)\s+ERROR", re.MULTILINE)

        test_result.passed = passed_pattern.findall(result.stdout)
        test_result.failed = failed_pattern.findall(result.stdout)
        test_result.errors = error_pattern.findall(result.stdout)

        logger.debug(
            "Pytest results β€” passed: %d, failed: %d, errors: %d",
            len(test_result.passed),
            len(test_result.failed),
            len(test_result.errors),
        )
        return test_result


# ── Security helper ───────────────────────────────────────────────────────────

def _validate_command(cmd: list[str]) -> None:
    """
    Raise ValueError if the command's base name is not in the whitelist.
    This is a defence-in-depth measure β€” Docker isolation is the primary control.
    """
    if not cmd:
        raise ValueError("Empty command")
    base = Path(cmd[0]).name
    if base not in ALLOWED_COMMANDS:
        raise ValueError(
            f"Command '{base}' is not in the allowed command whitelist: {ALLOWED_COMMANDS}"
        )