| """ |
| Streaming & Async Engine — Real-time token streaming and concurrent execution. |
| |
| Adds streaming support to all modules: |
| - Actor streams its thought process as it reasons |
| - Purpose Function streams its evaluation |
| - Orchestrator streams step-by-step progress |
| |
| Async support via asyncio: |
| - All core operations have async variants |
| - Concurrent tool execution |
| - Background experience replay updates |
| |
| Pattern: sync methods remain the default. Async wrappers use asyncio.to_thread |
| for backends that don't support native async (per smolagents pattern). |
| """ |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import json |
| import logging |
| import time |
| from typing import Any, AsyncIterator, Callable, Iterator |
|
|
| from purpose_agent.types import ( |
| Action, PurposeScore, State, Trajectory, TrajectoryStep, |
| ) |
| from purpose_agent.llm_backend import ChatMessage, LLMBackend |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
|
|
| class StreamingMixin: |
| """ |
| Mixin that adds streaming to any LLMBackend that doesn't natively support it. |
| |
| Falls back to returning the full response as a single chunk. |
| Override generate_stream() for native streaming. |
| """ |
|
|
| def generate_stream( |
| self, |
| messages: list[ChatMessage], |
| temperature: float = 0.7, |
| max_tokens: int = 2048, |
| ) -> Iterator[str]: |
| """ |
| Stream tokens. Default: generate full response, yield as one chunk. |
| Override in subclasses for real token-level streaming. |
| """ |
| full = self.generate(messages, temperature=temperature, max_tokens=max_tokens) |
| yield full |
|
|
| async def agenerate( |
| self, |
| messages: list[ChatMessage], |
| temperature: float = 0.7, |
| max_tokens: int = 2048, |
| stop: list[str] | None = None, |
| ) -> str: |
| """Async wrapper around sync generate.""" |
| return await asyncio.to_thread( |
| self.generate, messages, temperature, max_tokens, stop |
| ) |
|
|
| async def agenerate_structured( |
| self, |
| messages: list[ChatMessage], |
| schema: dict[str, Any], |
| temperature: float = 0.3, |
| max_tokens: int = 1024, |
| ) -> dict[str, Any]: |
| """Async wrapper around sync generate_structured.""" |
| return await asyncio.to_thread( |
| self.generate_structured, messages, schema, temperature, max_tokens |
| ) |
|
|
| async def agenerate_stream( |
| self, |
| messages: list[ChatMessage], |
| temperature: float = 0.7, |
| max_tokens: int = 2048, |
| ) -> AsyncIterator[str]: |
| """Async streaming. Default: wrap sync stream in async iterator.""" |
| loop = asyncio.get_event_loop() |
| |
| gen = self.generate_stream(messages, temperature, max_tokens) |
| while True: |
| try: |
| token = await asyncio.to_thread(next, gen) |
| yield token |
| except StopIteration: |
| break |
|
|
|
|
| |
| |
| |
|
|
| class StreamEvent: |
| """An event emitted during streaming orchestration.""" |
|
|
| def __init__( |
| self, |
| event_type: str, |
| data: dict[str, Any] | None = None, |
| step: int = 0, |
| token: str = "", |
| ): |
| self.event_type = event_type |
| self.data = data or {} |
| self.step = step |
| self.token = token |
| self.timestamp = time.time() |
|
|
| def __repr__(self) -> str: |
| if self.token: |
| return f"StreamEvent({self.event_type}, token='{self.token[:20]}')" |
| return f"StreamEvent({self.event_type}, step={self.step})" |
|
|
|
|
| |
| |
| |
|
|
| class AsyncOrchestrator: |
| """ |
| Async wrapper around the synchronous Orchestrator that streams events. |
| |
| Usage: |
| async for event in async_orch.run_task_stream(purpose="...", ...): |
| if event.event_type == "token": |
| print(event.token, end="", flush=True) |
| elif event.event_type == "score": |
| print(f"\\nΦ: {event.data['phi_before']:.1f} → {event.data['phi_after']:.1f}") |
| """ |
|
|
| def __init__(self, orchestrator): |
| self.orch = orchestrator |
|
|
| async def run_task_stream( |
| self, |
| purpose: str, |
| initial_state: State | None = None, |
| max_steps: int = 20, |
| early_stop_phi: float = 9.0, |
| ) -> AsyncIterator[StreamEvent]: |
| """Run a task and stream events as they happen.""" |
|
|
| current_state = initial_state or self.orch.environment.reset() |
| self.orch.purpose_fn.reset_trajectory_stats() |
|
|
| trajectory = Trajectory(task_description=purpose, purpose=purpose) |
| history: list[dict[str, Any]] = [] |
|
|
| yield StreamEvent("task_start", {"purpose": purpose, "max_steps": max_steps}) |
|
|
| for step_idx in range(max_steps): |
| yield StreamEvent("step_start", {"step": step_idx + 1}, step=step_idx + 1) |
|
|
| |
| action = await asyncio.to_thread( |
| self.orch.actor.decide, purpose, current_state, history |
| ) |
|
|
| yield StreamEvent("action", { |
| "name": action.name, |
| "thought": action.thought, |
| "expected_delta": action.expected_delta, |
| }, step=step_idx + 1) |
|
|
| if action.name.upper() == "DONE": |
| yield StreamEvent("done", {}, step=step_idx + 1) |
| break |
|
|
| |
| try: |
| new_state = await asyncio.to_thread( |
| self.orch.environment.execute, action, current_state |
| ) |
| except Exception as e: |
| new_state = State(data={**current_state.data, "_error": str(e)}) |
| yield StreamEvent("error", {"error": str(e)}, step=step_idx + 1) |
|
|
| |
| score = await asyncio.to_thread( |
| self.orch.purpose_fn.evaluate, current_state, action, new_state, purpose |
| ) |
|
|
| yield StreamEvent("score", { |
| "phi_before": score.phi_before, |
| "phi_after": score.phi_after, |
| "delta": score.delta, |
| "confidence": score.confidence, |
| "improved": score.improved, |
| "evidence": score.evidence, |
| }, step=step_idx + 1) |
|
|
| |
| step = TrajectoryStep( |
| state_before=current_state, action=action, state_after=new_state, |
| score=score, step_index=step_idx + 1, |
| ) |
| trajectory.steps.append(step) |
| history.append({ |
| "action": f"{action.name}({json.dumps(action.params, default=str)})", |
| "result": new_state.describe()[:200], |
| "score": f"Δ={score.delta:+.2f}", |
| }) |
|
|
| yield StreamEvent("step_end", { |
| "state_summary": new_state.describe()[:200], |
| }, step=step_idx + 1) |
|
|
| if score.phi_after >= early_stop_phi: |
| yield StreamEvent("early_stop", {"phi": score.phi_after}, step=step_idx + 1) |
| break |
|
|
| if self.orch.environment.is_terminal(new_state): |
| yield StreamEvent("terminal", {}, step=step_idx + 1) |
| break |
|
|
| current_state = new_state |
|
|
| |
| await asyncio.to_thread(self.orch.post_task, trajectory, []) |
|
|
| yield StreamEvent("task_end", { |
| "total_steps": len(trajectory.steps), |
| "cumulative_reward": trajectory.cumulative_reward, |
| "success_rate": trajectory.success_rate, |
| "final_phi": trajectory.final_phi, |
| }) |
|
|
| async def run_task(self, **kwargs): |
| """Non-streaming async task execution.""" |
| from purpose_agent.orchestrator import TaskResult |
| result = await asyncio.to_thread(self.orch.run_task, **kwargs) |
| return result |
|
|