File size: 32,447 Bytes
525ba6d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
66b6eab
 
 
 
 
525ba6d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
936a07c
 
525ba6d
 
 
 
936a07c
 
 
525ba6d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
66b6eab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
525ba6d
 
 
 
 
 
 
66b6eab
 
 
525ba6d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b87dac7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
"""
Unified Capabilities β€” Five competing framework philosophies in one composable layer.

    LangGraph  β†’ "I want control"      β†’ GraphOrchestrator (conditional edges, cycles, fan-out/fan-in)
    CrewAI     β†’ "I want speed"         β†’ ParallelRunner (concurrent tasks, parallel fan-out)
    AutoGen    β†’ "I want agents talking" β†’ Conversation (agent-to-agent message passing, group chat)
    OpenAI SDK β†’ "I want plug-and-play"  β†’ Agent() one-liner factory
    LlamaIndex β†’ "I want knowledge"     β†’ KnowledgeStore (RAG-as-a-tool, chunk + embed + retrieve)

Design principle: ZERO changes to the existing Orchestrator/Actor/PurposeFunction.
Each capability is a composable layer that calls the existing modules.
The self-improvement loop (Ξ¦ scoring β†’ experience replay β†’ heuristic distillation)
runs INSIDE each capability automatically β€” every graph node, every parallel task,
every conversation turn feeds the same learning loop.

Usage:
    # Plug-and-play (OpenAI SDK simplicity)
    agent = Agent("researcher", model="qwen3:1.7b", tools=[SearchTool()])
    result = agent.run("Find information about X")

    # Control flow (LangGraph power)
    graph = Graph()
    graph.add_node("research", research_agent)
    graph.add_node("write", writer_agent)
    graph.add_edge("research", "write")
    graph.add_conditional_edge("write", review_fn, {"pass": END, "fail": "research"})
    result = graph.run(initial_state)

    # Speed (CrewAI parallelism)
    results = parallel([task1, task2, task3], agents=[a1, a2, a3])

    # Conversation (AutoGen talking)
    chat = Conversation([researcher, coder, reviewer])
    result = chat.run("Build a web scraper", rounds=5)

    # Knowledge (LlamaIndex RAG)
    kb = KnowledgeStore.from_directory("./docs")
    agent = Agent("assistant", tools=[kb.as_tool()])
    result = agent.run("What does the documentation say about X?")
"""

from __future__ import annotations

import asyncio
import json
import logging
import math
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Iterator

from purpose_agent.types import (
    Action, Heuristic, MemoryTier, PurposeScore, State,
    Trajectory, TrajectoryStep,
)
from purpose_agent.llm_backend import LLMBackend, MockLLMBackend, ChatMessage
from purpose_agent.actor import Actor
from purpose_agent.purpose_function import PurposeFunction
from purpose_agent.experience_replay import ExperienceReplay
from purpose_agent.optimizer import HeuristicOptimizer
from purpose_agent.orchestrator import (
    Environment, Orchestrator, SimpleEnvironment, TaskResult,
)
from purpose_agent.tools import Tool, FunctionTool, ToolResult, ToolRegistry

logger = logging.getLogger(__name__)

# Sentinel for graph end node
END = "__END__"
START = "__START__"


import threading

# Global lock for shared replay/optimizer in parallel execution
_parallel_lock = threading.Lock()

# ═══════════════════════════════════════════════════════════════════════════
# 1. PLUG-AND-PLAY β€” Agent() one-liner factory (OpenAI Agents SDK simplicity)
# ═══════════════════════════════════════════════════════════════════════════

