""" Unified Capabilities — Five competing framework philosophies in one composable layer. LangGraph → "I want control" → GraphOrchestrator (conditional edges, cycles, fan-out/fan-in) CrewAI → "I want speed" → ParallelRunner (concurrent tasks, parallel fan-out) AutoGen → "I want agents talking" → Conversation (agent-to-agent message passing, group chat) OpenAI SDK → "I want plug-and-play" → Agent() one-liner factory LlamaIndex → "I want knowledge" → KnowledgeStore (RAG-as-a-tool, chunk + embed + retrieve) Design principle: ZERO changes to the existing Orchestrator/Actor/PurposeFunction. Each capability is a composable layer that calls the existing modules. The self-improvement loop (Φ scoring → experience replay → heuristic distillation) runs INSIDE each capability automatically — every graph node, every parallel task, every conversation turn feeds the same learning loop. Usage: # Plug-and-play (OpenAI SDK simplicity) agent = Agent("researcher", model="qwen3:1.7b", tools=[SearchTool()]) result = agent.run("Find information about X") # Control flow (LangGraph power) graph = Graph() graph.add_node("research", research_agent) graph.add_node("write", writer_agent) graph.add_edge("research", "write") graph.add_conditional_edge("write", review_fn, {"pass": END, "fail": "research"}) result = graph.run(initial_state) # Speed (CrewAI parallelism) results = parallel([task1, task2, task3], agents=[a1, a2, a3]) # Conversation (AutoGen talking) chat = Conversation([researcher, coder, reviewer]) result = chat.run("Build a web scraper", rounds=5) # Knowledge (LlamaIndex RAG) kb = KnowledgeStore.from_directory("./docs") agent = Agent("assistant", tools=[kb.as_tool()]) result = agent.run("What does the documentation say about X?") """ from __future__ import annotations import asyncio import json import logging import math import os import time from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass, field from pathlib import Path from typing import Any, Callable, Iterator from purpose_agent.types import ( Action, Heuristic, MemoryTier, PurposeScore, State, Trajectory, TrajectoryStep, ) from purpose_agent.llm_backend import LLMBackend, MockLLMBackend, ChatMessage from purpose_agent.actor import Actor from purpose_agent.purpose_function import PurposeFunction from purpose_agent.experience_replay import ExperienceReplay from purpose_agent.optimizer import HeuristicOptimizer from purpose_agent.orchestrator import ( Environment, Orchestrator, SimpleEnvironment, TaskResult, ) from purpose_agent.tools import Tool, FunctionTool, ToolResult, ToolRegistry logger = logging.getLogger(__name__) # Sentinel for graph end node END = "__END__" START = "__START__" import threading # Global lock for shared replay/optimizer in parallel execution _parallel_lock = threading.Lock() # ═══════════════════════════════════════════════════════════════════════════ # 1. PLUG-AND-PLAY — Agent() one-liner factory (OpenAI Agents SDK simplicity) # ═══════════════════════════════════════════════════════════════════════════ class Agent: """ One-liner agent factory. The simplest way to create and run an agent. Inspired by OpenAI Agents SDK: Agent(name, instructions, tools) → run(task). But ours self-improves. Every run feeds the Φ loop. Usage: # Minimal (uses mock for testing) agent = Agent("helper") result = agent.run("Do something") # With local SLM agent = Agent("coder", model="qwen3:1.7b", tools=[PythonExecTool()]) result = agent.run("Write a sorting algorithm") # With cloud LLM agent = Agent("analyst", model="gpt-4o", api_key="sk-...") result = agent.run("Analyze this data") # Handoff to another agent agent_a = Agent("researcher", model="qwen3:1.7b") agent_b = Agent("writer", model="phi4-mini", handoff_from=agent_a) # agent_b inherits agent_a's experience replay """ def __init__( self, name: str = "agent", instructions: str = "", model: str | LLMBackend | None = None, tools: list[Tool] | None = None, api_key: str | None = None, max_steps: int = 15, handoff_from: "Agent | None" = None, persistence_dir: str | None = None, ): self.name = name self.instructions = instructions self.max_steps = max_steps # Resolve LLM backend if model is None: self.llm = MockLLMBackend() elif isinstance(model, str): self.llm = self._resolve_model(model, api_key) else: self.llm = model # Build available actions from tools available_actions = {"DONE": "Signal task completion"} self._tools = {} for tool in (tools or []): available_actions[tool.name] = tool.description self._tools[tool.name] = tool # Build environment that executes tools self._env = _ToolEnvironment(self._tools) # Create orchestrator self.orch = Orchestrator( llm=self.llm, environment=self._env, available_actions=available_actions, persistence_dir=persistence_dir or f"./.purpose_agent/{name}", ) # Handoff: inherit experience from another agent if handoff_from: self.orch.experience_replay = handoff_from.orch.experience_replay self.orch.optimizer = handoff_from.orch.optimizer self.orch.sync_memory() # Inject custom instructions into actor's strategic memory if instructions: h = Heuristic( pattern="Always", strategy=instructions, steps=[], tier=MemoryTier.STRATEGIC, q_value=1.0, ) self.orch.optimizer.heuristic_library.append(h) self.orch.sync_memory() def run(self, task: str, state: State | None = None) -> TaskResult: """Run a task. Returns TaskResult with trajectory, final state, success.""" return self.orch.run_task( purpose=task, initial_state=state or State(data={}), max_steps=self.max_steps, ) def __call__(self, task: str, **kwargs) -> TaskResult: return self.run(task, **kwargs) @staticmethod def _resolve_model(model: str, api_key: str | None = None) -> LLMBackend: """Resolve a model string to an LLMBackend.""" from purpose_agent.slm_backends import SLM_REGISTRY # Known SLM registry keys if model in SLM_REGISTRY: from purpose_agent.slm_backends import create_slm_backend return create_slm_backend(model) # Delegate to the centralized resolver for all other models (e.g. groq:, openai:) from purpose_agent.llm_backend import resolve_backend return resolve_backend(model, api_key) class _ToolEnvironment(Environment): """Environment that executes tools based on action names.""" def __init__(self, tools: dict[str, Tool]): self._tools = tools def execute(self, action: Action, current_state: State) -> State: tool = self._tools.get(action.name) if not tool: return State( data={**current_state.data, "_last_result": f"Unknown tool: {action.name}"}, summary=f"Error: Unknown tool '{action.name}'", ) result = tool.run(**action.params) new_data = {**current_state.data, "_last_result": result.output, "_last_tool": action.name} if not result.success: new_data["_last_error"] = result.error return State(data=new_data, summary=result.output[:500]) def reset(self) -> State: return State(data={}) # ═══════════════════════════════════════════════════════════════════════════ # 2. CONTROL — Graph execution engine (LangGraph-style) # ═══════════════════════════════════════════════════════════════════════════ @dataclass class GraphNode: """A node in the execution graph.""" name: str handler: Callable[[State], State | TaskResult] | Agent metadata: dict[str, Any] = field(default_factory=dict) @dataclass class GraphEdge: """An edge in the execution graph.""" source: str target: str condition: Callable[[State], bool] | None = None # None = unconditional class Graph: """ Graph-based workflow execution — LangGraph's control, with Φ self-improvement. Supports: conditional branching, cycles (loops), parallel fan-out/fan-in. Every node that runs an Agent automatically feeds the Φ improvement loop. Usage: graph = Graph() # Add nodes (agents or functions) graph.add_node("research", Agent("researcher", model="qwen3:1.7b")) graph.add_node("write", Agent("writer", model="phi4-mini")) graph.add_node("review", lambda state: review_fn(state)) # Linear flow graph.add_edge(START, "research") graph.add_edge("research", "write") # Conditional branching (cycle back on failure) graph.add_conditional_edge("write", "review", condition_map={"pass": END, "revise": "write"}) result = graph.run(State(data={"topic": "AI safety"})) """ def __init__(self): self._nodes: dict[str, GraphNode] = {} self._edges: list[GraphEdge] = [] self._conditional_edges: dict[str, dict] = {} # source → {condition_fn, map} self._entry: str | None = None def add_node(self, name: str, handler: Callable | Agent) -> "Graph": """Add a node. Handler is either an Agent or a function(State) → State.""" self._nodes[name] = GraphNode(name=name, handler=handler) return self def add_edge(self, source: str, target: str) -> "Graph": """Add an unconditional edge.""" self._edges.append(GraphEdge(source=source, target=target)) if source == START: self._entry = target return self def add_conditional_edge( self, source: str, evaluator: str | Callable[[State], str], condition_map: dict[str, str] | None = None, ) -> "Graph": """ Add a conditional edge. After source node runs, evaluator determines next node. evaluator: A function(State) → str (returns key from condition_map) OR a node name that will be run to produce the routing decision condition_map: {"key": "target_node"} — maps evaluator output to next node. Use END as target to terminate. """ self._conditional_edges[source] = { "evaluator": evaluator, "map": condition_map or {}, } return self def run( self, initial_state: State | None = None, max_iterations: int = 20, ) -> State: """Execute the graph from START to END.""" state = initial_state or State(data={}) if not self._entry: # Auto-detect entry: first node added if self._nodes: self._entry = list(self._nodes.keys())[0] else: raise ValueError("Graph has no nodes") current = self._entry visited_count: dict[str, int] = {} for iteration in range(max_iterations): if current == END: logger.info(f"Graph: Reached END after {iteration} iterations") break if current not in self._nodes: raise ValueError(f"Graph: Unknown node '{current}'") visited_count[current] = visited_count.get(current, 0) + 1 logger.info(f"Graph: Executing node '{current}' (visit #{visited_count[current]})") # Execute node node = self._nodes[current] state = self._execute_node(node, state) # Determine next node if current in self._conditional_edges: cond = self._conditional_edges[current] evaluator = cond["evaluator"] cond_map = cond["map"] # Get routing decision if callable(evaluator): route_key = evaluator(state) else: route_key = str(state.data.get("_route", "default")) current = cond_map.get(route_key, cond_map.get("default", END)) logger.info(f"Graph: Conditional route '{route_key}' → '{current}'") else: # Find unconditional edge next_node = None for edge in self._edges: if edge.source == current: next_node = edge.target break current = next_node or END else: logger.warning(f"Graph: Hit max iterations ({max_iterations})") return state def _execute_node(self, node: GraphNode, state: State) -> State: """Execute a single node — Agent or function.""" handler = node.handler if isinstance(handler, Agent): # Run the agent on the current state, extract purpose from state data purpose = state.data.get("_purpose", state.data.get("task", f"Execute {node.name}")) result = handler.run(purpose, state=state) # Merge agent's final state into the graph state merged = {**state.data, **result.final_state.data} merged["_last_node"] = node.name merged["_last_success"] = result.success merged["_last_phi"] = result.final_phi return State(data=merged, summary=result.final_state.summary) elif callable(handler): result = handler(state) if isinstance(result, State): return result elif isinstance(result, TaskResult): return result.final_state else: return State(data={**state.data, "_result": str(result)}) raise ValueError(f"Invalid node handler type: {type(handler)}") # ═══════════════════════════════════════════════════════════════════════════ # 3. SPEED — Parallel execution (CrewAI-style) # ═══════════════════════════════════════════════════════════════════════════ def parallel( tasks: list[str] | list[dict[str, Any]], agents: list[Agent] | Agent | None = None, max_workers: int | None = None, initial_states: list[State] | None = None, ) -> list[TaskResult]: """ Run multiple tasks in parallel — CrewAI's speed, with Φ self-improvement. Every parallel task feeds the same improvement loop, so agents learn even from concurrent executions. Usage: # Same agent, multiple tasks agent = Agent("worker", model="qwen3:1.7b") results = parallel(["task 1", "task 2", "task 3"], agent) # Different agents for different tasks results = parallel( ["research X", "code Y", "review Z"], agents=[researcher, coder, reviewer], ) # Dict-based tasks with metadata results = parallel([ {"purpose": "research X", "max_steps": 10}, {"purpose": "code Y", "max_steps": 20}, ], agent) """ # Normalize tasks normalized: list[dict] = [] for t in tasks: if isinstance(t, str): normalized.append({"purpose": t}) else: normalized.append(t) # Normalize agents if agents is None: agent_list = [Agent("worker")] * len(normalized) elif isinstance(agents, Agent): agent_list = [agents] * len(normalized) else: if len(agents) < len(normalized): # Cycle agents agent_list = [agents[i % len(agents)] for i in range(len(normalized))] else: agent_list = agents states = initial_states or [None] * len(normalized) # Thread safety: detect backend type for concurrency limit # Local backends (Ollama, llama-cpp) share one GPU/CPU — serialize # Cloud/API backends can parallelize if max_workers is None: sample_agent = agent_list[0] if agent_list else None if sample_agent and hasattr(sample_agent, 'llm'): backend_type = type(sample_agent.llm).__name__ if backend_type in ("OllamaBackend", "LlamaCppBackend", "MockLLMBackend"): workers = 1 # Local model — serialize to avoid contention else: workers = min(len(normalized), 8) else: workers = min(len(normalized), 8) else: workers = max_workers logger.info(f"Parallel: Running {len(normalized)} tasks with {workers} workers") def _run_one(idx: int) -> TaskResult: task = normalized[idx] agent = agent_list[idx] state = states[idx] # Lock around shared replay/optimizer writes with _parallel_lock: return agent.run(task["purpose"], state=state) results: list[TaskResult | None] = [None] * len(normalized) with ThreadPoolExecutor(max_workers=workers) as executor: future_to_idx = { executor.submit(_run_one, i): i for i in range(len(normalized)) } for future in as_completed(future_to_idx): idx = future_to_idx[future] try: results[idx] = future.result() logger.info(f"Parallel: Task {idx} completed — success={results[idx].success}") except Exception as e: logger.error(f"Parallel: Task {idx} failed — {e}") results[idx] = TaskResult( trajectory=Trajectory( task_description=normalized[idx]["purpose"], purpose=normalized[idx]["purpose"], ), final_state=State(data={"_error": str(e)}), ) return results # ═══════════════════════════════════════════════════════════════════════════ # 4. CONVERSATION — Agent-to-agent messaging (AutoGen-style) # ═══════════════════════════════════════════════════════════════════════════ @dataclass class Message: """A message in an agent conversation.""" sender: str content: str timestamp: float = field(default_factory=time.time) metadata: dict[str, Any] = field(default_factory=dict) class Conversation: """ Multi-agent conversation — AutoGen's talking, with Φ self-improvement. Agents take turns speaking. Each agent sees the full conversation history and contributes its perspective. The conversation continues for N rounds or until agents converge on a solution. Every agent's turn feeds the Φ loop — agents learn from conversations. Usage: researcher = Agent("researcher", model="qwen3:1.7b") coder = Agent("coder", model="phi4-mini") reviewer = Agent("reviewer", model="qwen3:1.7b") chat = Conversation([researcher, coder, reviewer]) result = chat.run("Build a web scraper for news articles", rounds=5) # Access conversation history for msg in chat.history: print(f"{msg.sender}: {msg.content[:100]}") """ def __init__( self, agents: list[Agent], moderator: Agent | LLMBackend | None = None, speaker_selection: str = "round_robin", # "round_robin", "auto", "manual" ): self.agents = {a.name: a for a in agents} self.agent_order = [a.name for a in agents] self.moderator = moderator self.speaker_selection = speaker_selection self.history: list[Message] = [] def run( self, topic: str, rounds: int = 3, initial_context: str = "", ) -> State: """ Run a conversation about a topic for N rounds. Returns final State with conversation results. """ self.history = [Message(sender="system", content=f"Topic: {topic}")] if initial_context: self.history.append(Message(sender="system", content=initial_context)) logger.info(f"Conversation: Starting '{topic}' with {list(self.agents.keys())}") for round_num in range(rounds): logger.info(f"Conversation: Round {round_num + 1}/{rounds}") for agent_name in self._get_speaker_order(round_num): agent = self.agents[agent_name] # Build the conversation state for this agent conv_text = self._format_history() state = State( data={ "conversation": conv_text, "topic": topic, "round": round_num + 1, "role": agent_name, }, summary=f"Conversation round {round_num + 1}. Topic: {topic}\n\n{conv_text}", ) # Agent responds (this feeds the Φ loop!) purpose = ( f"You are '{agent_name}' in a team discussion about: {topic}. " f"Read the conversation so far and contribute your expert perspective. " f"Be concise and actionable." ) result = agent.run(purpose, state=state) # Extract the agent's contribution response = result.final_state.data.get( "_last_result", result.final_state.summary or "(no response)", ) self.history.append(Message( sender=agent_name, content=response, metadata={ "round": round_num + 1, "phi": result.final_phi, "success": result.success, }, )) logger.info(f" {agent_name}: {response[:100]}...") # Build final state with full conversation return State( data={ "topic": topic, "rounds": rounds, "messages": [ {"sender": m.sender, "content": m.content} for m in self.history ], "final_summary": self.history[-1].content if self.history else "", }, summary=self._format_history(), ) def _get_speaker_order(self, round_num: int) -> list[str]: """Determine speaking order for a round.""" if self.speaker_selection == "round_robin": return self.agent_order elif self.speaker_selection == "auto": # Reverse every other round for variety order = list(self.agent_order) if round_num % 2 == 1: order.reverse() return order return self.agent_order def _format_history(self) -> str: """Format conversation history as text.""" lines = [] for msg in self.history: if msg.sender == "system": lines.append(f"[System] {msg.content}") else: lines.append(f"[{msg.sender}] {msg.content}") return "\n\n".join(lines) # ═══════════════════════════════════════════════════════════════════════════ # 5. KNOWLEDGE — RAG-as-a-tool (LlamaIndex-style) # ═══════════════════════════════════════════════════════════════════════════ class KnowledgeStore: """ Knowledge-aware agents — LlamaIndex's RAG, as a simple Tool. Chunks documents, embeds them, retrieves relevant chunks for queries. Plugs into any Agent as a tool — the agent decides when to retrieve. No external dependencies. Uses the same trigram embedding as ExperienceReplay. For production, swap in sentence-transformers via EmbeddingBackend. Usage: # From files kb = KnowledgeStore.from_directory("./docs", glob="*.md") # From strings kb = KnowledgeStore.from_texts([ "Python was created by Guido van Rossum.", "Python 3.12 added PEP 695 type aliases.", ]) # As a tool for any agent agent = Agent("assistant", tools=[kb.as_tool()]) result = agent.run("What PEP was added in Python 3.12?") # Direct query results = kb.query("type aliases", top_k=3) """ def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50, top_k: int = 5): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap self.top_k = top_k self._chunks: list[dict[str, Any]] = [] # {text, embedding, source, index} def add_text(self, text: str, source: str = "unknown") -> int: """Add a text document — auto-chunks and embeds.""" chunks = self._chunk_text(text) count = 0 for chunk in chunks: embedding = self._embed(chunk) self._chunks.append({ "text": chunk, "embedding": embedding, "source": source, "index": len(self._chunks), }) count += 1 return count def add_file(self, path: str) -> int: """Add a file to the knowledge store.""" with open(path, "r", errors="ignore") as f: text = f.read() return self.add_text(text, source=os.path.basename(path)) @classmethod def from_texts(cls, texts: list[str], **kwargs) -> "KnowledgeStore": """Create from a list of text strings.""" store = cls(**kwargs) for i, text in enumerate(texts): store.add_text(text, source=f"text_{i}") return store @classmethod def from_directory(cls, path: str, glob: str = "*.txt", **kwargs) -> "KnowledgeStore": """Create from all matching files in a directory.""" store = cls(**kwargs) p = Path(path) for file in sorted(p.glob(glob)): store.add_file(str(file)) logger.info(f"KnowledgeStore: Loaded {len(store._chunks)} chunks from {path}") return store def query(self, query: str, top_k: int | None = None) -> list[dict[str, Any]]: """Retrieve the most relevant chunks for a query.""" k = top_k or self.top_k if not self._chunks: return [] query_emb = self._embed(query) scored = [] for chunk in self._chunks: sim = self._cosine_sim(query_emb, chunk["embedding"]) scored.append((sim, chunk)) scored.sort(key=lambda x: -x[0]) return [ {"text": c["text"], "source": c["source"], "score": round(s, 3)} for s, c in scored[:k] ] def as_tool(self, name: str = "knowledge_search", description: str | None = None) -> Tool: """ Convert this KnowledgeStore into a Tool that any Agent can use. This is the LlamaIndex QueryEngineTool pattern — RAG as a tool. The agent decides WHEN to retrieve (agentic RAG), rather than always retrieving (traditional RAG pipeline). """ desc = description or ( f"Search the knowledge base ({len(self._chunks)} chunks). " f"Use this to find specific information from documents." ) store = self class _KnowledgeTool(Tool): name_attr = name description_attr = desc parameters = { "type": "object", "properties": { "query": { "type": "string", "description": "Search query — use specific terms, not questions", } }, "required": ["query"], } def __init__(self_tool): self_tool.name = name self_tool.description = desc def execute(self_tool, query: str) -> str: results = store.query(query) if not results: return "No relevant documents found." parts = [] for i, r in enumerate(results, 1): parts.append(f"[{i}] (score={r['score']}, source={r['source']})\n{r['text']}") return "\n\n".join(parts) return _KnowledgeTool() @property def size(self) -> int: return len(self._chunks) # --- Internal --- def _chunk_text(self, text: str) -> list[str]: """Split text into overlapping chunks.""" if len(text) <= self.chunk_size: return [text] if text.strip() else [] chunks = [] start = 0 while start < len(text): end = start + self.chunk_size chunk = text[start:end].strip() if chunk: chunks.append(chunk) start += self.chunk_size - self.chunk_overlap return chunks @staticmethod def _embed(text: str) -> list[float]: """Lightweight trigram embedding (same as ExperienceReplay).""" dim = 128 vec = [0.0] * dim text_lower = text.lower() for i in range(len(text_lower) - 2): trigram = text_lower[i:i + 3] h = hash(trigram) % dim vec[h] += 1.0 magnitude = math.sqrt(sum(x * x for x in vec)) if magnitude > 0: vec = [x / magnitude for x in vec] return vec @staticmethod def _cosine_sim(a: list[float], b: list[float]) -> float: if not a or not b or len(a) != len(b): return 0.0 dot = sum(x * y for x, y in zip(a, b)) mag_a = math.sqrt(sum(x * x for x in a)) mag_b = math.sqrt(sum(x * x for x in b)) if mag_a == 0 or mag_b == 0: return 0.0 return dot / (mag_a * mag_b) # ═══════════════════════════════════════════════════════════════════════════ # CREATIVE ALIASES — Purpose Agent's own names (primary) # Old names kept for backward compatibility # ═══════════════════════════════════════════════════════════════════════════ # New names (use these) Spark = Agent # A spark of intelligence — the atomic agent unit Flow = Graph # Data flows through nodes — workflow engine swarm = parallel # Agents working concurrently like a swarm Council = Conversation # Agents deliberate like a council Vault = KnowledgeStore # Knowledge vault — RAG as a tool BEGIN = START DONE_SIGNAL = END