""" sandbox_hooks.py — PEP 578 Kernel-Level Audit Hooks for unbypassable sandboxing. PROBLEM: AST static analysis is weak because Python is dynamic. An agent can obfuscate code: getattr(os, 'sys' + 'tem')('rm -rf /') AST sees nothing. The command still executes. SOLUTION: Python's sys.addaudithook (PEP 578, Python 3.8+). This intercepts events at the C-level INSIDE the Python interpreter. If the agent code tries to: - Open a file outside sandbox → BLOCKED - Create a network socket → BLOCKED - Execute os.system/subprocess → BLOCKED - Import dangerous modules → BLOCKED The hook fires REGARDLESS of how the code was invoked (eval, exec, importlib, ctypes, whatever). It operates at the interpreter level, not the AST level. Security properties: - Cannot be disabled from user code (once installed, permanent for the process) - Cannot be bypassed by dynamic attribute access - Cannot be bypassed by code obfuscation - Zero external dependencies (stdlib only, no Docker) - Works on all platforms Python 3.8+ Usage: from purpose_agent.sandbox_hooks import install_sandbox, SandboxPolicy policy = SandboxPolicy( allowed_paths=["/app/workspace"], block_network=True, block_subprocess=True, ) install_sandbox(policy) # Now ANY code in this process is sandboxed: exec("import os; os.system('ls')") # → RuntimeError: blocked by audit hook """ from __future__ import annotations import sys import os import logging from dataclasses import dataclass, field from typing import Any logger = logging.getLogger(__name__) # Audit events we intercept (PEP 578) # See: https://docs.python.org/3/library/audit_events.html _DANGEROUS_EVENTS = { # Process execution "os.system", "subprocess.Popen", "os.exec", "os.posix_spawn", "os.spawn", # Network "socket.connect", "socket.bind", "socket.sendto", # File system (checked against allowed_paths) "open", # Code execution "compile", "exec", # Module imports (checked against blocklist) "import", } # Modules that are NEVER allowed in sandboxed code _BLOCKED_MODULES = { "ctypes", # Can call arbitrary C functions "multiprocessing", # Can spawn processes "signal", # Can interfere with process control "resource", # Can modify system limits "pty", # Pseudo-terminal (escape vector) } @dataclass class SandboxPolicy: """ Policy for the PEP 578 audit hook sandbox. Configure what's allowed and what's blocked. Once installed, CANNOT be disabled from user code. """ allowed_paths: list[str] = field(default_factory=lambda: ["/tmp"]) blocked_paths: list[str] = field(default_factory=lambda: ["/etc", "/proc", "/sys", "/dev"]) block_network: bool = True block_subprocess: bool = True block_dangerous_imports: bool = True blocked_modules: set[str] = field(default_factory=lambda: set(_BLOCKED_MODULES)) log_violations: bool = True raise_on_violation: bool = True # If False, just log (for monitoring mode) class SandboxViolation(RuntimeError): """Raised when sandboxed code attempts a forbidden operation.""" pass _installed = False _policy: SandboxPolicy | None = None def install_sandbox(policy: SandboxPolicy | None = None) -> None: """ Install PEP 578 audit hooks. Once installed, CANNOT be removed. This affects ALL code in the current process — not just agent code. Call this BEFORE executing any untrusted code. Thread-safe: the hook is installed once and affects all threads. """ global _installed, _policy if _installed: logger.warning("Sandbox already installed (cannot install twice)") return _policy = policy or SandboxPolicy() # Install the audit hook (C-level interception) sys.addaudithook(_audit_hook) _installed = True logger.info( f"PEP 578 sandbox installed: " f"network={'blocked' if _policy.block_network else 'allowed'}, " f"subprocess={'blocked' if _policy.block_subprocess else 'allowed'}, " f"paths={_policy.allowed_paths}" ) def _audit_hook(event: str, args: tuple) -> None: """ The actual audit hook. Called by the Python interpreter at the C level. This function is invoked for EVERY auditable operation in the process. It cannot be bypassed by user code. """ if _policy is None: return # ── Subprocess/system execution ── if _policy.block_subprocess and event in ("os.system", "subprocess.Popen", "os.exec", "os.posix_spawn", "os.spawn"): _violation(f"Subprocess blocked: {event}({_safe_repr(args)})") # ── Network ── if _policy.block_network and event in ("socket.connect", "socket.bind", "socket.sendto"): _violation(f"Network blocked: {event}({_safe_repr(args)})") # ── File access ── if event == "open" and args: path = str(args[0]) if args else "" if path and not _path_allowed(path): _violation(f"File access blocked: {path}") # ── Dangerous imports ── if _policy.block_dangerous_imports and event == "import": module_name = str(args[0]) if args else "" if module_name in _policy.blocked_modules: _violation(f"Import blocked: {module_name}") def _path_allowed(path: str) -> bool: """Check if a file path is allowed by the sandbox policy.""" if not _policy: return True abs_path = os.path.abspath(path) # Explicit blocks for blocked in _policy.blocked_paths: if abs_path.startswith(os.path.abspath(blocked)): return False # Must be under an allowed path for allowed in _policy.allowed_paths: if abs_path.startswith(os.path.abspath(allowed)): return True # If no allowed paths match but no blocked paths match either, # default to allowed (permissive mode for non-sandboxed paths) return True def _violation(message: str) -> None: """Handle a sandbox violation.""" if _policy and _policy.log_violations: logger.warning(f"SANDBOX VIOLATION: {message}") if _policy and _policy.raise_on_violation: raise SandboxViolation(message) def _safe_repr(args: tuple, max_len: int = 100) -> str: """Safe string representation of audit args.""" try: s = repr(args) return s[:max_len] if len(s) > max_len else s except: return "(...)" def is_sandbox_installed() -> bool: """Check if the sandbox is currently active.""" return _installed def get_policy() -> SandboxPolicy | None: """Get the current sandbox policy (None if not installed).""" return _policy