File size: 12,231 Bytes
9d76fd3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
592f7ef
9d76fd3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
"""
Multi-Agent System β€” Shared experience replay, agent delegation, specialist agents.

Purpose Agent is the world's first SLM-native multi-agent framework with
SHARED SELF-IMPROVEMENT. Agents learn from each other's experiences.

Architecture:
  - AgentTeam: A group of specialist agents with shared experience replay
  - DelegatingOrchestrator: Routes tasks to the best-suited agent
  - SharedMemory: Cross-agent heuristic sharing with credit assignment
  
Key insight: When Agent A solves a hard problem, Agent B can immediately
benefit from the distilled heuristic β€” no retraining needed.
"""

from __future__ import annotations

import json
import logging
import time
from typing import Any, Callable

from purpose_agent.types import (
    Action, Heuristic, MemoryTier, State, Trajectory, TrajectoryStep,
)
from purpose_agent.llm_backend import LLMBackend, ChatMessage
from purpose_agent.actor import Actor
from purpose_agent.purpose_function import PurposeFunction
from purpose_agent.experience_replay import ExperienceReplay
from purpose_agent.optimizer import HeuristicOptimizer
from purpose_agent.orchestrator import Environment, Orchestrator, TaskResult
from purpose_agent.tools import Tool, ToolRegistry

logger = logging.getLogger(__name__)


# ---------------------------------------------------------------------------
# Agent Spec β€” defines a specialist agent
# ---------------------------------------------------------------------------

class AgentSpec:
    """
    Specification for a specialist agent in a multi-agent team.
    
    Example:
        researcher = AgentSpec(
            name="researcher",
            role="Find and synthesize information from the web",
            tools=[WebSearchTool(), ReadFileTool()],
            model=create_slm_backend("qwen3-1.7b"),  # Can use SLM!
        )
        coder = AgentSpec(
            name="coder",
            role="Write and debug Python code",
            tools=[PythonExecTool(), ReadFileTool(), WriteFileTool()],
            model=create_slm_backend("phi-4-mini"),
        )
    """

    def __init__(
        self,
        name: str,
        role: str,
        tools: list[Tool] | None = None,
        model: LLMBackend | None = None,
        expertise_keywords: list[str] | None = None,
        max_steps: int = 15,
    ):
        self.name = name
        self.role = role
        self.tools = tools or []
        self.model = model  # None = use team's default model
        self.expertise_keywords = expertise_keywords or []
        self.max_steps = max_steps

    def to_prompt(self) -> str:
        """Format agent description for delegation prompt."""
        tools_str = ", ".join(t.name for t in self.tools) if self.tools else "none"
        return f"- **{self.name}**: {self.role} (tools: {tools_str})"


# ---------------------------------------------------------------------------
# Agent Team β€” multi-agent with shared memory
# ---------------------------------------------------------------------------