class Agent:
    """
    One-liner agent factory. The simplest way to create and run an agent.

    Inspired by OpenAI Agents SDK: Agent(name, instructions, tools) β†’ run(task).
    But ours self-improves. Every run feeds the Ξ¦ loop.

    Usage:
        # Minimal (uses mock for testing)
        agent = Agent("helper")
        result = agent.run("Do something")

        # With local SLM
        agent = Agent("coder", model="qwen3:1.7b", tools=[PythonExecTool()])
        result = agent.run("Write a sorting algorithm")

        # With cloud LLM
        agent = Agent("analyst", model="gpt-4o", api_key="sk-...")
        result = agent.run("Analyze this data")

        # Handoff to another agent
        agent_a = Agent("researcher", model="qwen3:1.7b")
        agent_b = Agent("writer", model="phi4-mini", handoff_from=agent_a)
        # agent_b inherits agent_a's experience replay
    """

    def __init__(
        self,
        name: str = "agent",
        instructions: str = "",
        model: str | LLMBackend | None = None,
        tools: list[Tool] | None = None,
        api_key: str | None = None,
        max_steps: int = 15,
        handoff_from: "Agent | None" = None,
        persistence_dir: str | None = None,
    ):
        self.name = name
        self.instructions = instructions
        self.max_steps = max_steps

        # Resolve LLM backend
        if model is None:
            self.llm = MockLLMBackend()
        elif isinstance(model, str):
            self.llm = self._resolve_model(model, api_key)
        else:
            self.llm = model

        # Build available actions from tools
        available_actions = {"DONE": "Signal task completion"}
        self._tools = {}
        for tool in (tools or []):
            available_actions[tool.name] = tool.description
            self._tools[tool.name] = tool

        # Build environment that executes tools
        self._env = _ToolEnvironment(self._tools)

        # Create orchestrator
        self.orch = Orchestrator(
            llm=self.llm,
            environment=self._env,
            available_actions=available_actions,
            persistence_dir=persistence_dir or f"./.purpose_agent/{name}",
        )

        # Handoff: inherit experience from another agent
        if handoff_from:
            self.orch.experience_replay = handoff_from.orch.experience_replay
            self.orch.optimizer = handoff_from.orch.optimizer
            self.orch.sync_memory()

        # Inject custom instructions into actor's strategic memory
        if instructions:
            h = Heuristic(
                pattern="Always", strategy=instructions, steps=[],
                tier=MemoryTier.STRATEGIC, q_value=1.0,
            )
            self.orch.optimizer.heuristic_library.append(h)
            self.orch.sync_memory()

    def run(self, task: str, state: State | None = None) -> TaskResult:
        """Run a task. Returns TaskResult with trajectory, final state, success."""
        return self.orch.run_task(
            purpose=task,
            initial_state=state or State(data={}),
            max_steps=self.max_steps,
        )

    def __call__(self, task: str, **kwargs) -> TaskResult:
        return self.run(task, **kwargs)

    @staticmethod
    def _resolve_model(model: str, api_key: str | None = None) -> LLMBackend:
        """Resolve a model string to an LLMBackend."""
        from purpose_agent.slm_backends import SLM_REGISTRY
        
        # Known SLM registry keys
        if model in SLM_REGISTRY:
            from purpose_agent.slm_backends import create_slm_backend
            return create_slm_backend(model)

        # Delegate to the centralized resolver for all other models (e.g. groq:, openai:)
        from purpose_agent.llm_backend import resolve_backend
        return resolve_backend(model, api_key)


class _ToolEnvironment(Environment):
    """Environment that executes tools based on action names."""

    def __init__(self, tools: dict[str, Tool]):
        self._tools = tools

    def execute(self, action: Action, current_state: State) -> State:
        tool = self._tools.get(action.name)
        if not tool:
            return State(
                data={**current_state.data, "_last_result": f"Unknown tool: {action.name}"},
                summary=f"Error: Unknown tool '{action.name}'",
            )
        result = tool.run(**action.params)
        new_data = {**current_state.data, "_last_result": result.output, "_last_tool": action.name}
        if not result.success:
            new_data["_last_error"] = result.error
        return State(data=new_data, summary=result.output[:500])

    def reset(self) -> State:
        return State(data={})


