| """ |
| Unified Capabilities β Five competing framework philosophies in one composable layer. |
| |
| LangGraph β "I want control" β GraphOrchestrator (conditional edges, cycles, fan-out/fan-in) |
| CrewAI β "I want speed" β ParallelRunner (concurrent tasks, parallel fan-out) |
| AutoGen β "I want agents talking" β Conversation (agent-to-agent message passing, group chat) |
| OpenAI SDK β "I want plug-and-play" β Agent() one-liner factory |
| LlamaIndex β "I want knowledge" β KnowledgeStore (RAG-as-a-tool, chunk + embed + retrieve) |
| |
| Design principle: ZERO changes to the existing Orchestrator/Actor/PurposeFunction. |
| Each capability is a composable layer that calls the existing modules. |
| The self-improvement loop (Ξ¦ scoring β experience replay β heuristic distillation) |
| runs INSIDE each capability automatically β every graph node, every parallel task, |
| every conversation turn feeds the same learning loop. |
| |
| Usage: |
| # Plug-and-play (OpenAI SDK simplicity) |
| agent = Agent("researcher", model="qwen3:1.7b", tools=[SearchTool()]) |
| result = agent.run("Find information about X") |
| |
| # Control flow (LangGraph power) |
| graph = Graph() |
| graph.add_node("research", research_agent) |
| graph.add_node("write", writer_agent) |
| graph.add_edge("research", "write") |
| graph.add_conditional_edge("write", review_fn, {"pass": END, "fail": "research"}) |
| result = graph.run(initial_state) |
| |
| # Speed (CrewAI parallelism) |
| results = parallel([task1, task2, task3], agents=[a1, a2, a3]) |
| |
| # Conversation (AutoGen talking) |
| chat = Conversation([researcher, coder, reviewer]) |
| result = chat.run("Build a web scraper", rounds=5) |
| |
| # Knowledge (LlamaIndex RAG) |
| kb = KnowledgeStore.from_directory("./docs") |
| agent = Agent("assistant", tools=[kb.as_tool()]) |
| result = agent.run("What does the documentation say about X?") |
| """ |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import json |
| import logging |
| import math |
| import os |
| import time |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| from dataclasses import dataclass, field |
| from pathlib import Path |
| from typing import Any, Callable, Iterator |
|
|
| from purpose_agent.types import ( |
| Action, Heuristic, MemoryTier, PurposeScore, State, |
| Trajectory, TrajectoryStep, |
| ) |
| from purpose_agent.llm_backend import LLMBackend, MockLLMBackend, ChatMessage |
| from purpose_agent.actor import Actor |
| from purpose_agent.purpose_function import PurposeFunction |
| from purpose_agent.experience_replay import ExperienceReplay |
| from purpose_agent.optimizer import HeuristicOptimizer |
| from purpose_agent.orchestrator import ( |
| Environment, Orchestrator, SimpleEnvironment, TaskResult, |
| ) |
| from purpose_agent.tools import Tool, FunctionTool, ToolResult, ToolRegistry |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| END = "__END__" |
| START = "__START__" |
|
|
|
|
| import threading |
|
|
| |
| _parallel_lock = threading.Lock() |
|
|
| |
| |
| |
|
|
| class Agent: |
| """ |
| One-liner agent factory. The simplest way to create and run an agent. |
| |
| Inspired by OpenAI Agents SDK: Agent(name, instructions, tools) β run(task). |
| But ours self-improves. Every run feeds the Ξ¦ loop. |
| |
| Usage: |
| # Minimal (uses mock for testing) |
| agent = Agent("helper") |
| result = agent.run("Do something") |
| |
| # With local SLM |
| agent = Agent("coder", model="qwen3:1.7b", tools=[PythonExecTool()]) |
| result = agent.run("Write a sorting algorithm") |
| |
| # With cloud LLM |
| agent = Agent("analyst", model="gpt-4o", api_key="sk-...") |
| result = agent.run("Analyze this data") |
| |
| # Handoff to another agent |
| agent_a = Agent("researcher", model="qwen3:1.7b") |
| agent_b = Agent("writer", model="phi4-mini", handoff_from=agent_a) |
| # agent_b inherits agent_a's experience replay |
| """ |
|
|
| def __init__( |
| self, |
| name: str = "agent", |
| instructions: str = "", |
| model: str | LLMBackend | None = None, |
| tools: list[Tool] | None = None, |
| api_key: str | None = None, |
| max_steps: int = 15, |
| handoff_from: "Agent | None" = None, |
| persistence_dir: str | None = None, |
| ): |
| self.name = name |
| self.instructions = instructions |
| self.max_steps = max_steps |
|
|
| |
| if model is None: |
| self.llm = MockLLMBackend() |
| elif isinstance(model, str): |
| self.llm = self._resolve_model(model, api_key) |
| else: |
| self.llm = model |
|
|
| |
| available_actions = {"DONE": "Signal task completion"} |
| self._tools = {} |
| for tool in (tools or []): |
| available_actions[tool.name] = tool.description |
| self._tools[tool.name] = tool |
|
|
| |
| self._env = _ToolEnvironment(self._tools) |
|
|
| |
| self.orch = Orchestrator( |
| llm=self.llm, |
| environment=self._env, |
| available_actions=available_actions, |
| persistence_dir=persistence_dir or f"./.purpose_agent/{name}", |
| ) |
|
|
| |
| if handoff_from: |
| self.orch.experience_replay = handoff_from.orch.experience_replay |
| self.orch.optimizer = handoff_from.orch.optimizer |
| self.orch.sync_memory() |
|
|
| |
| if instructions: |
| h = Heuristic( |
| pattern="Always", strategy=instructions, steps=[], |
| tier=MemoryTier.STRATEGIC, q_value=1.0, |
| ) |
| self.orch.optimizer.heuristic_library.append(h) |
| self.orch.sync_memory() |
|
|
| def run(self, task: str, state: State | None = None) -> TaskResult: |
| """Run a task. Returns TaskResult with trajectory, final state, success.""" |
| return self.orch.run_task( |
| purpose=task, |
| initial_state=state or State(data={}), |
| max_steps=self.max_steps, |
| ) |
|
|
| def __call__(self, task: str, **kwargs) -> TaskResult: |
| return self.run(task, **kwargs) |
|
|
| @staticmethod |
| def _resolve_model(model: str, api_key: str | None = None) -> LLMBackend: |
| """Resolve a model string to an LLMBackend.""" |
| from purpose_agent.slm_backends import SLM_REGISTRY |
| |
| |
| if model in SLM_REGISTRY: |
| from purpose_agent.slm_backends import create_slm_backend |
| return create_slm_backend(model) |
|
|
| |
| from purpose_agent.llm_backend import resolve_backend |
| return resolve_backend(model, api_key) |
|
|
|
|
| class _ToolEnvironment(Environment): |
| """Environment that executes tools based on action names.""" |
|
|
| def __init__(self, tools: dict[str, Tool]): |
| self._tools = tools |
|
|
| def execute(self, action: Action, current_state: State) -> State: |
| tool = self._tools.get(action.name) |
| if not tool: |
| return State( |
| data={**current_state.data, "_last_result": f"Unknown tool: {action.name}"}, |
| summary=f"Error: Unknown tool '{action.name}'", |
| ) |
| result = tool.run(**action.params) |
| new_data = {**current_state.data, "_last_result": result.output, "_last_tool": action.name} |
| if not result.success: |
| new_data["_last_error"] = result.error |
| return State(data=new_data, summary=result.output[:500]) |
|
|
| def reset(self) -> State: |
| return State(data={}) |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class GraphNode: |
| """A node in the execution graph.""" |
| name: str |
| handler: Callable[[State], State | TaskResult] | Agent |
| metadata: dict[str, Any] = field(default_factory=dict) |
|
|
|
|
| @dataclass |
| class GraphEdge: |
| """An edge in the execution graph.""" |
| source: str |
| target: str |
| condition: Callable[[State], bool] | None = None |
|
|
|
|
| class Graph: |
| """ |
| Graph-based workflow execution β LangGraph's control, with Ξ¦ self-improvement. |
| |
| Supports: conditional branching, cycles (loops), parallel fan-out/fan-in. |
| Every node that runs an Agent automatically feeds the Ξ¦ improvement loop. |
| |
| Usage: |
| graph = Graph() |
| |
| # Add nodes (agents or functions) |
| graph.add_node("research", Agent("researcher", model="qwen3:1.7b")) |
| graph.add_node("write", Agent("writer", model="phi4-mini")) |
| graph.add_node("review", lambda state: review_fn(state)) |
| |
| # Linear flow |
| graph.add_edge(START, "research") |
| graph.add_edge("research", "write") |
| |
| # Conditional branching (cycle back on failure) |
| graph.add_conditional_edge("write", "review", |
| condition_map={"pass": END, "revise": "write"}) |
| |
| result = graph.run(State(data={"topic": "AI safety"})) |
| """ |
|
|
| def __init__(self): |
| self._nodes: dict[str, GraphNode] = {} |
| self._edges: list[GraphEdge] = [] |
| self._conditional_edges: dict[str, dict] = {} |
| self._entry: str | None = None |
|
|
| def add_node(self, name: str, handler: Callable | Agent) -> "Graph": |
| """Add a node. Handler is either an Agent or a function(State) β State.""" |
| self._nodes[name] = GraphNode(name=name, handler=handler) |
| return self |
|
|
| def add_edge(self, source: str, target: str) -> "Graph": |
| """Add an unconditional edge.""" |
| self._edges.append(GraphEdge(source=source, target=target)) |
| if source == START: |
| self._entry = target |
| return self |
|
|
| def add_conditional_edge( |
| self, |
| source: str, |
| evaluator: str | Callable[[State], str], |
| condition_map: dict[str, str] | None = None, |
| ) -> "Graph": |
| """ |
| Add a conditional edge. After source node runs, evaluator determines next node. |
| |
| evaluator: A function(State) β str (returns key from condition_map) |
| OR a node name that will be run to produce the routing decision |
| condition_map: {"key": "target_node"} β maps evaluator output to next node. |
| Use END as target to terminate. |
| """ |
| self._conditional_edges[source] = { |
| "evaluator": evaluator, |
| "map": condition_map or {}, |
| } |
| return self |
|
|
| def run( |
| self, |
| initial_state: State | None = None, |
| max_iterations: int = 20, |
| ) -> State: |
| """Execute the graph from START to END.""" |
| state = initial_state or State(data={}) |
|
|
| if not self._entry: |
| |
| if self._nodes: |
| self._entry = list(self._nodes.keys())[0] |
| else: |
| raise ValueError("Graph has no nodes") |
|
|
| current = self._entry |
| visited_count: dict[str, int] = {} |
|
|
| for iteration in range(max_iterations): |
| if current == END: |
| logger.info(f"Graph: Reached END after {iteration} iterations") |
| break |
|
|
| if current not in self._nodes: |
| raise ValueError(f"Graph: Unknown node '{current}'") |
|
|
| visited_count[current] = visited_count.get(current, 0) + 1 |
| logger.info(f"Graph: Executing node '{current}' (visit #{visited_count[current]})") |
|
|
| |
| node = self._nodes[current] |
| state = self._execute_node(node, state) |
|
|
| |
| if current in self._conditional_edges: |
| cond = self._conditional_edges[current] |
| evaluator = cond["evaluator"] |
| cond_map = cond["map"] |
|
|
| |
| if callable(evaluator): |
| route_key = evaluator(state) |
| else: |
| route_key = str(state.data.get("_route", "default")) |
|
|
| current = cond_map.get(route_key, cond_map.get("default", END)) |
| logger.info(f"Graph: Conditional route '{route_key}' β '{current}'") |
| else: |
| |
| next_node = None |
| for edge in self._edges: |
| if edge.source == current: |
| next_node = edge.target |
| break |
| current = next_node or END |
| else: |
| logger.warning(f"Graph: Hit max iterations ({max_iterations})") |
|
|
| return state |
|
|
| def _execute_node(self, node: GraphNode, state: State) -> State: |
| """Execute a single node β Agent or function.""" |
| handler = node.handler |
|
|
| if isinstance(handler, Agent): |
| |
| purpose = state.data.get("_purpose", state.data.get("task", f"Execute {node.name}")) |
| result = handler.run(purpose, state=state) |
| |
| merged = {**state.data, **result.final_state.data} |
| merged["_last_node"] = node.name |
| merged["_last_success"] = result.success |
| merged["_last_phi"] = result.final_phi |
| return State(data=merged, summary=result.final_state.summary) |
|
|
| elif callable(handler): |
| result = handler(state) |
| if isinstance(result, State): |
| return result |
| elif isinstance(result, TaskResult): |
| return result.final_state |
| else: |
| return State(data={**state.data, "_result": str(result)}) |
|
|
| raise ValueError(f"Invalid node handler type: {type(handler)}") |
|
|
|
|
| |
| |
| |
|
|
| def parallel( |
| tasks: list[str] | list[dict[str, Any]], |
| agents: list[Agent] | Agent | None = None, |
| max_workers: int | None = None, |
| initial_states: list[State] | None = None, |
| ) -> list[TaskResult]: |
| """ |
| Run multiple tasks in parallel β CrewAI's speed, with Ξ¦ self-improvement. |
| |
| Every parallel task feeds the same improvement loop, so agents learn |
| even from concurrent executions. |
| |
| Usage: |
| # Same agent, multiple tasks |
| agent = Agent("worker", model="qwen3:1.7b") |
| results = parallel(["task 1", "task 2", "task 3"], agent) |
| |
| # Different agents for different tasks |
| results = parallel( |
| ["research X", "code Y", "review Z"], |
| agents=[researcher, coder, reviewer], |
| ) |
| |
| # Dict-based tasks with metadata |
| results = parallel([ |
| {"purpose": "research X", "max_steps": 10}, |
| {"purpose": "code Y", "max_steps": 20}, |
| ], agent) |
| """ |
| |
| normalized: list[dict] = [] |
| for t in tasks: |
| if isinstance(t, str): |
| normalized.append({"purpose": t}) |
| else: |
| normalized.append(t) |
|
|
| |
| if agents is None: |
| agent_list = [Agent("worker")] * len(normalized) |
| elif isinstance(agents, Agent): |
| agent_list = [agents] * len(normalized) |
| else: |
| if len(agents) < len(normalized): |
| |
| agent_list = [agents[i % len(agents)] for i in range(len(normalized))] |
| else: |
| agent_list = agents |
|
|
| states = initial_states or [None] * len(normalized) |
|
|
| |
| |
| |
| if max_workers is None: |
| sample_agent = agent_list[0] if agent_list else None |
| if sample_agent and hasattr(sample_agent, 'llm'): |
| backend_type = type(sample_agent.llm).__name__ |
| if backend_type in ("OllamaBackend", "LlamaCppBackend", "MockLLMBackend"): |
| workers = 1 |
| else: |
| workers = min(len(normalized), 8) |
| else: |
| workers = min(len(normalized), 8) |
| else: |
| workers = max_workers |
|
|
| logger.info(f"Parallel: Running {len(normalized)} tasks with {workers} workers") |
|
|
| def _run_one(idx: int) -> TaskResult: |
| task = normalized[idx] |
| agent = agent_list[idx] |
| state = states[idx] |
| |
| with _parallel_lock: |
| return agent.run(task["purpose"], state=state) |
|
|
| results: list[TaskResult | None] = [None] * len(normalized) |
|
|
| with ThreadPoolExecutor(max_workers=workers) as executor: |
| future_to_idx = { |
| executor.submit(_run_one, i): i |
| for i in range(len(normalized)) |
| } |
| for future in as_completed(future_to_idx): |
| idx = future_to_idx[future] |
| try: |
| results[idx] = future.result() |
| logger.info(f"Parallel: Task {idx} completed β success={results[idx].success}") |
| except Exception as e: |
| logger.error(f"Parallel: Task {idx} failed β {e}") |
| results[idx] = TaskResult( |
| trajectory=Trajectory( |
| task_description=normalized[idx]["purpose"], |
| purpose=normalized[idx]["purpose"], |
| ), |
| final_state=State(data={"_error": str(e)}), |
| ) |
|
|
| return results |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class Message: |
| """A message in an agent conversation.""" |
| sender: str |
| content: str |
| timestamp: float = field(default_factory=time.time) |
| metadata: dict[str, Any] = field(default_factory=dict) |
|
|
|
|
| class Conversation: |
| """ |
| Multi-agent conversation β AutoGen's talking, with Ξ¦ self-improvement. |
| |
| Agents take turns speaking. Each agent sees the full conversation history |
| and contributes its perspective. The conversation continues for N rounds |
| or until agents converge on a solution. |
| |
| Every agent's turn feeds the Ξ¦ loop β agents learn from conversations. |
| |
| Usage: |
| researcher = Agent("researcher", model="qwen3:1.7b") |
| coder = Agent("coder", model="phi4-mini") |
| reviewer = Agent("reviewer", model="qwen3:1.7b") |
| |
| chat = Conversation([researcher, coder, reviewer]) |
| result = chat.run("Build a web scraper for news articles", rounds=5) |
| |
| # Access conversation history |
| for msg in chat.history: |
| print(f"{msg.sender}: {msg.content[:100]}") |
| """ |
|
|
| def __init__( |
| self, |
| agents: list[Agent], |
| moderator: Agent | LLMBackend | None = None, |
| speaker_selection: str = "round_robin", |
| ): |
| self.agents = {a.name: a for a in agents} |
| self.agent_order = [a.name for a in agents] |
| self.moderator = moderator |
| self.speaker_selection = speaker_selection |
| self.history: list[Message] = [] |
|
|
| def run( |
| self, |
| topic: str, |
| rounds: int = 3, |
| initial_context: str = "", |
| ) -> State: |
| """ |
| Run a conversation about a topic for N rounds. |
| |
| Returns final State with conversation results. |
| """ |
| self.history = [Message(sender="system", content=f"Topic: {topic}")] |
| if initial_context: |
| self.history.append(Message(sender="system", content=initial_context)) |
|
|
| logger.info(f"Conversation: Starting '{topic}' with {list(self.agents.keys())}") |
|
|
| for round_num in range(rounds): |
| logger.info(f"Conversation: Round {round_num + 1}/{rounds}") |
|
|
| for agent_name in self._get_speaker_order(round_num): |
| agent = self.agents[agent_name] |
|
|
| |
| conv_text = self._format_history() |
| state = State( |
| data={ |
| "conversation": conv_text, |
| "topic": topic, |
| "round": round_num + 1, |
| "role": agent_name, |
| }, |
| summary=f"Conversation round {round_num + 1}. Topic: {topic}\n\n{conv_text}", |
| ) |
|
|
| |
| purpose = ( |
| f"You are '{agent_name}' in a team discussion about: {topic}. " |
| f"Read the conversation so far and contribute your expert perspective. " |
| f"Be concise and actionable." |
| ) |
| result = agent.run(purpose, state=state) |
|
|
| |
| response = result.final_state.data.get( |
| "_last_result", |
| result.final_state.summary or "(no response)", |
| ) |
|
|
| self.history.append(Message( |
| sender=agent_name, |
| content=response, |
| metadata={ |
| "round": round_num + 1, |
| "phi": result.final_phi, |
| "success": result.success, |
| }, |
| )) |
|
|
| logger.info(f" {agent_name}: {response[:100]}...") |
|
|
| |
| return State( |
| data={ |
| "topic": topic, |
| "rounds": rounds, |
| "messages": [ |
| {"sender": m.sender, "content": m.content} |
| for m in self.history |
| ], |
| "final_summary": self.history[-1].content if self.history else "", |
| }, |
| summary=self._format_history(), |
| ) |
|
|
| def _get_speaker_order(self, round_num: int) -> list[str]: |
| """Determine speaking order for a round.""" |
| if self.speaker_selection == "round_robin": |
| return self.agent_order |
| elif self.speaker_selection == "auto": |
| |
| order = list(self.agent_order) |
| if round_num % 2 == 1: |
| order.reverse() |
| return order |
| return self.agent_order |
|
|
| def _format_history(self) -> str: |
| """Format conversation history as text.""" |
| lines = [] |
| for msg in self.history: |
| if msg.sender == "system": |
| lines.append(f"[System] {msg.content}") |
| else: |
| lines.append(f"[{msg.sender}] {msg.content}") |
| return "\n\n".join(lines) |
|
|
|
|
| |
| |
| |
|
|
| class KnowledgeStore: |
| """ |
| Knowledge-aware agents β LlamaIndex's RAG, as a simple Tool. |
| |
| Chunks documents, embeds them, retrieves relevant chunks for queries. |
| Plugs into any Agent as a tool β the agent decides when to retrieve. |
| |
| No external dependencies. Uses the same trigram embedding as ExperienceReplay. |
| For production, swap in sentence-transformers via EmbeddingBackend. |
| |
| Usage: |
| # From files |
| kb = KnowledgeStore.from_directory("./docs", glob="*.md") |
| |
| # From strings |
| kb = KnowledgeStore.from_texts([ |
| "Python was created by Guido van Rossum.", |
| "Python 3.12 added PEP 695 type aliases.", |
| ]) |
| |
| # As a tool for any agent |
| agent = Agent("assistant", tools=[kb.as_tool()]) |
| result = agent.run("What PEP was added in Python 3.12?") |
| |
| # Direct query |
| results = kb.query("type aliases", top_k=3) |
| """ |
|
|
| def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50, top_k: int = 5): |
| self.chunk_size = chunk_size |
| self.chunk_overlap = chunk_overlap |
| self.top_k = top_k |
| self._chunks: list[dict[str, Any]] = [] |
|
|
| def add_text(self, text: str, source: str = "unknown") -> int: |
| """Add a text document β auto-chunks and embeds.""" |
| chunks = self._chunk_text(text) |
| count = 0 |
| for chunk in chunks: |
| embedding = self._embed(chunk) |
| self._chunks.append({ |
| "text": chunk, |
| "embedding": embedding, |
| "source": source, |
| "index": len(self._chunks), |
| }) |
| count += 1 |
| return count |
|
|
| def add_file(self, path: str) -> int: |
| """Add a file to the knowledge store.""" |
| with open(path, "r", errors="ignore") as f: |
| text = f.read() |
| return self.add_text(text, source=os.path.basename(path)) |
|
|
| @classmethod |
| def from_texts(cls, texts: list[str], **kwargs) -> "KnowledgeStore": |
| """Create from a list of text strings.""" |
| store = cls(**kwargs) |
| for i, text in enumerate(texts): |
| store.add_text(text, source=f"text_{i}") |
| return store |
|
|
| @classmethod |
| def from_directory(cls, path: str, glob: str = "*.txt", **kwargs) -> "KnowledgeStore": |
| """Create from all matching files in a directory.""" |
| store = cls(**kwargs) |
| p = Path(path) |
| for file in sorted(p.glob(glob)): |
| store.add_file(str(file)) |
| logger.info(f"KnowledgeStore: Loaded {len(store._chunks)} chunks from {path}") |
| return store |
|
|
| def query(self, query: str, top_k: int | None = None) -> list[dict[str, Any]]: |
| """Retrieve the most relevant chunks for a query.""" |
| k = top_k or self.top_k |
| if not self._chunks: |
| return [] |
|
|
| query_emb = self._embed(query) |
| scored = [] |
| for chunk in self._chunks: |
| sim = self._cosine_sim(query_emb, chunk["embedding"]) |
| scored.append((sim, chunk)) |
| scored.sort(key=lambda x: -x[0]) |
|
|
| return [ |
| {"text": c["text"], "source": c["source"], "score": round(s, 3)} |
| for s, c in scored[:k] |
| ] |
|
|
| def as_tool(self, name: str = "knowledge_search", description: str | None = None) -> Tool: |
| """ |
| Convert this KnowledgeStore into a Tool that any Agent can use. |
| |
| This is the LlamaIndex QueryEngineTool pattern β RAG as a tool. |
| The agent decides WHEN to retrieve (agentic RAG), rather than |
| always retrieving (traditional RAG pipeline). |
| """ |
| desc = description or ( |
| f"Search the knowledge base ({len(self._chunks)} chunks). " |
| f"Use this to find specific information from documents." |
| ) |
| store = self |
|
|
| class _KnowledgeTool(Tool): |
| name_attr = name |
| description_attr = desc |
| parameters = { |
| "type": "object", |
| "properties": { |
| "query": { |
| "type": "string", |
| "description": "Search query β use specific terms, not questions", |
| } |
| }, |
| "required": ["query"], |
| } |
|
|
| def __init__(self_tool): |
| self_tool.name = name |
| self_tool.description = desc |
|
|
| def execute(self_tool, query: str) -> str: |
| results = store.query(query) |
| if not results: |
| return "No relevant documents found." |
| parts = [] |
| for i, r in enumerate(results, 1): |
| parts.append(f"[{i}] (score={r['score']}, source={r['source']})\n{r['text']}") |
| return "\n\n".join(parts) |
|
|
| return _KnowledgeTool() |
|
|
| @property |
| def size(self) -> int: |
| return len(self._chunks) |
|
|
| |
|
|
| def _chunk_text(self, text: str) -> list[str]: |
| """Split text into overlapping chunks.""" |
| if len(text) <= self.chunk_size: |
| return [text] if text.strip() else [] |
|
|
| chunks = [] |
| start = 0 |
| while start < len(text): |
| end = start + self.chunk_size |
| chunk = text[start:end].strip() |
| if chunk: |
| chunks.append(chunk) |
| start += self.chunk_size - self.chunk_overlap |
| return chunks |
|
|
| @staticmethod |
| def _embed(text: str) -> list[float]: |
| """Lightweight trigram embedding (same as ExperienceReplay).""" |
| dim = 128 |
| vec = [0.0] * dim |
| text_lower = text.lower() |
| for i in range(len(text_lower) - 2): |
| trigram = text_lower[i:i + 3] |
| h = hash(trigram) % dim |
| vec[h] += 1.0 |
| magnitude = math.sqrt(sum(x * x for x in vec)) |
| if magnitude > 0: |
| vec = [x / magnitude for x in vec] |
| return vec |
|
|
| @staticmethod |
| def _cosine_sim(a: list[float], b: list[float]) -> float: |
| if not a or not b or len(a) != len(b): |
| return 0.0 |
| dot = sum(x * y for x, y in zip(a, b)) |
| mag_a = math.sqrt(sum(x * x for x in a)) |
| mag_b = math.sqrt(sum(x * x for x in b)) |
| if mag_a == 0 or mag_b == 0: |
| return 0.0 |
| return dot / (mag_a * mag_b) |
|
|
|
|
| |
| |
| |
| |
|
|
| |
| Spark = Agent |
| Flow = Graph |
| swarm = parallel |
| Council = Conversation |
| Vault = KnowledgeStore |
| BEGIN = START |
| DONE_SIGNAL = END |
|
|