File size: 5,349 Bytes
315c661 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 | """
AETHER Safety Sandbox.
Constrained self-modification with validation, audit logging,
and human-in-the-loop oversight.
"""
import hashlib
import time
import logging
from typing import Dict, List, Any, Optional, Callable
from contextlib import contextmanager
from dataclasses import fields
logger = logging.getLogger("AETHER.Safety")
class SafetySandbox:
"""
Sandboxed evaluation environment for self-modification.
Inspired by AlphaEvolve's validation and GEA's admission requirements.
"""
def __init__(self, timeout: float = 30.0,
max_code_size: int = 100000,
forbidden_modules: List[str] = None):
self.timeout = timeout
self.max_code_size = max_code_size
self.forbidden_modules = forbidden_modules or [
"os.system", "subprocess", "socket", "eval", "exec",
"compile", "__import__", "importlib.import_module",
]
self.audit_log: List[Dict[str, Any]] = []
self.modification_history: List[Dict[str, Any]] = []
self.pending_approvals: List[Dict[str, Any]] = []
@contextmanager
def sandbox(self):
start_time = time.time()
context = {
"start_time": start_time,
"modifications_attempted": [],
"modifications_approved": [],
"errors": [],
}
try:
yield context
except Exception as e:
context["errors"].append(str(e))
logger.warning(f"Sandbox caught exception: {e}")
raise
finally:
elapsed = time.time() - start_time
context["elapsed_time"] = elapsed
if elapsed > self.timeout:
logger.warning(f"Sandbox timeout: {elapsed:.2f}s > {self.timeout}s")
self.audit_log.append(context)
def validate_architecture(self, config) -> bool:
checks = {
"population_size": (2, 64),
"mutation_rate": (0.0, 0.5),
"learning_rate": (1e-6, 1e-3),
"num_agents": (1, 32),
"macro_policy_dim": (32, 1024),
"micro_policy_dim": (16, 512),
}
violations = []
for field_name, (min_val, max_val) in checks.items():
val = getattr(config, field_name, None)
if val is not None and not (min_val <= val <= max_val):
violations.append(f"{field_name}={val} outside [{min_val}, {max_val}]")
if hasattr(config, 'micro_policy_dim') and hasattr(config, 'macro_policy_dim'):
if config.micro_policy_dim > config.macro_policy_dim:
violations.append("micro_policy_dim > macro_policy_dim")
estimated_memory = (config.macro_policy_dim * config.micro_policy_dim *
config.num_agents * 4) / 1e6
if estimated_memory > 10000:
violations.append(f"Estimated memory {estimated_memory:.1f}MB exceeds limit")
if violations:
logger.warning(f"Architecture validation failed: {violations}")
self._log_modification(config, approved=False, reason="; ".join(violations))
return False
self._log_modification(config, approved=True)
return True
def validate_code(self, code: str) -> bool:
if len(code) > self.max_code_size:
logger.warning(f"Code size {len(code)} exceeds limit {self.max_code_size}")
return False
for forbidden in self.forbidden_modules:
if forbidden in code:
logger.warning(f"Forbidden pattern '{forbidden}' found in code")
return False
return True
def _log_modification(self, config, approved: bool, reason: str = ""):
entry = {
"timestamp": time.time(),
"approved": approved,
"config_hash": hashlib.sha256(str(config.__dict__).encode()).hexdigest()[:16],
"reason": reason if not approved else "passed all checks",
}
self.modification_history.append(entry)
def request_human_approval(self, modification: Dict[str, Any]) -> bool:
self.pending_approvals.append({
"timestamp": time.time(),
"modification": modification,
"auto_decision": False,
})
if modification.get("mutation_rate", 0) > 0.3:
logger.info("Auto-rejected high mutation rate modification")
return False
if modification.get("num_agents", 0) > 10:
logger.info("Auto-rejected large agent pool modification")
return False
logger.info("Auto-approved conservative modification")
return True
def get_audit_summary(self) -> Dict[str, Any]:
total = len(self.modification_history)
approved = sum(1 for m in self.modification_history if m["approved"])
return {
"total_modifications_attempted": total,
"approved": approved,
"rejected": total - approved,
"pending_human_approval": len(self.pending_approvals),
"recent_modifications": self.modification_history[-10:],
"audit_log_entries": len(self.audit_log),
}
|