# ═══════════════════════════════════════════════════════════════════════════
# 2. CONTROL β€” Graph execution engine (LangGraph-style)
# ═══════════════════════════════════════════════════════════════════════════

@dataclass
class GraphNode:
    """A node in the execution graph."""
    name: str
    handler: Callable[[State], State | TaskResult] | Agent
    metadata: dict[str, Any] = field(default_factory=dict)


@dataclass
class GraphEdge:
    """An edge in the execution graph."""
    source: str
    target: str
    condition: Callable[[State], bool] | None = None  # None = unconditional


class Graph:
    """
    Graph-based workflow execution β€” LangGraph's control, with Ξ¦ self-improvement.

    Supports: conditional branching, cycles (loops), parallel fan-out/fan-in.
    Every node that runs an Agent automatically feeds the Ξ¦ improvement loop.

    Usage:
        graph = Graph()

        # Add nodes (agents or functions)
        graph.add_node("research", Agent("researcher", model="qwen3:1.7b"))
        graph.add_node("write", Agent("writer", model="phi4-mini"))
        graph.add_node("review", lambda state: review_fn(state))

        # Linear flow
        graph.add_edge(START, "research")
        graph.add_edge("research", "write")

        # Conditional branching (cycle back on failure)
        graph.add_conditional_edge("write", "review",
            condition_map={"pass": END, "revise": "write"})

        result = graph.run(State(data={"topic": "AI safety"}))
    """

    def __init__(self):
        self._nodes: dict[str, GraphNode] = {}
        self._edges: list[GraphEdge] = []
        self._conditional_edges: dict[str, dict] = {}  # source β†’ {condition_fn, map}
        self._entry: str | None = None

    def add_node(self, name: str, handler: Callable | Agent) -> "Graph":
        """Add a node. Handler is either an Agent or a function(State) β†’ State."""
        self._nodes[name] = GraphNode(name=name, handler=handler)
        return self

    def add_edge(self, source: str, target: str) -> "Graph":
        """Add an unconditional edge."""
        self._edges.append(GraphEdge(source=source, target=target))
        if source == START:
            self._entry = target
        return self

    def add_conditional_edge(
        self,
        source: str,
        evaluator: str | Callable[[State], str],
        condition_map: dict[str, str] | None = None,
    ) -> "Graph":
        """
        Add a conditional edge. After source node runs, evaluator determines next node.

        evaluator: A function(State) β†’ str (returns key from condition_map)
                   OR a node name that will be run to produce the routing decision
        condition_map: {"key": "target_node"} β€” maps evaluator output to next node.
                       Use END as target to terminate.
        """
        self._conditional_edges[source] = {
            "evaluator": evaluator,
            "map": condition_map or {},
        }
        return self

    def run(
        self,
        initial_state: State | None = None,
        max_iterations: int = 20,
    ) -> State:
        """Execute the graph from START to END."""
        state = initial_state or State(data={})

        if not self._entry:
            # Auto-detect entry: first node added
            if self._nodes:
                self._entry = list(self._nodes.keys())[0]
            else:
                raise ValueError("Graph has no nodes")

        current = self._entry
        visited_count: dict[str, int] = {}

        for iteration in range(max_iterations):
            if current == END:
                logger.info(f"Graph: Reached END after {iteration} iterations")
                break

            if current not in self._nodes:
                raise ValueError(f"Graph: Unknown node '{current}'")

            visited_count[current] = visited_count.get(current, 0) + 1
            logger.info(f"Graph: Executing node '{current}' (visit #{visited_count[current]})")

            # Execute node
            node = self._nodes[current]
            state = self._execute_node(node, state)

            # Determine next node
            if current in self._conditional_edges:
                cond = self._conditional_edges[current]
                evaluator = cond["evaluator"]
                cond_map = cond["map"]

                # Get routing decision
                if callable(evaluator):
                    route_key = evaluator(state)
                else:
                    route_key = str(state.data.get("_route", "default"))

                current = cond_map.get(route_key, cond_map.get("default", END))
                logger.info(f"Graph: Conditional route '{route_key}' β†’ '{current}'")
            else:
                # Find unconditional edge
                next_node = None
                for edge in self._edges:
                    if edge.source == current:
                        next_node = edge.target
                        break
                current = next_node or END
        else:
            logger.warning(f"Graph: Hit max iterations ({max_iterations})")

        return state

    def _execute_node(self, node: GraphNode, state: State) -> State:
        """Execute a single node β€” Agent or function."""
        handler = node.handler

        if isinstance(handler, Agent):
            # Run the agent on the current state, extract purpose from state data
            purpose = state.data.get("_purpose", state.data.get("task", f"Execute {node.name}"))
            result = handler.run(purpose, state=state)
            # Merge agent's final state into the graph state
            merged = {**state.data, **result.final_state.data}
            merged["_last_node"] = node.name
            merged["_last_success"] = result.success
            merged["_last_phi"] = result.final_phi
            return State(data=merged, summary=result.final_state.summary)

        elif callable(handler):
            result = handler(state)
            if isinstance(result, State):
                return result
            elif isinstance(result, TaskResult):
                return result.final_state
            else:
                return State(data={**state.data, "_result": str(result)})

        raise ValueError(f"Invalid node handler type: {type(handler)}")


