File size: 5,349 Bytes
315c661
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""
AETHER Safety Sandbox.
Constrained self-modification with validation, audit logging,
and human-in-the-loop oversight.
"""

import hashlib
import time
import logging
from typing import Dict, List, Any, Optional, Callable
from contextlib import contextmanager
from dataclasses import fields

logger = logging.getLogger("AETHER.Safety")


class SafetySandbox:
    """
    Sandboxed evaluation environment for self-modification.
    Inspired by AlphaEvolve's validation and GEA's admission requirements.
    """
    
    def __init__(self, timeout: float = 30.0,
                 max_code_size: int = 100000,
                 forbidden_modules: List[str] = None):
        self.timeout = timeout
        self.max_code_size = max_code_size
        self.forbidden_modules = forbidden_modules or [
            "os.system", "subprocess", "socket", "eval", "exec",
            "compile", "__import__", "importlib.import_module",
        ]
        
        self.audit_log: List[Dict[str, Any]] = []
        self.modification_history: List[Dict[str, Any]] = []
        self.pending_approvals: List[Dict[str, Any]] = []
    
    @contextmanager
    def sandbox(self):
        start_time = time.time()
        context = {
            "start_time": start_time,
            "modifications_attempted": [],
            "modifications_approved": [],
            "errors": [],
        }
        
        try:
            yield context
        except Exception as e:
            context["errors"].append(str(e))
            logger.warning(f"Sandbox caught exception: {e}")
            raise
        finally:
            elapsed = time.time() - start_time
            context["elapsed_time"] = elapsed
            
            if elapsed > self.timeout:
                logger.warning(f"Sandbox timeout: {elapsed:.2f}s > {self.timeout}s")
            
            self.audit_log.append(context)
    
    def validate_architecture(self, config) -> bool:
        checks = {
            "population_size": (2, 64),
            "mutation_rate": (0.0, 0.5),
            "learning_rate": (1e-6, 1e-3),
            "num_agents": (1, 32),
            "macro_policy_dim": (32, 1024),
            "micro_policy_dim": (16, 512),
        }
        
        violations = []
        for field_name, (min_val, max_val) in checks.items():
            val = getattr(config, field_name, None)
            if val is not None and not (min_val <= val <= max_val):
                violations.append(f"{field_name}={val} outside [{min_val}, {max_val}]")
        
        if hasattr(config, 'micro_policy_dim') and hasattr(config, 'macro_policy_dim'):
            if config.micro_policy_dim > config.macro_policy_dim:
                violations.append("micro_policy_dim > macro_policy_dim")
        
        estimated_memory = (config.macro_policy_dim * config.micro_policy_dim * 
                          config.num_agents * 4) / 1e6
        if estimated_memory > 10000:
            violations.append(f"Estimated memory {estimated_memory:.1f}MB exceeds limit")
        
        if violations:
            logger.warning(f"Architecture validation failed: {violations}")
            self._log_modification(config, approved=False, reason="; ".join(violations))
            return False
        
        self._log_modification(config, approved=True)
        return True
    
    def validate_code(self, code: str) -> bool:
        if len(code) > self.max_code_size:
            logger.warning(f"Code size {len(code)} exceeds limit {self.max_code_size}")
            return False
        
        for forbidden in self.forbidden_modules:
            if forbidden in code:
                logger.warning(f"Forbidden pattern '{forbidden}' found in code")
                return False
        
        return True
    
    def _log_modification(self, config, approved: bool, reason: str = ""):
        entry = {
            "timestamp": time.time(),
            "approved": approved,
            "config_hash": hashlib.sha256(str(config.__dict__).encode()).hexdigest()[:16],
            "reason": reason if not approved else "passed all checks",
        }
        self.modification_history.append(entry)
    
    def request_human_approval(self, modification: Dict[str, Any]) -> bool:
        self.pending_approvals.append({
            "timestamp": time.time(),
            "modification": modification,
            "auto_decision": False,
        })
        
        if modification.get("mutation_rate", 0) > 0.3:
            logger.info("Auto-rejected high mutation rate modification")
            return False
        
        if modification.get("num_agents", 0) > 10:
            logger.info("Auto-rejected large agent pool modification")
            return False
        
        logger.info("Auto-approved conservative modification")
        return True
    
    def get_audit_summary(self) -> Dict[str, Any]:
        total = len(self.modification_history)
        approved = sum(1 for m in self.modification_history if m["approved"])
        
        return {
            "total_modifications_attempted": total,
            "approved": approved,
            "rejected": total - approved,
            "pending_human_approval": len(self.pending_approvals),
            "recent_modifications": self.modification_history[-10:],
            "audit_log_entries": len(self.audit_log),
        }