purpose-agent / purpose_agent /multi_agent.py
Rohan03's picture
refactor: modularity fixes + plugin registry + compiled research
592f7ef verified
raw
history blame
12.2 kB
"""
Multi-Agent System β€” Shared experience replay, agent delegation, specialist agents.
Purpose Agent is the world's first SLM-native multi-agent framework with
SHARED SELF-IMPROVEMENT. Agents learn from each other's experiences.
Architecture:
- AgentTeam: A group of specialist agents with shared experience replay
- DelegatingOrchestrator: Routes tasks to the best-suited agent
- SharedMemory: Cross-agent heuristic sharing with credit assignment
Key insight: When Agent A solves a hard problem, Agent B can immediately
benefit from the distilled heuristic β€” no retraining needed.
"""
from __future__ import annotations
import json
import logging
import time
from typing import Any, Callable
from purpose_agent.types import (
Action, Heuristic, MemoryTier, State, Trajectory, TrajectoryStep,
)
from purpose_agent.llm_backend import LLMBackend, ChatMessage
from purpose_agent.actor import Actor
from purpose_agent.purpose_function import PurposeFunction
from purpose_agent.experience_replay import ExperienceReplay
from purpose_agent.optimizer import HeuristicOptimizer
from purpose_agent.orchestrator import Environment, Orchestrator, TaskResult
from purpose_agent.tools import Tool, ToolRegistry
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Agent Spec β€” defines a specialist agent
# ---------------------------------------------------------------------------
class AgentSpec:
"""
Specification for a specialist agent in a multi-agent team.
Example:
researcher = AgentSpec(
name="researcher",
role="Find and synthesize information from the web",
tools=[WebSearchTool(), ReadFileTool()],
model=create_slm_backend("qwen3-1.7b"), # Can use SLM!
)
coder = AgentSpec(
name="coder",
role="Write and debug Python code",
tools=[PythonExecTool(), ReadFileTool(), WriteFileTool()],
model=create_slm_backend("phi-4-mini"),
)
"""
def __init__(
self,
name: str,
role: str,
tools: list[Tool] | None = None,
model: LLMBackend | None = None,
expertise_keywords: list[str] | None = None,
max_steps: int = 15,
):
self.name = name
self.role = role
self.tools = tools or []
self.model = model # None = use team's default model
self.expertise_keywords = expertise_keywords or []
self.max_steps = max_steps
def to_prompt(self) -> str:
"""Format agent description for delegation prompt."""
tools_str = ", ".join(t.name for t in self.tools) if self.tools else "none"
return f"- **{self.name}**: {self.role} (tools: {tools_str})"
# ---------------------------------------------------------------------------
# Agent Team β€” multi-agent with shared memory
# ---------------------------------------------------------------------------
class AgentTeam:
"""
A team of specialist agents that share experience and learn together.
This is the core multi-agent primitive. Key features:
- Shared experience replay: all agents' trajectories go to one buffer
- Cross-agent heuristic transfer: when one agent learns, all benefit
- Automatic delegation: tasks routed to best-suited agent
- Cost-aware: can mix SLMs (cheap specialists) with LLMs (expensive generalists)
Usage:
team = AgentTeam(
agents=[researcher, coder, reviewer],
default_model=OllamaBackend(model="qwen3:1.7b"),
environment=my_env,
)
result = team.run_task("Build a web scraper for...")
SLM-native design:
Each agent can use a DIFFERENT model β€” assign expensive LLMs only
to agents that need them, use SLMs everywhere else.
"""
def __init__(
self,
agents: list[AgentSpec],
default_model: LLMBackend,
environment: Environment,
critic_model: LLMBackend | None = None,
shared_memory_capacity: int = 1000,
persistence_dir: str | None = None,
):
self.agent_specs = {a.name: a for a in agents}
self.default_model = default_model
self.environment = environment
self.critic_model = critic_model or default_model
# Shared experience replay β€” all agents contribute and benefit
replay_path = f"{persistence_dir}/shared_replay.json" if persistence_dir else None
self.shared_replay = ExperienceReplay(
capacity=shared_memory_capacity,
persistence_path=replay_path,
)
# Shared optimizer β€” distills heuristics from all agents' experiences
self.shared_optimizer = HeuristicOptimizer(llm=default_model)
# Per-agent orchestrators
self.orchestrators: dict[str, Orchestrator] = {}
for spec in agents:
model = spec.model or default_model
available_actions = {"DONE": "Signal task completion"}
for tool in spec.tools:
available_actions[tool.name] = tool.description
orch = Orchestrator(
llm=model,
environment=environment,
available_actions=available_actions,
critic_llm=self.critic_model,
experience_buffer_size=shared_memory_capacity,
persistence_dir=f"{persistence_dir}/{spec.name}" if persistence_dir else None,
)
# Share the experience replay
orch.experience_replay = self.shared_replay
orch.optimizer = self.shared_optimizer
self.orchestrators[spec.name] = orch
# Delegation history
self._delegation_log: list[dict] = []
def run_task(
self,
purpose: str,
initial_state: State | None = None,
agent_name: str | None = None,
max_steps: int | None = None,
) -> TaskResult:
"""
Run a task, automatically delegating to the best agent.
If agent_name is specified, uses that agent directly.
Otherwise, uses the delegation LLM to choose.
"""
# Select agent
if agent_name:
selected = agent_name
else:
selected = self._select_agent(purpose)
spec = self.agent_specs.get(selected)
if not spec:
logger.warning(f"Agent '{selected}' not found, using first agent")
selected = list(self.agent_specs.keys())[0]
spec = self.agent_specs[selected]
logger.info(f"πŸ€– Delegating to agent '{selected}': {spec.role}")
steps = max_steps or spec.max_steps
orch = self.orchestrators[selected]
# Sync shared heuristics to this agent before running
self._sync_shared_memory(selected)
result = orch.run_task(
purpose=purpose,
initial_state=initial_state,
max_steps=steps,
task_description=f"[{selected}] {purpose}",
)
self._delegation_log.append({
"agent": selected,
"purpose": purpose,
"success": result.success,
"steps": result.total_steps,
"final_phi": result.final_phi,
"timestamp": time.time(),
})
return result
def run_pipeline(
self,
tasks: list[dict[str, Any]],
initial_state: State | None = None,
) -> list[TaskResult]:
"""
Run a sequence of tasks, each potentially handled by a different agent.
State flows from one task to the next.
tasks = [
{"purpose": "Research the topic", "agent": "researcher"},
{"purpose": "Write the code", "agent": "coder"},
{"purpose": "Review and fix bugs", "agent": "reviewer"},
]
"""
results = []
current_state = initial_state
for task in tasks:
result = self.run_task(
purpose=task["purpose"],
initial_state=current_state,
agent_name=task.get("agent"),
max_steps=task.get("max_steps"),
)
results.append(result)
current_state = result.final_state
return results
def _select_agent(self, purpose: str) -> str:
"""
Select the best agent for a task.
Strategy: keyword matching first (fast, no LLM call), then LLM delegation.
"""
# Fast path: keyword matching
purpose_lower = purpose.lower()
best_match = None
best_score = 0
for name, spec in self.agent_specs.items():
score = 0
for keyword in spec.expertise_keywords:
if keyword.lower() in purpose_lower:
score += 1
# Also check role match
for word in spec.role.lower().split():
if len(word) > 3 and word in purpose_lower:
score += 0.5
if score > best_score:
best_score = score
best_match = name
if best_match and best_score >= 1:
return best_match
# Slow path: LLM delegation
try:
return self._llm_select_agent(purpose)
except Exception:
# Fallback: round-robin or first agent
return list(self.agent_specs.keys())[0]
def _llm_select_agent(self, purpose: str) -> str:
"""Use LLM to select the best agent."""
agent_descriptions = "\n".join(
spec.to_prompt() for spec in self.agent_specs.values()
)
messages = [
ChatMessage(role="system", content="You are a task router. Select the best agent for the task."),
ChatMessage(role="user", content=(
f"Task: {purpose}\n\nAvailable agents:\n{agent_descriptions}\n\n"
f"Respond with ONLY the agent name, nothing else."
)),
]
response = self.default_model.generate(messages, temperature=0.1, max_tokens=50)
selected = response.strip().lower().replace("*", "").replace('"', '')
# Fuzzy match
for name in self.agent_specs:
if name.lower() in selected or selected in name.lower():
return name
return list(self.agent_specs.keys())[0]
def _sync_shared_memory(self, agent_name: str) -> None:
"""Push shared heuristics to a specific agent."""
orch = self.orchestrators.get(agent_name)
if not orch:
return
orch.sync_memory()
@property
def stats(self) -> dict[str, Any]:
return {
"agents": list(self.agent_specs.keys()),
"shared_replay_size": self.shared_replay.size,
"shared_heuristics": len(self.shared_optimizer.heuristic_library),
"delegation_log": self._delegation_log[-10:],
"per_agent_stats": {
name: orch.stats for name, orch in self.orchestrators.items()
},
}
def get_learning_report(self) -> str:
"""Show what the team has learned collectively."""
lines = ["═══ Team Learning Report ═══\n"]
lines.append(f"Agents: {', '.join(self.agent_specs.keys())}")
lines.append(f"Shared experiences: {self.shared_replay.size}")
lines.append(f"Shared heuristics: {len(self.shared_optimizer.heuristic_library)}")
# Show which agent contributed which heuristics
for h in self.shared_optimizer.heuristic_library:
source_traj = h.source_trajectory_id
agent = "unknown"
for name, orch in self.orchestrators.items():
for record in self.shared_replay.records:
if record.trajectory.id == source_traj:
if f"[{name}]" in record.trajectory.task_description:
agent = name
break
lines.append(
f"\n [{h.tier.value}] Q={h.q_value:.2f} (from {agent})"
f"\n {h.pattern}: {h.strategy}"
)
return "\n".join(lines)