Rohan03 commited on
Commit
525ba6d
Β·
verified Β·
1 Parent(s): 358e009

feat: unified capabilities (Graph + Parallel + Conversation + Agent + KnowledgeStore)

Browse files
Files changed (1) hide show
  1. purpose_agent/unified.py +814 -0
purpose_agent/unified.py ADDED
@@ -0,0 +1,814 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Unified Capabilities β€” Five competing framework philosophies in one composable layer.
3
+
4
+ LangGraph β†’ "I want control" β†’ GraphOrchestrator (conditional edges, cycles, fan-out/fan-in)
5
+ CrewAI β†’ "I want speed" β†’ ParallelRunner (concurrent tasks, parallel fan-out)
6
+ AutoGen β†’ "I want agents talking" β†’ Conversation (agent-to-agent message passing, group chat)
7
+ OpenAI SDK β†’ "I want plug-and-play" β†’ Agent() one-liner factory
8
+ LlamaIndex β†’ "I want knowledge" β†’ KnowledgeStore (RAG-as-a-tool, chunk + embed + retrieve)
9
+
10
+ Design principle: ZERO changes to the existing Orchestrator/Actor/PurposeFunction.
11
+ Each capability is a composable layer that calls the existing modules.
12
+ The self-improvement loop (Ξ¦ scoring β†’ experience replay β†’ heuristic distillation)
13
+ runs INSIDE each capability automatically β€” every graph node, every parallel task,
14
+ every conversation turn feeds the same learning loop.
15
+
16
+ Usage:
17
+ # Plug-and-play (OpenAI SDK simplicity)
18
+ agent = Agent("researcher", model="qwen3:1.7b", tools=[SearchTool()])
19
+ result = agent.run("Find information about X")
20
+
21
+ # Control flow (LangGraph power)
22
+ graph = Graph()
23
+ graph.add_node("research", research_agent)
24
+ graph.add_node("write", writer_agent)
25
+ graph.add_edge("research", "write")
26
+ graph.add_conditional_edge("write", review_fn, {"pass": END, "fail": "research"})
27
+ result = graph.run(initial_state)
28
+
29
+ # Speed (CrewAI parallelism)
30
+ results = parallel([task1, task2, task3], agents=[a1, a2, a3])
31
+
32
+ # Conversation (AutoGen talking)
33
+ chat = Conversation([researcher, coder, reviewer])
34
+ result = chat.run("Build a web scraper", rounds=5)
35
+
36
+ # Knowledge (LlamaIndex RAG)
37
+ kb = KnowledgeStore.from_directory("./docs")
38
+ agent = Agent("assistant", tools=[kb.as_tool()])
39
+ result = agent.run("What does the documentation say about X?")
40
+ """
41
+
42
+ from __future__ import annotations
43
+
44
+ import asyncio
45
+ import json
46
+ import logging
47
+ import math
48
+ import os
49
+ import time
50
+ from concurrent.futures import ThreadPoolExecutor, as_completed
51
+ from dataclasses import dataclass, field
52
+ from pathlib import Path
53
+ from typing import Any, Callable, Iterator
54
+
55
+ from purpose_agent.types import (
56
+ Action, Heuristic, MemoryTier, PurposeScore, State,
57
+ Trajectory, TrajectoryStep,
58
+ )
59
+ from purpose_agent.llm_backend import LLMBackend, MockLLMBackend, ChatMessage
60
+ from purpose_agent.actor import Actor
61
+ from purpose_agent.purpose_function import PurposeFunction
62
+ from purpose_agent.experience_replay import ExperienceReplay
63
+ from purpose_agent.optimizer import HeuristicOptimizer
64
+ from purpose_agent.orchestrator import (
65
+ Environment, Orchestrator, SimpleEnvironment, TaskResult,
66
+ )
67
+ from purpose_agent.tools import Tool, FunctionTool, ToolResult, ToolRegistry
68
+
69
+ logger = logging.getLogger(__name__)
70
+
71
+ # Sentinel for graph end node
72
+ END = "__END__"
73
+ START = "__START__"
74
+
75
+
76
+ # ═══════════════════════════════════════════════════════════════════════════
77
+ # 1. PLUG-AND-PLAY β€” Agent() one-liner factory (OpenAI Agents SDK simplicity)
78
+ # ═══════════════════════════════════════════════════════════════════════════
79
+
80
+ class Agent:
81
+ """
82
+ One-liner agent factory. The simplest way to create and run an agent.
83
+
84
+ Inspired by OpenAI Agents SDK: Agent(name, instructions, tools) β†’ run(task).
85
+ But ours self-improves. Every run feeds the Ξ¦ loop.
86
+
87
+ Usage:
88
+ # Minimal (uses mock for testing)
89
+ agent = Agent("helper")
90
+ result = agent.run("Do something")
91
+
92
+ # With local SLM
93
+ agent = Agent("coder", model="qwen3:1.7b", tools=[PythonExecTool()])
94
+ result = agent.run("Write a sorting algorithm")
95
+
96
+ # With cloud LLM
97
+ agent = Agent("analyst", model="gpt-4o", api_key="sk-...")
98
+ result = agent.run("Analyze this data")
99
+
100
+ # Handoff to another agent
101
+ agent_a = Agent("researcher", model="qwen3:1.7b")
102
+ agent_b = Agent("writer", model="phi4-mini", handoff_from=agent_a)
103
+ # agent_b inherits agent_a's experience replay
104
+ """
105
+
106
+ def __init__(
107
+ self,
108
+ name: str = "agent",
109
+ instructions: str = "",
110
+ model: str | LLMBackend | None = None,
111
+ tools: list[Tool] | None = None,
112
+ api_key: str | None = None,
113
+ max_steps: int = 15,
114
+ handoff_from: "Agent | None" = None,
115
+ persistence_dir: str | None = None,
116
+ ):
117
+ self.name = name
118
+ self.instructions = instructions
119
+ self.max_steps = max_steps
120
+
121
+ # Resolve LLM backend
122
+ if model is None:
123
+ self.llm = MockLLMBackend()
124
+ elif isinstance(model, str):
125
+ self.llm = self._resolve_model(model, api_key)
126
+ else:
127
+ self.llm = model
128
+
129
+ # Build available actions from tools
130
+ available_actions = {"DONE": "Signal task completion"}
131
+ self._tools = {}
132
+ for tool in (tools or []):
133
+ available_actions[tool.name] = tool.description
134
+ self._tools[tool.name] = tool
135
+
136
+ # Build environment that executes tools
137
+ self._env = _ToolEnvironment(self._tools)
138
+
139
+ # Create orchestrator
140
+ self.orch = Orchestrator(
141
+ llm=self.llm,
142
+ environment=self._env,
143
+ available_actions=available_actions,
144
+ persistence_dir=persistence_dir or f"./.purpose_agent/{name}",
145
+ )
146
+
147
+ # Handoff: inherit experience from another agent
148
+ if handoff_from:
149
+ self.orch.experience_replay = handoff_from.orch.experience_replay
150
+ self.orch.optimizer = handoff_from.orch.optimizer
151
+ self.orch.sync_memory()
152
+
153
+ # Inject custom instructions into actor's strategic memory
154
+ if instructions:
155
+ h = Heuristic(
156
+ pattern="Always", strategy=instructions, steps=[],
157
+ tier=MemoryTier.STRATEGIC, q_value=1.0,
158
+ )
159
+ self.orch.optimizer.heuristic_library.append(h)
160
+ self.orch.sync_memory()
161
+
162
+ def run(self, task: str, state: State | None = None) -> TaskResult:
163
+ """Run a task. Returns TaskResult with trajectory, final state, success."""
164
+ return self.orch.run_task(
165
+ purpose=task,
166
+ initial_state=state or State(data={}),
167
+ max_steps=self.max_steps,
168
+ )
169
+
170
+ def __call__(self, task: str, **kwargs) -> TaskResult:
171
+ return self.run(task, **kwargs)
172
+
173
+ @staticmethod
174
+ def _resolve_model(model: str, api_key: str | None = None) -> LLMBackend:
175
+ """Resolve a model string to an LLMBackend."""
176
+ # Local Ollama models (contain ":" like "qwen3:1.7b")
177
+ if ":" in model and not model.startswith("http"):
178
+ from purpose_agent.slm_backends import OllamaBackend
179
+ return OllamaBackend(model=model)
180
+
181
+ # Known SLM registry keys
182
+ from purpose_agent.slm_backends import SLM_REGISTRY
183
+ if model in SLM_REGISTRY:
184
+ from purpose_agent.slm_backends import create_slm_backend
185
+ return create_slm_backend(model)
186
+
187
+ # OpenAI models
188
+ if model.startswith("gpt-") or model.startswith("o1") or model.startswith("o3"):
189
+ from purpose_agent.llm_backend import OpenAICompatibleBackend
190
+ return OpenAICompatibleBackend(model=model, api_key=api_key)
191
+
192
+ # HuggingFace models (contain "/")
193
+ if "/" in model:
194
+ from purpose_agent.llm_backend import HFInferenceBackend
195
+ return HFInferenceBackend(model_id=model, api_key=api_key)
196
+
197
+ # Fallback: try Ollama
198
+ from purpose_agent.slm_backends import OllamaBackend
199
+ return OllamaBackend(model=model)
200
+
201
+
202
+ class _ToolEnvironment(Environment):
203
+ """Environment that executes tools based on action names."""
204
+
205
+ def __init__(self, tools: dict[str, Tool]):
206
+ self._tools = tools
207
+
208
+ def execute(self, action: Action, current_state: State) -> State:
209
+ tool = self._tools.get(action.name)
210
+ if not tool:
211
+ return State(
212
+ data={**current_state.data, "_last_result": f"Unknown tool: {action.name}"},
213
+ summary=f"Error: Unknown tool '{action.name}'",
214
+ )
215
+ result = tool.run(**action.params)
216
+ new_data = {**current_state.data, "_last_result": result.output, "_last_tool": action.name}
217
+ if not result.success:
218
+ new_data["_last_error"] = result.error
219
+ return State(data=new_data, summary=result.output[:500])
220
+
221
+ def reset(self) -> State:
222
+ return State(data={})
223
+
224
+
225
+ # ═══════════════════════════════════════════════════════════════════════════
226
+ # 2. CONTROL β€” Graph execution engine (LangGraph-style)
227
+ # ═══════════════════════════════════════════════════════════════════════════
228
+
229
+ @dataclass
230
+ class GraphNode:
231
+ """A node in the execution graph."""
232
+ name: str
233
+ handler: Callable[[State], State | TaskResult] | Agent
234
+ metadata: dict[str, Any] = field(default_factory=dict)
235
+
236
+
237
+ @dataclass
238
+ class GraphEdge:
239
+ """An edge in the execution graph."""
240
+ source: str
241
+ target: str
242
+ condition: Callable[[State], bool] | None = None # None = unconditional
243
+
244
+
245
+ class Graph:
246
+ """
247
+ Graph-based workflow execution β€” LangGraph's control, with Ξ¦ self-improvement.
248
+
249
+ Supports: conditional branching, cycles (loops), parallel fan-out/fan-in.
250
+ Every node that runs an Agent automatically feeds the Ξ¦ improvement loop.
251
+
252
+ Usage:
253
+ graph = Graph()
254
+
255
+ # Add nodes (agents or functions)
256
+ graph.add_node("research", Agent("researcher", model="qwen3:1.7b"))
257
+ graph.add_node("write", Agent("writer", model="phi4-mini"))
258
+ graph.add_node("review", lambda state: review_fn(state))
259
+
260
+ # Linear flow
261
+ graph.add_edge(START, "research")
262
+ graph.add_edge("research", "write")
263
+
264
+ # Conditional branching (cycle back on failure)
265
+ graph.add_conditional_edge("write", "review",
266
+ condition_map={"pass": END, "revise": "write"})
267
+
268
+ result = graph.run(State(data={"topic": "AI safety"}))
269
+ """
270
+
271
+ def __init__(self):
272
+ self._nodes: dict[str, GraphNode] = {}
273
+ self._edges: list[GraphEdge] = []
274
+ self._conditional_edges: dict[str, dict] = {} # source β†’ {condition_fn, map}
275
+ self._entry: str | None = None
276
+
277
+ def add_node(self, name: str, handler: Callable | Agent) -> "Graph":
278
+ """Add a node. Handler is either an Agent or a function(State) β†’ State."""
279
+ self._nodes[name] = GraphNode(name=name, handler=handler)
280
+ return self
281
+
282
+ def add_edge(self, source: str, target: str) -> "Graph":
283
+ """Add an unconditional edge."""
284
+ self._edges.append(GraphEdge(source=source, target=target))
285
+ if source == START:
286
+ self._entry = target
287
+ return self
288
+
289
+ def add_conditional_edge(
290
+ self,
291
+ source: str,
292
+ evaluator: str | Callable[[State], str],
293
+ condition_map: dict[str, str] | None = None,
294
+ ) -> "Graph":
295
+ """
296
+ Add a conditional edge. After source node runs, evaluator determines next node.
297
+
298
+ evaluator: A function(State) β†’ str (returns key from condition_map)
299
+ OR a node name that will be run to produce the routing decision
300
+ condition_map: {"key": "target_node"} β€” maps evaluator output to next node.
301
+ Use END as target to terminate.
302
+ """
303
+ self._conditional_edges[source] = {
304
+ "evaluator": evaluator,
305
+ "map": condition_map or {},
306
+ }
307
+ return self
308
+
309
+ def run(
310
+ self,
311
+ initial_state: State | None = None,
312
+ max_iterations: int = 20,
313
+ ) -> State:
314
+ """Execute the graph from START to END."""
315
+ state = initial_state or State(data={})
316
+
317
+ if not self._entry:
318
+ # Auto-detect entry: first node added
319
+ if self._nodes:
320
+ self._entry = list(self._nodes.keys())[0]
321
+ else:
322
+ raise ValueError("Graph has no nodes")
323
+
324
+ current = self._entry
325
+ visited_count: dict[str, int] = {}
326
+
327
+ for iteration in range(max_iterations):
328
+ if current == END:
329
+ logger.info(f"Graph: Reached END after {iteration} iterations")
330
+ break
331
+
332
+ if current not in self._nodes:
333
+ raise ValueError(f"Graph: Unknown node '{current}'")
334
+
335
+ visited_count[current] = visited_count.get(current, 0) + 1
336
+ logger.info(f"Graph: Executing node '{current}' (visit #{visited_count[current]})")
337
+
338
+ # Execute node
339
+ node = self._nodes[current]
340
+ state = self._execute_node(node, state)
341
+
342
+ # Determine next node
343
+ if current in self._conditional_edges:
344
+ cond = self._conditional_edges[current]
345
+ evaluator = cond["evaluator"]
346
+ cond_map = cond["map"]
347
+
348
+ # Get routing decision
349
+ if callable(evaluator):
350
+ route_key = evaluator(state)
351
+ else:
352
+ route_key = str(state.data.get("_route", "default"))
353
+
354
+ current = cond_map.get(route_key, cond_map.get("default", END))
355
+ logger.info(f"Graph: Conditional route '{route_key}' β†’ '{current}'")
356
+ else:
357
+ # Find unconditional edge
358
+ next_node = None
359
+ for edge in self._edges:
360
+ if edge.source == current:
361
+ next_node = edge.target
362
+ break
363
+ current = next_node or END
364
+ else:
365
+ logger.warning(f"Graph: Hit max iterations ({max_iterations})")
366
+
367
+ return state
368
+
369
+ def _execute_node(self, node: GraphNode, state: State) -> State:
370
+ """Execute a single node β€” Agent or function."""
371
+ handler = node.handler
372
+
373
+ if isinstance(handler, Agent):
374
+ # Run the agent on the current state, extract purpose from state data
375
+ purpose = state.data.get("_purpose", state.data.get("task", f"Execute {node.name}"))
376
+ result = handler.run(purpose, state=state)
377
+ # Merge agent's final state into the graph state
378
+ merged = {**state.data, **result.final_state.data}
379
+ merged["_last_node"] = node.name
380
+ merged["_last_success"] = result.success
381
+ merged["_last_phi"] = result.final_phi
382
+ return State(data=merged, summary=result.final_state.summary)
383
+
384
+ elif callable(handler):
385
+ result = handler(state)
386
+ if isinstance(result, State):
387
+ return result
388
+ elif isinstance(result, TaskResult):
389
+ return result.final_state
390
+ else:
391
+ return State(data={**state.data, "_result": str(result)})
392
+
393
+ raise ValueError(f"Invalid node handler type: {type(handler)}")
394
+
395
+
396
+ # ═══════════════════════════════════════════════════════════════════════════
397
+ # 3. SPEED β€” Parallel execution (CrewAI-style)
398
+ # ═══════════════════════════════════════════════════════════════════════════
399
+
400
+ def parallel(
401
+ tasks: list[str] | list[dict[str, Any]],
402
+ agents: list[Agent] | Agent | None = None,
403
+ max_workers: int | None = None,
404
+ initial_states: list[State] | None = None,
405
+ ) -> list[TaskResult]:
406
+ """
407
+ Run multiple tasks in parallel β€” CrewAI's speed, with Ξ¦ self-improvement.
408
+
409
+ Every parallel task feeds the same improvement loop, so agents learn
410
+ even from concurrent executions.
411
+
412
+ Usage:
413
+ # Same agent, multiple tasks
414
+ agent = Agent("worker", model="qwen3:1.7b")
415
+ results = parallel(["task 1", "task 2", "task 3"], agent)
416
+
417
+ # Different agents for different tasks
418
+ results = parallel(
419
+ ["research X", "code Y", "review Z"],
420
+ agents=[researcher, coder, reviewer],
421
+ )
422
+
423
+ # Dict-based tasks with metadata
424
+ results = parallel([
425
+ {"purpose": "research X", "max_steps": 10},
426
+ {"purpose": "code Y", "max_steps": 20},
427
+ ], agent)
428
+ """
429
+ # Normalize tasks
430
+ normalized: list[dict] = []
431
+ for t in tasks:
432
+ if isinstance(t, str):
433
+ normalized.append({"purpose": t})
434
+ else:
435
+ normalized.append(t)
436
+
437
+ # Normalize agents
438
+ if agents is None:
439
+ agent_list = [Agent("worker")] * len(normalized)
440
+ elif isinstance(agents, Agent):
441
+ agent_list = [agents] * len(normalized)
442
+ else:
443
+ if len(agents) < len(normalized):
444
+ # Cycle agents
445
+ agent_list = [agents[i % len(agents)] for i in range(len(normalized))]
446
+ else:
447
+ agent_list = agents
448
+
449
+ states = initial_states or [None] * len(normalized)
450
+ workers = max_workers or min(len(normalized), 8)
451
+
452
+ logger.info(f"Parallel: Running {len(normalized)} tasks with {workers} workers")
453
+
454
+ def _run_one(idx: int) -> TaskResult:
455
+ task = normalized[idx]
456
+ agent = agent_list[idx]
457
+ state = states[idx]
458
+ return agent.run(task["purpose"], state=state)
459
+
460
+ results: list[TaskResult | None] = [None] * len(normalized)
461
+
462
+ with ThreadPoolExecutor(max_workers=workers) as executor:
463
+ future_to_idx = {
464
+ executor.submit(_run_one, i): i
465
+ for i in range(len(normalized))
466
+ }
467
+ for future in as_completed(future_to_idx):
468
+ idx = future_to_idx[future]
469
+ try:
470
+ results[idx] = future.result()
471
+ logger.info(f"Parallel: Task {idx} completed β€” success={results[idx].success}")
472
+ except Exception as e:
473
+ logger.error(f"Parallel: Task {idx} failed β€” {e}")
474
+ results[idx] = TaskResult(
475
+ trajectory=Trajectory(
476
+ task_description=normalized[idx]["purpose"],
477
+ purpose=normalized[idx]["purpose"],
478
+ ),
479
+ final_state=State(data={"_error": str(e)}),
480
+ )
481
+
482
+ return results
483
+
484
+
485
+ # ═══════════════════════════════════════════════════════════════════════════
486
+ # 4. CONVERSATION β€” Agent-to-agent messaging (AutoGen-style)
487
+ # ═══════════════════════════════════════════════════════════════════════════
488
+
489
+ @dataclass
490
+ class Message:
491
+ """A message in an agent conversation."""
492
+ sender: str
493
+ content: str
494
+ timestamp: float = field(default_factory=time.time)
495
+ metadata: dict[str, Any] = field(default_factory=dict)
496
+
497
+
498
+ class Conversation:
499
+ """
500
+ Multi-agent conversation β€” AutoGen's talking, with Ξ¦ self-improvement.
501
+
502
+ Agents take turns speaking. Each agent sees the full conversation history
503
+ and contributes its perspective. The conversation continues for N rounds
504
+ or until agents converge on a solution.
505
+
506
+ Every agent's turn feeds the Ξ¦ loop β€” agents learn from conversations.
507
+
508
+ Usage:
509
+ researcher = Agent("researcher", model="qwen3:1.7b")
510
+ coder = Agent("coder", model="phi4-mini")
511
+ reviewer = Agent("reviewer", model="qwen3:1.7b")
512
+
513
+ chat = Conversation([researcher, coder, reviewer])
514
+ result = chat.run("Build a web scraper for news articles", rounds=5)
515
+
516
+ # Access conversation history
517
+ for msg in chat.history:
518
+ print(f"{msg.sender}: {msg.content[:100]}")
519
+ """
520
+
521
+ def __init__(
522
+ self,
523
+ agents: list[Agent],
524
+ moderator: Agent | LLMBackend | None = None,
525
+ speaker_selection: str = "round_robin", # "round_robin", "auto", "manual"
526
+ ):
527
+ self.agents = {a.name: a for a in agents}
528
+ self.agent_order = [a.name for a in agents]
529
+ self.moderator = moderator
530
+ self.speaker_selection = speaker_selection
531
+ self.history: list[Message] = []
532
+
533
+ def run(
534
+ self,
535
+ topic: str,
536
+ rounds: int = 3,
537
+ initial_context: str = "",
538
+ ) -> State:
539
+ """
540
+ Run a conversation about a topic for N rounds.
541
+
542
+ Returns final State with conversation results.
543
+ """
544
+ self.history = [Message(sender="system", content=f"Topic: {topic}")]
545
+ if initial_context:
546
+ self.history.append(Message(sender="system", content=initial_context))
547
+
548
+ logger.info(f"Conversation: Starting '{topic}' with {list(self.agents.keys())}")
549
+
550
+ for round_num in range(rounds):
551
+ logger.info(f"Conversation: Round {round_num + 1}/{rounds}")
552
+
553
+ for agent_name in self._get_speaker_order(round_num):
554
+ agent = self.agents[agent_name]
555
+
556
+ # Build the conversation state for this agent
557
+ conv_text = self._format_history()
558
+ state = State(
559
+ data={
560
+ "conversation": conv_text,
561
+ "topic": topic,
562
+ "round": round_num + 1,
563
+ "role": agent_name,
564
+ },
565
+ summary=f"Conversation round {round_num + 1}. Topic: {topic}\n\n{conv_text}",
566
+ )
567
+
568
+ # Agent responds (this feeds the Ξ¦ loop!)
569
+ purpose = (
570
+ f"You are '{agent_name}' in a team discussion about: {topic}. "
571
+ f"Read the conversation so far and contribute your expert perspective. "
572
+ f"Be concise and actionable."
573
+ )
574
+ result = agent.run(purpose, state=state)
575
+
576
+ # Extract the agent's contribution
577
+ response = result.final_state.data.get(
578
+ "_last_result",
579
+ result.final_state.summary or "(no response)",
580
+ )
581
+
582
+ self.history.append(Message(
583
+ sender=agent_name,
584
+ content=response,
585
+ metadata={
586
+ "round": round_num + 1,
587
+ "phi": result.final_phi,
588
+ "success": result.success,
589
+ },
590
+ ))
591
+
592
+ logger.info(f" {agent_name}: {response[:100]}...")
593
+
594
+ # Build final state with full conversation
595
+ return State(
596
+ data={
597
+ "topic": topic,
598
+ "rounds": rounds,
599
+ "messages": [
600
+ {"sender": m.sender, "content": m.content}
601
+ for m in self.history
602
+ ],
603
+ "final_summary": self.history[-1].content if self.history else "",
604
+ },
605
+ summary=self._format_history(),
606
+ )
607
+
608
+ def _get_speaker_order(self, round_num: int) -> list[str]:
609
+ """Determine speaking order for a round."""
610
+ if self.speaker_selection == "round_robin":
611
+ return self.agent_order
612
+ elif self.speaker_selection == "auto":
613
+ # Reverse every other round for variety
614
+ order = list(self.agent_order)
615
+ if round_num % 2 == 1:
616
+ order.reverse()
617
+ return order
618
+ return self.agent_order
619
+
620
+ def _format_history(self) -> str:
621
+ """Format conversation history as text."""
622
+ lines = []
623
+ for msg in self.history:
624
+ if msg.sender == "system":
625
+ lines.append(f"[System] {msg.content}")
626
+ else:
627
+ lines.append(f"[{msg.sender}] {msg.content}")
628
+ return "\n\n".join(lines)
629
+
630
+
631
+ # ═══════════════════════════════════════════════════════════════════════════
632
+ # 5. KNOWLEDGE β€” RAG-as-a-tool (LlamaIndex-style)
633
+ # ═══════════════════════════════════════════════════════════════════════════
634
+
635
+ class KnowledgeStore:
636
+ """
637
+ Knowledge-aware agents β€” LlamaIndex's RAG, as a simple Tool.
638
+
639
+ Chunks documents, embeds them, retrieves relevant chunks for queries.
640
+ Plugs into any Agent as a tool β€” the agent decides when to retrieve.
641
+
642
+ No external dependencies. Uses the same trigram embedding as ExperienceReplay.
643
+ For production, swap in sentence-transformers via EmbeddingBackend.
644
+
645
+ Usage:
646
+ # From files
647
+ kb = KnowledgeStore.from_directory("./docs", glob="*.md")
648
+
649
+ # From strings
650
+ kb = KnowledgeStore.from_texts([
651
+ "Python was created by Guido van Rossum.",
652
+ "Python 3.12 added PEP 695 type aliases.",
653
+ ])
654
+
655
+ # As a tool for any agent
656
+ agent = Agent("assistant", tools=[kb.as_tool()])
657
+ result = agent.run("What PEP was added in Python 3.12?")
658
+
659
+ # Direct query
660
+ results = kb.query("type aliases", top_k=3)
661
+ """
662
+
663
+ def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50, top_k: int = 5):
664
+ self.chunk_size = chunk_size
665
+ self.chunk_overlap = chunk_overlap
666
+ self.top_k = top_k
667
+ self._chunks: list[dict[str, Any]] = [] # {text, embedding, source, index}
668
+
669
+ def add_text(self, text: str, source: str = "unknown") -> int:
670
+ """Add a text document β€” auto-chunks and embeds."""
671
+ chunks = self._chunk_text(text)
672
+ count = 0
673
+ for chunk in chunks:
674
+ embedding = self._embed(chunk)
675
+ self._chunks.append({
676
+ "text": chunk,
677
+ "embedding": embedding,
678
+ "source": source,
679
+ "index": len(self._chunks),
680
+ })
681
+ count += 1
682
+ return count
683
+
684
+ def add_file(self, path: str) -> int:
685
+ """Add a file to the knowledge store."""
686
+ with open(path, "r", errors="ignore") as f:
687
+ text = f.read()
688
+ return self.add_text(text, source=os.path.basename(path))
689
+
690
+ @classmethod
691
+ def from_texts(cls, texts: list[str], **kwargs) -> "KnowledgeStore":
692
+ """Create from a list of text strings."""
693
+ store = cls(**kwargs)
694
+ for i, text in enumerate(texts):
695
+ store.add_text(text, source=f"text_{i}")
696
+ return store
697
+
698
+ @classmethod
699
+ def from_directory(cls, path: str, glob: str = "*.txt", **kwargs) -> "KnowledgeStore":
700
+ """Create from all matching files in a directory."""
701
+ store = cls(**kwargs)
702
+ p = Path(path)
703
+ for file in sorted(p.glob(glob)):
704
+ store.add_file(str(file))
705
+ logger.info(f"KnowledgeStore: Loaded {len(store._chunks)} chunks from {path}")
706
+ return store
707
+
708
+ def query(self, query: str, top_k: int | None = None) -> list[dict[str, Any]]:
709
+ """Retrieve the most relevant chunks for a query."""
710
+ k = top_k or self.top_k
711
+ if not self._chunks:
712
+ return []
713
+
714
+ query_emb = self._embed(query)
715
+ scored = []
716
+ for chunk in self._chunks:
717
+ sim = self._cosine_sim(query_emb, chunk["embedding"])
718
+ scored.append((sim, chunk))
719
+ scored.sort(key=lambda x: -x[0])
720
+
721
+ return [
722
+ {"text": c["text"], "source": c["source"], "score": round(s, 3)}
723
+ for s, c in scored[:k]
724
+ ]
725
+
726
+ def as_tool(self, name: str = "knowledge_search", description: str | None = None) -> Tool:
727
+ """
728
+ Convert this KnowledgeStore into a Tool that any Agent can use.
729
+
730
+ This is the LlamaIndex QueryEngineTool pattern β€” RAG as a tool.
731
+ The agent decides WHEN to retrieve (agentic RAG), rather than
732
+ always retrieving (traditional RAG pipeline).
733
+ """
734
+ desc = description or (
735
+ f"Search the knowledge base ({len(self._chunks)} chunks). "
736
+ f"Use this to find specific information from documents."
737
+ )
738
+ store = self
739
+
740
+ class _KnowledgeTool(Tool):
741
+ name_attr = name
742
+ description_attr = desc
743
+ parameters = {
744
+ "type": "object",
745
+ "properties": {
746
+ "query": {
747
+ "type": "string",
748
+ "description": "Search query β€” use specific terms, not questions",
749
+ }
750
+ },
751
+ "required": ["query"],
752
+ }
753
+
754
+ def __init__(self_tool):
755
+ self_tool.name = name
756
+ self_tool.description = desc
757
+
758
+ def execute(self_tool, query: str) -> str:
759
+ results = store.query(query)
760
+ if not results:
761
+ return "No relevant documents found."
762
+ parts = []
763
+ for i, r in enumerate(results, 1):
764
+ parts.append(f"[{i}] (score={r['score']}, source={r['source']})\n{r['text']}")
765
+ return "\n\n".join(parts)
766
+
767
+ return _KnowledgeTool()
768
+
769
+ @property
770
+ def size(self) -> int:
771
+ return len(self._chunks)
772
+
773
+ # --- Internal ---
774
+
775
+ def _chunk_text(self, text: str) -> list[str]:
776
+ """Split text into overlapping chunks."""
777
+ if len(text) <= self.chunk_size:
778
+ return [text] if text.strip() else []
779
+
780
+ chunks = []
781
+ start = 0
782
+ while start < len(text):
783
+ end = start + self.chunk_size
784
+ chunk = text[start:end].strip()
785
+ if chunk:
786
+ chunks.append(chunk)
787
+ start += self.chunk_size - self.chunk_overlap
788
+ return chunks
789
+
790
+ @staticmethod
791
+ def _embed(text: str) -> list[float]:
792
+ """Lightweight trigram embedding (same as ExperienceReplay)."""
793
+ dim = 128
794
+ vec = [0.0] * dim
795
+ text_lower = text.lower()
796
+ for i in range(len(text_lower) - 2):
797
+ trigram = text_lower[i:i + 3]
798
+ h = hash(trigram) % dim
799
+ vec[h] += 1.0
800
+ magnitude = math.sqrt(sum(x * x for x in vec))
801
+ if magnitude > 0:
802
+ vec = [x / magnitude for x in vec]
803
+ return vec
804
+
805
+ @staticmethod
806
+ def _cosine_sim(a: list[float], b: list[float]) -> float:
807
+ if not a or not b or len(a) != len(b):
808
+ return 0.0
809
+ dot = sum(x * y for x, y in zip(a, b))
810
+ mag_a = math.sqrt(sum(x * x for x in a))
811
+ mag_b = math.sqrt(sum(x * x for x in b))
812
+ if mag_a == 0 or mag_b == 0:
813
+ return 0.0
814
+ return dot / (mag_a * mag_b)