""" AETHER: Autonomous Self-Evolving Neuro-Symbolic Architecture =============================================================== Fully automated — zero human-in-the-loop. All oversight is performed by automated regression gating, risk scoring, and stability validation. Architecture: 1. Neuro-Symbolic Fusion Gate — learned attention over symbolic/neural split 2. Four-Agent Orchestration — Researcher, Engineer, Analyzer, Integrator 3. MAP-Elites Quality-Diversity — behavioral archive for evolutionary search 4. CoALA 4-Tier Memory — Working, Episodic, Semantic, Procedural 5. Temporal Memory with Attention — long-horizon context retention 6. Knowledge Graph Engine — RGCN + ComplEx + symbolic inference 7. AutoOversight System — regression suites, risk scoring, auto-rollback 8. Recursive Evolution Loop — generate → evaluate → select → mutate → validate → integrate Run: python aether_autonomous.py Dependencies: torch, numpy, networkx """ import torch import torch.nn as nn import torch.nn.functional as F import numpy as np import networkx as nx import copy, json, hashlib, time, random, logging, warnings from dataclasses import dataclass, field, asdict from typing import Dict, List, Any, Optional, Tuple, Callable from collections import deque from contextlib import contextmanager warnings.filterwarnings("ignore") logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", ) logger = logging.getLogger("AETHER") # ============================================================================ # 0. CONFIGURATION # ============================================================================ @dataclass class AetherConfig: population_size: int = 8 generations: int = 10 mutation_rate: float = 0.15 crossover_rate: float = 0.30 macro_policy_dim: int = 256 micro_policy_dim: int = 128 num_agents: int = 4 working_memory_capacity: int = 16 episodic_buffer_size: int = 1000 kg_embedding_dim: int = 128 kg_num_relations: int = 20 learning_rate: float = 2e-5 batch_size: int = 4 enable_self_modification: bool = True enable_parallel_agents: bool = True # Auto-oversight thresholds (fully automated) max_mutation_rate: float = 0.50 max_agents: int = 16 max_memory_mb: float = 8192.0 rollback_fitness_drop: float = 0.15 stability_window: int = 3 risk_threshold: float = 0.70 # MAP-Elites archive_dims: Tuple[int, int] = (10, 10) def to_vector(self) -> np.ndarray: return np.array([ self.population_size, self.mutation_rate, self.learning_rate * 1e5, self.macro_policy_dim, self.micro_policy_dim, self.num_agents, self.kg_embedding_dim, ], dtype=np.float32) @classmethod def from_vector(cls, vec: np.ndarray) -> "AetherConfig": return cls( population_size=int(np.clip(vec[0], 2, 64)), mutation_rate=float(np.clip(vec[1], 0.01, 0.5)), learning_rate=float(np.clip(vec[2] / 1e5, 1e-6, 1e-3)), macro_policy_dim=int(np.clip(vec[3], 64, 512)), micro_policy_dim=int(np.clip(vec[4], 32, 256)), num_agents=int(np.clip(vec[5], 1, 16)), kg_embedding_dim=int(np.clip(vec[6], 32, 512)), ) # ============================================================================ # 1. AUTO-OVERSIGHT (replaces human-in-the-loop) # ============================================================================ class AutoOversight: """ Fully automated oversight. No human approval. Components: - Risk scorer: estimates danger of proposed mutation - Regression suite: quick benchmarks that must not degrade - Stability validator: checks config bounds, memory, consistency - Auto-rollback: reverts to last known good if fitness drops """ def __init__(self, config: AetherConfig): self.config = config self.audit_log: List[Dict] = [] self.modification_history: List[Dict] = [] self.baseline_fitness: float = 0.0 self.fitness_history: deque = deque(maxlen=config.stability_window * 2) self.last_good_config: Optional[AetherConfig] = None self.last_good_fitness: float = -float("inf") self.consecutive_rejections: int = 0 def risk_score(self, candidate: AetherConfig) -> float: """Return 0..1 risk. >threshold = reject.""" risks = [] # Mutation rate risk risks.append(min(1.0, candidate.mutation_rate / self.config.max_mutation_rate)) # Agent count risk risks.append(min(1.0, candidate.num_agents / self.config.max_agents)) # Memory estimate risk est_mem = (candidate.macro_policy_dim * candidate.micro_policy_dim * candidate.num_agents * 4) / 1e6 risks.append(min(1.0, est_mem / self.config.max_memory_mb)) # Dimension consistency risk if candidate.micro_policy_dim > candidate.macro_policy_dim: risks.append(1.0) else: risks.append(0.0) return float(np.mean(risks)) def validate_stability(self, candidate: AetherConfig) -> Tuple[bool, str]: checks = { "population_size": (2, 64), "mutation_rate": (0.0, self.config.max_mutation_rate), "learning_rate": (1e-6, 1e-3), "num_agents": (1, self.config.max_agents), "macro_policy_dim": (32, 512), "micro_policy_dim": (16, 256), } violations = [] for field_name, (lo, hi) in checks.items(): val = getattr(candidate, field_name, None) if val is not None and not (lo <= val <= hi): violations.append(f"{field_name}={val} not in [{lo},{hi}]") if candidate.micro_policy_dim > candidate.macro_policy_dim: violations.append("micro > macro") if violations: return False, "; ".join(violations) return True, "ok" def regression_suite(self, candidate: AetherConfig, core: "AetherCore") -> Tuple[bool, float]: """ Quick synthetic benchmarks. Returns (pass, composite_score). Higher = better. """ scores = [] # Benchmark 1: memory throughput try: wm = WorkingMemory(capacity=candidate.working_memory_capacity) for i in range(100): wm.store({"idx": i, "data": torch.randn(16)}) retrieved = wm.retrieve("idx", top_k=5) scores.append(len(retrieved) / 5.0) except Exception as e: scores.append(0.0) # Benchmark 2: knowledge graph query speed try: kg = KnowledgeGraphEngine( embedding_dim=candidate.kg_embedding_dim, num_relations=candidate.kg_num_relations, ) for i in range(20): kg.add_fact(f"Node{i}", "relates_to", f"Node{i+1}") q = kg.query("Node0 relates_to", top_k=3) scores.append(min(1.0, len(q["results"]) / 3.0)) except Exception: scores.append(0.0) # Benchmark 3: agent orchestration latency try: orch = AetherAgentOrchestrator(candidate) task_embed = torch.randn(1, candidate.macro_policy_dim) blueprint = orch.hierarchical.generate_blueprint(task_embed) scores.append(min(1.0, len(blueprint) / 3.0)) except Exception: scores.append(0.0) composite = float(np.mean(scores)) # Must beat baseline by at least rollback threshold, or be first run if self.baseline_fitness > 0 and composite < self.baseline_fitness * (1 - self.config.rollback_fitness_drop): return False, composite return True, composite def should_rollback(self, current_fitness: float) -> bool: """Auto-rollback if fitness drops significantly.""" if self.last_good_fitness == -float("inf"): return False drop = (self.last_good_fitness - current_fitness) / (abs(self.last_good_fitness) + 1e-8) return drop > self.config.rollback_fitness_drop def decide(self, candidate: AetherConfig, core: "AetherCore") -> Tuple[bool, float, str]: """ Automated decision gate. Returns (approved, score, reason). No human involved. """ risk = self.risk_score(candidate) if risk > self.config.risk_threshold: self._log(candidate, False, f"risk={risk:.2f} > threshold") self.consecutive_rejections += 1 return False, risk, "auto-rejected: high risk" stable, stability_reason = self.validate_stability(candidate) if not stable: self._log(candidate, False, stability_reason) self.consecutive_rejections += 1 return False, risk, f"auto-rejected: unstable ({stability_reason})" reg_pass, reg_score = self.regression_suite(candidate, core) if not reg_pass: self._log(candidate, False, f"regression fail score={reg_score:.3f}") self.consecutive_rejections += 1 return False, reg_score, "auto-rejected: regression failure" self._log(candidate, True, f"risk={risk:.2f} reg={reg_score:.3f}") self.consecutive_rejections = 0 self.baseline_fitness = max(self.baseline_fitness, reg_score) return True, reg_score, "auto-approved" def _log(self, candidate: AetherConfig, approved: bool, reason: str): entry = { "timestamp": time.time(), "approved": approved, "config_hash": hashlib.sha256( json.dumps(asdict(candidate), sort_keys=True).encode() ).hexdigest()[:16], "reason": reason, } self.modification_history.append(entry) self.audit_log.append(entry) def update_good_checkpoint(self, config: AetherConfig, fitness: float): self.last_good_config = copy.deepcopy(config) self.last_good_fitness = fitness def summary(self) -> Dict[str, Any]: total = len(self.modification_history) approved = sum(1 for m in self.modification_history if m["approved"]) return { "total_attempted": total, "approved": approved, "rejected": total - approved, "consecutive_rejections": self.consecutive_rejections, "baseline_fitness": self.baseline_fitness, "last_good_fitness": self.last_good_fitness, } # ============================================================================ # 2. MEMORY SYSTEM (CoALA 4-tier + Temporal) # ============================================================================ class WorkingMemory: def __init__(self, capacity: int = 16): self.capacity = capacity self.buffer: deque = deque(maxlen=capacity) self.attention = nn.Parameter(torch.ones(capacity)) def store(self, item: Dict[str, Any]): item["_t"] = time.time() self.buffer.append(item) def retrieve(self, query: str, top_k: int = 3) -> List[Dict]: if not self.buffer: return [] scores = [] buf = list(self.buffer) for i, item in enumerate(buf): text = json.dumps(item) score = sum(1 for w in query.lower().split() if w in text.lower()) # attention weighting (learned) attn = torch.sigmoid(self.attention[i % self.capacity]).item() scores.append(score * attn) indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)[:top_k] return [buf[i] for i in indices] def export(self) -> List[Dict]: return list(self.buffer) class EpisodicMemory: def __init__(self, buffer_size: int = 1000): self.buffer: deque = deque(maxlen=buffer_size) def store(self, episode: Dict[str, Any]): episode["_t"] = time.time() self.buffer.append(episode) def retrieve_similar(self, query: str, top_k: int = 5) -> List[Dict]: if not self.buffer: return [] buf = list(self.buffer) scores = [] for item in buf: text = json.dumps(item) scores.append(sum(1 for w in query.lower().split() if w in text.lower())) indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)[:top_k] return [buf[i] for i in indices] def get_recent(self, n: int = 10) -> List[Dict]: return list(self.buffer)[-n:] def export(self) -> List[Dict]: return list(self.buffer) class SemanticMemory: def __init__(self): self.facts: Dict[str, Any] = {} def store_fact(self, key: str, value: Any, confidence: float = 1.0): self.facts[key] = {"value": value, "confidence": confidence, "t": time.time()} def retrieve(self, key: str) -> Optional[Dict]: return self.facts.get(key) def query(self, query: str) -> List[Dict]: return [v for k, v in self.facts.items() if query.lower() in k.lower()] def export(self) -> Dict: return self.facts class ProceduralMemory: def __init__(self): self.tools: Dict[str, Dict] = {} self.usage: Dict[str, int] = {} def register_tool(self, name: str, code: str, description: str, tags: List[str] = None): self.tools[name] = { "code": code, "description": description, "tags": tags or [], "registered_at": time.time(), "version": 1, } self.usage[name] = 0 def get_tool(self, name: str) -> Optional[Dict]: if name in self.tools: self.usage[name] += 1 return self.tools[name] return None def search_tools(self, query: str) -> List[Dict]: out = [] for name, tool in self.tools.items(): text = f"{name} {tool['description']} {' '.join(tool['tags'])}" if query.lower() in text.lower(): out.append({"name": name, **tool}) return out def merge_tools(self, cluster: List[str]) -> Optional[str]: if len(cluster) < 2: return None canonical = max(cluster, key=lambda t: self.usage.get(t, 0)) merged_desc = " | ".join(self.tools[t]["description"] for t in cluster if t in self.tools) self.tools[canonical]["description"] = merged_desc self.tools[canonical]["version"] += 1 for t in cluster: if t != canonical and t in self.tools: del self.tools[t] return canonical def export(self) -> Dict: return {"tools": self.tools, "usage": self.usage} class CoALAMemory: def __init__(self, capacity: int = 16): self.working = WorkingMemory(capacity=capacity) self.episodic = EpisodicMemory(buffer_size=1000) self.semantic = SemanticMemory() self.procedural = ProceduralMemory() def store(self, item: Dict[str, Any], memory_type: str = "working"): if memory_type == "working": self.working.store(item) elif memory_type == "episodic": self.episodic.store(item) elif memory_type == "semantic": for k, v in item.items(): self.semantic.store_fact(k, v) elif memory_type == "procedural": if "name" in item and "code" in item: self.procedural.register_tool( item["name"], item["code"], item.get("description", ""), item.get("tags", []) ) def retrieve(self, query: str, memory_type: str = "all", top_k: int = 5) -> List[Dict]: if memory_type == "all": out = [] out.extend(self.working.retrieve(query, top_k=top_k // 2)) out.extend(self.episodic.retrieve_similar(query, top_k=top_k)) out.extend(self.semantic.query(query)[:top_k]) return out[:top_k] elif memory_type == "working": return self.working.retrieve(query, top_k) elif memory_type == "episodic": return self.episodic.retrieve_similar(query, top_k) elif memory_type == "semantic": return self.semantic.query(query)[:top_k] elif memory_type == "procedural": return self.procedural.search_tools(query) return [] @property def buffer(self): return self.working.buffer def export(self) -> Dict[str, Any]: return { "working": self.working.export(), "episodic": self.episodic.export(), "semantic": self.semantic.export(), "procedural": self.procedural.export(), } class TemporalMemory(nn.Module): def __init__(self, buffer_size: int = 1000, hidden_dim: int = 64): super().__init__() self.buffer_size = buffer_size self.hidden_dim = hidden_dim self.buffer: deque = deque(maxlen=buffer_size) self.temporal_gate = nn.Sequential( nn.Linear(2, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, 1), nn.Sigmoid(), ) def store(self, event: Dict[str, Any]): event["_t"] = time.time() self.buffer.append(event) def retrieve_context(self, current_time: Optional[float] = None, lookback: float = 3600.0) -> List[Dict]: current_time = current_time or time.time() relevant = [] for event in self.buffer: age = current_time - event.get("_t", current_time) if age <= lookback: recency = torch.exp(torch.tensor(-age / lookback)).item() relevant.append({**event, "recency": recency, "age": age}) relevant.sort(key=lambda x: x["recency"], reverse=True) return relevant def retrieve_with_attention(self, query_embed: torch.Tensor, top_k: int = 10) -> List[Dict]: # Simplified: use recency-weighted retrieval return self.retrieve_context()[:top_k] def export(self) -> List[Dict]: return list(self.buffer) def __len__(self): return len(self.buffer) # ============================================================================ # 3. KNOWLEDGE GRAPH ENGINE (RGCN + ComplEx + Symbolic Rules) # ============================================================================ class RGCNLayer(nn.Module): def __init__(self, in_dim: int, out_dim: int, num_relations: int, num_bases: int = 4): super().__init__() self.num_relations = num_relations self.bases = nn.Parameter(torch.Tensor(num_bases, in_dim, out_dim)) self.comp = nn.Parameter(torch.Tensor(num_relations, num_bases)) self.self_loop = nn.Parameter(torch.Tensor(in_dim, out_dim)) self.bias = nn.Parameter(torch.Tensor(out_dim)) self.reset_parameters() def reset_parameters(self): nn.init.xavier_uniform_(self.bases) nn.init.xavier_uniform_(self.comp) nn.init.xavier_uniform_(self.self_loop) nn.init.zeros_(self.bias) def forward(self, x, edge_index, edge_type): num_nodes = int(edge_index.max().item()) + 1 if x is None else x.size(0) if x is None: x = torch.eye(num_nodes, self.bases.size(1), device=edge_index.device) weight = torch.einsum("rb,bio->rio", self.comp, self.bases) out = torch.zeros(num_nodes, weight.size(2), device=x.device) for rid in range(self.num_relations): mask = edge_type == rid if mask.sum() == 0: continue ei = edge_index[:, mask] messages = torch.mm(x[ei[0]], weight[rid]) out.index_add_(0, ei[1], messages) out = out + torch.mm(x, self.self_loop) + self.bias return out class KnowledgeGraphEncoder(nn.Module): def __init__(self, num_nodes, hidden_dim, num_relations, num_layers=2, num_bases=4): super().__init__() self.node_embeddings = nn.Embedding(num_nodes, hidden_dim) self.layers = nn.ModuleList([ RGCNLayer(hidden_dim, hidden_dim, num_relations, num_bases) for _ in range(num_layers) ]) self.norms = nn.ModuleList([nn.LayerNorm(hidden_dim) for _ in range(num_layers)]) def forward(self, edge_index, edge_type): num_nodes = int(edge_index.max().item()) + 1 x = self.node_embeddings(torch.arange(num_nodes, device=edge_index.device)) for layer, norm in zip(self.layers, self.norms): x = F.relu(norm(layer(x, edge_index, edge_type))) return x class ComplExScorer(nn.Module): def __init__(self, num_nodes, num_relations, hidden_dim=50): super().__init__() self.head_real = nn.Embedding(num_nodes, hidden_dim) self.head_imag = nn.Embedding(num_nodes, hidden_dim) self.tail_real = nn.Embedding(num_nodes, hidden_dim) self.tail_imag = nn.Embedding(num_nodes, hidden_dim) self.rel_real = nn.Embedding(num_relations, hidden_dim) self.rel_imag = nn.Embedding(num_relations, hidden_dim) self.reset_parameters() def reset_parameters(self): for p in self.parameters(): nn.init.xavier_uniform_(p) def forward(self, h, r, t): hr, hi = self.head_real(h), self.head_imag(h) tr, ti = self.tail_real(t), self.tail_imag(t) rr, ri = self.rel_real(r), self.rel_imag(r) return torch.sum(hr * rr * tr + hr * ri * ti + hi * rr * ti - hi * ri * tr, dim=-1) def loss(self, h, r, t, neg_t=None): pos = self.forward(h, r, t) if neg_t is None: neg_t = torch.randint(0, self.tail_real.num_embeddings, t.size(), device=t.device) neg = self.forward(h, r, neg_t) return (F.softplus(-pos) + F.softplus(neg)).mean() class KnowledgeGraphEngine(nn.Module): def __init__(self, embedding_dim=128, num_relations=20, max_nodes=10000): super().__init__() self.embedding_dim = embedding_dim self.num_relations = num_relations self.max_nodes = max_nodes self.graph = nx.DiGraph() self.node_id_map: Dict[str, int] = {} self.relation_map: Dict[str, int] = {} self.next_node_id = 0 self.next_rel_id = 0 self.encoder: Optional[KnowledgeGraphEncoder] = None self.scorer: Optional[ComplExScorer] = None self.symbolic_attention = nn.Parameter(torch.ones(num_relations)) self.rules: List[Tuple[Tuple[str, str, str], Tuple[str, str, str]]] = [] def _get_or_create_node(self, name: str) -> int: if name not in self.node_id_map: self.node_id_map[name] = self.next_node_id self.graph.add_node(self.next_node_id, name=name) self.next_node_id += 1 return self.node_id_map[name] def _get_or_create_relation(self, name: str) -> int: if name not in self.relation_map: self.relation_map[name] = self.next_rel_id self.next_rel_id += 1 return self.relation_map[name] def add_fact(self, head: str, relation: str, tail: str, confidence: float = 1.0): h = self._get_or_create_node(head) t = self._get_or_create_node(tail) r = self._get_or_create_relation(relation) self.graph.add_edge(h, t, relation=r, name=relation, confidence=confidence) self._ensure_capacity() def add_rule(self, premise: Tuple[str, str, str], conclusion: Tuple[str, str, str]): self.rules.append((premise, conclusion)) def _ensure_capacity(self): if self.encoder is None and self.next_node_id > 0: n = min(self.next_node_id, self.max_nodes) r = max(self.next_rel_id, self.num_relations) self.encoder = KnowledgeGraphEncoder(n, self.embedding_dim, r) self.scorer = ComplExScorer(n, r, self.embedding_dim // 2) logger.info(f"KG initialized: {n} nodes, {r} relations") def _check_fact(self, fact: Tuple[str, str, str]) -> bool: h, r, t = fact if h not in self.node_id_map or t not in self.node_id_map or r not in self.relation_map: return False return self.graph.has_edge(self.node_id_map[h], self.node_id_map[t]) and \ self.graph.edges[self.node_id_map[h], self.node_id_map[t]].get("relation") == self.relation_map[r] def reason_symbolic(self, query_head: str, query_relation: str) -> List[Dict]: results = [] if query_head not in self.node_id_map: return results h_id = self.node_id_map[query_head] r_name = query_relation if r_name in self.relation_map: r_id = self.relation_map[r_name] for _, target, data in self.graph.out_edges(h_id, data=True): if data.get("relation") == r_id: results.append({ "head": query_head, "relation": r_name, "tail": self.graph.nodes[target].get("name", str(target)), "confidence": data.get("confidence", 1.0), "path": "direct", }) # Rule inference for premise, conclusion in self.rules: p_head, p_rel, p_tail = premise c_head, c_rel, c_tail = conclusion if p_head == query_head and self._check_fact(premise): results.append({ "head": c_head if c_head != "?" else query_head, "relation": c_rel, "tail": c_tail, "confidence": 0.8, "path": "inferred", "rule": f"{premise} -> {conclusion}", }) # Multi-hop BFS for neighbor in nx.bfs_tree(self.graph, h_id, depth_limit=2).nodes(): if neighbor != h_id: for path in nx.all_simple_paths(self.graph, h_id, neighbor, cutoff=2): if len(path) > 1: ed = self.graph.edges[path[0], path[1]] results.append({ "head": query_head, "relation": f"multi-hop via {ed.get('name', 'unknown')}", "tail": self.graph.nodes[neighbor].get("name", str(neighbor)), "confidence": 0.6 ** (len(path) - 1), "path": "->".join(str(n) for n in path), }) return sorted(results, key=lambda x: x["confidence"], reverse=True) def reason_learned(self, query_head: str, query_relation: str, top_k: int = 5) -> List[Dict]: if self.scorer is None or query_head not in self.node_id_map: return [] h_id = self.node_id_map[query_head] r_id = self.relation_map.get(query_relation) if r_id is None: return [] h_t = torch.tensor([h_id]) r_t = torch.tensor([r_id]) all_t = torch.arange(self.scorer.tail_real.num_embeddings) scores = [] for i in range(0, len(all_t), 1000): batch = all_t[i:i + 1000] scores.extend(self.scorer(h_t.repeat(len(batch)), r_t.repeat(len(batch)), batch).tolist()) scores_t = torch.tensor(scores) top_scores, top_idx = torch.topk(scores_t, min(top_k, len(scores_t))) results = [] for idx, sc in zip(top_idx, top_scores): node_name = self.graph.nodes[idx.item()].get("name", str(idx.item())) results.append({ "head": query_head, "relation": query_relation, "tail": node_name, "confidence": torch.sigmoid(sc).item(), "path": "learned", }) return results def query(self, text_query: str, top_k: int = 5) -> Dict[str, Any]: parts = text_query.lower().split() head = parts[0].capitalize() if parts else text_query.capitalize() relation = " ".join(parts[1:]) if len(parts) > 1 else "related_to" sym = self.reason_symbolic(head, relation)[:top_k] learned = self.reason_learned(head, relation, top_k) rel_id = self.relation_map.get(relation, 0) sym_w = torch.sigmoid(self.symbolic_attention[rel_id % self.num_relations]).item() learned_w = 1.0 - sym_w for r in sym: r["source"] = "symbolic" r["fusion_weight"] = sym_w for r in learned: r["source"] = "learned" r["fusion_weight"] = learned_w all_r = sorted(sym + learned, key=lambda x: x.get("confidence", 0), reverse=True) return { "query": text_query, "results": all_r[:top_k], "symbolic_weight": sym_w, "learned_weight": learned_w, "num_symbolic": len(sym), "num_learned": len(learned), } def stats(self) -> Dict[str, Any]: return { "num_nodes": self.graph.number_of_nodes(), "num_edges": self.graph.number_of_edges(), "num_relations": len(self.relation_map), "num_rules": len(self.rules), } def export(self) -> Dict[str, Any]: edges = [] for u, v, d in self.graph.edges(data=True): edges.append({"source": u, "target": v, "relation": d.get("name"), "confidence": d.get("confidence")}) return { "nodes": {n: self.graph.nodes[n].get("name", str(n)) for n in self.graph.nodes()}, "edges": edges, "rules": self.rules, } # ============================================================================ # 4. AGENT ORCHESTRATION (4 roles + Hierarchical + BabyAGI loop) # ============================================================================ class AgentRole: RESEARCHER = "researcher" ENGINEER = "engineer" ANALYZER = "analyzer" INTEGRATOR = "integrator" class BaseAgent(nn.Module): def __init__(self, role: str, hidden_dim: int = 128, vocab_size: int = 32000): super().__init__() self.role = role self.hidden_dim = hidden_dim self.encoder = nn.Sequential( nn.Embedding(vocab_size, hidden_dim), nn.LSTM(hidden_dim, hidden_dim, batch_first=True), ) self.policy_head = nn.Linear(hidden_dim, hidden_dim) self.value_head = nn.Linear(hidden_dim, 1) self.task_history: deque = deque(maxlen=100) self.performance_log: List[float] = [] def forward(self, input_ids: torch.Tensor) -> Dict[str, torch.Tensor]: embeds = self.encoder[0](input_ids) lstm_out, _ = self.encoder[1](embeds) hidden = lstm_out[:, -1, :] return { "policy_logits": self.policy_head(hidden), "value": self.value_head(hidden), "hidden": hidden, } def act(self, observation: str) -> str: self.task_history.append({"observation": observation, "t": time.time()}) actions = { AgentRole.RESEARCHER: f"[RESEARCHER] Exploring knowledge for: '{observation[:50]}...'", AgentRole.ENGINEER: f"[ENGINEER] Synthesizing tool for: '{observation[:50]}...'", AgentRole.ANALYZER: f"[ANALYZER] Evaluating solution for: '{observation[:50]}...'", AgentRole.INTEGRATOR: f"[INTEGRATOR] Merging components for: '{observation[:50]}...'", } return actions.get(self.role, f"[{self.role.upper()}] Processing: '{observation}'") def update(self, reward: float): self.performance_log.append(reward) class HierarchicalAgent(nn.Module): """Macro-policy generates blueprints; micro-policy executes conditioned on blueprint.""" def __init__(self, macro_dim: int = 256, micro_dim: int = 128, num_subgoals: int = 5): super().__init__() self.macro_dim = macro_dim self.micro_dim = micro_dim self.num_subgoals = num_subgoals self.macro_decoder = nn.LSTM(macro_dim, macro_dim, batch_first=True) self.subgoal_head = nn.Linear(macro_dim, num_subgoals) self.termination_token = nn.Parameter(torch.randn(macro_dim)) self.micro_encoder = nn.LSTM(micro_dim + macro_dim, micro_dim, batch_first=True) self.action_head = nn.Linear(micro_dim, 50) self.current_blueprint: Optional[List[str]] = None self.active_subgoal_idx = 0 def generate_blueprint(self, task_embedding: torch.Tensor) -> List[str]: batch_size = task_embedding.size(0) hidden = (torch.zeros(1, batch_size, self.macro_dim), torch.zeros(1, batch_size, self.macro_dim)) input_tok = task_embedding.unsqueeze(1) blueprints = [] for _ in range(self.num_subgoals): out, hidden = self.macro_decoder(input_tok, hidden) sg_logits = self.subgoal_head(out.squeeze(1)) sg_id = torch.argmax(sg_logits, dim=-1) sim = torch.cosine_similarity(out.squeeze(1), self.termination_token.unsqueeze(0)) if sim.item() > 0.9: break blueprints.append(f"subgoal_{sg_id.item()}") input_tok = out self.current_blueprint = blueprints self.active_subgoal_idx = 0 return blueprints def execute_action(self, observation: torch.Tensor, blueprint: Optional[List[str]] = None) -> torch.Tensor: if blueprint is not None: self.current_blueprint = blueprint if not self.current_blueprint: return torch.zeros(1, 50) active = self.current_blueprint[min(self.active_subgoal_idx, len(self.current_blueprint) - 1)] subgoal_embed = torch.randn(1, self.macro_dim) combined = torch.cat([observation, subgoal_embed], dim=-1) out, _ = self.micro_encoder(combined.unsqueeze(1)) return self.action_head(out.squeeze(1)) def advance_subgoal(self): self.active_subgoal_idx += 1 def reset(self): self.current_blueprint = None self.active_subgoal_idx = 0 class BabyAGILoop: def __init__(self, objective: str, max_iterations: int = 50): self.objective = objective self.max_iterations = max_iterations self.task_list: deque = deque() self.completed: List[Dict] = [] self.results: Dict[int, Any] = {} self.iteration = 0 def create_tasks(self, previous_result: str, task_desc: str) -> List[str]: return [f"Sub-task {len(self.task_list) + i}: Analyze {previous_result[:30]}..." for i in range(3)] def prioritize(self) -> List[str]: tasks = list(self.task_list) scores = [sum(1 for w in self.objective.lower().split() if w in t.lower()) for t in tasks] return [t for _, t in sorted(zip(scores, tasks), reverse=True)] def execute(self, task: str, agent: BaseAgent) -> str: result = agent.act(task) self.completed.append({"task": task, "result": result, "iteration": self.iteration}) return result def run(self, agent: BaseAgent) -> Dict[str, Any]: self.task_list.append(self.objective) while self.iteration < self.max_iterations and self.task_list: prioritized = self.prioritize() self.task_list = deque(prioritized) current = self.task_list.popleft() prev = self.completed[-1]["result"] if self.completed else "" result = self.execute(current, agent) self.results[self.iteration] = result for t in self.create_tasks(result, current): if t not in self.task_list: self.task_list.append(t) self.iteration += 1 return { "completed": self.completed, "results": self.results, "iterations": self.iteration, "objective": self.objective, } class AetherAgentOrchestrator(nn.Module): def __init__(self, config: AetherConfig): super().__init__() self.config = config self.agents: Dict[str, BaseAgent] = nn.ModuleDict({ "researcher": BaseAgent(AgentRole.RESEARCHER, hidden_dim=config.macro_policy_dim), "engineer": BaseAgent(AgentRole.ENGINEER, hidden_dim=config.micro_policy_dim), "analyzer": BaseAgent(AgentRole.ANALYZER, hidden_dim=config.micro_policy_dim), "integrator": BaseAgent(AgentRole.INTEGRATOR, hidden_dim=config.micro_policy_dim), }) self.leader = BaseAgent("leader", hidden_dim=config.macro_policy_dim) self.hierarchical = HierarchicalAgent(macro_dim=config.macro_policy_dim, micro_dim=config.micro_policy_dim) self.routing_weights = nn.Parameter(torch.ones(len(self.agents))) self.aggregation_gate = nn.Softmax(dim=0) self.agent_tasks: Dict[str, BabyAGILoop] = {} self.interactions: List[Dict] = [] self.task_count = 0 def forward(self, task: str, context: Dict[str, Any]) -> Dict[str, Any]: task_embed = torch.randn(1, self.config.macro_policy_dim) blueprint = self.hierarchical.generate_blueprint(task_embed) routing_probs = self.aggregation_gate(self.routing_weights) agent_outputs = {} for i, (name, agent) in enumerate(self.agents.items()): weight = routing_probs[i].item() if weight < 0.10: continue sub_task = blueprint[min(i, len(blueprint) - 1)] if blueprint else task output = agent.act(f"[{name}] {sub_task}") agent_outputs[name] = {"output": output, "weight": weight, "sub_task": sub_task} synthesis = self.leader.act(f"Synthesize: {task} with inputs: {list(agent_outputs.keys())}") self.interactions.append({ "task": task, "blueprint": blueprint, "agent_outputs": agent_outputs, "leader_synthesis": synthesis, "routing_probs": routing_probs.detach().cpu().tolist(), "t": time.time(), }) self.task_count += 1 return { "output": synthesis, "blueprint": blueprint, "agent_outputs": agent_outputs, "routing_weights": routing_probs.detach().cpu().tolist(), } def execute(self, task: str, kg_context: Any, context: Dict[str, Any]) -> Dict[str, Any]: return self.forward(task, context) def textual_backprop(self, global_gradient: str, performance_feedback: float, beta: float = 0.5) -> Dict[str, str]: updates = {} for name, agent in self.agents.items(): local_grad = f"{global_gradient} + {name} perf={performance_feedback:.3f}" blended = local_grad updates[name] = blended self.routing_weights.data += performance_feedback * 0.01 return updates def co_evolve_interactions(self) -> List[Dict]: rewards = [] for interaction in self.interactions[-10:]: n_agents = len(interaction.get("agent_outputs", {})) complexity = len(interaction.get("blueprint", [])) reward = n_agents * 0.1 + min(complexity * 0.05, 0.5) rewards.append({"reward": reward, "agents_involved": n_agents}) return rewards def run_babyagi(self, objective: str, max_iterations: int = 20) -> Dict[str, Any]: loop = BabyAGILoop(objective, max_iterations) result = loop.run(self.agents["researcher"]) self.agent_tasks[objective] = loop return result def stats(self) -> Dict[str, Any]: return { "total_tasks": self.task_count, "num_agents": len(self.agents), "total_interactions": len(self.interactions), "routing_weights": self.routing_weights.detach().cpu().tolist(), } # ============================================================================ # 5. EVOLUTION ENGINE (MAP-Elites + Quality-Diversity + Auto-Oversight) # ============================================================================ class MAPelitesArchive: def __init__(self, dims=(10, 10), ranges=None): self.dims = dims self.ranges = ranges or [(0, 1), (0, 1)] self.archive: Dict[Tuple[int, int], Tuple[AetherConfig, float]] = {} def _index(self, measures: np.ndarray) -> Tuple[int, int]: indices = [] for m, (lo, hi), dim in zip(measures, self.ranges, self.dims): norm = (m - lo) / (hi - lo + 1e-8) idx = int(np.clip(norm * dim, 0, dim - 1)) indices.append(idx) return tuple(indices) def add(self, config: AetherConfig, fitness: float, measures: np.ndarray) -> bool: idx = self._index(measures) if idx not in self.archive or self.archive[idx][1] < fitness: self.archive[idx] = (config, fitness) return True return False def sample(self, n: int = 1) -> List[AetherConfig]: if not self.archive: return [] items = list(self.archive.values()) selected = random.sample(items, min(n, len(items))) return [cfg for cfg, _ in selected] def get_best(self) -> Optional[Tuple[AetherConfig, float]]: if not self.archive: return None return max(self.archive.values(), key=lambda x: x[1]) def stats(self) -> Dict[str, float]: total_cells = self.dims[0] * self.dims[1] return { "coverage": len(self.archive) / total_cells, "qd_score": sum(f for _, f in self.archive.values()), "max_fitness": max((f for _, f in self.archive.values()), default=0), } class AetherEvolutionEngine: def __init__(self, config: AetherConfig): self.config = config self.archive = MAPelitesArchive( dims=config.archive_dims, ranges=[(0, 1), (0, 1)], # (symbolic_bias_proxy, fitness) ) self.generation = 0 self.experience_log: List[Dict] = [] def generate_candidates(self, base_config: AetherConfig, population_size: int = 8) -> List[AetherConfig]: candidates = [base_config] archive_seeds = self.archive.sample(n=min(2, len(self.archive.archive))) for _ in range(population_size - len(archive_seeds) - 1): candidates.append(self._mutate(base_config)) for cfg in archive_seeds: candidates.append(cfg) return candidates def _mutate(self, config: AetherConfig) -> AetherConfig: vec = config.to_vector() noise = np.random.normal(0, config.mutation_rate, size=vec.shape) mutated = vec + noise * vec new_cfg = AetherConfig.from_vector(mutated) # Preserve meta fields new_cfg.generations = config.generations new_cfg.enable_self_modification = config.enable_self_modification new_cfg.enable_parallel_agents = config.enable_parallel_agents new_cfg.archive_dims = config.archive_dims return new_cfg def select(self, candidates: List[AetherConfig], fitness_scores: List[float], alpha_exploration: float = 0.3) -> List[AetherConfig]: if not candidates or not fitness_scores: return candidates[:2] if len(candidates) >= 2 else candidates vectors = np.array([c.to_vector() for c in candidates]) f = np.array(fitness_scores) f_norm = (f - f.min()) / (f.max() - f.min() + 1e-8) k = min(4, len(candidates) - 1) novelties = [] for i, v in enumerate(vectors): dists = np.linalg.norm(vectors - v, axis=1) dists[i] = np.inf knn = np.partition(dists, k)[:k] novelties.append(np.mean(knn)) nov_norm = np.array(novelties) / (max(novelties) + 1e-8) scores = f_norm * np.sqrt(nov_norm + 1e-8) n_select = max(1, len(candidates) // 2) top_indices = np.argsort(scores)[-n_select:] return [candidates[i] for i in top_indices] def mutate(self, candidates: List[AetherConfig], mutation_rate: float = 0.15) -> List[AetherConfig]: mutated = [] for cfg in candidates: new_cfg = self._mutate(cfg) # Hard constraints if new_cfg.macro_policy_dim > 512: new_cfg.macro_policy_dim = 512 if new_cfg.micro_policy_dim > new_cfg.macro_policy_dim: new_cfg.micro_policy_dim = new_cfg.macro_policy_dim // 2 mutated.append(new_cfg) return mutated def update_archive(self, candidates: List[AetherConfig], fitness_scores: List[float]): for cfg, fitness in zip(candidates, fitness_scores): if fitness == -float("inf"): continue # Behavioral descriptor: symbolic bias proxy = num_agents / max_agents sym_proxy = cfg.num_agents / cfg.max_agents measures = np.array([sym_proxy, np.clip(fitness, 0, 1)]) improved = self.archive.add(cfg, fitness, measures) if improved: logger.debug(f"Archive improved at cell fitness={fitness:.4f}") def get_diversity_stats(self) -> Dict[str, float]: return self.archive.stats() # ============================================================================ # 6. AETHER CORE (Orchestrator + Evolution Loop + Auto-Oversight) # ============================================================================ class AetherCore(nn.Module): def __init__(self, config: Optional[AetherConfig] = None): super().__init__() self.config = config or AetherConfig() self.generation = 0 self.architecture_history: List[Dict] = [] self.fitness_log: List[float] = [] self.metadata = {"birth": time.time(), "version": "0.2.0-autonomous"} # Subsystems (lazily initialized where possible) self._memory: Optional[CoALAMemory] = None self._temporal: Optional[TemporalMemory] = None self._evolution: Optional[AetherEvolutionEngine] = None self._agents: Optional[AetherAgentOrchestrator] = None self._knowledge: Optional[KnowledgeGraphEngine] = None self._oversight: Optional[AutoOversight] = None # Neuro-symbolic fusion gate (trainable) self.symbolic_gate = nn.Parameter(torch.tensor(0.0)) self.neural_gate = nn.Parameter(torch.tensor(0.0)) logger.info("AETHER Core v0.2.0-autonomous initialized") @property def memory(self) -> CoALAMemory: if self._memory is None: self._memory = CoALAMemory(capacity=self.config.working_memory_capacity) return self._memory @property def temporal(self) -> TemporalMemory: if self._temporal is None: self._temporal = TemporalMemory(buffer_size=self.config.episodic_buffer_size) return self._temporal @property def evolution(self) -> AetherEvolutionEngine: if self._evolution is None: self._evolution = AetherEvolutionEngine(self.config) return self._evolution @property def agents(self) -> AetherAgentOrchestrator: if self._agents is None: self._agents = AetherAgentOrchestrator(self.config) return self._agents @property def knowledge(self) -> KnowledgeGraphEngine: if self._knowledge is None: self._knowledge = KnowledgeGraphEngine( embedding_dim=self.config.kg_embedding_dim, num_relations=self.config.kg_num_relations, ) return self._knowledge @property def oversight(self) -> AutoOversight: if self._oversight is None: self._oversight = AutoOversight(self.config) return self._oversight def forward(self, task: str, context: Optional[Dict] = None) -> Dict[str, Any]: context = context or {} kg_context = self.knowledge.query(task, top_k=5) self.memory.store({"task": task, "kg_context": kg_context, "t": time.time()}) result = self.agents.execute(task, kg_context, context) # Neuro-symbolic fusion sym_w = torch.sigmoid(self.symbolic_gate) neu_w = torch.sigmoid(self.neural_gate) total = sym_w + neu_w + 1e-8 sym_w, neu_w = sym_w / total, neu_w / total self.temporal.store({ "task": task, "result": result, "weights": {"symbolic": sym_w.item(), "neural": neu_w.item()}, }) return { "output": result, "symbolic_weight": sym_w.item(), "neural_weight": neu_w.item(), "kg_context": kg_context, "generation": self.generation, } def _default_evaluator(self, candidate: AetherConfig) -> float: """ Fully automated fitness function — no external API. Scores: synthetic reasoning benchmarks + memory stress + knowledge graph coverage. """ scores = [] try: # 1. Agent orchestration efficiency orch = AetherAgentOrchestrator(candidate) task_embed = torch.randn(1, candidate.macro_policy_dim) blueprint = orch.hierarchical.generate_blueprint(task_embed) scores.append(min(1.0, len(blueprint) / 4.0)) # 2. Knowledge graph reasoning coverage kg = KnowledgeGraphEngine(embedding_dim=candidate.kg_embedding_dim, num_relations=candidate.kg_num_relations) for i in range(15): kg.add_fact(f"Entity{i}", "connects_to", f"Entity{i+1}") q = kg.query("Entity0 connects_to", top_k=5) scores.append(min(1.0, len(q["results"]) / 3.0)) # 3. Memory throughput mem = WorkingMemory(capacity=candidate.working_memory_capacity) for i in range(50): mem.store({"idx": i, "data": list(range(10))}) retrieved = mem.retrieve("idx", top_k=5) scores.append(min(1.0, len(retrieved) / 5.0)) # 4. Config balance penalty (prefer moderate values) balance = 1.0 - abs(candidate.macro_policy_dim - 256) / 256.0 scores.append(max(0.0, balance)) except Exception as e: logger.warning(f"Fitness evaluation failed: {e}") return -float("inf") return float(np.mean(scores)) def evolve(self, num_generations: Optional[int] = None, evaluator: Optional[Callable[[AetherConfig], float]] = None) -> Dict[str, Any]: num_generations = num_generations or self.config.generations evaluator = evaluator or self._default_evaluator logger.info(f"=== AUTONOMOUS EVOLUTION: {num_generations} generations ===") best_fitness = -float("inf") best_config: Optional[AetherConfig] = None for gen in range(num_generations): self.generation = gen logger.info(f"\n--- Generation {gen} ---") # 1. Generate candidates candidates = self.evolution.generate_candidates(self.config, self.config.population_size) logger.info(f"Generated {len(candidates)} candidates") # 2. Evaluate + Auto-oversight gate fitness_scores = [] approved_candidates = [] for candidate in candidates: # Automated decision — no human approved, score, reason = self.oversight.decide(candidate, self) if approved: # Full fitness evaluation fitness = evaluator(candidate) fitness_scores.append(fitness) approved_candidates.append(candidate) logger.info(f" Candidate approved | reason={reason} | fitness={fitness:.4f}") else: fitness_scores.append(-float("inf")) logger.info(f" Candidate REJECTED | reason={reason}") # 3. Auto-rollback check current_best = max((f for f in fitness_scores if f > -float("inf")), default=-float("inf")) if self.oversight.should_rollback(current_best): logger.warning(f"ROLLBACK TRIGGERED: fitness dropped to {current_best:.4f}") if self.oversight.last_good_config is not None: self.config = copy.deepcopy(self.oversight.last_good_config) logger.info("Rolled back to last known good configuration") continue # 4. Select (Performance-Novelty) selected = self.evolution.select(candidates, fitness_scores) # 5. Mutate mutated = self.evolution.mutate(selected) # 6. Validate via oversight (second pass for mutated) validated = [] validated_scores = [] for m in mutated: ok, _, reason = self.oversight.decide(m, self) if ok: validated.append(m) validated_scores.append(evaluator(m)) else: logger.info(f" Mutated candidate rejected: {reason}") # 7. Integrate best if validated and validated_scores: best_idx = int(np.argmax(validated_scores)) best_mutated = validated[best_idx] current_fitness = validated_scores[best_idx] if current_fitness > best_fitness: best_fitness = current_fitness best_config = best_mutated self.config = best_mutated self.oversight.update_good_checkpoint(best_mutated, best_fitness) arch_hash = hashlib.sha256( json.dumps(asdict(best_mutated), sort_keys=True).encode() ).hexdigest()[:16] self.architecture_history.append({ "generation": gen, "hash": arch_hash, "fitness": best_fitness, "config": asdict(best_mutated), }) logger.info(f"*** NEW BEST: gen={gen} fitness={best_fitness:.4f} hash={arch_hash} ***") # 8. Update MAP-Elites archive self.evolution.update_archive(candidates, fitness_scores) self.fitness_log.append(best_fitness) # 9. Self-reflection per generation reflection = self.self_reflect() logger.info(f"Reflection: {reflection['recommendations']}") return { "best_fitness": best_fitness, "best_config": asdict(best_config) if best_config else None, "generations": num_generations, "history": self.architecture_history, "oversight_summary": self.oversight.summary(), "archive_stats": self.evolution.get_diversity_stats(), } def self_reflect(self) -> Dict[str, Any]: recs = [] if len(self.fitness_log) > 5: recent = self.fitness_log[-5:] if max(recent) - min(recent) < 0.01: recs.append("Fitness plateau detected. Increase diversity or mutation rate.") if recent[-1] < recent[0]: recs.append("Declining trend. Rollback or expand search.") sym = torch.sigmoid(self.symbolic_gate).item() if sym < 0.3: recs.append("Symbolic reasoning underutilized. Boost KG integration.") elif sym > 0.7: recs.append("Symbolic dominance. Increase neural flexibility.") return { "generation": self.generation, "architectures_tested": len(self.architecture_history), "fitness_trend": self.fitness_log, "neuro_symbolic_balance": {"symbolic": sym, "neural": 1.0 - sym}, "recommendations": recs, "oversight": self.oversight.summary(), } def export_state(self) -> Dict[str, Any]: return { "config": asdict(self.config), "generation": self.generation, "architecture_history": self.architecture_history, "fitness_log": self.fitness_log, "metadata": self.metadata, "knowledge": self.knowledge.export(), "memory": self.memory.export(), "model_state_dict": {k: v.cpu().tolist() for k, v in self.state_dict().items()}, } @classmethod def from_state(cls, state: Dict[str, Any]) -> "AetherCore": cfg = AetherConfig(**state["config"]) core = cls(config=cfg) core.generation = state["generation"] core.architecture_history = state["architecture_history"] core.fitness_log = state["fitness_log"] core.metadata = state["metadata"] return core # ============================================================================ # 7. RUNNABLE MAIN # ============================================================================ def run_autonomous_demo(): print("=" * 70) print(" AETHER v0.2.0 — AUTONOMOUS SELF-EVOLVING ARCHITECTURE") print(" Zero human oversight. Automated regression gating + rollback.") print("=" * 70) config = AetherConfig( population_size=6, generations=5, mutation_rate=0.12, macro_policy_dim=128, micro_policy_dim=64, num_agents=4, working_memory_capacity=16, episodic_buffer_size=500, kg_embedding_dim=64, kg_num_relations=10, ) core = AetherCore(config) # Seed knowledge graph print("\n[1] Seeding Knowledge Graph...") kg = core.knowledge facts = [ ("Intelligence", "requires", "Reasoning"), ("Reasoning", "requires", "Memory"), ("Memory", "enables", "Learning"), ("Learning", "produces", "Intelligence"), ("Agent", "has_role", "Researcher"), ("Agent", "has_role", "Engineer"), ("Agent", "has_role", "Analyzer"), ("Agent", "has_role", "Integrator"), ] for h, r, t in facts: kg.add_fact(h, r, t) print(f" KG: {kg.stats()}") # Single forward pass demo print("\n[2] Forward Pass Demo (neuro-symbolic query)...") result = core.forward("Intelligence requires") print(f" Symbolic weight: {result['symbolic_weight']:.3f}") print(f" Neural weight: {result['neural_weight']:.3f}") print(f" Results: {len(result['kg_context']['results'])} items") for r in result["kg_context"]["results"]: print(f" → {r['head']} --{r['relation']}--> {r['tail']} (conf={r.get('confidence',0):.2f}, src={r.get('source','?')})") # Agent orchestration demo print("\n[3] Agent Orchestration Demo...") agent_result = core.agents.execute("Optimize reasoning pipeline", {}, {}) print(f" Leader synthesis: {agent_result['output'][:80]}...") print(f" Agents activated: {list(agent_result['agent_outputs'].keys())}") print(f" Routing weights: {[f'{w:.3f}' for w in agent_result['routing_weights']]}") # Evolution loop (fully automated) print("\n[4] AUTONOMOUS EVOLUTION LOOP (no human oversight)...") evolution_result = core.evolve(num_generations=5) print("\n[5] EVOLUTION RESULTS") print(f" Best fitness achieved: {evolution_result['best_fitness']:.4f}") print(f" Generations run: {evolution_result['generations']}") print(f" Architecture changes: {len(evolution_result['history'])}") print(f" MAP-Elites coverage: {evolution_result['archive_stats']['coverage']:.2%}") print(f" MAP-Elites QD score: {evolution_result['archive_stats']['qd_score']:.2f}") print(f" Auto-oversight approved: {evolution_result['oversight_summary']['approved']}") print(f" Auto-oversight rejected: {evolution_result['oversight_summary']['rejected']}") print(f" Consecutive rejections: {evolution_result['oversight_summary']['consecutive_rejections']}") print("\n[6] Architecture Evolution Trajectory") for entry in evolution_result["history"]: print(f" Gen {entry['generation']:02d} | hash={entry['hash']} | fitness={entry['fitness']:.4f} | " f"agents={entry['config']['num_agents']} | macro={entry['config']['macro_policy_dim']} | " f"mut_rate={entry['config']['mutation_rate']:.3f}") # Self-reflection print("\n[7] Self-Reflection") reflection = core.self_reflect() for rec in reflection["recommendations"]: print(f" → {rec}") # Export checkpoint print("\n[8] Exporting state checkpoint...") state = core.export_state() checkpoint_path = "/app/aether_checkpoint.json" with open(checkpoint_path, "w") as f: json.dump(state, f, indent=2, default=str) print(f" Checkpoint saved to: {checkpoint_path}") print("\n" + "=" * 70) print(" DEMO COMPLETE. AETHER is fully autonomous.") print("=" * 70) return core, evolution_result if __name__ == "__main__": run_autonomous_demo()