aether-core / aether /agents.py
camdog920's picture
Upload aether/agents.py
786ed57 verified
"""
AETHER Agent Orchestration.
Integrates:
- smolagents multi-agent hierarchy (Manager + Workers)
- MLPO: Multi-agent guided Leader Policy Optimization
- BabyAGI task creation/prioritization/execution loop
- Agentic Neural Networks: textual backpropagation
- Yunjue Agent: Manager/Executor/Developer/Integrator/Merger/Aggregator roles
"""
import torch
import torch.nn as nn
from typing import Dict, List, Any, Optional, Callable
import logging
import time
from collections import deque
logger = logging.getLogger("AETHER.Agents")
class AgentRole:
"""Role definitions inspired by Yunjue Agent multi-agent system."""
MANAGER = "manager"
EXECUTOR = "executor"
DEVELOPER = "developer"
INTEGRATOR = "integrator"
MERGER = "merger"
AGGREGATOR = "aggregator"
RESEARCHER = "researcher"
class BaseAgent(nn.Module):
"""Base agent with policy network. Implements MLPO-style leader policy."""
def __init__(self, role: str, hidden_dim: int = 128,
vocab_size: int = 32000):
super().__init__()
self.role = role
self.hidden_dim = hidden_dim
self.encoder = nn.Sequential(
nn.Embedding(vocab_size, hidden_dim),
nn.LSTM(hidden_dim, hidden_dim, batch_first=True),
)
self.policy_head = nn.Linear(hidden_dim, hidden_dim)
self.value_head = nn.Linear(hidden_dim, 1)
self.task_history: deque = deque(maxlen=100)
self.performance_log: List[float] = []
def forward(self, input_ids: torch.Tensor) -> Dict[str, torch.Tensor]:
embeds = self.encoder[0](input_ids)
lstm_out, _ = self.encoder[1](embeds)
hidden = lstm_out[:, -1, :]
return {
"policy_logits": self.policy_head(hidden),
"value": self.value_head(hidden),
"hidden": hidden,
}
def act(self, observation: str) -> str:
self.task_history.append({
"observation": observation,
"timestamp": time.time(),
})
role_actions = {
AgentRole.MANAGER: f"[MANAGER] Decomposing task: '{observation[:50]}...'",
AgentRole.EXECUTOR: f"[EXECUTOR] Executing: '{observation[:50]}...'",
AgentRole.DEVELOPER: f"[DEVELOPER] Synthesizing tool for: '{observation[:50]}...'",
AgentRole.INTEGRATOR: f"[INTEGRATOR] Integrating components for: '{observation[:50]}...'",
AgentRole.MERGER: f"[MERGER] Consolidating tools for: '{observation[:50]}...'",
AgentRole.AGGREGATOR: f"[AGGREGATOR] Aggregating results for: '{observation[:50]}...'",
AgentRole.RESEARCHER: f"[RESEARCHER] Exploring knowledge for: '{observation[:50]}...'",
}
return role_actions.get(self.role, f"[{self.role.upper()}] Processing: '{observation}'")
def update(self, reward: float):
self.performance_log.append(reward)
class HierarchicalAgent(nn.Module):
"""
HiMAC-style hierarchical agent with Macro-Policy and Micro-Policy.
Macro: generates blueprint (sub-goals)
Micro: executes atomic actions conditioned on blueprint
"""
def __init__(self, macro_dim: int = 256, micro_dim: int = 128,
num_subgoals: int = 5):
super().__init__()
self.macro_dim = macro_dim
self.micro_dim = micro_dim
self.num_subgoals = num_subgoals
self.macro_encoder = nn.LSTM(macro_dim, macro_dim, batch_first=True)
self.macro_decoder = nn.LSTM(macro_dim, macro_dim, batch_first=True)
self.subgoal_head = nn.Linear(macro_dim, num_subgoals)
self.termination_token = nn.Parameter(torch.randn(macro_dim))
self.micro_encoder = nn.LSTM(micro_dim + macro_dim, micro_dim, batch_first=True)
self.action_head = nn.Linear(micro_dim, 50)
self.current_blueprint: Optional[List[str]] = None
self.active_subgoal_idx = 0
def generate_blueprint(self, task_embedding: torch.Tensor) -> List[str]:
batch_size = task_embedding.size(0)
hidden = (torch.zeros(1, batch_size, self.macro_dim),
torch.zeros(1, batch_size, self.macro_dim))
blueprints = []
input_token = task_embedding.unsqueeze(1)
for _ in range(self.num_subgoals):
out, hidden = self.macro_decoder(input_token, hidden)
subgoal_logits = self.subgoal_head(out.squeeze(1))
subgoal_id = torch.argmax(subgoal_logits, dim=-1)
similarity = torch.cosine_similarity(out.squeeze(1),
self.termination_token.unsqueeze(0))
if similarity.item() > 0.9:
break
blueprints.append(f"subgoal_{subgoal_id.item()}")
input_token = out
self.current_blueprint = blueprints
self.active_subgoal_idx = 0
return blueprints
def execute_action(self, observation: torch.Tensor,
blueprint: Optional[List[str]] = None) -> torch.Tensor:
if blueprint is not None:
self.current_blueprint = blueprint
if not self.current_blueprint:
return torch.zeros(1, 50)
active_subgoal = self.current_blueprint[
min(self.active_subgoal_idx, len(self.current_blueprint) - 1)
]
subgoal_embed = torch.randn(1, self.macro_dim)
combined = torch.cat([observation, subgoal_embed], dim=-1)
out, _ = self.micro_encoder(combined.unsqueeze(1))
action_logits = self.action_head(out.squeeze(1))
return action_logits
def advance_subgoal(self):
self.active_subgoal_idx += 1
def reset(self):
self.current_blueprint = None
self.active_subgoal_idx = 0
class BabyAGILoop:
"""BabyAGI-inspired task-driven autonomous loop."""
def __init__(self, objective: str, max_iterations: int = 50):
self.objective = objective
self.max_iterations = max_iterations
self.task_list: deque = deque()
self.completed_tasks: List[Dict] = []
self.results: Dict[int, Any] = {}
self.iteration = 0
def create_tasks(self, previous_result: str, task_description: str) -> List[str]:
new_tasks = [
f"Sub-task {len(self.task_list) + i}: Analyze {previous_result[:30]}..."
for i in range(3)
]
return new_tasks
def prioritize_tasks(self) -> List[str]:
tasks = list(self.task_list)
scores = []
for task in tasks:
overlap = sum(1 for word in self.objective.lower().split()
if word in task.lower())
scores.append(overlap)
sorted_tasks = [t for _, t in sorted(zip(scores, tasks), reverse=True)]
return sorted_tasks
def execute_task(self, task: str, agent: BaseAgent) -> str:
result = agent.act(task)
self.completed_tasks.append({
"task": task,
"result": result,
"iteration": self.iteration,
})
return result
def run(self, execution_agent: BaseAgent) -> Dict[str, Any]:
self.task_list.append(self.objective)
while self.iteration < self.max_iterations and self.task_list:
prioritized = self.prioritize_tasks()
self.task_list = deque(prioritized)
current_task = self.task_list.popleft()
previous_result = self.completed_tasks[-1]["result"] if self.completed_tasks else ""
result = self.execute_task(current_task, execution_agent)
self.results[self.iteration] = result
new_tasks = self.create_tasks(result, current_task)
for t in new_tasks:
if t not in self.task_list:
self.task_list.append(t)
self.iteration += 1
logger.info(f"BabyAGI iteration {self.iteration}: "
f"tasks_remaining={len(self.task_list)}, "
f"completed={len(self.completed_tasks)}")
return {
"completed_tasks": self.completed_tasks,
"results": self.results,
"iterations": self.iteration,
"objective": self.objective,
}
class AetherAgentOrchestrator(nn.Module):
"""
Multi-agent orchestrator combining:
- smolagents hierarchical delegation
- MLPO: train single leader, peers untrained
- Agentic Neural Networks: textual backpropagation
- CoMAS: co-evolving via interaction rewards
"""
def __init__(self, config):
super().__init__()
self.config = config
self.agents: Dict[str, BaseAgent] = nn.ModuleDict({
"manager": BaseAgent(AgentRole.MANAGER, hidden_dim=config.macro_policy_dim),
"executor": BaseAgent(AgentRole.EXECUTOR, hidden_dim=config.micro_policy_dim),
"developer": BaseAgent(AgentRole.DEVELOPER, hidden_dim=config.micro_policy_dim),
"researcher": BaseAgent(AgentRole.RESEARCHER, hidden_dim=config.micro_policy_dim),
})
self.leader = BaseAgent(AgentRole.MANAGER, hidden_dim=config.macro_policy_dim)
self.hierarchical = HierarchicalAgent(
macro_dim=config.macro_policy_dim,
micro_dim=config.micro_policy_dim,
)
self.routing_weights = nn.Parameter(torch.ones(len(self.agents)))
self.aggregation_gate = nn.Softmax(dim=0)
self.agent_tasks: Dict[str, BabyAGILoop] = {}
self.task_count = 0
self.agent_interactions: List[Dict] = []
def forward(self, task: str, context: Dict[str, Any]) -> Dict[str, Any]:
task_embed = torch.randn(1, self.config.macro_policy_dim)
blueprint = self.hierarchical.generate_blueprint(task_embed)
routing_probs = self.aggregation_gate(self.routing_weights)
agent_outputs = {}
for i, (name, agent) in enumerate(self.agents.items()):
if name == "manager":
continue
weight = routing_probs[i].item()
if weight < 0.15:
continue
sub_task = blueprint[min(i, len(blueprint) - 1)] if blueprint else task
output = agent.act(f"[{name}] {sub_task}")
agent_outputs[name] = {
"output": output,
"weight": weight,
"sub_task": sub_task,
}
synthesized = self.leader.act(
f"Synthesize: {task} with inputs: {list(agent_outputs.keys())}"
)
self.agent_interactions.append({
"task": task,
"blueprint": blueprint,
"agent_outputs": agent_outputs,
"leader_synthesis": synthesized,
"routing_probs": routing_probs.detach().cpu().tolist(),
"timestamp": time.time(),
})
self.task_count += 1
return {
"output": synthesized,
"blueprint": blueprint,
"agent_outputs": agent_outputs,
"routing_weights": routing_probs.detach().cpu().tolist(),
}
def execute(self, task: str, kg_context: Any, context: Dict[str, Any]) -> Dict[str, Any]:
return self.forward(task, context)
def textual_backprop(self, global_gradient: str,
performance_feedback: float,
beta: float = 0.5) -> Dict[str, str]:
updates = {}
for name, agent in self.agents.items():
local_grad = f"{global_gradient} + Agent {name} performance: {performance_feedback}"
if hasattr(agent, 'previous_gradient'):
blended = f"0.7*{local_grad} + 0.3*{agent.previous_gradient}"
else:
blended = local_grad
agent.previous_gradient = blended
updates[name] = blended
self.routing_weights.data += performance_feedback * 0.01
return updates
def co_evolve_interactions(self) -> List[Dict]:
rewards = []
for interaction in self.agent_interactions[-10:]:
num_agents_involved = len(interaction.get("agent_outputs", {}))
blueprint_complexity = len(interaction.get("blueprint", []))
reward = num_agents_involved * 0.1 + min(blueprint_complexity * 0.05, 0.5)
rewards.append({
"interaction_id": id(interaction),
"reward": reward,
"agents_involved": num_agents_involved,
})
return rewards
def run_babyagi(self, objective: str, max_iterations: int = 20) -> Dict[str, Any]:
loop = BabyAGILoop(objective, max_iterations)
result = loop.run(self.agents["manager"])
self.agent_tasks[objective] = loop
return result
def stats(self) -> Dict[str, Any]:
return {
"total_tasks": self.task_count,
"num_agents": len(self.agents),
"total_interactions": len(self.agent_interactions),
"routing_weights": self.routing_weights.detach().cpu().tolist(),
"active_tasks": len(self.agent_tasks),
}