Rohan03 commited on
Commit
0d16e44
·
verified ·
1 Parent(s): 7eacaee

v0.2.0: Add purpose_agent/streaming.py

Browse files
Files changed (1) hide show
  1. purpose_agent/streaming.py +244 -0
purpose_agent/streaming.py ADDED
@@ -0,0 +1,244 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Streaming & Async Engine — Real-time token streaming and concurrent execution.
3
+
4
+ Adds streaming support to all modules:
5
+ - Actor streams its thought process as it reasons
6
+ - Purpose Function streams its evaluation
7
+ - Orchestrator streams step-by-step progress
8
+
9
+ Async support via asyncio:
10
+ - All core operations have async variants
11
+ - Concurrent tool execution
12
+ - Background experience replay updates
13
+
14
+ Pattern: sync methods remain the default. Async wrappers use asyncio.to_thread
15
+ for backends that don't support native async (per smolagents pattern).
16
+ """
17
+
18
+ from __future__ import annotations
19
+
20
+ import asyncio
21
+ import json
22
+ import logging
23
+ import time
24
+ from typing import Any, AsyncIterator, Callable, Iterator
25
+
26
+ from purpose_agent.types import (
27
+ Action, PurposeScore, State, Trajectory, TrajectoryStep,
28
+ )
29
+ from purpose_agent.llm_backend import ChatMessage, LLMBackend
30
+
31
+ logger = logging.getLogger(__name__)
32
+
33
+
34
+ # ---------------------------------------------------------------------------
35
+ # Streaming Mixin — adds generate_stream to any LLMBackend
36
+ # ---------------------------------------------------------------------------
37
+
38
+ class StreamingMixin:
39
+ """
40
+ Mixin that adds streaming to any LLMBackend that doesn't natively support it.
41
+
42
+ Falls back to returning the full response as a single chunk.
43
+ Override generate_stream() for native streaming.
44
+ """
45
+
46
+ def generate_stream(
47
+ self,
48
+ messages: list[ChatMessage],
49
+ temperature: float = 0.7,
50
+ max_tokens: int = 2048,
51
+ ) -> Iterator[str]:
52
+ """
53
+ Stream tokens. Default: generate full response, yield as one chunk.
54
+ Override in subclasses for real token-level streaming.
55
+ """
56
+ full = self.generate(messages, temperature=temperature, max_tokens=max_tokens)
57
+ yield full
58
+
59
+ async def agenerate(
60
+ self,
61
+ messages: list[ChatMessage],
62
+ temperature: float = 0.7,
63
+ max_tokens: int = 2048,
64
+ stop: list[str] | None = None,
65
+ ) -> str:
66
+ """Async wrapper around sync generate."""
67
+ return await asyncio.to_thread(
68
+ self.generate, messages, temperature, max_tokens, stop
69
+ )
70
+
71
+ async def agenerate_structured(
72
+ self,
73
+ messages: list[ChatMessage],
74
+ schema: dict[str, Any],
75
+ temperature: float = 0.3,
76
+ max_tokens: int = 1024,
77
+ ) -> dict[str, Any]:
78
+ """Async wrapper around sync generate_structured."""
79
+ return await asyncio.to_thread(
80
+ self.generate_structured, messages, schema, temperature, max_tokens
81
+ )
82
+
83
+ async def agenerate_stream(
84
+ self,
85
+ messages: list[ChatMessage],
86
+ temperature: float = 0.7,
87
+ max_tokens: int = 2048,
88
+ ) -> AsyncIterator[str]:
89
+ """Async streaming. Default: wrap sync stream in async iterator."""
90
+ loop = asyncio.get_event_loop()
91
+ # Run sync generator in thread, yield results
92
+ gen = self.generate_stream(messages, temperature, max_tokens)
93
+ while True:
94
+ try:
95
+ token = await asyncio.to_thread(next, gen)
96
+ yield token
97
+ except StopIteration:
98
+ break
99
+
100
+
101
+ # ---------------------------------------------------------------------------
102
+ # Event types for streaming orchestration
103
+ # ---------------------------------------------------------------------------
104
+
105
+ class StreamEvent:
106
+ """An event emitted during streaming orchestration."""
107
+
108
+ def __init__(
109
+ self,
110
+ event_type: str,
111
+ data: dict[str, Any] | None = None,
112
+ step: int = 0,
113
+ token: str = "",
114
+ ):
115
+ self.event_type = event_type # "step_start", "token", "score", "step_end", "task_end", etc.
116
+ self.data = data or {}
117
+ self.step = step
118
+ self.token = token
119
+ self.timestamp = time.time()
120
+
121
+ def __repr__(self) -> str:
122
+ if self.token:
123
+ return f"StreamEvent({self.event_type}, token='{self.token[:20]}')"
124
+ return f"StreamEvent({self.event_type}, step={self.step})"
125
+
126
+
127
+ # ---------------------------------------------------------------------------
128
+ # Async Orchestrator — streams events during task execution
129
+ # ---------------------------------------------------------------------------
130
+
131
+ class AsyncOrchestrator:
132
+ """
133
+ Async wrapper around the synchronous Orchestrator that streams events.
134
+
135
+ Usage:
136
+ async for event in async_orch.run_task_stream(purpose="...", ...):
137
+ if event.event_type == "token":
138
+ print(event.token, end="", flush=True)
139
+ elif event.event_type == "score":
140
+ print(f"\\nΦ: {event.data['phi_before']:.1f} → {event.data['phi_after']:.1f}")
141
+ """
142
+
143
+ def __init__(self, orchestrator):
144
+ self.orch = orchestrator
145
+
146
+ async def run_task_stream(
147
+ self,
148
+ purpose: str,
149
+ initial_state: State | None = None,
150
+ max_steps: int = 20,
151
+ early_stop_phi: float = 9.0,
152
+ ) -> AsyncIterator[StreamEvent]:
153
+ """Run a task and stream events as they happen."""
154
+
155
+ current_state = initial_state or self.orch.environment.reset()
156
+ self.orch.purpose_fn.reset_trajectory_stats()
157
+
158
+ trajectory = Trajectory(task_description=purpose, purpose=purpose)
159
+ history: list[dict[str, Any]] = []
160
+
161
+ yield StreamEvent("task_start", {"purpose": purpose, "max_steps": max_steps})
162
+
163
+ for step_idx in range(max_steps):
164
+ yield StreamEvent("step_start", {"step": step_idx + 1}, step=step_idx + 1)
165
+
166
+ # Actor decides (run in thread to not block)
167
+ action = await asyncio.to_thread(
168
+ self.orch.actor.decide, purpose, current_state, history
169
+ )
170
+
171
+ yield StreamEvent("action", {
172
+ "name": action.name,
173
+ "thought": action.thought,
174
+ "expected_delta": action.expected_delta,
175
+ }, step=step_idx + 1)
176
+
177
+ if action.name.upper() == "DONE":
178
+ yield StreamEvent("done", {}, step=step_idx + 1)
179
+ break
180
+
181
+ # Environment executes
182
+ try:
183
+ new_state = await asyncio.to_thread(
184
+ self.orch.environment.execute, action, current_state
185
+ )
186
+ except Exception as e:
187
+ new_state = State(data={**current_state.data, "_error": str(e)})
188
+ yield StreamEvent("error", {"error": str(e)}, step=step_idx + 1)
189
+
190
+ # Purpose Function scores
191
+ score = await asyncio.to_thread(
192
+ self.orch.purpose_fn.evaluate, current_state, action, new_state, purpose
193
+ )
194
+
195
+ yield StreamEvent("score", {
196
+ "phi_before": score.phi_before,
197
+ "phi_after": score.phi_after,
198
+ "delta": score.delta,
199
+ "confidence": score.confidence,
200
+ "improved": score.improved,
201
+ "evidence": score.evidence,
202
+ }, step=step_idx + 1)
203
+
204
+ # Record step
205
+ step = TrajectoryStep(
206
+ state_before=current_state, action=action, state_after=new_state,
207
+ score=score, step_index=step_idx + 1,
208
+ )
209
+ trajectory.steps.append(step)
210
+ history.append({
211
+ "action": f"{action.name}({json.dumps(action.params, default=str)})",
212
+ "result": new_state.describe()[:200],
213
+ "score": f"Δ={score.delta:+.2f}",
214
+ })
215
+
216
+ yield StreamEvent("step_end", {
217
+ "state_summary": new_state.describe()[:200],
218
+ }, step=step_idx + 1)
219
+
220
+ if score.phi_after >= early_stop_phi:
221
+ yield StreamEvent("early_stop", {"phi": score.phi_after}, step=step_idx + 1)
222
+ break
223
+
224
+ if self.orch.environment.is_terminal(new_state):
225
+ yield StreamEvent("terminal", {}, step=step_idx + 1)
226
+ break
227
+
228
+ current_state = new_state
229
+
230
+ # Post-task (run in background)
231
+ await asyncio.to_thread(self.orch._post_task, trajectory, [])
232
+
233
+ yield StreamEvent("task_end", {
234
+ "total_steps": len(trajectory.steps),
235
+ "cumulative_reward": trajectory.cumulative_reward,
236
+ "success_rate": trajectory.success_rate,
237
+ "final_phi": trajectory.final_phi,
238
+ })
239
+
240
+ async def run_task(self, **kwargs):
241
+ """Non-streaming async task execution."""
242
+ from purpose_agent.orchestrator import TaskResult
243
+ result = await asyncio.to_thread(self.orch.run_task, **kwargs)
244
+ return result