| """ |
| AETHER Evolution Engine. |
| Integrates AlphaEvolve-style code diff evolution, |
| GEA-style group experience sharing, |
| MAP-Elites diversity maintenance, |
| and HiMAC hierarchical co-evolution phases. |
| """ |
|
|
| import numpy as np |
| import torch |
| from typing import List, Dict, Any, Callable, Optional, Tuple |
| import random |
| import copy |
| import logging |
| from dataclasses import dataclass, fields |
|
|
| logger = logging.getLogger("AETHER.Evolution") |
|
|
|
|
| @dataclass |
| class ArchitectureDNA: |
| """Genotype encoding for AETHER architecture variants.""" |
| population_size: int |
| mutation_rate: float |
| learning_rate: float |
| macro_policy_dim: int |
| micro_policy_dim: int |
| num_agents: int |
| kg_embedding_dim: int |
| symbolic_bias: float = 0.5 |
| |
| def to_vector(self) -> np.ndarray: |
| return np.array([ |
| self.population_size, |
| self.mutation_rate, |
| self.learning_rate * 1e5, |
| self.macro_policy_dim, |
| self.micro_policy_dim, |
| self.num_agents, |
| self.kg_embedding_dim, |
| self.symbolic_bias * 10, |
| ]) |
| |
| @classmethod |
| def from_vector(cls, vec: np.ndarray) -> "ArchitectureDNA": |
| return cls( |
| population_size=int(np.clip(vec[0], 2, 64)), |
| mutation_rate=float(np.clip(vec[1], 0.01, 0.5)), |
| learning_rate=float(np.clip(vec[2] / 1e5, 1e-6, 1e-3)), |
| macro_policy_dim=int(np.clip(vec[3], 64, 512)), |
| micro_policy_dim=int(np.clip(vec[4], 32, 256)), |
| num_agents=int(np.clip(vec[5], 1, 16)), |
| kg_embedding_dim=int(np.clip(vec[6], 32, 512)), |
| symbolic_bias=float(np.clip(vec[7] / 10, 0.0, 1.0)), |
| ) |
| |
| def to_config_dict(self) -> Dict[str, Any]: |
| return { |
| "population_size": self.population_size, |
| "mutation_rate": self.mutation_rate, |
| "learning_rate": self.learning_rate, |
| "macro_policy_dim": self.macro_policy_dim, |
| "micro_policy_dim": self.micro_policy_dim, |
| "num_agents": self.num_agents, |
| "kg_embedding_dim": self.kg_embedding_dim, |
| } |
|
|
|
|
| class MAPelitesArchive: |
| """ |
| MAP-Elites archive for quality-diversity optimization. |
| Cells indexed by behavioral descriptors (capability dimensions). |
| """ |
| def __init__(self, dims: Tuple[int, int] = (10, 10), |
| ranges: List[Tuple[float, float]] = None): |
| self.dims = dims |
| self.ranges = ranges or [(0, 1), (0, 1)] |
| self.archive: Dict[Tuple[int, int], Tuple[ArchitectureDNA, float]] = {} |
| |
| def _get_index(self, measures: np.ndarray) -> Tuple[int, int]: |
| """Map continuous measures to discrete cell indices.""" |
| indices = [] |
| for m, (low, high), dim in zip(measures, self.ranges, self.dims): |
| normalized = (m - low) / (high - low + 1e-8) |
| idx = int(np.clip(normalized * dim, 0, dim - 1)) |
| indices.append(idx) |
| return tuple(indices) |
| |
| def add(self, dna: ArchitectureDNA, fitness: float, |
| measures: np.ndarray) -> bool: |
| """Add solution to archive. Returns True if improved cell.""" |
| idx = self._get_index(measures) |
| if idx not in self.archive or self.archive[idx][1] < fitness: |
| self.archive[idx] = (dna, fitness) |
| return True |
| return False |
| |
| def sample(self, n: int = 1) -> List[ArchitectureDNA]: |
| """Sample random solutions from archive.""" |
| if not self.archive: |
| return [] |
| items = list(self.archive.values()) |
| selected = random.sample(items, min(n, len(items))) |
| return [dna for dna, _ in selected] |
| |
| def get_best(self) -> Optional[Tuple[ArchitectureDNA, float]]: |
| """Get highest fitness solution.""" |
| if not self.archive: |
| return None |
| return max(self.archive.values(), key=lambda x: x[1]) |
| |
| def stats(self) -> Dict[str, float]: |
| total_cells = self.dims[0] * self.dims[1] |
| return { |
| "coverage": len(self.archive) / total_cells, |
| "qd_score": sum(f for _, f in self.archive.values()), |
| "max_fitness": max((f for _, f in self.archive.values()), default=0), |
| } |
|
|
|
|
| class AetherEvolutionEngine: |
| """ |
| Evolutionary engine combining: |
| - AlphaEvolve-style LLM-guided mutation (code diffs) |
| - GEA-style group experience sharing |
| - MAP-Elites quality-diversity |
| - HiMAC hierarchical co-evolution phases |
| """ |
| |
| def __init__(self, config): |
| self.config = config |
| self.archive = MAPelitesArchive( |
| dims=(10, 10), |
| ranges=[(0, 1), (0, 1)], |
| ) |
| self.generation = 0 |
| self.experience_log: List[Dict] = [] |
| |
| def generate_candidates(self, base_config, |
| population_size: int = 8) -> List[Any]: |
| """ |
| Generate candidate architecture variants. |
| Uses mutation + archive seeding. |
| """ |
| candidates = [] |
| |
| |
| archive_seeds = self.archive.sample(n=min(2, len(self.archive.archive))) |
| |
| |
| candidates.append(base_config) |
| |
| |
| for _ in range(population_size - len(archive_seeds) - 1): |
| mutated = self._mutate_config(base_config) |
| candidates.append(mutated) |
| |
| |
| for dna in archive_seeds: |
| from .core import AetherConfig |
| cfg = AetherConfig(**dna.to_config_dict()) |
| candidates.append(cfg) |
| |
| return candidates |
| |
| def _mutate_config(self, config) -> Any: |
| """Apply constrained mutation to config.""" |
| from .core import AetherConfig |
| |
| dna = ArchitectureDNA( |
| population_size=config.population_size, |
| mutation_rate=config.mutation_rate, |
| learning_rate=config.learning_rate, |
| macro_policy_dim=config.macro_policy_dim, |
| micro_policy_dim=config.micro_policy_dim, |
| num_agents=config.num_agents, |
| kg_embedding_dim=config.kg_embedding_dim, |
| symbolic_bias=getattr(config, 'symbolic_bias', 0.5), |
| ) |
| |
| vec = dna.to_vector() |
| |
| |
| noise = np.random.normal(0, config.mutation_rate, size=vec.shape) |
| mutated_vec = vec + noise * vec |
| |
| new_dna = ArchitectureDNA.from_vector(mutated_vec) |
| |
| new_config = AetherConfig(**new_dna.to_config_dict()) |
| new_config.generations = config.generations |
| new_config.sandbox_timeout = config.sandbox_timeout |
| new_config.max_architecture_depth = config.max_architecture_depth |
| new_config.enable_self_modification = config.enable_self_modification |
| |
| return new_config |
| |
| def select(self, candidates: List[Any], fitness_scores: List[float], |
| alpha_exploration: float = 0.3) -> List[Any]: |
| """ |
| Select candidates using Performance-Novelty scoring (from GEA). |
| score(i) = performance_i * sqrt(novelty_i) |
| """ |
| if not candidates or not fitness_scores: |
| return candidates[:2] if len(candidates) >= 2 else candidates |
| |
| vectors = [] |
| for cfg in candidates: |
| dna = ArchitectureDNA( |
| population_size=cfg.population_size, |
| mutation_rate=cfg.mutation_rate, |
| learning_rate=cfg.learning_rate, |
| macro_policy_dim=cfg.macro_policy_dim, |
| micro_policy_dim=cfg.micro_policy_dim, |
| num_agents=cfg.num_agents, |
| kg_embedding_dim=cfg.kg_embedding_dim, |
| ) |
| vectors.append(dna.to_vector()) |
| |
| vectors = np.array(vectors) |
| |
| f = np.array(fitness_scores) |
| f_norm = (f - f.min()) / (f.max() - f.min() + 1e-8) |
| |
| k = min(4, len(candidates) - 1) |
| novelties = [] |
| for i, v in enumerate(vectors): |
| distances = np.linalg.norm(vectors - v, axis=1) |
| distances[i] = np.inf |
| knn = np.partition(distances, k)[:k] |
| novelty = np.mean(knn) |
| novelties.append(novelty) |
| |
| novelties = np.array(novelties) |
| nov_norm = novelties / (novelties.max() + 1e-8) |
| |
| scores = f_norm * np.sqrt(nov_norm + 1e-8) |
| |
| n_select = max(1, len(candidates) // 2) |
| top_indices = np.argsort(scores)[-n_select:] |
| |
| selected = [candidates[i] for i in top_indices] |
| |
| logger.info(f"Selected {len(selected)} candidates. " |
| f"Score range: [{scores.min():.3f}, {scores.max():.3f}]") |
| |
| return selected |
| |
| def mutate(self, candidates: List[Any], mutation_rate: float = 0.15, |
| max_depth: int = 5) -> List[Any]: |
| """ |
| Apply constrained mutations. |
| Enforces max architecture depth and safety constraints. |
| """ |
| mutated = [] |
| for cfg in candidates: |
| new_cfg = self._mutate_config(cfg) |
| |
| if new_cfg.macro_policy_dim > 512: |
| new_cfg.macro_policy_dim = 512 |
| if new_cfg.micro_policy_dim > new_cfg.macro_policy_dim: |
| new_cfg.micro_policy_dim = new_cfg.macro_policy_dim // 2 |
| if new_cfg.num_agents > max_depth * 2: |
| new_cfg.num_agents = max_depth * 2 |
| |
| mutated.append(new_cfg) |
| |
| return mutated |
| |
| def co_evolve_phases(self, macro_policy, micro_policy, |
| macro_env_fn, micro_env_fn, |
| num_iterations: int = 10) -> Tuple[Any, Any]: |
| """ |
| HiMAC-style iterative co-evolution. |
| Phase A: Macro-exploration (freeze micro) |
| Phase B: Micro-adaptation (freeze macro, train on best blueprint) |
| """ |
| logger.info(f"Starting hierarchical co-evolution for {num_iterations} iterations") |
| |
| best_blueprint = None |
| best_reward = -float('inf') |
| |
| for iteration in range(num_iterations): |
| logger.info(f"Iteration {iteration}: Phase A - Macro Exploration") |
| blueprints = [] |
| rewards = [] |
| |
| for _ in range(8): |
| blueprint = macro_policy.sample() |
| reward = macro_env_fn(blueprint, micro_policy) |
| blueprints.append(blueprint) |
| rewards.append(reward) |
| |
| r = np.array(rewards) |
| advantages = (r - r.mean()) / (r.std() + 1e-8) |
| |
| macro_policy.update(blueprints, advantages) |
| |
| best_idx = int(np.argmax(rewards)) |
| if rewards[best_idx] > best_reward: |
| best_reward = rewards[best_idx] |
| best_blueprint = blueprints[best_idx] |
| |
| logger.info(f"Iteration {iteration}: Phase B - Micro Adaptation") |
| if best_blueprint is not None: |
| micro_policy.update(best_blueprint, micro_env_fn) |
| |
| return macro_policy, micro_policy |
| |
| def share_experience(self, agent_group: List[Any], |
| traces: List[Dict]) -> List[str]: |
| """ |
| GEA-style experience sharing: agents reflect on group traces |
| and generate evolution directives. |
| """ |
| aggregated = { |
| "patches_applied": [], |
| "predicted_patches": [], |
| "execution_logs": [], |
| "outcomes": [], |
| } |
| |
| for trace in traces: |
| for key in aggregated: |
| if key in trace: |
| aggregated[key].append(trace[key]) |
| |
| directives = [] |
| for agent in agent_group: |
| directive = self._generate_directive(agent, aggregated) |
| directives.append(directive) |
| |
| self.experience_log.append({ |
| "generation": self.generation, |
| "group_size": len(agent_group), |
| "traces": len(traces), |
| "directives": directives, |
| }) |
| |
| self.generation += 1 |
| return directives |
| |
| def _generate_directive(self, agent, aggregated: Dict) -> str: |
| success_rate = (np.mean(aggregated["outcomes"]) |
| if aggregated["outcomes"] else 0.5) |
| |
| if success_rate < 0.3: |
| return "Increase exploration diversity. Decrease learning rate. Add more agents." |
| elif success_rate > 0.8: |
| return "Consolidate current strategy. Increase exploitation. Optimize inference speed." |
| else: |
| return "Balance exploration and exploitation. Refine tool definitions." |
| |
| def update_archive(self, candidates: List[Any], |
| fitness_scores: List[float]) -> None: |
| """Update MAP-Elites archive with evaluated candidates.""" |
| for cfg, fitness in zip(candidates, fitness_scores): |
| if fitness == -float('inf'): |
| continue |
| |
| symbolic_bias = getattr(cfg, 'symbolic_bias', 0.5) |
| measures = np.array([ |
| symbolic_bias, |
| np.clip(fitness, 0, 1), |
| ]) |
| |
| dna = ArchitectureDNA( |
| population_size=cfg.population_size, |
| mutation_rate=cfg.mutation_rate, |
| learning_rate=cfg.learning_rate, |
| macro_policy_dim=cfg.macro_policy_dim, |
| micro_policy_dim=cfg.micro_policy_dim, |
| num_agents=cfg.num_agents, |
| kg_embedding_dim=cfg.kg_embedding_dim, |
| symbolic_bias=symbolic_bias, |
| ) |
| |
| improved = self.archive.add(dna, fitness, measures) |
| if improved: |
| logger.debug(f"Archive improved at cell with fitness {fitness:.4f}") |
| |
| def get_diversity_stats(self) -> Dict[str, float]: |
| return self.archive.stats() |
|
|