# ═══════════════════════════════════════════════════════════════════════════
# 3. SPEED β€” Parallel execution (CrewAI-style)
# ═══════════════════════════════════════════════════════════════════════════

def parallel(
    tasks: list[str] | list[dict[str, Any]],
    agents: list[Agent] | Agent | None = None,
    max_workers: int | None = None,
    initial_states: list[State] | None = None,
) -> list[TaskResult]:
    """
    Run multiple tasks in parallel β€” CrewAI's speed, with Ξ¦ self-improvement.

    Every parallel task feeds the same improvement loop, so agents learn
    even from concurrent executions.

    Usage:
        # Same agent, multiple tasks
        agent = Agent("worker", model="qwen3:1.7b")
        results = parallel(["task 1", "task 2", "task 3"], agent)

        # Different agents for different tasks
        results = parallel(
            ["research X", "code Y", "review Z"],
            agents=[researcher, coder, reviewer],
        )

        # Dict-based tasks with metadata
        results = parallel([
            {"purpose": "research X", "max_steps": 10},
            {"purpose": "code Y", "max_steps": 20},
        ], agent)
    """
    # Normalize tasks
    normalized: list[dict] = []
    for t in tasks:
        if isinstance(t, str):
            normalized.append({"purpose": t})
        else:
            normalized.append(t)

    # Normalize agents
    if agents is None:
        agent_list = [Agent("worker")] * len(normalized)
    elif isinstance(agents, Agent):
        agent_list = [agents] * len(normalized)
    else:
        if len(agents) < len(normalized):
            # Cycle agents
            agent_list = [agents[i % len(agents)] for i in range(len(normalized))]
        else:
            agent_list = agents

    states = initial_states or [None] * len(normalized)

    # Thread safety: detect backend type for concurrency limit
    # Local backends (Ollama, llama-cpp) share one GPU/CPU β€” serialize
    # Cloud/API backends can parallelize
    if max_workers is None:
        sample_agent = agent_list[0] if agent_list else None
        if sample_agent and hasattr(sample_agent, 'llm'):
            backend_type = type(sample_agent.llm).__name__
            if backend_type in ("OllamaBackend", "LlamaCppBackend", "MockLLMBackend"):
                workers = 1  # Local model β€” serialize to avoid contention
            else:
                workers = min(len(normalized), 8)
        else:
            workers = min(len(normalized), 8)
    else:
        workers = max_workers

    logger.info(f"Parallel: Running {len(normalized)} tasks with {workers} workers")

    def _run_one(idx: int) -> TaskResult:
        task = normalized[idx]
        agent = agent_list[idx]
        state = states[idx]
        # Lock around shared replay/optimizer writes
        with _parallel_lock:
            return agent.run(task["purpose"], state=state)

    results: list[TaskResult | None] = [None] * len(normalized)

    with ThreadPoolExecutor(max_workers=workers) as executor:
        future_to_idx = {
            executor.submit(_run_one, i): i
            for i in range(len(normalized))
        }
        for future in as_completed(future_to_idx):
            idx = future_to_idx[future]
            try:
                results[idx] = future.result()
                logger.info(f"Parallel: Task {idx} completed β€” success={results[idx].success}")
            except Exception as e:
                logger.error(f"Parallel: Task {idx} failed β€” {e}")
                results[idx] = TaskResult(
                    trajectory=Trajectory(
                        task_description=normalized[idx]["purpose"],
                        purpose=normalized[idx]["purpose"],
                    ),
                    final_state=State(data={"_error": str(e)}),
                )

    return results


