Rohan03's picture
Fix model resolution bug for non-Ollama models in easy API (v3.0.1)
936a07c
"""
Unified Capabilities β€” Five competing framework philosophies in one composable layer.
LangGraph β†’ "I want control" β†’ GraphOrchestrator (conditional edges, cycles, fan-out/fan-in)
CrewAI β†’ "I want speed" β†’ ParallelRunner (concurrent tasks, parallel fan-out)
AutoGen β†’ "I want agents talking" β†’ Conversation (agent-to-agent message passing, group chat)
OpenAI SDK β†’ "I want plug-and-play" β†’ Agent() one-liner factory
LlamaIndex β†’ "I want knowledge" β†’ KnowledgeStore (RAG-as-a-tool, chunk + embed + retrieve)
Design principle: ZERO changes to the existing Orchestrator/Actor/PurposeFunction.
Each capability is a composable layer that calls the existing modules.
The self-improvement loop (Ξ¦ scoring β†’ experience replay β†’ heuristic distillation)
runs INSIDE each capability automatically β€” every graph node, every parallel task,
every conversation turn feeds the same learning loop.
Usage:
# Plug-and-play (OpenAI SDK simplicity)
agent = Agent("researcher", model="qwen3:1.7b", tools=[SearchTool()])
result = agent.run("Find information about X")
# Control flow (LangGraph power)
graph = Graph()
graph.add_node("research", research_agent)
graph.add_node("write", writer_agent)
graph.add_edge("research", "write")
graph.add_conditional_edge("write", review_fn, {"pass": END, "fail": "research"})
result = graph.run(initial_state)
# Speed (CrewAI parallelism)
results = parallel([task1, task2, task3], agents=[a1, a2, a3])
# Conversation (AutoGen talking)
chat = Conversation([researcher, coder, reviewer])
result = chat.run("Build a web scraper", rounds=5)
# Knowledge (LlamaIndex RAG)
kb = KnowledgeStore.from_directory("./docs")
agent = Agent("assistant", tools=[kb.as_tool()])
result = agent.run("What does the documentation say about X?")
"""
from __future__ import annotations
import asyncio
import json
import logging
import math
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Iterator
from purpose_agent.types import (
Action, Heuristic, MemoryTier, PurposeScore, State,
Trajectory, TrajectoryStep,
)
from purpose_agent.llm_backend import LLMBackend, MockLLMBackend, ChatMessage
from purpose_agent.actor import Actor
from purpose_agent.purpose_function import PurposeFunction
from purpose_agent.experience_replay import ExperienceReplay
from purpose_agent.optimizer import HeuristicOptimizer
from purpose_agent.orchestrator import (
Environment, Orchestrator, SimpleEnvironment, TaskResult,
)
from purpose_agent.tools import Tool, FunctionTool, ToolResult, ToolRegistry
logger = logging.getLogger(__name__)
# Sentinel for graph end node
END = "__END__"
START = "__START__"
import threading
# Global lock for shared replay/optimizer in parallel execution
_parallel_lock = threading.Lock()
# ═══════════════════════════════════════════════════════════════════════════
# 1. PLUG-AND-PLAY β€” Agent() one-liner factory (OpenAI Agents SDK simplicity)
# ═══════════════════════════════════════════════════════════════════════════
class Agent:
"""
One-liner agent factory. The simplest way to create and run an agent.
Inspired by OpenAI Agents SDK: Agent(name, instructions, tools) β†’ run(task).
But ours self-improves. Every run feeds the Ξ¦ loop.
Usage:
# Minimal (uses mock for testing)
agent = Agent("helper")
result = agent.run("Do something")
# With local SLM
agent = Agent("coder", model="qwen3:1.7b", tools=[PythonExecTool()])
result = agent.run("Write a sorting algorithm")
# With cloud LLM
agent = Agent("analyst", model="gpt-4o", api_key="sk-...")
result = agent.run("Analyze this data")
# Handoff to another agent
agent_a = Agent("researcher", model="qwen3:1.7b")
agent_b = Agent("writer", model="phi4-mini", handoff_from=agent_a)
# agent_b inherits agent_a's experience replay
"""
def __init__(
self,
name: str = "agent",
instructions: str = "",
model: str | LLMBackend | None = None,
tools: list[Tool] | None = None,
api_key: str | None = None,
max_steps: int = 15,
handoff_from: "Agent | None" = None,
persistence_dir: str | None = None,
):
self.name = name
self.instructions = instructions
self.max_steps = max_steps
# Resolve LLM backend
if model is None:
self.llm = MockLLMBackend()
elif isinstance(model, str):
self.llm = self._resolve_model(model, api_key)
else:
self.llm = model
# Build available actions from tools
available_actions = {"DONE": "Signal task completion"}
self._tools = {}
for tool in (tools or []):
available_actions[tool.name] = tool.description
self._tools[tool.name] = tool
# Build environment that executes tools
self._env = _ToolEnvironment(self._tools)
# Create orchestrator
self.orch = Orchestrator(
llm=self.llm,
environment=self._env,
available_actions=available_actions,
persistence_dir=persistence_dir or f"./.purpose_agent/{name}",
)
# Handoff: inherit experience from another agent
if handoff_from:
self.orch.experience_replay = handoff_from.orch.experience_replay
self.orch.optimizer = handoff_from.orch.optimizer
self.orch.sync_memory()
# Inject custom instructions into actor's strategic memory
if instructions:
h = Heuristic(
pattern="Always", strategy=instructions, steps=[],
tier=MemoryTier.STRATEGIC, q_value=1.0,
)
self.orch.optimizer.heuristic_library.append(h)
self.orch.sync_memory()
def run(self, task: str, state: State | None = None) -> TaskResult:
"""Run a task. Returns TaskResult with trajectory, final state, success."""
return self.orch.run_task(
purpose=task,
initial_state=state or State(data={}),
max_steps=self.max_steps,
)
def __call__(self, task: str, **kwargs) -> TaskResult:
return self.run(task, **kwargs)
@staticmethod
def _resolve_model(model: str, api_key: str | None = None) -> LLMBackend:
"""Resolve a model string to an LLMBackend."""
from purpose_agent.slm_backends import SLM_REGISTRY
# Known SLM registry keys
if model in SLM_REGISTRY:
from purpose_agent.slm_backends import create_slm_backend
return create_slm_backend(model)
# Delegate to the centralized resolver for all other models (e.g. groq:, openai:)
from purpose_agent.llm_backend import resolve_backend
return resolve_backend(model, api_key)
class _ToolEnvironment(Environment):
"""Environment that executes tools based on action names."""
def __init__(self, tools: dict[str, Tool]):
self._tools = tools
def execute(self, action: Action, current_state: State) -> State:
tool = self._tools.get(action.name)
if not tool:
return State(
data={**current_state.data, "_last_result": f"Unknown tool: {action.name}"},
summary=f"Error: Unknown tool '{action.name}'",
)
result = tool.run(**action.params)
new_data = {**current_state.data, "_last_result": result.output, "_last_tool": action.name}
if not result.success:
new_data["_last_error"] = result.error
return State(data=new_data, summary=result.output[:500])
def reset(self) -> State:
return State(data={})
# ═══════════════════════════════════════════════════════════════════════════
# 2. CONTROL β€” Graph execution engine (LangGraph-style)
# ═══════════════════════════════════════════════════════════════════════════
@dataclass
class GraphNode:
"""A node in the execution graph."""
name: str
handler: Callable[[State], State | TaskResult] | Agent
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass
class GraphEdge:
"""An edge in the execution graph."""
source: str
target: str
condition: Callable[[State], bool] | None = None # None = unconditional
class Graph:
"""
Graph-based workflow execution β€” LangGraph's control, with Ξ¦ self-improvement.
Supports: conditional branching, cycles (loops), parallel fan-out/fan-in.
Every node that runs an Agent automatically feeds the Ξ¦ improvement loop.
Usage:
graph = Graph()
# Add nodes (agents or functions)
graph.add_node("research", Agent("researcher", model="qwen3:1.7b"))
graph.add_node("write", Agent("writer", model="phi4-mini"))
graph.add_node("review", lambda state: review_fn(state))
# Linear flow
graph.add_edge(START, "research")
graph.add_edge("research", "write")
# Conditional branching (cycle back on failure)
graph.add_conditional_edge("write", "review",
condition_map={"pass": END, "revise": "write"})
result = graph.run(State(data={"topic": "AI safety"}))
"""
def __init__(self):
self._nodes: dict[str, GraphNode] = {}
self._edges: list[GraphEdge] = []
self._conditional_edges: dict[str, dict] = {} # source β†’ {condition_fn, map}
self._entry: str | None = None
def add_node(self, name: str, handler: Callable | Agent) -> "Graph":
"""Add a node. Handler is either an Agent or a function(State) β†’ State."""
self._nodes[name] = GraphNode(name=name, handler=handler)
return self
def add_edge(self, source: str, target: str) -> "Graph":
"""Add an unconditional edge."""
self._edges.append(GraphEdge(source=source, target=target))
if source == START:
self._entry = target
return self
def add_conditional_edge(
self,
source: str,
evaluator: str | Callable[[State], str],
condition_map: dict[str, str] | None = None,
) -> "Graph":
"""
Add a conditional edge. After source node runs, evaluator determines next node.
evaluator: A function(State) β†’ str (returns key from condition_map)
OR a node name that will be run to produce the routing decision
condition_map: {"key": "target_node"} β€” maps evaluator output to next node.
Use END as target to terminate.
"""
self._conditional_edges[source] = {
"evaluator": evaluator,
"map": condition_map or {},
}
return self
def run(
self,
initial_state: State | None = None,
max_iterations: int = 20,
) -> State:
"""Execute the graph from START to END."""
state = initial_state or State(data={})
if not self._entry:
# Auto-detect entry: first node added
if self._nodes:
self._entry = list(self._nodes.keys())[0]
else:
raise ValueError("Graph has no nodes")
current = self._entry
visited_count: dict[str, int] = {}
for iteration in range(max_iterations):
if current == END:
logger.info(f"Graph: Reached END after {iteration} iterations")
break
if current not in self._nodes:
raise ValueError(f"Graph: Unknown node '{current}'")
visited_count[current] = visited_count.get(current, 0) + 1
logger.info(f"Graph: Executing node '{current}' (visit #{visited_count[current]})")
# Execute node
node = self._nodes[current]
state = self._execute_node(node, state)
# Determine next node
if current in self._conditional_edges:
cond = self._conditional_edges[current]
evaluator = cond["evaluator"]
cond_map = cond["map"]
# Get routing decision
if callable(evaluator):
route_key = evaluator(state)
else:
route_key = str(state.data.get("_route", "default"))
current = cond_map.get(route_key, cond_map.get("default", END))
logger.info(f"Graph: Conditional route '{route_key}' β†’ '{current}'")
else:
# Find unconditional edge
next_node = None
for edge in self._edges:
if edge.source == current:
next_node = edge.target
break
current = next_node or END
else:
logger.warning(f"Graph: Hit max iterations ({max_iterations})")
return state
def _execute_node(self, node: GraphNode, state: State) -> State:
"""Execute a single node β€” Agent or function."""
handler = node.handler
if isinstance(handler, Agent):
# Run the agent on the current state, extract purpose from state data
purpose = state.data.get("_purpose", state.data.get("task", f"Execute {node.name}"))
result = handler.run(purpose, state=state)
# Merge agent's final state into the graph state
merged = {**state.data, **result.final_state.data}
merged["_last_node"] = node.name
merged["_last_success"] = result.success
merged["_last_phi"] = result.final_phi
return State(data=merged, summary=result.final_state.summary)
elif callable(handler):
result = handler(state)
if isinstance(result, State):
return result
elif isinstance(result, TaskResult):
return result.final_state
else:
return State(data={**state.data, "_result": str(result)})
raise ValueError(f"Invalid node handler type: {type(handler)}")
# ═══════════════════════════════════════════════════════════════════════════
# 3. SPEED β€” Parallel execution (CrewAI-style)
# ═══════════════════════════════════════════════════════════════════════════
def parallel(
tasks: list[str] | list[dict[str, Any]],
agents: list[Agent] | Agent | None = None,
max_workers: int | None = None,
initial_states: list[State] | None = None,
) -> list[TaskResult]:
"""
Run multiple tasks in parallel β€” CrewAI's speed, with Ξ¦ self-improvement.
Every parallel task feeds the same improvement loop, so agents learn
even from concurrent executions.
Usage:
# Same agent, multiple tasks
agent = Agent("worker", model="qwen3:1.7b")
results = parallel(["task 1", "task 2", "task 3"], agent)
# Different agents for different tasks
results = parallel(
["research X", "code Y", "review Z"],
agents=[researcher, coder, reviewer],
)
# Dict-based tasks with metadata
results = parallel([
{"purpose": "research X", "max_steps": 10},
{"purpose": "code Y", "max_steps": 20},
], agent)
"""
# Normalize tasks
normalized: list[dict] = []
for t in tasks:
if isinstance(t, str):
normalized.append({"purpose": t})
else:
normalized.append(t)
# Normalize agents
if agents is None:
agent_list = [Agent("worker")] * len(normalized)
elif isinstance(agents, Agent):
agent_list = [agents] * len(normalized)
else:
if len(agents) < len(normalized):
# Cycle agents
agent_list = [agents[i % len(agents)] for i in range(len(normalized))]
else:
agent_list = agents
states = initial_states or [None] * len(normalized)
# Thread safety: detect backend type for concurrency limit
# Local backends (Ollama, llama-cpp) share one GPU/CPU β€” serialize
# Cloud/API backends can parallelize
if max_workers is None:
sample_agent = agent_list[0] if agent_list else None
if sample_agent and hasattr(sample_agent, 'llm'):
backend_type = type(sample_agent.llm).__name__
if backend_type in ("OllamaBackend", "LlamaCppBackend", "MockLLMBackend"):
workers = 1 # Local model β€” serialize to avoid contention
else:
workers = min(len(normalized), 8)
else:
workers = min(len(normalized), 8)
else:
workers = max_workers
logger.info(f"Parallel: Running {len(normalized)} tasks with {workers} workers")
def _run_one(idx: int) -> TaskResult:
task = normalized[idx]
agent = agent_list[idx]
state = states[idx]
# Lock around shared replay/optimizer writes
with _parallel_lock:
return agent.run(task["purpose"], state=state)
results: list[TaskResult | None] = [None] * len(normalized)
with ThreadPoolExecutor(max_workers=workers) as executor:
future_to_idx = {
executor.submit(_run_one, i): i
for i in range(len(normalized))
}
for future in as_completed(future_to_idx):
idx = future_to_idx[future]
try:
results[idx] = future.result()
logger.info(f"Parallel: Task {idx} completed β€” success={results[idx].success}")
except Exception as e:
logger.error(f"Parallel: Task {idx} failed β€” {e}")
results[idx] = TaskResult(
trajectory=Trajectory(
task_description=normalized[idx]["purpose"],
purpose=normalized[idx]["purpose"],
),
final_state=State(data={"_error": str(e)}),
)
return results
# ═══════════════════════════════════════════════════════════════════════════
# 4. CONVERSATION β€” Agent-to-agent messaging (AutoGen-style)
# ═══════════════════════════════════════════════════════════════════════════
@dataclass
class Message:
"""A message in an agent conversation."""
sender: str
content: str
timestamp: float = field(default_factory=time.time)
metadata: dict[str, Any] = field(default_factory=dict)
class Conversation:
"""
Multi-agent conversation β€” AutoGen's talking, with Ξ¦ self-improvement.
Agents take turns speaking. Each agent sees the full conversation history
and contributes its perspective. The conversation continues for N rounds
or until agents converge on a solution.
Every agent's turn feeds the Ξ¦ loop β€” agents learn from conversations.
Usage:
researcher = Agent("researcher", model="qwen3:1.7b")
coder = Agent("coder", model="phi4-mini")
reviewer = Agent("reviewer", model="qwen3:1.7b")
chat = Conversation([researcher, coder, reviewer])
result = chat.run("Build a web scraper for news articles", rounds=5)
# Access conversation history
for msg in chat.history:
print(f"{msg.sender}: {msg.content[:100]}")
"""
def __init__(
self,
agents: list[Agent],
moderator: Agent | LLMBackend | None = None,
speaker_selection: str = "round_robin", # "round_robin", "auto", "manual"
):
self.agents = {a.name: a for a in agents}
self.agent_order = [a.name for a in agents]
self.moderator = moderator
self.speaker_selection = speaker_selection
self.history: list[Message] = []
def run(
self,
topic: str,
rounds: int = 3,
initial_context: str = "",
) -> State:
"""
Run a conversation about a topic for N rounds.
Returns final State with conversation results.
"""
self.history = [Message(sender="system", content=f"Topic: {topic}")]
if initial_context:
self.history.append(Message(sender="system", content=initial_context))
logger.info(f"Conversation: Starting '{topic}' with {list(self.agents.keys())}")
for round_num in range(rounds):
logger.info(f"Conversation: Round {round_num + 1}/{rounds}")
for agent_name in self._get_speaker_order(round_num):
agent = self.agents[agent_name]
# Build the conversation state for this agent
conv_text = self._format_history()
state = State(
data={
"conversation": conv_text,
"topic": topic,
"round": round_num + 1,
"role": agent_name,
},
summary=f"Conversation round {round_num + 1}. Topic: {topic}\n\n{conv_text}",
)
# Agent responds (this feeds the Ξ¦ loop!)
purpose = (
f"You are '{agent_name}' in a team discussion about: {topic}. "
f"Read the conversation so far and contribute your expert perspective. "
f"Be concise and actionable."
)
result = agent.run(purpose, state=state)
# Extract the agent's contribution
response = result.final_state.data.get(
"_last_result",
result.final_state.summary or "(no response)",
)
self.history.append(Message(
sender=agent_name,
content=response,
metadata={
"round": round_num + 1,
"phi": result.final_phi,
"success": result.success,
},
))
logger.info(f" {agent_name}: {response[:100]}...")
# Build final state with full conversation
return State(
data={
"topic": topic,
"rounds": rounds,
"messages": [
{"sender": m.sender, "content": m.content}
for m in self.history
],
"final_summary": self.history[-1].content if self.history else "",
},
summary=self._format_history(),
)
def _get_speaker_order(self, round_num: int) -> list[str]:
"""Determine speaking order for a round."""
if self.speaker_selection == "round_robin":
return self.agent_order
elif self.speaker_selection == "auto":
# Reverse every other round for variety
order = list(self.agent_order)
if round_num % 2 == 1:
order.reverse()
return order
return self.agent_order
def _format_history(self) -> str:
"""Format conversation history as text."""
lines = []
for msg in self.history:
if msg.sender == "system":
lines.append(f"[System] {msg.content}")
else:
lines.append(f"[{msg.sender}] {msg.content}")
return "\n\n".join(lines)
# ═══════════════════════════════════════════════════════════════════════════
# 5. KNOWLEDGE β€” RAG-as-a-tool (LlamaIndex-style)
# ═══════════════════════════════════════════════════════════════════════════
class KnowledgeStore:
"""
Knowledge-aware agents β€” LlamaIndex's RAG, as a simple Tool.
Chunks documents, embeds them, retrieves relevant chunks for queries.
Plugs into any Agent as a tool β€” the agent decides when to retrieve.
No external dependencies. Uses the same trigram embedding as ExperienceReplay.
For production, swap in sentence-transformers via EmbeddingBackend.
Usage:
# From files
kb = KnowledgeStore.from_directory("./docs", glob="*.md")
# From strings
kb = KnowledgeStore.from_texts([
"Python was created by Guido van Rossum.",
"Python 3.12 added PEP 695 type aliases.",
])
# As a tool for any agent
agent = Agent("assistant", tools=[kb.as_tool()])
result = agent.run("What PEP was added in Python 3.12?")
# Direct query
results = kb.query("type aliases", top_k=3)
"""
def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50, top_k: int = 5):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.top_k = top_k
self._chunks: list[dict[str, Any]] = [] # {text, embedding, source, index}
def add_text(self, text: str, source: str = "unknown") -> int:
"""Add a text document β€” auto-chunks and embeds."""
chunks = self._chunk_text(text)
count = 0
for chunk in chunks:
embedding = self._embed(chunk)
self._chunks.append({
"text": chunk,
"embedding": embedding,
"source": source,
"index": len(self._chunks),
})
count += 1
return count
def add_file(self, path: str) -> int:
"""Add a file to the knowledge store."""
with open(path, "r", errors="ignore") as f:
text = f.read()
return self.add_text(text, source=os.path.basename(path))
@classmethod
def from_texts(cls, texts: list[str], **kwargs) -> "KnowledgeStore":
"""Create from a list of text strings."""
store = cls(**kwargs)
for i, text in enumerate(texts):
store.add_text(text, source=f"text_{i}")
return store
@classmethod
def from_directory(cls, path: str, glob: str = "*.txt", **kwargs) -> "KnowledgeStore":
"""Create from all matching files in a directory."""
store = cls(**kwargs)
p = Path(path)
for file in sorted(p.glob(glob)):
store.add_file(str(file))
logger.info(f"KnowledgeStore: Loaded {len(store._chunks)} chunks from {path}")
return store
def query(self, query: str, top_k: int | None = None) -> list[dict[str, Any]]:
"""Retrieve the most relevant chunks for a query."""
k = top_k or self.top_k
if not self._chunks:
return []
query_emb = self._embed(query)
scored = []
for chunk in self._chunks:
sim = self._cosine_sim(query_emb, chunk["embedding"])
scored.append((sim, chunk))
scored.sort(key=lambda x: -x[0])
return [
{"text": c["text"], "source": c["source"], "score": round(s, 3)}
for s, c in scored[:k]
]
def as_tool(self, name: str = "knowledge_search", description: str | None = None) -> Tool:
"""
Convert this KnowledgeStore into a Tool that any Agent can use.
This is the LlamaIndex QueryEngineTool pattern β€” RAG as a tool.
The agent decides WHEN to retrieve (agentic RAG), rather than
always retrieving (traditional RAG pipeline).
"""
desc = description or (
f"Search the knowledge base ({len(self._chunks)} chunks). "
f"Use this to find specific information from documents."
)
store = self
class _KnowledgeTool(Tool):
name_attr = name
description_attr = desc
parameters = {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query β€” use specific terms, not questions",
}
},
"required": ["query"],
}
def __init__(self_tool):
self_tool.name = name
self_tool.description = desc
def execute(self_tool, query: str) -> str:
results = store.query(query)
if not results:
return "No relevant documents found."
parts = []
for i, r in enumerate(results, 1):
parts.append(f"[{i}] (score={r['score']}, source={r['source']})\n{r['text']}")
return "\n\n".join(parts)
return _KnowledgeTool()
@property
def size(self) -> int:
return len(self._chunks)
# --- Internal ---
def _chunk_text(self, text: str) -> list[str]:
"""Split text into overlapping chunks."""
if len(text) <= self.chunk_size:
return [text] if text.strip() else []
chunks = []
start = 0
while start < len(text):
end = start + self.chunk_size
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
start += self.chunk_size - self.chunk_overlap
return chunks
@staticmethod
def _embed(text: str) -> list[float]:
"""Lightweight trigram embedding (same as ExperienceReplay)."""
dim = 128
vec = [0.0] * dim
text_lower = text.lower()
for i in range(len(text_lower) - 2):
trigram = text_lower[i:i + 3]
h = hash(trigram) % dim
vec[h] += 1.0
magnitude = math.sqrt(sum(x * x for x in vec))
if magnitude > 0:
vec = [x / magnitude for x in vec]
return vec
@staticmethod
def _cosine_sim(a: list[float], b: list[float]) -> float:
if not a or not b or len(a) != len(b):
return 0.0
dot = sum(x * y for x, y in zip(a, b))
mag_a = math.sqrt(sum(x * x for x in a))
mag_b = math.sqrt(sum(x * x for x in b))
if mag_a == 0 or mag_b == 0:
return 0.0
return dot / (mag_a * mag_b)
# ═══════════════════════════════════════════════════════════════════════════
# CREATIVE ALIASES β€” Purpose Agent's own names (primary)
# Old names kept for backward compatibility
# ═══════════════════════════════════════════════════════════════════════════
# New names (use these)
Spark = Agent # A spark of intelligence β€” the atomic agent unit
Flow = Graph # Data flows through nodes β€” workflow engine
swarm = parallel # Agents working concurrently like a swarm
Council = Conversation # Agents deliberate like a council
Vault = KnowledgeStore # Knowledge vault β€” RAG as a tool
BEGIN = START
DONE_SIGNAL = END