| """ |
| sandbox_hooks.py β PEP 578 Kernel-Level Audit Hooks for unbypassable sandboxing. |
| |
| PROBLEM: AST static analysis is weak because Python is dynamic. |
| An agent can obfuscate code: getattr(os, 'sys' + 'tem')('rm -rf /') |
| AST sees nothing. The command still executes. |
| |
| SOLUTION: Python's sys.addaudithook (PEP 578, Python 3.8+). |
| This intercepts events at the C-level INSIDE the Python interpreter. |
| If the agent code tries to: |
| - Open a file outside sandbox β BLOCKED |
| - Create a network socket β BLOCKED |
| - Execute os.system/subprocess β BLOCKED |
| - Import dangerous modules β BLOCKED |
| |
| The hook fires REGARDLESS of how the code was invoked (eval, exec, importlib, |
| ctypes, whatever). It operates at the interpreter level, not the AST level. |
| |
| Security properties: |
| - Cannot be disabled from user code (once installed, permanent for the process) |
| - Cannot be bypassed by dynamic attribute access |
| - Cannot be bypassed by code obfuscation |
| - Zero external dependencies (stdlib only, no Docker) |
| - Works on all platforms Python 3.8+ |
| |
| Usage: |
| from purpose_agent.sandbox_hooks import install_sandbox, SandboxPolicy |
| |
| policy = SandboxPolicy( |
| allowed_paths=["/app/workspace"], |
| block_network=True, |
| block_subprocess=True, |
| ) |
| install_sandbox(policy) |
| |
| # Now ANY code in this process is sandboxed: |
| exec("import os; os.system('ls')") # β RuntimeError: blocked by audit hook |
| """ |
| from __future__ import annotations |
|
|
| import sys |
| import os |
| import logging |
| from dataclasses import dataclass, field |
| from typing import Any |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| |
| _DANGEROUS_EVENTS = { |
| |
| "os.system", |
| "subprocess.Popen", |
| "os.exec", |
| "os.posix_spawn", |
| "os.spawn", |
|
|
| |
| "socket.connect", |
| "socket.bind", |
| "socket.sendto", |
|
|
| |
| "open", |
|
|
| |
| "compile", |
| "exec", |
|
|
| |
| "import", |
| } |
|
|
| |
| _BLOCKED_MODULES = { |
| "ctypes", |
| "multiprocessing", |
| "signal", |
| "resource", |
| "pty", |
| } |
|
|
|
|
| @dataclass |
| class SandboxPolicy: |
| """ |
| Policy for the PEP 578 audit hook sandbox. |
| |
| Configure what's allowed and what's blocked. |
| Once installed, CANNOT be disabled from user code. |
| """ |
| allowed_paths: list[str] = field(default_factory=lambda: ["/tmp"]) |
| blocked_paths: list[str] = field(default_factory=lambda: ["/etc", "/proc", "/sys", "/dev"]) |
| block_network: bool = True |
| block_subprocess: bool = True |
| block_dangerous_imports: bool = True |
| blocked_modules: set[str] = field(default_factory=lambda: set(_BLOCKED_MODULES)) |
| log_violations: bool = True |
| raise_on_violation: bool = True |
|
|
|
|
| class SandboxViolation(RuntimeError): |
| """Raised when sandboxed code attempts a forbidden operation.""" |
| pass |
|
|
|
|
| _installed = False |
| _policy: SandboxPolicy | None = None |
|
|
|
|
| def install_sandbox(policy: SandboxPolicy | None = None) -> None: |
| """ |
| Install PEP 578 audit hooks. Once installed, CANNOT be removed. |
| |
| This affects ALL code in the current process β not just agent code. |
| Call this BEFORE executing any untrusted code. |
| |
| Thread-safe: the hook is installed once and affects all threads. |
| """ |
| global _installed, _policy |
|
|
| if _installed: |
| logger.warning("Sandbox already installed (cannot install twice)") |
| return |
|
|
| _policy = policy or SandboxPolicy() |
|
|
| |
| sys.addaudithook(_audit_hook) |
| _installed = True |
|
|
| logger.info( |
| f"PEP 578 sandbox installed: " |
| f"network={'blocked' if _policy.block_network else 'allowed'}, " |
| f"subprocess={'blocked' if _policy.block_subprocess else 'allowed'}, " |
| f"paths={_policy.allowed_paths}" |
| ) |
|
|
|
|
| def _audit_hook(event: str, args: tuple) -> None: |
| """ |
| The actual audit hook. Called by the Python interpreter at the C level. |
| |
| This function is invoked for EVERY auditable operation in the process. |
| It cannot be bypassed by user code. |
| """ |
| if _policy is None: |
| return |
|
|
| |
| if _policy.block_subprocess and event in ("os.system", "subprocess.Popen", "os.exec", |
| "os.posix_spawn", "os.spawn"): |
| _violation(f"Subprocess blocked: {event}({_safe_repr(args)})") |
|
|
| |
| if _policy.block_network and event in ("socket.connect", "socket.bind", "socket.sendto"): |
| _violation(f"Network blocked: {event}({_safe_repr(args)})") |
|
|
| |
| if event == "open" and args: |
| path = str(args[0]) if args else "" |
| if path and not _path_allowed(path): |
| _violation(f"File access blocked: {path}") |
|
|
| |
| if _policy.block_dangerous_imports and event == "import": |
| module_name = str(args[0]) if args else "" |
| if module_name in _policy.blocked_modules: |
| _violation(f"Import blocked: {module_name}") |
|
|
|
|
| def _path_allowed(path: str) -> bool: |
| """Check if a file path is allowed by the sandbox policy.""" |
| if not _policy: |
| return True |
|
|
| abs_path = os.path.abspath(path) |
|
|
| |
| for blocked in _policy.blocked_paths: |
| if abs_path.startswith(os.path.abspath(blocked)): |
| return False |
|
|
| |
| for allowed in _policy.allowed_paths: |
| if abs_path.startswith(os.path.abspath(allowed)): |
| return True |
|
|
| |
| |
| return True |
|
|
|
|
| def _violation(message: str) -> None: |
| """Handle a sandbox violation.""" |
| if _policy and _policy.log_violations: |
| logger.warning(f"SANDBOX VIOLATION: {message}") |
|
|
| if _policy and _policy.raise_on_violation: |
| raise SandboxViolation(message) |
|
|
|
|
| def _safe_repr(args: tuple, max_len: int = 100) -> str: |
| """Safe string representation of audit args.""" |
| try: |
| s = repr(args) |
| return s[:max_len] if len(s) > max_len else s |
| except: |
| return "(...)" |
|
|
|
|
| def is_sandbox_installed() -> bool: |
| """Check if the sandbox is currently active.""" |
| return _installed |
|
|
|
|
| def get_policy() -> SandboxPolicy | None: |
| """Get the current sandbox policy (None if not installed).""" |
| return _policy |
|
|