File size: 6,819 Bytes
da3d4f0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
"""
sandbox_hooks.py β€” PEP 578 Kernel-Level Audit Hooks for unbypassable sandboxing.

PROBLEM: AST static analysis is weak because Python is dynamic.
An agent can obfuscate code: getattr(os, 'sys' + 'tem')('rm -rf /')
AST sees nothing. The command still executes.

SOLUTION: Python's sys.addaudithook (PEP 578, Python 3.8+).
This intercepts events at the C-level INSIDE the Python interpreter.
If the agent code tries to:
  - Open a file outside sandbox β†’ BLOCKED
  - Create a network socket β†’ BLOCKED
  - Execute os.system/subprocess β†’ BLOCKED
  - Import dangerous modules β†’ BLOCKED

The hook fires REGARDLESS of how the code was invoked (eval, exec, importlib,
ctypes, whatever). It operates at the interpreter level, not the AST level.

Security properties:
  - Cannot be disabled from user code (once installed, permanent for the process)
  - Cannot be bypassed by dynamic attribute access
  - Cannot be bypassed by code obfuscation
  - Zero external dependencies (stdlib only, no Docker)
  - Works on all platforms Python 3.8+

Usage:
    from purpose_agent.sandbox_hooks import install_sandbox, SandboxPolicy
    
    policy = SandboxPolicy(
        allowed_paths=["/app/workspace"],
        block_network=True,
        block_subprocess=True,
    )
    install_sandbox(policy)
    
    # Now ANY code in this process is sandboxed:
    exec("import os; os.system('ls')")  # β†’ RuntimeError: blocked by audit hook
"""
from __future__ import annotations

import sys
import os
import logging
from dataclasses import dataclass, field
from typing import Any

logger = logging.getLogger(__name__)

# Audit events we intercept (PEP 578)
# See: https://docs.python.org/3/library/audit_events.html
_DANGEROUS_EVENTS = {
    # Process execution
    "os.system",
    "subprocess.Popen",
    "os.exec",
    "os.posix_spawn",
    "os.spawn",

    # Network
    "socket.connect",
    "socket.bind",
    "socket.sendto",

    # File system (checked against allowed_paths)
    "open",

    # Code execution
    "compile",
    "exec",

    # Module imports (checked against blocklist)
    "import",
}

# Modules that are NEVER allowed in sandboxed code
_BLOCKED_MODULES = {
    "ctypes",       # Can call arbitrary C functions
    "multiprocessing",  # Can spawn processes
    "signal",       # Can interfere with process control
    "resource",     # Can modify system limits
    "pty",          # Pseudo-terminal (escape vector)
}


@dataclass
class SandboxPolicy:
    """
    Policy for the PEP 578 audit hook sandbox.
    
    Configure what's allowed and what's blocked.
    Once installed, CANNOT be disabled from user code.
    """
    allowed_paths: list[str] = field(default_factory=lambda: ["/tmp"])
    blocked_paths: list[str] = field(default_factory=lambda: ["/etc", "/proc", "/sys", "/dev"])
    block_network: bool = True
    block_subprocess: bool = True
    block_dangerous_imports: bool = True
    blocked_modules: set[str] = field(default_factory=lambda: set(_BLOCKED_MODULES))
    log_violations: bool = True
    raise_on_violation: bool = True  # If False, just log (for monitoring mode)


class SandboxViolation(RuntimeError):
    """Raised when sandboxed code attempts a forbidden operation."""
    pass


_installed = False
_policy: SandboxPolicy | None = None


def install_sandbox(policy: SandboxPolicy | None = None) -> None:
    """
    Install PEP 578 audit hooks. Once installed, CANNOT be removed.
    
    This affects ALL code in the current process β€” not just agent code.
    Call this BEFORE executing any untrusted code.
    
    Thread-safe: the hook is installed once and affects all threads.
    """
    global _installed, _policy

    if _installed:
        logger.warning("Sandbox already installed (cannot install twice)")
        return

    _policy = policy or SandboxPolicy()

    # Install the audit hook (C-level interception)
    sys.addaudithook(_audit_hook)
    _installed = True

    logger.info(
        f"PEP 578 sandbox installed: "
        f"network={'blocked' if _policy.block_network else 'allowed'}, "
        f"subprocess={'blocked' if _policy.block_subprocess else 'allowed'}, "
        f"paths={_policy.allowed_paths}"
    )


def _audit_hook(event: str, args: tuple) -> None:
    """
    The actual audit hook. Called by the Python interpreter at the C level.
    
    This function is invoked for EVERY auditable operation in the process.
    It cannot be bypassed by user code.
    """
    if _policy is None:
        return

    # ── Subprocess/system execution ──
    if _policy.block_subprocess and event in ("os.system", "subprocess.Popen", "os.exec",
                                               "os.posix_spawn", "os.spawn"):
        _violation(f"Subprocess blocked: {event}({_safe_repr(args)})")

    # ── Network ──
    if _policy.block_network and event in ("socket.connect", "socket.bind", "socket.sendto"):
        _violation(f"Network blocked: {event}({_safe_repr(args)})")

    # ── File access ──
    if event == "open" and args:
        path = str(args[0]) if args else ""
        if path and not _path_allowed(path):
            _violation(f"File access blocked: {path}")

    # ── Dangerous imports ──
    if _policy.block_dangerous_imports and event == "import":
        module_name = str(args[0]) if args else ""
        if module_name in _policy.blocked_modules:
            _violation(f"Import blocked: {module_name}")


def _path_allowed(path: str) -> bool:
    """Check if a file path is allowed by the sandbox policy."""
    if not _policy:
        return True

    abs_path = os.path.abspath(path)

    # Explicit blocks
    for blocked in _policy.blocked_paths:
        if abs_path.startswith(os.path.abspath(blocked)):
            return False

    # Must be under an allowed path
    for allowed in _policy.allowed_paths:
        if abs_path.startswith(os.path.abspath(allowed)):
            return True

    # If no allowed paths match but no blocked paths match either,
    # default to allowed (permissive mode for non-sandboxed paths)
    return True


def _violation(message: str) -> None:
    """Handle a sandbox violation."""
    if _policy and _policy.log_violations:
        logger.warning(f"SANDBOX VIOLATION: {message}")

    if _policy and _policy.raise_on_violation:
        raise SandboxViolation(message)


def _safe_repr(args: tuple, max_len: int = 100) -> str:
    """Safe string representation of audit args."""
    try:
        s = repr(args)
        return s[:max_len] if len(s) > max_len else s
    except:
        return "(...)"


def is_sandbox_installed() -> bool:
    """Check if the sandbox is currently active."""
    return _installed


def get_policy() -> SandboxPolicy | None:
    """Get the current sandbox policy (None if not installed)."""
    return _policy