# ═══════════════════════════════════════════════════════════════════════════
# 4. CONVERSATION β€” Agent-to-agent messaging (AutoGen-style)
# ═══════════════════════════════════════════════════════════════════════════

@dataclass
class Message:
    """A message in an agent conversation."""
    sender: str
    content: str
    timestamp: float = field(default_factory=time.time)
    metadata: dict[str, Any] = field(default_factory=dict)


class Conversation:
    """
    Multi-agent conversation β€” AutoGen's talking, with Ξ¦ self-improvement.

    Agents take turns speaking. Each agent sees the full conversation history
    and contributes its perspective. The conversation continues for N rounds
    or until agents converge on a solution.

    Every agent's turn feeds the Ξ¦ loop β€” agents learn from conversations.

    Usage:
        researcher = Agent("researcher", model="qwen3:1.7b")
        coder = Agent("coder", model="phi4-mini")
        reviewer = Agent("reviewer", model="qwen3:1.7b")

        chat = Conversation([researcher, coder, reviewer])
        result = chat.run("Build a web scraper for news articles", rounds=5)

        # Access conversation history
        for msg in chat.history:
            print(f"{msg.sender}: {msg.content[:100]}")
    """

    def __init__(
        self,
        agents: list[Agent],
        moderator: Agent | LLMBackend | None = None,
        speaker_selection: str = "round_robin",  # "round_robin", "auto", "manual"
    ):
        self.agents = {a.name: a for a in agents}
        self.agent_order = [a.name for a in agents]
        self.moderator = moderator
        self.speaker_selection = speaker_selection
        self.history: list[Message] = []

    def run(
        self,
        topic: str,
        rounds: int = 3,
        initial_context: str = "",
    ) -> State:
        """
        Run a conversation about a topic for N rounds.

        Returns final State with conversation results.
        """
        self.history = [Message(sender="system", content=f"Topic: {topic}")]
        if initial_context:
            self.history.append(Message(sender="system", content=initial_context))

        logger.info(f"Conversation: Starting '{topic}' with {list(self.agents.keys())}")

        for round_num in range(rounds):
            logger.info(f"Conversation: Round {round_num + 1}/{rounds}")

            for agent_name in self._get_speaker_order(round_num):
                agent = self.agents[agent_name]

                # Build the conversation state for this agent
                conv_text = self._format_history()
                state = State(
                    data={
                        "conversation": conv_text,
                        "topic": topic,
                        "round": round_num + 1,
                        "role": agent_name,
                    },
                    summary=f"Conversation round {round_num + 1}. Topic: {topic}\n\n{conv_text}",
                )

                # Agent responds (this feeds the Ξ¦ loop!)
                purpose = (
                    f"You are '{agent_name}' in a team discussion about: {topic}. "
                    f"Read the conversation so far and contribute your expert perspective. "
                    f"Be concise and actionable."
                )
                result = agent.run(purpose, state=state)

                # Extract the agent's contribution
                response = result.final_state.data.get(
                    "_last_result",
                    result.final_state.summary or "(no response)",
                )

                self.history.append(Message(
                    sender=agent_name,
                    content=response,
                    metadata={
                        "round": round_num + 1,
                        "phi": result.final_phi,
                        "success": result.success,
                    },
                ))

                logger.info(f"  {agent_name}: {response[:100]}...")

        # Build final state with full conversation
        return State(
            data={
                "topic": topic,
                "rounds": rounds,
                "messages": [
                    {"sender": m.sender, "content": m.content}
                    for m in self.history
                ],
                "final_summary": self.history[-1].content if self.history else "",
            },
            summary=self._format_history(),
        )

    def _get_speaker_order(self, round_num: int) -> list[str]:
        """Determine speaking order for a round."""
        if self.speaker_selection == "round_robin":
            return self.agent_order
        elif self.speaker_selection == "auto":
            # Reverse every other round for variety
            order = list(self.agent_order)
            if round_num % 2 == 1:
                order.reverse()
            return order
        return self.agent_order

    def _format_history(self) -> str:
        """Format conversation history as text."""
        lines = []
        for msg in self.history:
            if msg.sender == "system":
                lines.append(f"[System] {msg.content}")
            else:
                lines.append(f"[{msg.sender}] {msg.content}")
        return "\n\n".join(lines)


