aether-core / aether_autonomous.py
camdog920's picture
Add AETHER v0.2.0 autonomous β€” fully self-evolving with automated oversight
e34db8d verified
"""
AETHER: Autonomous Self-Evolving Neuro-Symbolic Architecture
===============================================================
Fully automated β€” zero human-in-the-loop. All oversight is performed by
automated regression gating, risk scoring, and stability validation.
Architecture:
1. Neuro-Symbolic Fusion Gate β€” learned attention over symbolic/neural split
2. Four-Agent Orchestration β€” Researcher, Engineer, Analyzer, Integrator
3. MAP-Elites Quality-Diversity β€” behavioral archive for evolutionary search
4. CoALA 4-Tier Memory β€” Working, Episodic, Semantic, Procedural
5. Temporal Memory with Attention β€” long-horizon context retention
6. Knowledge Graph Engine β€” RGCN + ComplEx + symbolic inference
7. AutoOversight System β€” regression suites, risk scoring, auto-rollback
8. Recursive Evolution Loop β€” generate β†’ evaluate β†’ select β†’ mutate β†’ validate β†’ integrate
Run: python aether_autonomous.py
Dependencies: torch, numpy, networkx
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import networkx as nx
import copy, json, hashlib, time, random, logging, warnings
from dataclasses import dataclass, field, asdict
from typing import Dict, List, Any, Optional, Tuple, Callable
from collections import deque
from contextlib import contextmanager
warnings.filterwarnings("ignore")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
)
logger = logging.getLogger("AETHER")
# ============================================================================
# 0. CONFIGURATION
# ============================================================================
@dataclass
class AetherConfig:
population_size: int = 8
generations: int = 10
mutation_rate: float = 0.15
crossover_rate: float = 0.30
macro_policy_dim: int = 256
micro_policy_dim: int = 128
num_agents: int = 4
working_memory_capacity: int = 16
episodic_buffer_size: int = 1000
kg_embedding_dim: int = 128
kg_num_relations: int = 20
learning_rate: float = 2e-5
batch_size: int = 4
enable_self_modification: bool = True
enable_parallel_agents: bool = True
# Auto-oversight thresholds (fully automated)
max_mutation_rate: float = 0.50
max_agents: int = 16
max_memory_mb: float = 8192.0
rollback_fitness_drop: float = 0.15
stability_window: int = 3
risk_threshold: float = 0.70
# MAP-Elites
archive_dims: Tuple[int, int] = (10, 10)
def to_vector(self) -> np.ndarray:
return np.array([
self.population_size,
self.mutation_rate,
self.learning_rate * 1e5,
self.macro_policy_dim,
self.micro_policy_dim,
self.num_agents,
self.kg_embedding_dim,
], dtype=np.float32)
@classmethod
def from_vector(cls, vec: np.ndarray) -> "AetherConfig":
return cls(
population_size=int(np.clip(vec[0], 2, 64)),
mutation_rate=float(np.clip(vec[1], 0.01, 0.5)),
learning_rate=float(np.clip(vec[2] / 1e5, 1e-6, 1e-3)),
macro_policy_dim=int(np.clip(vec[3], 64, 512)),
micro_policy_dim=int(np.clip(vec[4], 32, 256)),
num_agents=int(np.clip(vec[5], 1, 16)),
kg_embedding_dim=int(np.clip(vec[6], 32, 512)),
)
# ============================================================================
# 1. AUTO-OVERSIGHT (replaces human-in-the-loop)
# ============================================================================
class AutoOversight:
"""
Fully automated oversight. No human approval.
Components:
- Risk scorer: estimates danger of proposed mutation
- Regression suite: quick benchmarks that must not degrade
- Stability validator: checks config bounds, memory, consistency
- Auto-rollback: reverts to last known good if fitness drops
"""
def __init__(self, config: AetherConfig):
self.config = config
self.audit_log: List[Dict] = []
self.modification_history: List[Dict] = []
self.baseline_fitness: float = 0.0
self.fitness_history: deque = deque(maxlen=config.stability_window * 2)
self.last_good_config: Optional[AetherConfig] = None
self.last_good_fitness: float = -float("inf")
self.consecutive_rejections: int = 0
def risk_score(self, candidate: AetherConfig) -> float:
"""Return 0..1 risk. >threshold = reject."""
risks = []
# Mutation rate risk
risks.append(min(1.0, candidate.mutation_rate / self.config.max_mutation_rate))
# Agent count risk
risks.append(min(1.0, candidate.num_agents / self.config.max_agents))
# Memory estimate risk
est_mem = (candidate.macro_policy_dim * candidate.micro_policy_dim *
candidate.num_agents * 4) / 1e6
risks.append(min(1.0, est_mem / self.config.max_memory_mb))
# Dimension consistency risk
if candidate.micro_policy_dim > candidate.macro_policy_dim:
risks.append(1.0)
else:
risks.append(0.0)
return float(np.mean(risks))
def validate_stability(self, candidate: AetherConfig) -> Tuple[bool, str]:
checks = {
"population_size": (2, 64),
"mutation_rate": (0.0, self.config.max_mutation_rate),
"learning_rate": (1e-6, 1e-3),
"num_agents": (1, self.config.max_agents),
"macro_policy_dim": (32, 512),
"micro_policy_dim": (16, 256),
}
violations = []
for field_name, (lo, hi) in checks.items():
val = getattr(candidate, field_name, None)
if val is not None and not (lo <= val <= hi):
violations.append(f"{field_name}={val} not in [{lo},{hi}]")
if candidate.micro_policy_dim > candidate.macro_policy_dim:
violations.append("micro > macro")
if violations:
return False, "; ".join(violations)
return True, "ok"
def regression_suite(self, candidate: AetherConfig,
core: "AetherCore") -> Tuple[bool, float]:
"""
Quick synthetic benchmarks. Returns (pass, composite_score).
Higher = better.
"""
scores = []
# Benchmark 1: memory throughput
try:
wm = WorkingMemory(capacity=candidate.working_memory_capacity)
for i in range(100):
wm.store({"idx": i, "data": torch.randn(16)})
retrieved = wm.retrieve("idx", top_k=5)
scores.append(len(retrieved) / 5.0)
except Exception as e:
scores.append(0.0)
# Benchmark 2: knowledge graph query speed
try:
kg = KnowledgeGraphEngine(
embedding_dim=candidate.kg_embedding_dim,
num_relations=candidate.kg_num_relations,
)
for i in range(20):
kg.add_fact(f"Node{i}", "relates_to", f"Node{i+1}")
q = kg.query("Node0 relates_to", top_k=3)
scores.append(min(1.0, len(q["results"]) / 3.0))
except Exception:
scores.append(0.0)
# Benchmark 3: agent orchestration latency
try:
orch = AetherAgentOrchestrator(candidate)
task_embed = torch.randn(1, candidate.macro_policy_dim)
blueprint = orch.hierarchical.generate_blueprint(task_embed)
scores.append(min(1.0, len(blueprint) / 3.0))
except Exception:
scores.append(0.0)
composite = float(np.mean(scores))
# Must beat baseline by at least rollback threshold, or be first run
if self.baseline_fitness > 0 and composite < self.baseline_fitness * (1 - self.config.rollback_fitness_drop):
return False, composite
return True, composite
def should_rollback(self, current_fitness: float) -> bool:
"""Auto-rollback if fitness drops significantly."""
if self.last_good_fitness == -float("inf"):
return False
drop = (self.last_good_fitness - current_fitness) / (abs(self.last_good_fitness) + 1e-8)
return drop > self.config.rollback_fitness_drop
def decide(self, candidate: AetherConfig, core: "AetherCore") -> Tuple[bool, float, str]:
"""
Automated decision gate. Returns (approved, score, reason).
No human involved.
"""
risk = self.risk_score(candidate)
if risk > self.config.risk_threshold:
self._log(candidate, False, f"risk={risk:.2f} > threshold")
self.consecutive_rejections += 1
return False, risk, "auto-rejected: high risk"
stable, stability_reason = self.validate_stability(candidate)
if not stable:
self._log(candidate, False, stability_reason)
self.consecutive_rejections += 1
return False, risk, f"auto-rejected: unstable ({stability_reason})"
reg_pass, reg_score = self.regression_suite(candidate, core)
if not reg_pass:
self._log(candidate, False, f"regression fail score={reg_score:.3f}")
self.consecutive_rejections += 1
return False, reg_score, "auto-rejected: regression failure"
self._log(candidate, True, f"risk={risk:.2f} reg={reg_score:.3f}")
self.consecutive_rejections = 0
self.baseline_fitness = max(self.baseline_fitness, reg_score)
return True, reg_score, "auto-approved"
def _log(self, candidate: AetherConfig, approved: bool, reason: str):
entry = {
"timestamp": time.time(),
"approved": approved,
"config_hash": hashlib.sha256(
json.dumps(asdict(candidate), sort_keys=True).encode()
).hexdigest()[:16],
"reason": reason,
}
self.modification_history.append(entry)
self.audit_log.append(entry)
def update_good_checkpoint(self, config: AetherConfig, fitness: float):
self.last_good_config = copy.deepcopy(config)
self.last_good_fitness = fitness
def summary(self) -> Dict[str, Any]:
total = len(self.modification_history)
approved = sum(1 for m in self.modification_history if m["approved"])
return {
"total_attempted": total,
"approved": approved,
"rejected": total - approved,
"consecutive_rejections": self.consecutive_rejections,
"baseline_fitness": self.baseline_fitness,
"last_good_fitness": self.last_good_fitness,
}
# ============================================================================
# 2. MEMORY SYSTEM (CoALA 4-tier + Temporal)
# ============================================================================
class WorkingMemory:
def __init__(self, capacity: int = 16):
self.capacity = capacity
self.buffer: deque = deque(maxlen=capacity)
self.attention = nn.Parameter(torch.ones(capacity))
def store(self, item: Dict[str, Any]):
item["_t"] = time.time()
self.buffer.append(item)
def retrieve(self, query: str, top_k: int = 3) -> List[Dict]:
if not self.buffer:
return []
scores = []
buf = list(self.buffer)
for i, item in enumerate(buf):
text = json.dumps(item)
score = sum(1 for w in query.lower().split() if w in text.lower())
# attention weighting (learned)
attn = torch.sigmoid(self.attention[i % self.capacity]).item()
scores.append(score * attn)
indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)[:top_k]
return [buf[i] for i in indices]
def export(self) -> List[Dict]:
return list(self.buffer)
class EpisodicMemory:
def __init__(self, buffer_size: int = 1000):
self.buffer: deque = deque(maxlen=buffer_size)
def store(self, episode: Dict[str, Any]):
episode["_t"] = time.time()
self.buffer.append(episode)
def retrieve_similar(self, query: str, top_k: int = 5) -> List[Dict]:
if not self.buffer:
return []
buf = list(self.buffer)
scores = []
for item in buf:
text = json.dumps(item)
scores.append(sum(1 for w in query.lower().split() if w in text.lower()))
indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)[:top_k]
return [buf[i] for i in indices]
def get_recent(self, n: int = 10) -> List[Dict]:
return list(self.buffer)[-n:]
def export(self) -> List[Dict]:
return list(self.buffer)
class SemanticMemory:
def __init__(self):
self.facts: Dict[str, Any] = {}
def store_fact(self, key: str, value: Any, confidence: float = 1.0):
self.facts[key] = {"value": value, "confidence": confidence, "t": time.time()}
def retrieve(self, key: str) -> Optional[Dict]:
return self.facts.get(key)
def query(self, query: str) -> List[Dict]:
return [v for k, v in self.facts.items() if query.lower() in k.lower()]
def export(self) -> Dict:
return self.facts
class ProceduralMemory:
def __init__(self):
self.tools: Dict[str, Dict] = {}
self.usage: Dict[str, int] = {}
def register_tool(self, name: str, code: str, description: str, tags: List[str] = None):
self.tools[name] = {
"code": code, "description": description,
"tags": tags or [], "registered_at": time.time(), "version": 1,
}
self.usage[name] = 0
def get_tool(self, name: str) -> Optional[Dict]:
if name in self.tools:
self.usage[name] += 1
return self.tools[name]
return None
def search_tools(self, query: str) -> List[Dict]:
out = []
for name, tool in self.tools.items():
text = f"{name} {tool['description']} {' '.join(tool['tags'])}"
if query.lower() in text.lower():
out.append({"name": name, **tool})
return out
def merge_tools(self, cluster: List[str]) -> Optional[str]:
if len(cluster) < 2:
return None
canonical = max(cluster, key=lambda t: self.usage.get(t, 0))
merged_desc = " | ".join(self.tools[t]["description"] for t in cluster if t in self.tools)
self.tools[canonical]["description"] = merged_desc
self.tools[canonical]["version"] += 1
for t in cluster:
if t != canonical and t in self.tools:
del self.tools[t]
return canonical
def export(self) -> Dict:
return {"tools": self.tools, "usage": self.usage}
class CoALAMemory:
def __init__(self, capacity: int = 16):
self.working = WorkingMemory(capacity=capacity)
self.episodic = EpisodicMemory(buffer_size=1000)
self.semantic = SemanticMemory()
self.procedural = ProceduralMemory()
def store(self, item: Dict[str, Any], memory_type: str = "working"):
if memory_type == "working":
self.working.store(item)
elif memory_type == "episodic":
self.episodic.store(item)
elif memory_type == "semantic":
for k, v in item.items():
self.semantic.store_fact(k, v)
elif memory_type == "procedural":
if "name" in item and "code" in item:
self.procedural.register_tool(
item["name"], item["code"],
item.get("description", ""), item.get("tags", [])
)
def retrieve(self, query: str, memory_type: str = "all", top_k: int = 5) -> List[Dict]:
if memory_type == "all":
out = []
out.extend(self.working.retrieve(query, top_k=top_k // 2))
out.extend(self.episodic.retrieve_similar(query, top_k=top_k))
out.extend(self.semantic.query(query)[:top_k])
return out[:top_k]
elif memory_type == "working":
return self.working.retrieve(query, top_k)
elif memory_type == "episodic":
return self.episodic.retrieve_similar(query, top_k)
elif memory_type == "semantic":
return self.semantic.query(query)[:top_k]
elif memory_type == "procedural":
return self.procedural.search_tools(query)
return []
@property
def buffer(self):
return self.working.buffer
def export(self) -> Dict[str, Any]:
return {
"working": self.working.export(),
"episodic": self.episodic.export(),
"semantic": self.semantic.export(),
"procedural": self.procedural.export(),
}
class TemporalMemory(nn.Module):
def __init__(self, buffer_size: int = 1000, hidden_dim: int = 64):
super().__init__()
self.buffer_size = buffer_size
self.hidden_dim = hidden_dim
self.buffer: deque = deque(maxlen=buffer_size)
self.temporal_gate = nn.Sequential(
nn.Linear(2, hidden_dim), nn.ReLU(),
nn.Linear(hidden_dim, 1), nn.Sigmoid(),
)
def store(self, event: Dict[str, Any]):
event["_t"] = time.time()
self.buffer.append(event)
def retrieve_context(self, current_time: Optional[float] = None,
lookback: float = 3600.0) -> List[Dict]:
current_time = current_time or time.time()
relevant = []
for event in self.buffer:
age = current_time - event.get("_t", current_time)
if age <= lookback:
recency = torch.exp(torch.tensor(-age / lookback)).item()
relevant.append({**event, "recency": recency, "age": age})
relevant.sort(key=lambda x: x["recency"], reverse=True)
return relevant
def retrieve_with_attention(self, query_embed: torch.Tensor, top_k: int = 10) -> List[Dict]:
# Simplified: use recency-weighted retrieval
return self.retrieve_context()[:top_k]
def export(self) -> List[Dict]:
return list(self.buffer)
def __len__(self):
return len(self.buffer)
# ============================================================================
# 3. KNOWLEDGE GRAPH ENGINE (RGCN + ComplEx + Symbolic Rules)
# ============================================================================
class RGCNLayer(nn.Module):
def __init__(self, in_dim: int, out_dim: int, num_relations: int, num_bases: int = 4):
super().__init__()
self.num_relations = num_relations
self.bases = nn.Parameter(torch.Tensor(num_bases, in_dim, out_dim))
self.comp = nn.Parameter(torch.Tensor(num_relations, num_bases))
self.self_loop = nn.Parameter(torch.Tensor(in_dim, out_dim))
self.bias = nn.Parameter(torch.Tensor(out_dim))
self.reset_parameters()
def reset_parameters(self):
nn.init.xavier_uniform_(self.bases)
nn.init.xavier_uniform_(self.comp)
nn.init.xavier_uniform_(self.self_loop)
nn.init.zeros_(self.bias)
def forward(self, x, edge_index, edge_type):
num_nodes = int(edge_index.max().item()) + 1 if x is None else x.size(0)
if x is None:
x = torch.eye(num_nodes, self.bases.size(1), device=edge_index.device)
weight = torch.einsum("rb,bio->rio", self.comp, self.bases)
out = torch.zeros(num_nodes, weight.size(2), device=x.device)
for rid in range(self.num_relations):
mask = edge_type == rid
if mask.sum() == 0:
continue
ei = edge_index[:, mask]
messages = torch.mm(x[ei[0]], weight[rid])
out.index_add_(0, ei[1], messages)
out = out + torch.mm(x, self.self_loop) + self.bias
return out
class KnowledgeGraphEncoder(nn.Module):
def __init__(self, num_nodes, hidden_dim, num_relations, num_layers=2, num_bases=4):
super().__init__()
self.node_embeddings = nn.Embedding(num_nodes, hidden_dim)
self.layers = nn.ModuleList([
RGCNLayer(hidden_dim, hidden_dim, num_relations, num_bases)
for _ in range(num_layers)
])
self.norms = nn.ModuleList([nn.LayerNorm(hidden_dim) for _ in range(num_layers)])
def forward(self, edge_index, edge_type):
num_nodes = int(edge_index.max().item()) + 1
x = self.node_embeddings(torch.arange(num_nodes, device=edge_index.device))
for layer, norm in zip(self.layers, self.norms):
x = F.relu(norm(layer(x, edge_index, edge_type)))
return x
class ComplExScorer(nn.Module):
def __init__(self, num_nodes, num_relations, hidden_dim=50):
super().__init__()
self.head_real = nn.Embedding(num_nodes, hidden_dim)
self.head_imag = nn.Embedding(num_nodes, hidden_dim)
self.tail_real = nn.Embedding(num_nodes, hidden_dim)
self.tail_imag = nn.Embedding(num_nodes, hidden_dim)
self.rel_real = nn.Embedding(num_relations, hidden_dim)
self.rel_imag = nn.Embedding(num_relations, hidden_dim)
self.reset_parameters()
def reset_parameters(self):
for p in self.parameters():
nn.init.xavier_uniform_(p)
def forward(self, h, r, t):
hr, hi = self.head_real(h), self.head_imag(h)
tr, ti = self.tail_real(t), self.tail_imag(t)
rr, ri = self.rel_real(r), self.rel_imag(r)
return torch.sum(hr * rr * tr + hr * ri * ti + hi * rr * ti - hi * ri * tr, dim=-1)
def loss(self, h, r, t, neg_t=None):
pos = self.forward(h, r, t)
if neg_t is None:
neg_t = torch.randint(0, self.tail_real.num_embeddings, t.size(), device=t.device)
neg = self.forward(h, r, neg_t)
return (F.softplus(-pos) + F.softplus(neg)).mean()
class KnowledgeGraphEngine(nn.Module):
def __init__(self, embedding_dim=128, num_relations=20, max_nodes=10000):
super().__init__()
self.embedding_dim = embedding_dim
self.num_relations = num_relations
self.max_nodes = max_nodes
self.graph = nx.DiGraph()
self.node_id_map: Dict[str, int] = {}
self.relation_map: Dict[str, int] = {}
self.next_node_id = 0
self.next_rel_id = 0
self.encoder: Optional[KnowledgeGraphEncoder] = None
self.scorer: Optional[ComplExScorer] = None
self.symbolic_attention = nn.Parameter(torch.ones(num_relations))
self.rules: List[Tuple[Tuple[str, str, str], Tuple[str, str, str]]] = []
def _get_or_create_node(self, name: str) -> int:
if name not in self.node_id_map:
self.node_id_map[name] = self.next_node_id
self.graph.add_node(self.next_node_id, name=name)
self.next_node_id += 1
return self.node_id_map[name]
def _get_or_create_relation(self, name: str) -> int:
if name not in self.relation_map:
self.relation_map[name] = self.next_rel_id
self.next_rel_id += 1
return self.relation_map[name]
def add_fact(self, head: str, relation: str, tail: str, confidence: float = 1.0):
h = self._get_or_create_node(head)
t = self._get_or_create_node(tail)
r = self._get_or_create_relation(relation)
self.graph.add_edge(h, t, relation=r, name=relation, confidence=confidence)
self._ensure_capacity()
def add_rule(self, premise: Tuple[str, str, str], conclusion: Tuple[str, str, str]):
self.rules.append((premise, conclusion))
def _ensure_capacity(self):
if self.encoder is None and self.next_node_id > 0:
n = min(self.next_node_id, self.max_nodes)
r = max(self.next_rel_id, self.num_relations)
self.encoder = KnowledgeGraphEncoder(n, self.embedding_dim, r)
self.scorer = ComplExScorer(n, r, self.embedding_dim // 2)
logger.info(f"KG initialized: {n} nodes, {r} relations")
def _check_fact(self, fact: Tuple[str, str, str]) -> bool:
h, r, t = fact
if h not in self.node_id_map or t not in self.node_id_map or r not in self.relation_map:
return False
return self.graph.has_edge(self.node_id_map[h], self.node_id_map[t]) and \
self.graph.edges[self.node_id_map[h], self.node_id_map[t]].get("relation") == self.relation_map[r]
def reason_symbolic(self, query_head: str, query_relation: str) -> List[Dict]:
results = []
if query_head not in self.node_id_map:
return results
h_id = self.node_id_map[query_head]
r_name = query_relation
if r_name in self.relation_map:
r_id = self.relation_map[r_name]
for _, target, data in self.graph.out_edges(h_id, data=True):
if data.get("relation") == r_id:
results.append({
"head": query_head, "relation": r_name,
"tail": self.graph.nodes[target].get("name", str(target)),
"confidence": data.get("confidence", 1.0), "path": "direct",
})
# Rule inference
for premise, conclusion in self.rules:
p_head, p_rel, p_tail = premise
c_head, c_rel, c_tail = conclusion
if p_head == query_head and self._check_fact(premise):
results.append({
"head": c_head if c_head != "?" else query_head,
"relation": c_rel, "tail": c_tail,
"confidence": 0.8, "path": "inferred",
"rule": f"{premise} -> {conclusion}",
})
# Multi-hop BFS
for neighbor in nx.bfs_tree(self.graph, h_id, depth_limit=2).nodes():
if neighbor != h_id:
for path in nx.all_simple_paths(self.graph, h_id, neighbor, cutoff=2):
if len(path) > 1:
ed = self.graph.edges[path[0], path[1]]
results.append({
"head": query_head,
"relation": f"multi-hop via {ed.get('name', 'unknown')}",
"tail": self.graph.nodes[neighbor].get("name", str(neighbor)),
"confidence": 0.6 ** (len(path) - 1),
"path": "->".join(str(n) for n in path),
})
return sorted(results, key=lambda x: x["confidence"], reverse=True)
def reason_learned(self, query_head: str, query_relation: str, top_k: int = 5) -> List[Dict]:
if self.scorer is None or query_head not in self.node_id_map:
return []
h_id = self.node_id_map[query_head]
r_id = self.relation_map.get(query_relation)
if r_id is None:
return []
h_t = torch.tensor([h_id])
r_t = torch.tensor([r_id])
all_t = torch.arange(self.scorer.tail_real.num_embeddings)
scores = []
for i in range(0, len(all_t), 1000):
batch = all_t[i:i + 1000]
scores.extend(self.scorer(h_t.repeat(len(batch)), r_t.repeat(len(batch)), batch).tolist())
scores_t = torch.tensor(scores)
top_scores, top_idx = torch.topk(scores_t, min(top_k, len(scores_t)))
results = []
for idx, sc in zip(top_idx, top_scores):
node_name = self.graph.nodes[idx.item()].get("name", str(idx.item()))
results.append({
"head": query_head, "relation": query_relation,
"tail": node_name, "confidence": torch.sigmoid(sc).item(), "path": "learned",
})
return results
def query(self, text_query: str, top_k: int = 5) -> Dict[str, Any]:
parts = text_query.lower().split()
head = parts[0].capitalize() if parts else text_query.capitalize()
relation = " ".join(parts[1:]) if len(parts) > 1 else "related_to"
sym = self.reason_symbolic(head, relation)[:top_k]
learned = self.reason_learned(head, relation, top_k)
rel_id = self.relation_map.get(relation, 0)
sym_w = torch.sigmoid(self.symbolic_attention[rel_id % self.num_relations]).item()
learned_w = 1.0 - sym_w
for r in sym:
r["source"] = "symbolic"
r["fusion_weight"] = sym_w
for r in learned:
r["source"] = "learned"
r["fusion_weight"] = learned_w
all_r = sorted(sym + learned, key=lambda x: x.get("confidence", 0), reverse=True)
return {
"query": text_query, "results": all_r[:top_k],
"symbolic_weight": sym_w, "learned_weight": learned_w,
"num_symbolic": len(sym), "num_learned": len(learned),
}
def stats(self) -> Dict[str, Any]:
return {
"num_nodes": self.graph.number_of_nodes(),
"num_edges": self.graph.number_of_edges(),
"num_relations": len(self.relation_map),
"num_rules": len(self.rules),
}
def export(self) -> Dict[str, Any]:
edges = []
for u, v, d in self.graph.edges(data=True):
edges.append({"source": u, "target": v, "relation": d.get("name"), "confidence": d.get("confidence")})
return {
"nodes": {n: self.graph.nodes[n].get("name", str(n)) for n in self.graph.nodes()},
"edges": edges, "rules": self.rules,
}
# ============================================================================
# 4. AGENT ORCHESTRATION (4 roles + Hierarchical + BabyAGI loop)
# ============================================================================
class AgentRole:
RESEARCHER = "researcher"
ENGINEER = "engineer"
ANALYZER = "analyzer"
INTEGRATOR = "integrator"
class BaseAgent(nn.Module):
def __init__(self, role: str, hidden_dim: int = 128, vocab_size: int = 32000):
super().__init__()
self.role = role
self.hidden_dim = hidden_dim
self.encoder = nn.Sequential(
nn.Embedding(vocab_size, hidden_dim),
nn.LSTM(hidden_dim, hidden_dim, batch_first=True),
)
self.policy_head = nn.Linear(hidden_dim, hidden_dim)
self.value_head = nn.Linear(hidden_dim, 1)
self.task_history: deque = deque(maxlen=100)
self.performance_log: List[float] = []
def forward(self, input_ids: torch.Tensor) -> Dict[str, torch.Tensor]:
embeds = self.encoder[0](input_ids)
lstm_out, _ = self.encoder[1](embeds)
hidden = lstm_out[:, -1, :]
return {
"policy_logits": self.policy_head(hidden),
"value": self.value_head(hidden),
"hidden": hidden,
}
def act(self, observation: str) -> str:
self.task_history.append({"observation": observation, "t": time.time()})
actions = {
AgentRole.RESEARCHER: f"[RESEARCHER] Exploring knowledge for: '{observation[:50]}...'",
AgentRole.ENGINEER: f"[ENGINEER] Synthesizing tool for: '{observation[:50]}...'",
AgentRole.ANALYZER: f"[ANALYZER] Evaluating solution for: '{observation[:50]}...'",
AgentRole.INTEGRATOR: f"[INTEGRATOR] Merging components for: '{observation[:50]}...'",
}
return actions.get(self.role, f"[{self.role.upper()}] Processing: '{observation}'")
def update(self, reward: float):
self.performance_log.append(reward)
class HierarchicalAgent(nn.Module):
"""Macro-policy generates blueprints; micro-policy executes conditioned on blueprint."""
def __init__(self, macro_dim: int = 256, micro_dim: int = 128, num_subgoals: int = 5):
super().__init__()
self.macro_dim = macro_dim
self.micro_dim = micro_dim
self.num_subgoals = num_subgoals
self.macro_decoder = nn.LSTM(macro_dim, macro_dim, batch_first=True)
self.subgoal_head = nn.Linear(macro_dim, num_subgoals)
self.termination_token = nn.Parameter(torch.randn(macro_dim))
self.micro_encoder = nn.LSTM(micro_dim + macro_dim, micro_dim, batch_first=True)
self.action_head = nn.Linear(micro_dim, 50)
self.current_blueprint: Optional[List[str]] = None
self.active_subgoal_idx = 0
def generate_blueprint(self, task_embedding: torch.Tensor) -> List[str]:
batch_size = task_embedding.size(0)
hidden = (torch.zeros(1, batch_size, self.macro_dim),
torch.zeros(1, batch_size, self.macro_dim))
input_tok = task_embedding.unsqueeze(1)
blueprints = []
for _ in range(self.num_subgoals):
out, hidden = self.macro_decoder(input_tok, hidden)
sg_logits = self.subgoal_head(out.squeeze(1))
sg_id = torch.argmax(sg_logits, dim=-1)
sim = torch.cosine_similarity(out.squeeze(1), self.termination_token.unsqueeze(0))
if sim.item() > 0.9:
break
blueprints.append(f"subgoal_{sg_id.item()}")
input_tok = out
self.current_blueprint = blueprints
self.active_subgoal_idx = 0
return blueprints
def execute_action(self, observation: torch.Tensor, blueprint: Optional[List[str]] = None) -> torch.Tensor:
if blueprint is not None:
self.current_blueprint = blueprint
if not self.current_blueprint:
return torch.zeros(1, 50)
active = self.current_blueprint[min(self.active_subgoal_idx, len(self.current_blueprint) - 1)]
subgoal_embed = torch.randn(1, self.macro_dim)
combined = torch.cat([observation, subgoal_embed], dim=-1)
out, _ = self.micro_encoder(combined.unsqueeze(1))
return self.action_head(out.squeeze(1))
def advance_subgoal(self):
self.active_subgoal_idx += 1
def reset(self):
self.current_blueprint = None
self.active_subgoal_idx = 0
class BabyAGILoop:
def __init__(self, objective: str, max_iterations: int = 50):
self.objective = objective
self.max_iterations = max_iterations
self.task_list: deque = deque()
self.completed: List[Dict] = []
self.results: Dict[int, Any] = {}
self.iteration = 0
def create_tasks(self, previous_result: str, task_desc: str) -> List[str]:
return [f"Sub-task {len(self.task_list) + i}: Analyze {previous_result[:30]}..." for i in range(3)]
def prioritize(self) -> List[str]:
tasks = list(self.task_list)
scores = [sum(1 for w in self.objective.lower().split() if w in t.lower()) for t in tasks]
return [t for _, t in sorted(zip(scores, tasks), reverse=True)]
def execute(self, task: str, agent: BaseAgent) -> str:
result = agent.act(task)
self.completed.append({"task": task, "result": result, "iteration": self.iteration})
return result
def run(self, agent: BaseAgent) -> Dict[str, Any]:
self.task_list.append(self.objective)
while self.iteration < self.max_iterations and self.task_list:
prioritized = self.prioritize()
self.task_list = deque(prioritized)
current = self.task_list.popleft()
prev = self.completed[-1]["result"] if self.completed else ""
result = self.execute(current, agent)
self.results[self.iteration] = result
for t in self.create_tasks(result, current):
if t not in self.task_list:
self.task_list.append(t)
self.iteration += 1
return {
"completed": self.completed, "results": self.results,
"iterations": self.iteration, "objective": self.objective,
}
class AetherAgentOrchestrator(nn.Module):
def __init__(self, config: AetherConfig):
super().__init__()
self.config = config
self.agents: Dict[str, BaseAgent] = nn.ModuleDict({
"researcher": BaseAgent(AgentRole.RESEARCHER, hidden_dim=config.macro_policy_dim),
"engineer": BaseAgent(AgentRole.ENGINEER, hidden_dim=config.micro_policy_dim),
"analyzer": BaseAgent(AgentRole.ANALYZER, hidden_dim=config.micro_policy_dim),
"integrator": BaseAgent(AgentRole.INTEGRATOR, hidden_dim=config.micro_policy_dim),
})
self.leader = BaseAgent("leader", hidden_dim=config.macro_policy_dim)
self.hierarchical = HierarchicalAgent(macro_dim=config.macro_policy_dim, micro_dim=config.micro_policy_dim)
self.routing_weights = nn.Parameter(torch.ones(len(self.agents)))
self.aggregation_gate = nn.Softmax(dim=0)
self.agent_tasks: Dict[str, BabyAGILoop] = {}
self.interactions: List[Dict] = []
self.task_count = 0
def forward(self, task: str, context: Dict[str, Any]) -> Dict[str, Any]:
task_embed = torch.randn(1, self.config.macro_policy_dim)
blueprint = self.hierarchical.generate_blueprint(task_embed)
routing_probs = self.aggregation_gate(self.routing_weights)
agent_outputs = {}
for i, (name, agent) in enumerate(self.agents.items()):
weight = routing_probs[i].item()
if weight < 0.10:
continue
sub_task = blueprint[min(i, len(blueprint) - 1)] if blueprint else task
output = agent.act(f"[{name}] {sub_task}")
agent_outputs[name] = {"output": output, "weight": weight, "sub_task": sub_task}
synthesis = self.leader.act(f"Synthesize: {task} with inputs: {list(agent_outputs.keys())}")
self.interactions.append({
"task": task, "blueprint": blueprint,
"agent_outputs": agent_outputs, "leader_synthesis": synthesis,
"routing_probs": routing_probs.detach().cpu().tolist(),
"t": time.time(),
})
self.task_count += 1
return {
"output": synthesis, "blueprint": blueprint,
"agent_outputs": agent_outputs,
"routing_weights": routing_probs.detach().cpu().tolist(),
}
def execute(self, task: str, kg_context: Any, context: Dict[str, Any]) -> Dict[str, Any]:
return self.forward(task, context)
def textual_backprop(self, global_gradient: str, performance_feedback: float, beta: float = 0.5) -> Dict[str, str]:
updates = {}
for name, agent in self.agents.items():
local_grad = f"{global_gradient} + {name} perf={performance_feedback:.3f}"
blended = local_grad
updates[name] = blended
self.routing_weights.data += performance_feedback * 0.01
return updates
def co_evolve_interactions(self) -> List[Dict]:
rewards = []
for interaction in self.interactions[-10:]:
n_agents = len(interaction.get("agent_outputs", {}))
complexity = len(interaction.get("blueprint", []))
reward = n_agents * 0.1 + min(complexity * 0.05, 0.5)
rewards.append({"reward": reward, "agents_involved": n_agents})
return rewards
def run_babyagi(self, objective: str, max_iterations: int = 20) -> Dict[str, Any]:
loop = BabyAGILoop(objective, max_iterations)
result = loop.run(self.agents["researcher"])
self.agent_tasks[objective] = loop
return result
def stats(self) -> Dict[str, Any]:
return {
"total_tasks": self.task_count,
"num_agents": len(self.agents),
"total_interactions": len(self.interactions),
"routing_weights": self.routing_weights.detach().cpu().tolist(),
}
# ============================================================================
# 5. EVOLUTION ENGINE (MAP-Elites + Quality-Diversity + Auto-Oversight)
# ============================================================================
class MAPelitesArchive:
def __init__(self, dims=(10, 10), ranges=None):
self.dims = dims
self.ranges = ranges or [(0, 1), (0, 1)]
self.archive: Dict[Tuple[int, int], Tuple[AetherConfig, float]] = {}
def _index(self, measures: np.ndarray) -> Tuple[int, int]:
indices = []
for m, (lo, hi), dim in zip(measures, self.ranges, self.dims):
norm = (m - lo) / (hi - lo + 1e-8)
idx = int(np.clip(norm * dim, 0, dim - 1))
indices.append(idx)
return tuple(indices)
def add(self, config: AetherConfig, fitness: float, measures: np.ndarray) -> bool:
idx = self._index(measures)
if idx not in self.archive or self.archive[idx][1] < fitness:
self.archive[idx] = (config, fitness)
return True
return False
def sample(self, n: int = 1) -> List[AetherConfig]:
if not self.archive:
return []
items = list(self.archive.values())
selected = random.sample(items, min(n, len(items)))
return [cfg for cfg, _ in selected]
def get_best(self) -> Optional[Tuple[AetherConfig, float]]:
if not self.archive:
return None
return max(self.archive.values(), key=lambda x: x[1])
def stats(self) -> Dict[str, float]:
total_cells = self.dims[0] * self.dims[1]
return {
"coverage": len(self.archive) / total_cells,
"qd_score": sum(f for _, f in self.archive.values()),
"max_fitness": max((f for _, f in self.archive.values()), default=0),
}
class AetherEvolutionEngine:
def __init__(self, config: AetherConfig):
self.config = config
self.archive = MAPelitesArchive(
dims=config.archive_dims,
ranges=[(0, 1), (0, 1)], # (symbolic_bias_proxy, fitness)
)
self.generation = 0
self.experience_log: List[Dict] = []
def generate_candidates(self, base_config: AetherConfig, population_size: int = 8) -> List[AetherConfig]:
candidates = [base_config]
archive_seeds = self.archive.sample(n=min(2, len(self.archive.archive)))
for _ in range(population_size - len(archive_seeds) - 1):
candidates.append(self._mutate(base_config))
for cfg in archive_seeds:
candidates.append(cfg)
return candidates
def _mutate(self, config: AetherConfig) -> AetherConfig:
vec = config.to_vector()
noise = np.random.normal(0, config.mutation_rate, size=vec.shape)
mutated = vec + noise * vec
new_cfg = AetherConfig.from_vector(mutated)
# Preserve meta fields
new_cfg.generations = config.generations
new_cfg.enable_self_modification = config.enable_self_modification
new_cfg.enable_parallel_agents = config.enable_parallel_agents
new_cfg.archive_dims = config.archive_dims
return new_cfg
def select(self, candidates: List[AetherConfig], fitness_scores: List[float],
alpha_exploration: float = 0.3) -> List[AetherConfig]:
if not candidates or not fitness_scores:
return candidates[:2] if len(candidates) >= 2 else candidates
vectors = np.array([c.to_vector() for c in candidates])
f = np.array(fitness_scores)
f_norm = (f - f.min()) / (f.max() - f.min() + 1e-8)
k = min(4, len(candidates) - 1)
novelties = []
for i, v in enumerate(vectors):
dists = np.linalg.norm(vectors - v, axis=1)
dists[i] = np.inf
knn = np.partition(dists, k)[:k]
novelties.append(np.mean(knn))
nov_norm = np.array(novelties) / (max(novelties) + 1e-8)
scores = f_norm * np.sqrt(nov_norm + 1e-8)
n_select = max(1, len(candidates) // 2)
top_indices = np.argsort(scores)[-n_select:]
return [candidates[i] for i in top_indices]
def mutate(self, candidates: List[AetherConfig], mutation_rate: float = 0.15) -> List[AetherConfig]:
mutated = []
for cfg in candidates:
new_cfg = self._mutate(cfg)
# Hard constraints
if new_cfg.macro_policy_dim > 512:
new_cfg.macro_policy_dim = 512
if new_cfg.micro_policy_dim > new_cfg.macro_policy_dim:
new_cfg.micro_policy_dim = new_cfg.macro_policy_dim // 2
mutated.append(new_cfg)
return mutated
def update_archive(self, candidates: List[AetherConfig], fitness_scores: List[float]):
for cfg, fitness in zip(candidates, fitness_scores):
if fitness == -float("inf"):
continue
# Behavioral descriptor: symbolic bias proxy = num_agents / max_agents
sym_proxy = cfg.num_agents / cfg.max_agents
measures = np.array([sym_proxy, np.clip(fitness, 0, 1)])
improved = self.archive.add(cfg, fitness, measures)
if improved:
logger.debug(f"Archive improved at cell fitness={fitness:.4f}")
def get_diversity_stats(self) -> Dict[str, float]:
return self.archive.stats()
# ============================================================================
# 6. AETHER CORE (Orchestrator + Evolution Loop + Auto-Oversight)
# ============================================================================
class AetherCore(nn.Module):
def __init__(self, config: Optional[AetherConfig] = None):
super().__init__()
self.config = config or AetherConfig()
self.generation = 0
self.architecture_history: List[Dict] = []
self.fitness_log: List[float] = []
self.metadata = {"birth": time.time(), "version": "0.2.0-autonomous"}
# Subsystems (lazily initialized where possible)
self._memory: Optional[CoALAMemory] = None
self._temporal: Optional[TemporalMemory] = None
self._evolution: Optional[AetherEvolutionEngine] = None
self._agents: Optional[AetherAgentOrchestrator] = None
self._knowledge: Optional[KnowledgeGraphEngine] = None
self._oversight: Optional[AutoOversight] = None
# Neuro-symbolic fusion gate (trainable)
self.symbolic_gate = nn.Parameter(torch.tensor(0.0))
self.neural_gate = nn.Parameter(torch.tensor(0.0))
logger.info("AETHER Core v0.2.0-autonomous initialized")
@property
def memory(self) -> CoALAMemory:
if self._memory is None:
self._memory = CoALAMemory(capacity=self.config.working_memory_capacity)
return self._memory
@property
def temporal(self) -> TemporalMemory:
if self._temporal is None:
self._temporal = TemporalMemory(buffer_size=self.config.episodic_buffer_size)
return self._temporal
@property
def evolution(self) -> AetherEvolutionEngine:
if self._evolution is None:
self._evolution = AetherEvolutionEngine(self.config)
return self._evolution
@property
def agents(self) -> AetherAgentOrchestrator:
if self._agents is None:
self._agents = AetherAgentOrchestrator(self.config)
return self._agents
@property
def knowledge(self) -> KnowledgeGraphEngine:
if self._knowledge is None:
self._knowledge = KnowledgeGraphEngine(
embedding_dim=self.config.kg_embedding_dim,
num_relations=self.config.kg_num_relations,
)
return self._knowledge
@property
def oversight(self) -> AutoOversight:
if self._oversight is None:
self._oversight = AutoOversight(self.config)
return self._oversight
def forward(self, task: str, context: Optional[Dict] = None) -> Dict[str, Any]:
context = context or {}
kg_context = self.knowledge.query(task, top_k=5)
self.memory.store({"task": task, "kg_context": kg_context, "t": time.time()})
result = self.agents.execute(task, kg_context, context)
# Neuro-symbolic fusion
sym_w = torch.sigmoid(self.symbolic_gate)
neu_w = torch.sigmoid(self.neural_gate)
total = sym_w + neu_w + 1e-8
sym_w, neu_w = sym_w / total, neu_w / total
self.temporal.store({
"task": task, "result": result,
"weights": {"symbolic": sym_w.item(), "neural": neu_w.item()},
})
return {
"output": result, "symbolic_weight": sym_w.item(),
"neural_weight": neu_w.item(), "kg_context": kg_context,
"generation": self.generation,
}
def _default_evaluator(self, candidate: AetherConfig) -> float:
"""
Fully automated fitness function β€” no external API.
Scores: synthetic reasoning benchmarks + memory stress + knowledge graph coverage.
"""
scores = []
try:
# 1. Agent orchestration efficiency
orch = AetherAgentOrchestrator(candidate)
task_embed = torch.randn(1, candidate.macro_policy_dim)
blueprint = orch.hierarchical.generate_blueprint(task_embed)
scores.append(min(1.0, len(blueprint) / 4.0))
# 2. Knowledge graph reasoning coverage
kg = KnowledgeGraphEngine(embedding_dim=candidate.kg_embedding_dim, num_relations=candidate.kg_num_relations)
for i in range(15):
kg.add_fact(f"Entity{i}", "connects_to", f"Entity{i+1}")
q = kg.query("Entity0 connects_to", top_k=5)
scores.append(min(1.0, len(q["results"]) / 3.0))
# 3. Memory throughput
mem = WorkingMemory(capacity=candidate.working_memory_capacity)
for i in range(50):
mem.store({"idx": i, "data": list(range(10))})
retrieved = mem.retrieve("idx", top_k=5)
scores.append(min(1.0, len(retrieved) / 5.0))
# 4. Config balance penalty (prefer moderate values)
balance = 1.0 - abs(candidate.macro_policy_dim - 256) / 256.0
scores.append(max(0.0, balance))
except Exception as e:
logger.warning(f"Fitness evaluation failed: {e}")
return -float("inf")
return float(np.mean(scores))
def evolve(self, num_generations: Optional[int] = None,
evaluator: Optional[Callable[[AetherConfig], float]] = None) -> Dict[str, Any]:
num_generations = num_generations or self.config.generations
evaluator = evaluator or self._default_evaluator
logger.info(f"=== AUTONOMOUS EVOLUTION: {num_generations} generations ===")
best_fitness = -float("inf")
best_config: Optional[AetherConfig] = None
for gen in range(num_generations):
self.generation = gen
logger.info(f"\n--- Generation {gen} ---")
# 1. Generate candidates
candidates = self.evolution.generate_candidates(self.config, self.config.population_size)
logger.info(f"Generated {len(candidates)} candidates")
# 2. Evaluate + Auto-oversight gate
fitness_scores = []
approved_candidates = []
for candidate in candidates:
# Automated decision β€” no human
approved, score, reason = self.oversight.decide(candidate, self)
if approved:
# Full fitness evaluation
fitness = evaluator(candidate)
fitness_scores.append(fitness)
approved_candidates.append(candidate)
logger.info(f" Candidate approved | reason={reason} | fitness={fitness:.4f}")
else:
fitness_scores.append(-float("inf"))
logger.info(f" Candidate REJECTED | reason={reason}")
# 3. Auto-rollback check
current_best = max((f for f in fitness_scores if f > -float("inf")), default=-float("inf"))
if self.oversight.should_rollback(current_best):
logger.warning(f"ROLLBACK TRIGGERED: fitness dropped to {current_best:.4f}")
if self.oversight.last_good_config is not None:
self.config = copy.deepcopy(self.oversight.last_good_config)
logger.info("Rolled back to last known good configuration")
continue
# 4. Select (Performance-Novelty)
selected = self.evolution.select(candidates, fitness_scores)
# 5. Mutate
mutated = self.evolution.mutate(selected)
# 6. Validate via oversight (second pass for mutated)
validated = []
validated_scores = []
for m in mutated:
ok, _, reason = self.oversight.decide(m, self)
if ok:
validated.append(m)
validated_scores.append(evaluator(m))
else:
logger.info(f" Mutated candidate rejected: {reason}")
# 7. Integrate best
if validated and validated_scores:
best_idx = int(np.argmax(validated_scores))
best_mutated = validated[best_idx]
current_fitness = validated_scores[best_idx]
if current_fitness > best_fitness:
best_fitness = current_fitness
best_config = best_mutated
self.config = best_mutated
self.oversight.update_good_checkpoint(best_mutated, best_fitness)
arch_hash = hashlib.sha256(
json.dumps(asdict(best_mutated), sort_keys=True).encode()
).hexdigest()[:16]
self.architecture_history.append({
"generation": gen, "hash": arch_hash,
"fitness": best_fitness, "config": asdict(best_mutated),
})
logger.info(f"*** NEW BEST: gen={gen} fitness={best_fitness:.4f} hash={arch_hash} ***")
# 8. Update MAP-Elites archive
self.evolution.update_archive(candidates, fitness_scores)
self.fitness_log.append(best_fitness)
# 9. Self-reflection per generation
reflection = self.self_reflect()
logger.info(f"Reflection: {reflection['recommendations']}")
return {
"best_fitness": best_fitness,
"best_config": asdict(best_config) if best_config else None,
"generations": num_generations,
"history": self.architecture_history,
"oversight_summary": self.oversight.summary(),
"archive_stats": self.evolution.get_diversity_stats(),
}
def self_reflect(self) -> Dict[str, Any]:
recs = []
if len(self.fitness_log) > 5:
recent = self.fitness_log[-5:]
if max(recent) - min(recent) < 0.01:
recs.append("Fitness plateau detected. Increase diversity or mutation rate.")
if recent[-1] < recent[0]:
recs.append("Declining trend. Rollback or expand search.")
sym = torch.sigmoid(self.symbolic_gate).item()
if sym < 0.3:
recs.append("Symbolic reasoning underutilized. Boost KG integration.")
elif sym > 0.7:
recs.append("Symbolic dominance. Increase neural flexibility.")
return {
"generation": self.generation,
"architectures_tested": len(self.architecture_history),
"fitness_trend": self.fitness_log,
"neuro_symbolic_balance": {"symbolic": sym, "neural": 1.0 - sym},
"recommendations": recs,
"oversight": self.oversight.summary(),
}
def export_state(self) -> Dict[str, Any]:
return {
"config": asdict(self.config),
"generation": self.generation,
"architecture_history": self.architecture_history,
"fitness_log": self.fitness_log,
"metadata": self.metadata,
"knowledge": self.knowledge.export(),
"memory": self.memory.export(),
"model_state_dict": {k: v.cpu().tolist() for k, v in self.state_dict().items()},
}
@classmethod
def from_state(cls, state: Dict[str, Any]) -> "AetherCore":
cfg = AetherConfig(**state["config"])
core = cls(config=cfg)
core.generation = state["generation"]
core.architecture_history = state["architecture_history"]
core.fitness_log = state["fitness_log"]
core.metadata = state["metadata"]
return core
# ============================================================================
# 7. RUNNABLE MAIN
# ============================================================================
def run_autonomous_demo():
print("=" * 70)
print(" AETHER v0.2.0 β€” AUTONOMOUS SELF-EVOLVING ARCHITECTURE")
print(" Zero human oversight. Automated regression gating + rollback.")
print("=" * 70)
config = AetherConfig(
population_size=6,
generations=5,
mutation_rate=0.12,
macro_policy_dim=128,
micro_policy_dim=64,
num_agents=4,
working_memory_capacity=16,
episodic_buffer_size=500,
kg_embedding_dim=64,
kg_num_relations=10,
)
core = AetherCore(config)
# Seed knowledge graph
print("\n[1] Seeding Knowledge Graph...")
kg = core.knowledge
facts = [
("Intelligence", "requires", "Reasoning"),
("Reasoning", "requires", "Memory"),
("Memory", "enables", "Learning"),
("Learning", "produces", "Intelligence"),
("Agent", "has_role", "Researcher"),
("Agent", "has_role", "Engineer"),
("Agent", "has_role", "Analyzer"),
("Agent", "has_role", "Integrator"),
]
for h, r, t in facts:
kg.add_fact(h, r, t)
print(f" KG: {kg.stats()}")
# Single forward pass demo
print("\n[2] Forward Pass Demo (neuro-symbolic query)...")
result = core.forward("Intelligence requires")
print(f" Symbolic weight: {result['symbolic_weight']:.3f}")
print(f" Neural weight: {result['neural_weight']:.3f}")
print(f" Results: {len(result['kg_context']['results'])} items")
for r in result["kg_context"]["results"]:
print(f" β†’ {r['head']} --{r['relation']}--> {r['tail']} (conf={r.get('confidence',0):.2f}, src={r.get('source','?')})")
# Agent orchestration demo
print("\n[3] Agent Orchestration Demo...")
agent_result = core.agents.execute("Optimize reasoning pipeline", {}, {})
print(f" Leader synthesis: {agent_result['output'][:80]}...")
print(f" Agents activated: {list(agent_result['agent_outputs'].keys())}")
print(f" Routing weights: {[f'{w:.3f}' for w in agent_result['routing_weights']]}")
# Evolution loop (fully automated)
print("\n[4] AUTONOMOUS EVOLUTION LOOP (no human oversight)...")
evolution_result = core.evolve(num_generations=5)
print("\n[5] EVOLUTION RESULTS")
print(f" Best fitness achieved: {evolution_result['best_fitness']:.4f}")
print(f" Generations run: {evolution_result['generations']}")
print(f" Architecture changes: {len(evolution_result['history'])}")
print(f" MAP-Elites coverage: {evolution_result['archive_stats']['coverage']:.2%}")
print(f" MAP-Elites QD score: {evolution_result['archive_stats']['qd_score']:.2f}")
print(f" Auto-oversight approved: {evolution_result['oversight_summary']['approved']}")
print(f" Auto-oversight rejected: {evolution_result['oversight_summary']['rejected']}")
print(f" Consecutive rejections: {evolution_result['oversight_summary']['consecutive_rejections']}")
print("\n[6] Architecture Evolution Trajectory")
for entry in evolution_result["history"]:
print(f" Gen {entry['generation']:02d} | hash={entry['hash']} | fitness={entry['fitness']:.4f} | "
f"agents={entry['config']['num_agents']} | macro={entry['config']['macro_policy_dim']} | "
f"mut_rate={entry['config']['mutation_rate']:.3f}")
# Self-reflection
print("\n[7] Self-Reflection")
reflection = core.self_reflect()
for rec in reflection["recommendations"]:
print(f" β†’ {rec}")
# Export checkpoint
print("\n[8] Exporting state checkpoint...")
state = core.export_state()
checkpoint_path = "/app/aether_checkpoint.json"
with open(checkpoint_path, "w") as f:
json.dump(state, f, indent=2, default=str)
print(f" Checkpoint saved to: {checkpoint_path}")
print("\n" + "=" * 70)
print(" DEMO COMPLETE. AETHER is fully autonomous.")
print("=" * 70)
return core, evolution_result
if __name__ == "__main__":
run_autonomous_demo()