aether-core / aether /core.py
camdog920's picture
Upload aether/core.py
997a0f9 verified
"""
AETHER Core: Central orchestrator integrating all subsystems.
Design: Neuro-Symbolic Fluidity + Constrained Self-Modification
"""
import torch
import torch.nn as nn
from typing import Dict, List, Any, Optional, Callable
import logging
from dataclasses import dataclass, field
import json
import hashlib
import time
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("AETHER.Core")
@dataclass
class AetherConfig:
"""Configuration for AETHER system evolution."""
# Evolution
population_size: int = 8
generations: int = 10
mutation_rate: float = 0.15
crossover_rate: float = 0.3
# Safety
sandbox_timeout: float = 30.0
max_architecture_depth: int = 5
require_human_approval: bool = False
# Hierarchical Reasoning
macro_policy_dim: int = 256
micro_policy_dim: int = 128
num_agents: int = 4
# Memory
working_memory_capacity: int = 16
episodic_buffer_size: int = 1000
# Knowledge
kg_embedding_dim: int = 128
kg_num_relations: int = 20
# Training
learning_rate: float = 2e-5
batch_size: int = 4
gradient_accumulation_steps: int = 8
# Meta
enable_self_modification: bool = True
enable_parallel_agents: bool = True
log_level: str = "INFO"
class AetherCore(nn.Module):
"""
Central controller for AETHER.
Manages the recursive evolution loop, agent orchestration,
knowledge integration, and safety constraints.
"""
def __init__(self, config: Optional[AetherConfig] = None,
model_name: str = "Qwen/Qwen2.5-0.5B-Instruct"):
super().__init__()
self.config = config or AetherConfig()
self.model_name = model_name
# Subsystems (initialized lazily)
self._memory = None
self._evolution = None
self._agents = None
self._knowledge = None
self._safety = None
# State tracking
self.generation = 0
self.architecture_history: List[Dict] = []
self.fitness_log: List[float] = []
self.metadata: Dict[str, Any] = {
"birth_timestamp": time.time(),
"model_name": model_name,
"version": "0.1.0",
}
# Neuro-symbolic bridge: learned attention over symbolic rules
self.symbolic_gate = nn.Parameter(torch.randn(1))
self.neural_gate = nn.Parameter(torch.randn(1))
logger.info(f"AETHER Core initialized with model: {model_name}")
@property
def memory(self):
if self._memory is None:
from .memory import CoALAMemory, TemporalMemory
self._memory = {
"working": CoALAMemory(capacity=self.config.working_memory_capacity),
"temporal": TemporalMemory(buffer_size=self.config.episodic_buffer_size),
}
return self._memory
@property
def evolution(self):
if self._evolution is None:
from .evolution import AetherEvolutionEngine
self._evolution = AetherEvolutionEngine(self.config)
return self._evolution
@property
def agents(self):
if self._agents is None:
from .agents import AetherAgentOrchestrator
self._agents = AetherAgentOrchestrator(self.config)
return self._agents
@property
def knowledge(self):
if self._knowledge is None:
from .knowledge import KnowledgeGraphEngine
self._knowledge = KnowledgeGraphEngine(
embedding_dim=self.config.kg_embedding_dim,
num_relations=self.config.kg_num_relations,
)
return self._knowledge
@property
def safety(self):
if self._safety is None:
from .safety import SafetySandbox
self._safety = SafetySandbox(timeout=self.config.sandbox_timeout)
return self._safety
def forward(self, task: str, context: Optional[Dict] = None) -> Dict[str, Any]:
"""
Main forward pass: given a task, orchestrate agents, query knowledge,
and produce output through neuro-symbolic fusion.
"""
context = context or {}
# 1. Retrieve relevant knowledge
kg_context = self.knowledge.query(task, top_k=5)
# 2. Load into working memory
self.memory["working"].store({
"task": task,
"kg_context": kg_context,
"timestamp": time.time(),
})
# 3. Hierarchical agent execution
result = self.agents.execute(task, kg_context, context)
# 4. Neuro-symbolic fusion gate
symbolic_weight = torch.sigmoid(self.symbolic_gate)
neural_weight = torch.sigmoid(self.neural_gate)
# Normalize
total = symbolic_weight + neural_weight + 1e-8
symbolic_weight = symbolic_weight / total
neural_weight = neural_weight / total
# 5. Store to episodic memory
self.memory["temporal"].store({
"task": task,
"result": result,
"weights": {
"symbolic": symbolic_weight.item(),
"neural": neural_weight.item(),
}
})
return {
"output": result,
"symbolic_weight": symbolic_weight.item(),
"neural_weight": neural_weight.item(),
"kg_context": kg_context,
"generation": self.generation,
}
def evolve(self, evaluation_function: Callable[[Dict], float],
num_generations: Optional[int] = None) -> Dict[str, Any]:
"""
Recursive evolutionary loop: generate candidates, evaluate,
select, mutate, validate, integrate.
Based on AlphaEvolve + GEA + ASI-Evolve methodology.
"""
num_generations = num_generations or self.config.generations
logger.info(f"Starting evolution for {num_generations} generations")
best_fitness = -float('inf')
best_config = None
for gen in range(num_generations):
self.generation = gen
# Generate candidate variants
candidates = self.evolution.generate_candidates(
base_config=self.config,
population_size=self.config.population_size,
)
# Evaluate each candidate
fitness_scores = []
for candidate in candidates:
# Safety sandbox evaluation
with self.safety.sandbox():
try:
score = evaluation_function(candidate)
fitness_scores.append(score)
except Exception as e:
logger.warning(f"Candidate failed evaluation: {e}")
fitness_scores.append(-float('inf'))
# Select top performers (Performance-Novelty from GEA)
selected = self.evolution.select(
candidates, fitness_scores,
alpha_exploration=0.3,
)
# Apply constrained mutations
mutated = self.evolution.mutate(
selected,
mutation_rate=self.config.mutation_rate,
max_depth=self.config.max_architecture_depth,
)
# Validate stability
validated = []
for candidate in mutated:
if self.safety.validate_architecture(candidate):
validated.append(candidate)
# Integrate best
if validated:
best_idx = max(range(len(validated)),
key=lambda i: fitness_scores[min(i, len(fitness_scores)-1)])
best_candidate = validated[best_idx]
current_fitness = fitness_scores[min(best_idx, len(fitness_scores)-1)]
if current_fitness > best_fitness:
best_fitness = current_fitness
best_config = best_candidate
self.config = best_candidate
# Log architecture change
arch_hash = hashlib.sha256(
json.dumps(best_candidate.__dict__, sort_keys=True).encode()
).hexdigest()[:16]
self.architecture_history.append({
"generation": gen,
"hash": arch_hash,
"fitness": best_fitness,
"config": best_candidate.__dict__,
})
logger.info(f"Gen {gen}: New best fitness={best_fitness:.4f}, hash={arch_hash}")
self.fitness_log.append(best_fitness)
return {
"best_fitness": best_fitness,
"best_config": best_config.__dict__ if best_config else None,
"generations_evolved": num_generations,
"architecture_history": self.architecture_history,
}
def self_reflect(self) -> Dict[str, Any]:
"""
Meta-cognitive reflection on system performance and architecture.
Inspired by GEA experience sharing and Yunjue Agent self-reflection.
"""
reflection = {
"generation": self.generation,
"total_architectures_tested": len(self.architecture_history),
"fitness_trend": self.fitness_log,
"memory_stats": {
"working_items": len(self.memory["working"].buffer),
"episodic_items": len(self.memory["temporal"].buffer),
},
"knowledge_stats": self.knowledge.stats(),
"agent_stats": self.agents.stats(),
"neuro_symbolic_balance": {
"symbolic_gate": torch.sigmoid(self.symbolic_gate).item(),
"neural_gate": torch.sigmoid(self.neural_gate).item(),
},
"recommendations": self._generate_recommendations(),
}
return reflection
def _generate_recommendations(self) -> List[str]:
"""Generate evolution directives based on performance analysis."""
recs = []
if len(self.fitness_log) > 5:
recent = self.fitness_log[-5:]
if max(recent) - min(recent) < 0.01:
recs.append("Fitness plateau detected. Increase mutation rate or population diversity.")
if recent[-1] < recent[0]:
recs.append("Performance declining. Consider rolling back to earlier architecture.")
sym_gate = torch.sigmoid(self.symbolic_gate).item()
if sym_gate < 0.3:
recs.append("Symbolic reasoning underutilized. Boost knowledge graph integration.")
elif sym_gate > 0.7:
recs.append("Symbolic dominance detected. Increase neural flexibility.")
return recs
def export_state(self) -> Dict[str, Any]:
"""Export full system state for checkpointing."""
return {
"config": self.config.__dict__,
"generation": self.generation,
"architecture_history": self.architecture_history,
"fitness_log": self.fitness_log,
"metadata": self.metadata,
"knowledge_state": self.knowledge.export(),
"memory_state": {
"working": self.memory["working"].export(),
"temporal": self.memory["temporal"].export(),
},
"model_state_dict": {k: v.cpu().tolist() for k, v in self.state_dict().items()},
}
@classmethod
def from_state(cls, state: Dict[str, Any]) -> "AetherCore":
"""Restore AETHER from checkpoint."""
config = AetherConfig(**state["config"])
core = cls(config=config, model_name=state["metadata"]["model_name"])
core.generation = state["generation"]
core.architecture_history = state["architecture_history"]
core.fitness_log = state["fitness_log"]
core.metadata = state["metadata"]
return core