# ═══════════════════════════════════════════════════════════════════════════
# 5. KNOWLEDGE β€” RAG-as-a-tool (LlamaIndex-style)
# ═══════════════════════════════════════════════════════════════════════════

class KnowledgeStore:
    """
    Knowledge-aware agents β€” LlamaIndex's RAG, as a simple Tool.

    Chunks documents, embeds them, retrieves relevant chunks for queries.
    Plugs into any Agent as a tool β€” the agent decides when to retrieve.

    No external dependencies. Uses the same trigram embedding as ExperienceReplay.
    For production, swap in sentence-transformers via EmbeddingBackend.

    Usage:
        # From files
        kb = KnowledgeStore.from_directory("./docs", glob="*.md")

        # From strings
        kb = KnowledgeStore.from_texts([
            "Python was created by Guido van Rossum.",
            "Python 3.12 added PEP 695 type aliases.",
        ])

        # As a tool for any agent
        agent = Agent("assistant", tools=[kb.as_tool()])
        result = agent.run("What PEP was added in Python 3.12?")

        # Direct query
        results = kb.query("type aliases", top_k=3)
    """

    def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50, top_k: int = 5):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.top_k = top_k
        self._chunks: list[dict[str, Any]] = []  # {text, embedding, source, index}

    def add_text(self, text: str, source: str = "unknown") -> int:
        """Add a text document β€” auto-chunks and embeds."""
        chunks = self._chunk_text(text)
        count = 0
        for chunk in chunks:
            embedding = self._embed(chunk)
            self._chunks.append({
                "text": chunk,
                "embedding": embedding,
                "source": source,
                "index": len(self._chunks),
            })
            count += 1
        return count

    def add_file(self, path: str) -> int:
        """Add a file to the knowledge store."""
        with open(path, "r", errors="ignore") as f:
            text = f.read()
        return self.add_text(text, source=os.path.basename(path))

    @classmethod
    def from_texts(cls, texts: list[str], **kwargs) -> "KnowledgeStore":
        """Create from a list of text strings."""
        store = cls(**kwargs)
        for i, text in enumerate(texts):
            store.add_text(text, source=f"text_{i}")
        return store

    @classmethod
    def from_directory(cls, path: str, glob: str = "*.txt", **kwargs) -> "KnowledgeStore":
        """Create from all matching files in a directory."""
        store = cls(**kwargs)
        p = Path(path)
        for file in sorted(p.glob(glob)):
            store.add_file(str(file))
        logger.info(f"KnowledgeStore: Loaded {len(store._chunks)} chunks from {path}")
        return store

    def query(self, query: str, top_k: int | None = None) -> list[dict[str, Any]]:
        """Retrieve the most relevant chunks for a query."""
        k = top_k or self.top_k
        if not self._chunks:
            return []

        query_emb = self._embed(query)
        scored = []
        for chunk in self._chunks:
            sim = self._cosine_sim(query_emb, chunk["embedding"])
            scored.append((sim, chunk))
        scored.sort(key=lambda x: -x[0])

        return [
            {"text": c["text"], "source": c["source"], "score": round(s, 3)}
            for s, c in scored[:k]
        ]

    def as_tool(self, name: str = "knowledge_search", description: str | None = None) -> Tool:
        """
        Convert this KnowledgeStore into a Tool that any Agent can use.

        This is the LlamaIndex QueryEngineTool pattern β€” RAG as a tool.
        The agent decides WHEN to retrieve (agentic RAG), rather than
        always retrieving (traditional RAG pipeline).
        """
        desc = description or (
            f"Search the knowledge base ({len(self._chunks)} chunks). "
            f"Use this to find specific information from documents."
        )
        store = self

        class _KnowledgeTool(Tool):
            name_attr = name
            description_attr = desc
            parameters = {
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "Search query β€” use specific terms, not questions",
                    }
                },
                "required": ["query"],
            }

            def __init__(self_tool):
                self_tool.name = name
                self_tool.description = desc

            def execute(self_tool, query: str) -> str:
                results = store.query(query)
                if not results:
                    return "No relevant documents found."
                parts = []
                for i, r in enumerate(results, 1):
                    parts.append(f"[{i}] (score={r['score']}, source={r['source']})\n{r['text']}")
                return "\n\n".join(parts)

        return _KnowledgeTool()

    @property
    def size(self) -> int:
        return len(self._chunks)

    # --- Internal ---

    def _chunk_text(self, text: str) -> list[str]:
        """Split text into overlapping chunks."""
        if len(text) <= self.chunk_size:
            return [text] if text.strip() else []

        chunks = []
        start = 0
        while start < len(text):
            end = start + self.chunk_size
            chunk = text[start:end].strip()
            if chunk:
                chunks.append(chunk)
            start += self.chunk_size - self.chunk_overlap
        return chunks

    @staticmethod
    def _embed(text: str) -> list[float]:
        """Lightweight trigram embedding (same as ExperienceReplay)."""
        dim = 128
        vec = [0.0] * dim
        text_lower = text.lower()
        for i in range(len(text_lower) - 2):
            trigram = text_lower[i:i + 3]
            h = hash(trigram) % dim
            vec[h] += 1.0
        magnitude = math.sqrt(sum(x * x for x in vec))
        if magnitude > 0:
            vec = [x / magnitude for x in vec]
        return vec

    @staticmethod
    def _cosine_sim(a: list[float], b: list[float]) -> float:
        if not a or not b or len(a) != len(b):
            return 0.0
        dot = sum(x * y for x, y in zip(a, b))
        mag_a = math.sqrt(sum(x * x for x in a))
        mag_b = math.sqrt(sum(x * x for x in b))
        if mag_a == 0 or mag_b == 0:
            return 0.0
        return dot / (mag_a * mag_b)


# ═══════════════════════════════════════════════════════════════════════════
# CREATIVE ALIASES β€” Purpose Agent's own names (primary)
# Old names kept for backward compatibility
# ═══════════════════════════════════════════════════════════════════════════

# New names (use these)
Spark = Agent           # A spark of intelligence β€” the atomic agent unit
Flow = Graph            # Data flows through nodes β€” workflow engine
swarm = parallel        # Agents working concurrently like a swarm
Council = Conversation  # Agents deliberate like a council
Vault = KnowledgeStore  # Knowledge vault β€” RAG as a tool
BEGIN = START
DONE_SIGNAL = END