| """ |
| Multi-Agent System β Shared experience replay, agent delegation, specialist agents. |
| |
| Purpose Agent is the world's first SLM-native multi-agent framework with |
| SHARED SELF-IMPROVEMENT. Agents learn from each other's experiences. |
| |
| Architecture: |
| - AgentTeam: A group of specialist agents with shared experience replay |
| - DelegatingOrchestrator: Routes tasks to the best-suited agent |
| - SharedMemory: Cross-agent heuristic sharing with credit assignment |
| |
| Key insight: When Agent A solves a hard problem, Agent B can immediately |
| benefit from the distilled heuristic β no retraining needed. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import time |
| from typing import Any, Callable |
|
|
| from purpose_agent.types import ( |
| Action, Heuristic, MemoryTier, State, Trajectory, TrajectoryStep, |
| ) |
| from purpose_agent.llm_backend import LLMBackend, ChatMessage |
| from purpose_agent.actor import Actor |
| from purpose_agent.purpose_function import PurposeFunction |
| from purpose_agent.experience_replay import ExperienceReplay |
| from purpose_agent.optimizer import HeuristicOptimizer |
| from purpose_agent.orchestrator import Environment, Orchestrator, TaskResult |
| from purpose_agent.tools import Tool, ToolRegistry |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
|
|
| class AgentSpec: |
| """ |
| Specification for a specialist agent in a multi-agent team. |
| |
| Example: |
| researcher = AgentSpec( |
| name="researcher", |
| role="Find and synthesize information from the web", |
| tools=[WebSearchTool(), ReadFileTool()], |
| model=create_slm_backend("qwen3-1.7b"), # Can use SLM! |
| ) |
| coder = AgentSpec( |
| name="coder", |
| role="Write and debug Python code", |
| tools=[PythonExecTool(), ReadFileTool(), WriteFileTool()], |
| model=create_slm_backend("phi-4-mini"), |
| ) |
| """ |
|
|
| def __init__( |
| self, |
| name: str, |
| role: str, |
| tools: list[Tool] | None = None, |
| model: LLMBackend | None = None, |
| expertise_keywords: list[str] | None = None, |
| max_steps: int = 15, |
| ): |
| self.name = name |
| self.role = role |
| self.tools = tools or [] |
| self.model = model |
| self.expertise_keywords = expertise_keywords or [] |
| self.max_steps = max_steps |
|
|
| def to_prompt(self) -> str: |
| """Format agent description for delegation prompt.""" |
| tools_str = ", ".join(t.name for t in self.tools) if self.tools else "none" |
| return f"- **{self.name}**: {self.role} (tools: {tools_str})" |
|
|
|
|
| |
| |
| |
|
|
| class AgentTeam: |
| """ |
| A team of specialist agents that share experience and learn together. |
| |
| This is the core multi-agent primitive. Key features: |
| - Shared experience replay: all agents' trajectories go to one buffer |
| - Cross-agent heuristic transfer: when one agent learns, all benefit |
| - Automatic delegation: tasks routed to best-suited agent |
| - Cost-aware: can mix SLMs (cheap specialists) with LLMs (expensive generalists) |
| |
| Usage: |
| team = AgentTeam( |
| agents=[researcher, coder, reviewer], |
| default_model=OllamaBackend(model="qwen3:1.7b"), |
| environment=my_env, |
| ) |
| result = team.run_task("Build a web scraper for...") |
| |
| SLM-native design: |
| Each agent can use a DIFFERENT model β assign expensive LLMs only |
| to agents that need them, use SLMs everywhere else. |
| """ |
|
|
| def __init__( |
| self, |
| agents: list[AgentSpec], |
| default_model: LLMBackend, |
| environment: Environment, |
| critic_model: LLMBackend | None = None, |
| shared_memory_capacity: int = 1000, |
| persistence_dir: str | None = None, |
| ): |
| self.agent_specs = {a.name: a for a in agents} |
| self.default_model = default_model |
| self.environment = environment |
| self.critic_model = critic_model or default_model |
|
|
| |
| replay_path = f"{persistence_dir}/shared_replay.json" if persistence_dir else None |
| self.shared_replay = ExperienceReplay( |
| capacity=shared_memory_capacity, |
| persistence_path=replay_path, |
| ) |
|
|
| |
| self.shared_optimizer = HeuristicOptimizer(llm=default_model) |
|
|
| |
| self.orchestrators: dict[str, Orchestrator] = {} |
| for spec in agents: |
| model = spec.model or default_model |
| available_actions = {"DONE": "Signal task completion"} |
| for tool in spec.tools: |
| available_actions[tool.name] = tool.description |
|
|
| orch = Orchestrator( |
| llm=model, |
| environment=environment, |
| available_actions=available_actions, |
| critic_llm=self.critic_model, |
| experience_buffer_size=shared_memory_capacity, |
| persistence_dir=f"{persistence_dir}/{spec.name}" if persistence_dir else None, |
| ) |
| |
| orch.experience_replay = self.shared_replay |
| orch.optimizer = self.shared_optimizer |
| self.orchestrators[spec.name] = orch |
|
|
| |
| self._delegation_log: list[dict] = [] |
|
|
| def run_task( |
| self, |
| purpose: str, |
| initial_state: State | None = None, |
| agent_name: str | None = None, |
| max_steps: int | None = None, |
| ) -> TaskResult: |
| """ |
| Run a task, automatically delegating to the best agent. |
| |
| If agent_name is specified, uses that agent directly. |
| Otherwise, uses the delegation LLM to choose. |
| """ |
| |
| if agent_name: |
| selected = agent_name |
| else: |
| selected = self._select_agent(purpose) |
|
|
| spec = self.agent_specs.get(selected) |
| if not spec: |
| logger.warning(f"Agent '{selected}' not found, using first agent") |
| selected = list(self.agent_specs.keys())[0] |
| spec = self.agent_specs[selected] |
|
|
| logger.info(f"π€ Delegating to agent '{selected}': {spec.role}") |
|
|
| steps = max_steps or spec.max_steps |
| orch = self.orchestrators[selected] |
|
|
| |
| self._sync_shared_memory(selected) |
|
|
| result = orch.run_task( |
| purpose=purpose, |
| initial_state=initial_state, |
| max_steps=steps, |
| task_description=f"[{selected}] {purpose}", |
| ) |
|
|
| self._delegation_log.append({ |
| "agent": selected, |
| "purpose": purpose, |
| "success": result.success, |
| "steps": result.total_steps, |
| "final_phi": result.final_phi, |
| "timestamp": time.time(), |
| }) |
|
|
| return result |
|
|
| def run_pipeline( |
| self, |
| tasks: list[dict[str, Any]], |
| initial_state: State | None = None, |
| ) -> list[TaskResult]: |
| """ |
| Run a sequence of tasks, each potentially handled by a different agent. |
| State flows from one task to the next. |
| |
| tasks = [ |
| {"purpose": "Research the topic", "agent": "researcher"}, |
| {"purpose": "Write the code", "agent": "coder"}, |
| {"purpose": "Review and fix bugs", "agent": "reviewer"}, |
| ] |
| """ |
| results = [] |
| current_state = initial_state |
|
|
| for task in tasks: |
| result = self.run_task( |
| purpose=task["purpose"], |
| initial_state=current_state, |
| agent_name=task.get("agent"), |
| max_steps=task.get("max_steps"), |
| ) |
| results.append(result) |
| current_state = result.final_state |
|
|
| return results |
|
|
| def _select_agent(self, purpose: str) -> str: |
| """ |
| Select the best agent for a task. |
| |
| Strategy: keyword matching first (fast, no LLM call), then LLM delegation. |
| """ |
| |
| purpose_lower = purpose.lower() |
| best_match = None |
| best_score = 0 |
|
|
| for name, spec in self.agent_specs.items(): |
| score = 0 |
| for keyword in spec.expertise_keywords: |
| if keyword.lower() in purpose_lower: |
| score += 1 |
| |
| for word in spec.role.lower().split(): |
| if len(word) > 3 and word in purpose_lower: |
| score += 0.5 |
| if score > best_score: |
| best_score = score |
| best_match = name |
|
|
| if best_match and best_score >= 1: |
| return best_match |
|
|
| |
| try: |
| return self._llm_select_agent(purpose) |
| except Exception: |
| |
| return list(self.agent_specs.keys())[0] |
|
|
| def _llm_select_agent(self, purpose: str) -> str: |
| """Use LLM to select the best agent.""" |
| agent_descriptions = "\n".join( |
| spec.to_prompt() for spec in self.agent_specs.values() |
| ) |
|
|
| messages = [ |
| ChatMessage(role="system", content="You are a task router. Select the best agent for the task."), |
| ChatMessage(role="user", content=( |
| f"Task: {purpose}\n\nAvailable agents:\n{agent_descriptions}\n\n" |
| f"Respond with ONLY the agent name, nothing else." |
| )), |
| ] |
|
|
| response = self.default_model.generate(messages, temperature=0.1, max_tokens=50) |
| selected = response.strip().lower().replace("*", "").replace('"', '') |
|
|
| |
| for name in self.agent_specs: |
| if name.lower() in selected or selected in name.lower(): |
| return name |
|
|
| return list(self.agent_specs.keys())[0] |
|
|
| def _sync_shared_memory(self, agent_name: str) -> None: |
| """Push shared heuristics to a specific agent.""" |
| orch = self.orchestrators.get(agent_name) |
| if not orch: |
| return |
| orch.sync_memory() |
|
|
| @property |
| def stats(self) -> dict[str, Any]: |
| return { |
| "agents": list(self.agent_specs.keys()), |
| "shared_replay_size": self.shared_replay.size, |
| "shared_heuristics": len(self.shared_optimizer.heuristic_library), |
| "delegation_log": self._delegation_log[-10:], |
| "per_agent_stats": { |
| name: orch.stats for name, orch in self.orchestrators.items() |
| }, |
| } |
|
|
| def get_learning_report(self) -> str: |
| """Show what the team has learned collectively.""" |
| lines = ["βββ Team Learning Report βββ\n"] |
| lines.append(f"Agents: {', '.join(self.agent_specs.keys())}") |
| lines.append(f"Shared experiences: {self.shared_replay.size}") |
| lines.append(f"Shared heuristics: {len(self.shared_optimizer.heuristic_library)}") |
|
|
| |
| for h in self.shared_optimizer.heuristic_library: |
| source_traj = h.source_trajectory_id |
| agent = "unknown" |
| for name, orch in self.orchestrators.items(): |
| for record in self.shared_replay.records: |
| if record.trajectory.id == source_traj: |
| if f"[{name}]" in record.trajectory.task_description: |
| agent = name |
| break |
|
|
| lines.append( |
| f"\n [{h.tier.value}] Q={h.q_value:.2f} (from {agent})" |
| f"\n {h.pattern}: {h.strategy}" |
| ) |
|
|
| return "\n".join(lines) |
|
|