Rohan03 commited on
Commit
9d76fd3
·
verified ·
1 Parent(s): f9d84be

v0.2.0: Add purpose_agent/multi_agent.py

Browse files
Files changed (1) hide show
  1. purpose_agent/multi_agent.py +338 -0
purpose_agent/multi_agent.py ADDED
@@ -0,0 +1,338 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Multi-Agent System — Shared experience replay, agent delegation, specialist agents.
3
+
4
+ Purpose Agent is the world's first SLM-native multi-agent framework with
5
+ SHARED SELF-IMPROVEMENT. Agents learn from each other's experiences.
6
+
7
+ Architecture:
8
+ - AgentTeam: A group of specialist agents with shared experience replay
9
+ - DelegatingOrchestrator: Routes tasks to the best-suited agent
10
+ - SharedMemory: Cross-agent heuristic sharing with credit assignment
11
+
12
+ Key insight: When Agent A solves a hard problem, Agent B can immediately
13
+ benefit from the distilled heuristic — no retraining needed.
14
+ """
15
+
16
+ from __future__ import annotations
17
+
18
+ import json
19
+ import logging
20
+ import time
21
+ from typing import Any, Callable
22
+
23
+ from purpose_agent.types import (
24
+ Action, Heuristic, MemoryTier, State, Trajectory, TrajectoryStep,
25
+ )
26
+ from purpose_agent.llm_backend import LLMBackend, ChatMessage
27
+ from purpose_agent.actor import Actor
28
+ from purpose_agent.purpose_function import PurposeFunction
29
+ from purpose_agent.experience_replay import ExperienceReplay
30
+ from purpose_agent.optimizer import HeuristicOptimizer
31
+ from purpose_agent.orchestrator import Environment, Orchestrator, TaskResult
32
+ from purpose_agent.tools import Tool, ToolRegistry
33
+
34
+ logger = logging.getLogger(__name__)
35
+
36
+
37
+ # ---------------------------------------------------------------------------
38
+ # Agent Spec — defines a specialist agent
39
+ # ---------------------------------------------------------------------------
40
+
41
+ class AgentSpec:
42
+ """
43
+ Specification for a specialist agent in a multi-agent team.
44
+
45
+ Example:
46
+ researcher = AgentSpec(
47
+ name="researcher",
48
+ role="Find and synthesize information from the web",
49
+ tools=[WebSearchTool(), ReadFileTool()],
50
+ model=create_slm_backend("qwen3-1.7b"), # Can use SLM!
51
+ )
52
+ coder = AgentSpec(
53
+ name="coder",
54
+ role="Write and debug Python code",
55
+ tools=[PythonExecTool(), ReadFileTool(), WriteFileTool()],
56
+ model=create_slm_backend("phi-4-mini"),
57
+ )
58
+ """
59
+
60
+ def __init__(
61
+ self,
62
+ name: str,
63
+ role: str,
64
+ tools: list[Tool] | None = None,
65
+ model: LLMBackend | None = None,
66
+ expertise_keywords: list[str] | None = None,
67
+ max_steps: int = 15,
68
+ ):
69
+ self.name = name
70
+ self.role = role
71
+ self.tools = tools or []
72
+ self.model = model # None = use team's default model
73
+ self.expertise_keywords = expertise_keywords or []
74
+ self.max_steps = max_steps
75
+
76
+ def to_prompt(self) -> str:
77
+ """Format agent description for delegation prompt."""
78
+ tools_str = ", ".join(t.name for t in self.tools) if self.tools else "none"
79
+ return f"- **{self.name}**: {self.role} (tools: {tools_str})"
80
+
81
+
82
+ # ---------------------------------------------------------------------------
83
+ # Agent Team — multi-agent with shared memory
84
+ # ---------------------------------------------------------------------------
85
+
86
+ class AgentTeam:
87
+ """
88
+ A team of specialist agents that share experience and learn together.
89
+
90
+ This is the core multi-agent primitive. Key features:
91
+ - Shared experience replay: all agents' trajectories go to one buffer
92
+ - Cross-agent heuristic transfer: when one agent learns, all benefit
93
+ - Automatic delegation: tasks routed to best-suited agent
94
+ - Cost-aware: can mix SLMs (cheap specialists) with LLMs (expensive generalists)
95
+
96
+ Usage:
97
+ team = AgentTeam(
98
+ agents=[researcher, coder, reviewer],
99
+ default_model=OllamaBackend(model="qwen3:1.7b"),
100
+ environment=my_env,
101
+ )
102
+ result = team.run_task("Build a web scraper for...")
103
+
104
+ SLM-native design:
105
+ Each agent can use a DIFFERENT model — assign expensive LLMs only
106
+ to agents that need them, use SLMs everywhere else.
107
+ """
108
+
109
+ def __init__(
110
+ self,
111
+ agents: list[AgentSpec],
112
+ default_model: LLMBackend,
113
+ environment: Environment,
114
+ critic_model: LLMBackend | None = None,
115
+ shared_memory_capacity: int = 1000,
116
+ persistence_dir: str | None = None,
117
+ ):
118
+ self.agent_specs = {a.name: a for a in agents}
119
+ self.default_model = default_model
120
+ self.environment = environment
121
+ self.critic_model = critic_model or default_model
122
+
123
+ # Shared experience replay — all agents contribute and benefit
124
+ replay_path = f"{persistence_dir}/shared_replay.json" if persistence_dir else None
125
+ self.shared_replay = ExperienceReplay(
126
+ capacity=shared_memory_capacity,
127
+ persistence_path=replay_path,
128
+ )
129
+
130
+ # Shared optimizer — distills heuristics from all agents' experiences
131
+ self.shared_optimizer = HeuristicOptimizer(llm=default_model)
132
+
133
+ # Per-agent orchestrators
134
+ self.orchestrators: dict[str, Orchestrator] = {}
135
+ for spec in agents:
136
+ model = spec.model or default_model
137
+ available_actions = {"DONE": "Signal task completion"}
138
+ for tool in spec.tools:
139
+ available_actions[tool.name] = tool.description
140
+
141
+ orch = Orchestrator(
142
+ llm=model,
143
+ environment=environment,
144
+ available_actions=available_actions,
145
+ critic_llm=self.critic_model,
146
+ experience_buffer_size=shared_memory_capacity,
147
+ persistence_dir=f"{persistence_dir}/{spec.name}" if persistence_dir else None,
148
+ )
149
+ # Share the experience replay
150
+ orch.experience_replay = self.shared_replay
151
+ orch.optimizer = self.shared_optimizer
152
+ self.orchestrators[spec.name] = orch
153
+
154
+ # Delegation history
155
+ self._delegation_log: list[dict] = []
156
+
157
+ def run_task(
158
+ self,
159
+ purpose: str,
160
+ initial_state: State | None = None,
161
+ agent_name: str | None = None,
162
+ max_steps: int | None = None,
163
+ ) -> TaskResult:
164
+ """
165
+ Run a task, automatically delegating to the best agent.
166
+
167
+ If agent_name is specified, uses that agent directly.
168
+ Otherwise, uses the delegation LLM to choose.
169
+ """
170
+ # Select agent
171
+ if agent_name:
172
+ selected = agent_name
173
+ else:
174
+ selected = self._select_agent(purpose)
175
+
176
+ spec = self.agent_specs.get(selected)
177
+ if not spec:
178
+ logger.warning(f"Agent '{selected}' not found, using first agent")
179
+ selected = list(self.agent_specs.keys())[0]
180
+ spec = self.agent_specs[selected]
181
+
182
+ logger.info(f"🤖 Delegating to agent '{selected}': {spec.role}")
183
+
184
+ steps = max_steps or spec.max_steps
185
+ orch = self.orchestrators[selected]
186
+
187
+ # Sync shared heuristics to this agent before running
188
+ self._sync_shared_memory(selected)
189
+
190
+ result = orch.run_task(
191
+ purpose=purpose,
192
+ initial_state=initial_state,
193
+ max_steps=steps,
194
+ task_description=f"[{selected}] {purpose}",
195
+ )
196
+
197
+ self._delegation_log.append({
198
+ "agent": selected,
199
+ "purpose": purpose,
200
+ "success": result.success,
201
+ "steps": result.total_steps,
202
+ "final_phi": result.final_phi,
203
+ "timestamp": time.time(),
204
+ })
205
+
206
+ return result
207
+
208
+ def run_pipeline(
209
+ self,
210
+ tasks: list[dict[str, Any]],
211
+ initial_state: State | None = None,
212
+ ) -> list[TaskResult]:
213
+ """
214
+ Run a sequence of tasks, each potentially handled by a different agent.
215
+ State flows from one task to the next.
216
+
217
+ tasks = [
218
+ {"purpose": "Research the topic", "agent": "researcher"},
219
+ {"purpose": "Write the code", "agent": "coder"},
220
+ {"purpose": "Review and fix bugs", "agent": "reviewer"},
221
+ ]
222
+ """
223
+ results = []
224
+ current_state = initial_state
225
+
226
+ for task in tasks:
227
+ result = self.run_task(
228
+ purpose=task["purpose"],
229
+ initial_state=current_state,
230
+ agent_name=task.get("agent"),
231
+ max_steps=task.get("max_steps"),
232
+ )
233
+ results.append(result)
234
+ current_state = result.final_state
235
+
236
+ return results
237
+
238
+ def _select_agent(self, purpose: str) -> str:
239
+ """
240
+ Select the best agent for a task.
241
+
242
+ Strategy: keyword matching first (fast, no LLM call), then LLM delegation.
243
+ """
244
+ # Fast path: keyword matching
245
+ purpose_lower = purpose.lower()
246
+ best_match = None
247
+ best_score = 0
248
+
249
+ for name, spec in self.agent_specs.items():
250
+ score = 0
251
+ for keyword in spec.expertise_keywords:
252
+ if keyword.lower() in purpose_lower:
253
+ score += 1
254
+ # Also check role match
255
+ for word in spec.role.lower().split():
256
+ if len(word) > 3 and word in purpose_lower:
257
+ score += 0.5
258
+ if score > best_score:
259
+ best_score = score
260
+ best_match = name
261
+
262
+ if best_match and best_score >= 1:
263
+ return best_match
264
+
265
+ # Slow path: LLM delegation
266
+ try:
267
+ return self._llm_select_agent(purpose)
268
+ except Exception:
269
+ # Fallback: round-robin or first agent
270
+ return list(self.agent_specs.keys())[0]
271
+
272
+ def _llm_select_agent(self, purpose: str) -> str:
273
+ """Use LLM to select the best agent."""
274
+ agent_descriptions = "\n".join(
275
+ spec.to_prompt() for spec in self.agent_specs.values()
276
+ )
277
+
278
+ messages = [
279
+ ChatMessage(role="system", content="You are a task router. Select the best agent for the task."),
280
+ ChatMessage(role="user", content=(
281
+ f"Task: {purpose}\n\nAvailable agents:\n{agent_descriptions}\n\n"
282
+ f"Respond with ONLY the agent name, nothing else."
283
+ )),
284
+ ]
285
+
286
+ response = self.default_model.generate(messages, temperature=0.1, max_tokens=50)
287
+ selected = response.strip().lower().replace("*", "").replace('"', '')
288
+
289
+ # Fuzzy match
290
+ for name in self.agent_specs:
291
+ if name.lower() in selected or selected in name.lower():
292
+ return name
293
+
294
+ return list(self.agent_specs.keys())[0]
295
+
296
+ def _sync_shared_memory(self, agent_name: str) -> None:
297
+ """Push shared heuristics to a specific agent."""
298
+ orch = self.orchestrators.get(agent_name)
299
+ if not orch:
300
+ return
301
+ orch._sync_memory()
302
+
303
+ @property
304
+ def stats(self) -> dict[str, Any]:
305
+ return {
306
+ "agents": list(self.agent_specs.keys()),
307
+ "shared_replay_size": self.shared_replay.size,
308
+ "shared_heuristics": len(self.shared_optimizer.heuristic_library),
309
+ "delegation_log": self._delegation_log[-10:],
310
+ "per_agent_stats": {
311
+ name: orch.stats for name, orch in self.orchestrators.items()
312
+ },
313
+ }
314
+
315
+ def get_learning_report(self) -> str:
316
+ """Show what the team has learned collectively."""
317
+ lines = ["═══ Team Learning Report ═══\n"]
318
+ lines.append(f"Agents: {', '.join(self.agent_specs.keys())}")
319
+ lines.append(f"Shared experiences: {self.shared_replay.size}")
320
+ lines.append(f"Shared heuristics: {len(self.shared_optimizer.heuristic_library)}")
321
+
322
+ # Show which agent contributed which heuristics
323
+ for h in self.shared_optimizer.heuristic_library:
324
+ source_traj = h.source_trajectory_id
325
+ agent = "unknown"
326
+ for name, orch in self.orchestrators.items():
327
+ for record in self.shared_replay.records:
328
+ if record.trajectory.id == source_traj:
329
+ if f"[{name}]" in record.trajectory.task_description:
330
+ agent = name
331
+ break
332
+
333
+ lines.append(
334
+ f"\n [{h.tier.value}] Q={h.q_value:.2f} (from {agent})"
335
+ f"\n {h.pattern}: {h.strategy}"
336
+ )
337
+
338
+ return "\n".join(lines)