""" Multi-Agent System — Shared experience replay, agent delegation, specialist agents. Purpose Agent is the world's first SLM-native multi-agent framework with SHARED SELF-IMPROVEMENT. Agents learn from each other's experiences. Architecture: - AgentTeam: A group of specialist agents with shared experience replay - DelegatingOrchestrator: Routes tasks to the best-suited agent - SharedMemory: Cross-agent heuristic sharing with credit assignment Key insight: When Agent A solves a hard problem, Agent B can immediately benefit from the distilled heuristic — no retraining needed. """ from __future__ import annotations import json import logging import time from typing import Any, Callable from purpose_agent.types import ( Action, Heuristic, MemoryTier, State, Trajectory, TrajectoryStep, ) from purpose_agent.llm_backend import LLMBackend, ChatMessage from purpose_agent.actor import Actor from purpose_agent.purpose_function import PurposeFunction from purpose_agent.experience_replay import ExperienceReplay from purpose_agent.optimizer import HeuristicOptimizer from purpose_agent.orchestrator import Environment, Orchestrator, TaskResult from purpose_agent.tools import Tool, ToolRegistry logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Agent Spec — defines a specialist agent # --------------------------------------------------------------------------- class AgentSpec: """ Specification for a specialist agent in a multi-agent team. Example: researcher = AgentSpec( name="researcher", role="Find and synthesize information from the web", tools=[WebSearchTool(), ReadFileTool()], model=create_slm_backend("qwen3-1.7b"), # Can use SLM! ) coder = AgentSpec( name="coder", role="Write and debug Python code", tools=[PythonExecTool(), ReadFileTool(), WriteFileTool()], model=create_slm_backend("phi-4-mini"), ) """ def __init__( self, name: str, role: str, tools: list[Tool] | None = None, model: LLMBackend | None = None, expertise_keywords: list[str] | None = None, max_steps: int = 15, ): self.name = name self.role = role self.tools = tools or [] self.model = model # None = use team's default model self.expertise_keywords = expertise_keywords or [] self.max_steps = max_steps def to_prompt(self) -> str: """Format agent description for delegation prompt.""" tools_str = ", ".join(t.name for t in self.tools) if self.tools else "none" return f"- **{self.name}**: {self.role} (tools: {tools_str})" # --------------------------------------------------------------------------- # Agent Team — multi-agent with shared memory # --------------------------------------------------------------------------- class AgentTeam: """ A team of specialist agents that share experience and learn together. This is the core multi-agent primitive. Key features: - Shared experience replay: all agents' trajectories go to one buffer - Cross-agent heuristic transfer: when one agent learns, all benefit - Automatic delegation: tasks routed to best-suited agent - Cost-aware: can mix SLMs (cheap specialists) with LLMs (expensive generalists) Usage: team = AgentTeam( agents=[researcher, coder, reviewer], default_model=OllamaBackend(model="qwen3:1.7b"), environment=my_env, ) result = team.run_task("Build a web scraper for...") SLM-native design: Each agent can use a DIFFERENT model — assign expensive LLMs only to agents that need them, use SLMs everywhere else. """ def __init__( self, agents: list[AgentSpec], default_model: LLMBackend, environment: Environment, critic_model: LLMBackend | None = None, shared_memory_capacity: int = 1000, persistence_dir: str | None = None, ): self.agent_specs = {a.name: a for a in agents} self.default_model = default_model self.environment = environment self.critic_model = critic_model or default_model # Shared experience replay — all agents contribute and benefit replay_path = f"{persistence_dir}/shared_replay.json" if persistence_dir else None self.shared_replay = ExperienceReplay( capacity=shared_memory_capacity, persistence_path=replay_path, ) # Shared optimizer — distills heuristics from all agents' experiences self.shared_optimizer = HeuristicOptimizer(llm=default_model) # Per-agent orchestrators self.orchestrators: dict[str, Orchestrator] = {} for spec in agents: model = spec.model or default_model available_actions = {"DONE": "Signal task completion"} for tool in spec.tools: available_actions[tool.name] = tool.description orch = Orchestrator( llm=model, environment=environment, available_actions=available_actions, critic_llm=self.critic_model, experience_buffer_size=shared_memory_capacity, persistence_dir=f"{persistence_dir}/{spec.name}" if persistence_dir else None, ) # Share the experience replay orch.experience_replay = self.shared_replay orch.optimizer = self.shared_optimizer self.orchestrators[spec.name] = orch # Delegation history self._delegation_log: list[dict] = [] def run_task( self, purpose: str, initial_state: State | None = None, agent_name: str | None = None, max_steps: int | None = None, ) -> TaskResult: """ Run a task, automatically delegating to the best agent. If agent_name is specified, uses that agent directly. Otherwise, uses the delegation LLM to choose. """ # Select agent if agent_name: selected = agent_name else: selected = self._select_agent(purpose) spec = self.agent_specs.get(selected) if not spec: logger.warning(f"Agent '{selected}' not found, using first agent") selected = list(self.agent_specs.keys())[0] spec = self.agent_specs[selected] logger.info(f"🤖 Delegating to agent '{selected}': {spec.role}") steps = max_steps or spec.max_steps orch = self.orchestrators[selected] # Sync shared heuristics to this agent before running self._sync_shared_memory(selected) result = orch.run_task( purpose=purpose, initial_state=initial_state, max_steps=steps, task_description=f"[{selected}] {purpose}", ) self._delegation_log.append({ "agent": selected, "purpose": purpose, "success": result.success, "steps": result.total_steps, "final_phi": result.final_phi, "timestamp": time.time(), }) return result def run_pipeline( self, tasks: list[dict[str, Any]], initial_state: State | None = None, ) -> list[TaskResult]: """ Run a sequence of tasks, each potentially handled by a different agent. State flows from one task to the next. tasks = [ {"purpose": "Research the topic", "agent": "researcher"}, {"purpose": "Write the code", "agent": "coder"}, {"purpose": "Review and fix bugs", "agent": "reviewer"}, ] """ results = [] current_state = initial_state for task in tasks: result = self.run_task( purpose=task["purpose"], initial_state=current_state, agent_name=task.get("agent"), max_steps=task.get("max_steps"), ) results.append(result) current_state = result.final_state return results def _select_agent(self, purpose: str) -> str: """ Select the best agent for a task. Strategy: keyword matching first (fast, no LLM call), then LLM delegation. """ # Fast path: keyword matching purpose_lower = purpose.lower() best_match = None best_score = 0 for name, spec in self.agent_specs.items(): score = 0 for keyword in spec.expertise_keywords: if keyword.lower() in purpose_lower: score += 1 # Also check role match for word in spec.role.lower().split(): if len(word) > 3 and word in purpose_lower: score += 0.5 if score > best_score: best_score = score best_match = name if best_match and best_score >= 1: return best_match # Slow path: LLM delegation try: return self._llm_select_agent(purpose) except Exception: # Fallback: round-robin or first agent return list(self.agent_specs.keys())[0] def _llm_select_agent(self, purpose: str) -> str: """Use LLM to select the best agent.""" agent_descriptions = "\n".join( spec.to_prompt() for spec in self.agent_specs.values() ) messages = [ ChatMessage(role="system", content="You are a task router. Select the best agent for the task."), ChatMessage(role="user", content=( f"Task: {purpose}\n\nAvailable agents:\n{agent_descriptions}\n\n" f"Respond with ONLY the agent name, nothing else." )), ] response = self.default_model.generate(messages, temperature=0.1, max_tokens=50) selected = response.strip().lower().replace("*", "").replace('"', '') # Fuzzy match for name in self.agent_specs: if name.lower() in selected or selected in name.lower(): return name return list(self.agent_specs.keys())[0] def _sync_shared_memory(self, agent_name: str) -> None: """Push shared heuristics to a specific agent.""" orch = self.orchestrators.get(agent_name) if not orch: return orch.sync_memory() @property def stats(self) -> dict[str, Any]: return { "agents": list(self.agent_specs.keys()), "shared_replay_size": self.shared_replay.size, "shared_heuristics": len(self.shared_optimizer.heuristic_library), "delegation_log": self._delegation_log[-10:], "per_agent_stats": { name: orch.stats for name, orch in self.orchestrators.items() }, } def get_learning_report(self) -> str: """Show what the team has learned collectively.""" lines = ["═══ Team Learning Report ═══\n"] lines.append(f"Agents: {', '.join(self.agent_specs.keys())}") lines.append(f"Shared experiences: {self.shared_replay.size}") lines.append(f"Shared heuristics: {len(self.shared_optimizer.heuristic_library)}") # Show which agent contributed which heuristics for h in self.shared_optimizer.heuristic_library: source_traj = h.source_trajectory_id agent = "unknown" for name, orch in self.orchestrators.items(): for record in self.shared_replay.records: if record.trajectory.id == source_traj: if f"[{name}]" in record.trajectory.task_description: agent = name break lines.append( f"\n [{h.tier.value}] Q={h.q_value:.2f} (from {agent})" f"\n {h.pattern}: {h.strategy}" ) return "\n".join(lines)