purpose-agent / purpose_agent /orchestrator.py
Rohan03's picture
refactor: modularity fixes + plugin registry + compiled research
09cce1a verified
raw
history blame
21.7 kB
"""
Orchestrator β€” The main loop tying Actor, Purpose Function, Experience Replay,
and Heuristic Optimizer together.
Implements the self-improvement loop:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ ORCHESTRATOR LOOP β”‚
β”‚ β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” action β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” s_new β”‚
β”‚ β”‚ ACTOR β”‚ ────────► β”‚ ENVIRONMENT β”‚ ──────────┐ β”‚
β”‚ β”‚(+memory) β”‚ β”‚ (your code) β”‚ β”‚ β”‚
β”‚ β””β”€β”€β”€β”€β–²β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚
β”‚ β”‚ β–Ό β”‚
β”‚ β”‚ heuristics β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” (s, a, s') β”‚
β”‚ │◄───────────────│ OPTIMIZER │◄─────────┐ β”‚
β”‚ β”‚ β”‚ (distillation) β”‚ β”‚ β”‚
β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚
β”‚ β”‚ β”‚ β”‚
β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” Ξ¦(s)β†’Ξ¦(s') β”‚
β”‚ β”‚ β”‚ PURPOSE FN │─────────── β”‚
β”‚ β”‚ β”‚ (state critic) β”‚ β”‚ β”‚
β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚
β”‚ β”‚ β”‚ β”‚
β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚
β”‚ └────────────────│ EXPERIENCE β”‚β—„β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚ β”‚ REPLAY BUFFER β”‚ β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Usage:
from purpose_agent import Orchestrator, MockLLMBackend
# 1. Define your environment
class MyEnv(Environment):
def execute(self, action, current_state):
# ... do something ...
return new_state
# 2. Create orchestrator
orch = Orchestrator(
llm=MockLLMBackend(), # or HFInferenceBackend(), OpenAICompatibleBackend()
environment=MyEnv(),
available_actions={"search": "Search for items", "move": "Move to location"},
)
# 3. Run a task
result = orch.run_task(
purpose="Find the hidden treasure in the maze",
initial_state=State(data={"position": [0, 0], "inventory": []}),
max_steps=20,
)
# 4. The agent self-improves β€” run more tasks and it gets better
result2 = orch.run_task(purpose="Find the second treasure", ...)
"""
from __future__ import annotations
import json
import logging
import time
from abc import ABC, abstractmethod
from typing import Any, Callable
from purpose_agent.types import (
Action,
Heuristic,
MemoryTier,
PurposeScore,
State,
Trajectory,
TrajectoryStep,
)
from purpose_agent.actor import Actor
from purpose_agent.purpose_function import PurposeFunction
from purpose_agent.experience_replay import ExperienceReplay
from purpose_agent.optimizer import HeuristicOptimizer
from purpose_agent.llm_backend import LLMBackend
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Environment Interface
# ---------------------------------------------------------------------------
class Environment(ABC):
"""
Abstract environment that the Agent acts in.
Implement this for your specific use case:
- Web navigation: wrap a browser automation tool
- Code generation: wrap a code executor
- Game: wrap a game API
- Simulated: mock environment for testing
The Orchestrator calls execute() with the agent's action and current state,
and expects a new state back.
"""
@abstractmethod
def execute(self, action: Action, current_state: State) -> State:
"""
Execute an action in the environment and return the resulting state.
Args:
action: The action to execute
current_state: The state before the action
Returns:
The new state after the action
"""
...
def reset(self) -> State:
"""
Reset the environment and return the initial state.
Override if your environment needs resetting between tasks.
"""
return State(data={})
def is_terminal(self, state: State) -> bool:
"""
Check if the state is terminal (task complete or impossible to continue).
Override for environments with natural termination conditions.
"""
return False
class SimpleEnvironment(Environment):
"""
A simple environment backed by a user-provided execute function.
Usage:
env = SimpleEnvironment(
execute_fn=lambda action, state: new_state,
initial_state=State(data={"x": 0}),
)
"""
def __init__(
self,
execute_fn: Callable[[Action, State], State],
initial_state: State | None = None,
terminal_fn: Callable[[State], bool] | None = None,
):
self._execute_fn = execute_fn
self._initial_state = initial_state or State(data={})
self._terminal_fn = terminal_fn
def execute(self, action: Action, current_state: State) -> State:
return self._execute_fn(action, current_state)
def reset(self) -> State:
return self._initial_state
def is_terminal(self, state: State) -> bool:
if self._terminal_fn:
return self._terminal_fn(state)
return False
# ---------------------------------------------------------------------------
# Task Result
# ---------------------------------------------------------------------------
class TaskResult:
"""Result of running a task through the Orchestrator."""
def __init__(self, trajectory: Trajectory, final_state: State):
self.trajectory = trajectory
self.final_state = final_state
@property
def success(self) -> bool:
"""Was the task successful? (final Ξ¦ > 7.0)"""
phi = self.trajectory.final_phi
return phi is not None and phi > 7.0
@property
def total_steps(self) -> int:
return len(self.trajectory.steps)
@property
def cumulative_reward(self) -> float:
return self.trajectory.cumulative_reward
@property
def final_phi(self) -> float | None:
return self.trajectory.final_phi
def summary(self) -> str:
lines = [
f"Task: {self.trajectory.task_description}",
f"Purpose: {self.trajectory.purpose}",
f"Steps: {self.total_steps}",
f"Success Rate: {self.trajectory.success_rate:.1%}",
f"Cumulative Reward: {self.cumulative_reward:.2f}",
f"Net Delta: {self.trajectory.total_delta:.2f}",
f"Final Ξ¦: {self.final_phi:.2f}" if self.final_phi is not None else "Final Ξ¦: N/A",
f"Task Success: {'βœ“' if self.success else 'βœ—'}",
]
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Orchestrator
# ---------------------------------------------------------------------------
class Orchestrator:
"""
Main orchestration loop for the self-improving agent.
Ties together all modules:
- Actor: Decides actions based on state + memory
- Purpose Function: Scores state transitions (Ξ¦ improvement)
- Experience Replay: Stores trajectories for future retrieval
- Heuristic Optimizer: Extracts winning strategies from good trajectories
Self-improvement happens via the memory feedback loop:
1. Actor uses heuristics from memory to decide actions
2. Purpose Function scores each transition
3. Experience Replay stores the full trajectory
4. Optimizer distills high-reward trajectories into new heuristics
5. Actor's memory is updated with new heuristics β†’ better next time
Args:
llm: Default LLM backend (used for all modules unless overridden)
critic_llm: Optional separate LLM for the Purpose Function
optimizer_llm: Optional separate LLM for the Optimizer
environment: The environment the agent acts in
available_actions: Dict of {action_name: description}
experience_buffer_size: Max trajectories in experience replay
persistence_dir: Directory for persistent storage (experience replay, heuristics)
on_step: Optional callback called after each step (for monitoring)
"""
def __init__(
self,
llm: LLMBackend,
environment: Environment,
available_actions: dict[str, str] | None = None,
critic_llm: LLMBackend | None = None,
optimizer_llm: LLMBackend | None = None,
experience_buffer_size: int = 500,
persistence_dir: str | None = None,
on_step: Callable[[TrajectoryStep], None] | None = None,
optimize_every_n_tasks: int = 1,
):
self.environment = environment
self.on_step = on_step
self.optimize_every_n_tasks = optimize_every_n_tasks
self._tasks_since_optimize = 0
# Persistence
replay_path = None
if persistence_dir:
import os
os.makedirs(persistence_dir, exist_ok=True)
replay_path = f"{persistence_dir}/experience_replay.json"
# Initialize modules
self.actor = Actor(
llm=llm,
available_actions=available_actions,
)
self.purpose_fn = PurposeFunction(
llm=critic_llm or llm,
)
self.experience_replay = ExperienceReplay(
capacity=experience_buffer_size,
persistence_path=replay_path,
)
self.optimizer = HeuristicOptimizer(
llm=optimizer_llm or llm,
)
# Load existing heuristics into Actor memory
self.sync_memory()
# ------------------------------------------------------------------
# Main Task Loop
# ------------------------------------------------------------------
def run_task(
self,
purpose: str,
initial_state: State | None = None,
max_steps: int = 20,
early_stop_phi: float = 9.0,
task_description: str | None = None,
) -> TaskResult:
"""
Run a complete task through the agent loop.
The loop for each step:
1. Actor decides an action (with thought + prediction)
2. Environment executes the action β†’ new state
3. Purpose Function evaluates: Ξ¦(s_new) vs Ξ¦(s_old)
4. Trajectory step is recorded
5. Check termination conditions
After the task:
- Trajectory is added to Experience Replay
- If enough tasks have run, Optimizer extracts new heuristics
- Actor's memory is updated
Args:
purpose: The goal description
initial_state: Starting state (or environment.reset() if None)
max_steps: Maximum steps before forced termination
early_stop_phi: Stop if Ξ¦ exceeds this value (goal ~achieved)
task_description: Optional description (defaults to purpose)
"""
task_desc = task_description or purpose
current_state = initial_state or self.environment.reset()
# Reset Purpose Function per-trajectory stats
self.purpose_fn.reset_trajectory_stats()
# Retrieve relevant past experiences for context
relevant_experiences = self.experience_replay.retrieve(task_desc, top_k=3)
self._inject_experience_context(relevant_experiences)
# Create trajectory
trajectory = Trajectory(
task_description=task_desc,
purpose=purpose,
)
# History for Actor context
history: list[dict[str, Any]] = []
logger.info(f"═══ Starting task: {task_desc} (max {max_steps} steps) ═══")
for step_idx in range(max_steps):
step_start = time.time()
# Step 1: Actor decides
action = self.actor.decide(
purpose=purpose,
current_state=current_state,
history=history,
)
logger.info(
f"Step {step_idx + 1}: Action={action.name}, "
f"Thought={action.thought[:100]}..."
)
# Check for DONE action
if action.name.upper() == "DONE":
logger.info("Agent signaled DONE β€” ending task")
# Still score the final state to record final Ξ¦
final_score = self.purpose_fn.evaluate(
state_before=current_state,
action=action,
state_after=current_state,
purpose=purpose,
)
trajectory.steps.append(TrajectoryStep(
state_before=current_state,
action=action,
state_after=current_state,
score=final_score,
step_index=step_idx + 1,
wall_time_s=time.time() - step_start,
))
break
# Step 2: Environment executes
try:
new_state = self.environment.execute(action, current_state)
except Exception as e:
logger.error(f"Environment execution failed: {e}")
new_state = State(
data={**current_state.data, "_error": str(e)},
summary=f"Error: {e}",
)
# Step 3: Purpose Function evaluates
score = self.purpose_fn.evaluate(
state_before=current_state,
action=action,
state_after=new_state,
purpose=purpose,
)
# Step 4: Record step
step = TrajectoryStep(
state_before=current_state,
action=action,
state_after=new_state,
score=score,
step_index=step_idx + 1,
wall_time_s=time.time() - step_start,
)
trajectory.steps.append(step)
# Update history for Actor context
history.append({
"action": f"{action.name}({json.dumps(action.params, default=str)})",
"result": new_state.describe()[:200],
"score": f"Ξ”={score.delta:+.2f}" if score else "N/A",
})
# Callback
if self.on_step:
self.on_step(step)
logger.info(
f" β†’ Ξ¦: {score.phi_before:.1f} β†’ {score.phi_after:.1f} "
f"(Ξ”={score.delta:+.2f}, conf={score.confidence:.2f})"
)
# Step 5: Check termination
if score.phi_after >= early_stop_phi:
logger.info(f"Early stop: Ξ¦={score.phi_after:.1f} β‰₯ {early_stop_phi}")
break
if self.environment.is_terminal(new_state):
logger.info("Environment signaled terminal state")
break
current_state = new_state
# Post-task processing
result = TaskResult(trajectory=trajectory, final_state=current_state)
self.post_task(trajectory, relevant_experiences)
logger.info(f"═══ Task complete ═══\n{result.summary()}")
return result
# ------------------------------------------------------------------
# Post-Task: Experience Storage + Optimization
# ------------------------------------------------------------------
def post_task(
self,
trajectory: Trajectory,
used_experiences: list[Any] | None = None,
) -> None:
"""Post-task processing: store trajectory, maybe optimize, sync memory.
Public API β€” called by HITLOrchestrator, AsyncOrchestrator, and
any custom orchestration wrapper after a task completes.
"""
used_experiences = used_experiences or []
# Store in experience replay
record = self.experience_replay.add(trajectory)
# Update Q-values for retrieved experiences that were used
task_success = trajectory.success_rate > 0.5
for exp in used_experiences:
self.experience_replay.update_q_value(
exp.id, reward=1.0 if task_success else 0.0
)
# Update heuristic usage stats
for h in self.actor.strategic_memory + self.actor.procedural_memory:
self.optimizer.update_heuristic_usage(h.id, was_successful=task_success)
# Periodic optimization
self._tasks_since_optimize += 1
if self._tasks_since_optimize >= self.optimize_every_n_tasks:
self._run_optimization()
self._tasks_since_optimize = 0
def _run_optimization(self) -> None:
"""Run the heuristic optimization cycle."""
logger.info("Running optimization cycle...")
# Get best trajectories
top_trajectories = self.experience_replay.get_top_trajectories(
n=5, min_success_rate=0.3
)
if not top_trajectories:
logger.info("No qualifying trajectories for optimization")
return
# Run optimizer
self.optimizer.optimize(top_trajectories)
# Sync updated heuristics to Actor memory
self.sync_memory()
def sync_memory(self) -> None:
"""Push current heuristic library to Actor's memory tiers.
Public API β€” call after manually modifying the heuristic library
(e.g., human-injected heuristics via HITL).
"""
self.actor.update_strategic_memory(
self.optimizer.get_heuristics_by_tier(MemoryTier.STRATEGIC)
)
self.actor.update_procedural_memory(
self.optimizer.get_heuristics_by_tier(MemoryTier.PROCEDURAL)
)
# Tool memory from heuristics
tool_heuristics = self.optimizer.get_heuristics_by_tier(MemoryTier.TOOL)
tool_tips = {h.pattern: h.strategy for h in tool_heuristics}
if tool_tips:
self.actor.update_tool_memory(tool_tips)
def _inject_experience_context(self, experiences: list[Any]) -> None:
"""
Inject retrieved experience context into Actor's procedural memory.
This is the CER (arxiv:2506.06698) retrieval injection pattern:
relevant past trajectories β†’ distilled into SOPs β†’ added to Actor context.
"""
injected = []
for exp in experiences:
for h in exp.heuristics:
if h.tier == MemoryTier.PROCEDURAL:
injected.append(h)
if injected:
current = self.actor.procedural_memory or []
self.actor.procedural_memory = current + injected
logger.debug(f"Injected {len(injected)} experience-based SOPs")
# ------------------------------------------------------------------
# Inspection / Monitoring
# ------------------------------------------------------------------
@property
def stats(self) -> dict[str, Any]:
"""Get current framework statistics."""
return {
"experience_replay": self.experience_replay.stats,
"heuristic_library_size": len(self.optimizer.heuristic_library),
"heuristics_by_tier": {
tier.value: len(self.optimizer.get_heuristics_by_tier(tier))
for tier in MemoryTier
},
"tasks_since_optimize": self._tasks_since_optimize,
}
def get_heuristic_report(self) -> str:
"""Human-readable report of all learned heuristics."""
lines = ["═══ Learned Heuristics Report ═══\n"]
for tier in MemoryTier:
heuristics = self.optimizer.get_heuristics_by_tier(tier)
lines.append(f"\n{'─' * 40}")
lines.append(f" {tier.value.upper()} ({len(heuristics)} heuristics)")
lines.append(f"{'─' * 40}")
for h in heuristics:
lines.append(f"\n [{h.id}] Q={h.q_value:.3f} (used {h.times_used}x, "
f"{h.times_succeeded} successes)")
lines.append(f" Pattern: {h.pattern}")
lines.append(f" Strategy: {h.strategy}")
if h.steps:
for i, step in enumerate(h.steps, 1):
lines.append(f" {i}. {step}")
return "\n".join(lines)