class AgentTeam:
    """
    A team of specialist agents that share experience and learn together.
    
    This is the core multi-agent primitive. Key features:
    - Shared experience replay: all agents' trajectories go to one buffer
    - Cross-agent heuristic transfer: when one agent learns, all benefit
    - Automatic delegation: tasks routed to best-suited agent
    - Cost-aware: can mix SLMs (cheap specialists) with LLMs (expensive generalists)
    
    Usage:
        team = AgentTeam(
            agents=[researcher, coder, reviewer],
            default_model=OllamaBackend(model="qwen3:1.7b"),
            environment=my_env,
        )
        result = team.run_task("Build a web scraper for...")
        
    SLM-native design:
        Each agent can use a DIFFERENT model β€” assign expensive LLMs only
        to agents that need them, use SLMs everywhere else.
    """

    def __init__(
        self,
        agents: list[AgentSpec],
        default_model: LLMBackend,
        environment: Environment,
        critic_model: LLMBackend | None = None,
        shared_memory_capacity: int = 1000,
        persistence_dir: str | None = None,
    ):
        self.agent_specs = {a.name: a for a in agents}
        self.default_model = default_model
        self.environment = environment
        self.critic_model = critic_model or default_model

        # Shared experience replay β€” all agents contribute and benefit
        replay_path = f"{persistence_dir}/shared_replay.json" if persistence_dir else None
        self.shared_replay = ExperienceReplay(
            capacity=shared_memory_capacity,
            persistence_path=replay_path,
        )

        # Shared optimizer β€” distills heuristics from all agents' experiences
        self.shared_optimizer = HeuristicOptimizer(llm=default_model)

        # Per-agent orchestrators
        self.orchestrators: dict[str, Orchestrator] = {}
        for spec in agents:
            model = spec.model or default_model
            available_actions = {"DONE": "Signal task completion"}
            for tool in spec.tools:
                available_actions[tool.name] = tool.description

            orch = Orchestrator(
                llm=model,
                environment=environment,
                available_actions=available_actions,
                critic_llm=self.critic_model,
                experience_buffer_size=shared_memory_capacity,
                persistence_dir=f"{persistence_dir}/{spec.name}" if persistence_dir else None,
            )
            # Share the experience replay
            orch.experience_replay = self.shared_replay
            orch.optimizer = self.shared_optimizer
            self.orchestrators[spec.name] = orch

        # Delegation history
        self._delegation_log: list[dict] = []

    def run_task(
        self,
        purpose: str,
        initial_state: State | None = None,
        agent_name: str | None = None,
        max_steps: int | None = None,
    ) -> TaskResult:
        """
        Run a task, automatically delegating to the best agent.
        
        If agent_name is specified, uses that agent directly.
        Otherwise, uses the delegation LLM to choose.
        """
        # Select agent
        if agent_name:
            selected = agent_name
        else:
            selected = self._select_agent(purpose)

        spec = self.agent_specs.get(selected)
        if not spec:
            logger.warning(f"Agent '{selected}' not found, using first agent")
            selected = list(self.agent_specs.keys())[0]
            spec = self.agent_specs[selected]

        logger.info(f"πŸ€– Delegating to agent '{selected}': {spec.role}")

        steps = max_steps or spec.max_steps
        orch = self.orchestrators[selected]

        # Sync shared heuristics to this agent before running
        self._sync_shared_memory(selected)

        result = orch.run_task(
            purpose=purpose,
            initial_state=initial_state,
            max_steps=steps,
            task_description=f"[{selected}] {purpose}",
        )

        self._delegation_log.append({
            "agent": selected,
            "purpose": purpose,
            "success": result.success,
            "steps": result.total_steps,
            "final_phi": result.final_phi,
            "timestamp": time.time(),
        })

        return result

    def run_pipeline(
        self,
        tasks: list[dict[str, Any]],
        initial_state: State | None = None,
    ) -> list[TaskResult]:
        """
        Run a sequence of tasks, each potentially handled by a different agent.
        State flows from one task to the next.
        
        tasks = [
            {"purpose": "Research the topic", "agent": "researcher"},
            {"purpose": "Write the code", "agent": "coder"},
            {"purpose": "Review and fix bugs", "agent": "reviewer"},
        ]
        """
        results = []
        current_state = initial_state

        for task in tasks:
            result = self.run_task(
                purpose=task["purpose"],
                initial_state=current_state,
                agent_name=task.get("agent"),
                max_steps=task.get("max_steps"),
            )
            results.append(result)
            current_state = result.final_state

        return results

    def _select_agent(self, purpose: str) -> str:
        """
        Select the best agent for a task.
        
        Strategy: keyword matching first (fast, no LLM call), then LLM delegation.
        """
        # Fast path: keyword matching
        purpose_lower = purpose.lower()
        best_match = None
        best_score = 0

        for name, spec in self.agent_specs.items():
            score = 0
            for keyword in spec.expertise_keywords:
                if keyword.lower() in purpose_lower:
                    score += 1
            # Also check role match
            for word in spec.role.lower().split():
                if len(word) > 3 and word in purpose_lower:
                    score += 0.5
            if score > best_score:
                best_score = score
                best_match = name

        if best_match and best_score >= 1:
            return best_match

        # Slow path: LLM delegation
        try:
            return self._llm_select_agent(purpose)
        except Exception:
            # Fallback: round-robin or first agent
            return list(self.agent_specs.keys())[0]

    def _llm_select_agent(self, purpose: str) -> str:
        """Use LLM to select the best agent."""
        agent_descriptions = "\n".join(
            spec.to_prompt() for spec in self.agent_specs.values()
        )

        messages = [
            ChatMessage(role="system", content="You are a task router. Select the best agent for the task."),
            ChatMessage(role="user", content=(
                f"Task: {purpose}\n\nAvailable agents:\n{agent_descriptions}\n\n"
                f"Respond with ONLY the agent name, nothing else."
            )),
        ]

        response = self.default_model.generate(messages, temperature=0.1, max_tokens=50)
        selected = response.strip().lower().replace("*", "").replace('"', '')

        # Fuzzy match
        for name in self.agent_specs:
            if name.lower() in selected or selected in name.lower():
                return name

        return list(self.agent_specs.keys())[0]

    def _sync_shared_memory(self, agent_name: str) -> None:
        """Push shared heuristics to a specific agent."""
        orch = self.orchestrators.get(agent_name)
        if not orch:
            return
        orch.sync_memory()

    @property
    def stats(self) -> dict[str, Any]:
        return {
            "agents": list(self.agent_specs.keys()),
            "shared_replay_size": self.shared_replay.size,
            "shared_heuristics": len(self.shared_optimizer.heuristic_library),
            "delegation_log": self._delegation_log[-10:],
            "per_agent_stats": {
                name: orch.stats for name, orch in self.orchestrators.items()
            },
        }

    def get_learning_report(self) -> str:
        """Show what the team has learned collectively."""
        lines = ["═══ Team Learning Report ═══\n"]
        lines.append(f"Agents: {', '.join(self.agent_specs.keys())}")
        lines.append(f"Shared experiences: {self.shared_replay.size}")
        lines.append(f"Shared heuristics: {len(self.shared_optimizer.heuristic_library)}")

        # Show which agent contributed which heuristics
        for h in self.shared_optimizer.heuristic_library:
            source_traj = h.source_trajectory_id
            agent = "unknown"
            for name, orch in self.orchestrators.items():
                for record in self.shared_replay.records:
                    if record.trajectory.id == source_traj:
                        if f"[{name}]" in record.trajectory.task_description:
                            agent = name
                            break

            lines.append(
                f"\n  [{h.tier.value}] Q={h.q_value:.2f} (from {agent})"
                f"\n    {h.pattern}: {h.strategy}"
            )

        return "\n".join(lines)