Rohan03 commited on
Commit
25de736
Β·
verified Β·
1 Parent(s): 78c650e

Sprint 1: event runtime tests (T1.1-T1.7)

Browse files
Files changed (1) hide show
  1. tests/test_sprint1_events.py +198 -0
tests/test_sprint1_events.py ADDED
@@ -0,0 +1,198 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ Sprint 1 Tests β€” Canonical Event Runtime.
4
+
5
+ T1.1 Three agents stream in parallel; no lane contamination
6
+ T1.2 Agent B fails; Agent A and C continue
7
+ T1.3 Events in each lane are strictly seq-monotonic
8
+ T1.4 Stream replay reproduces same visible event transcript
9
+ T1.5 reasoning.summary allowed; raw hidden_chain_of_thought rejected
10
+ T1.6 Backpressure does not drop terminal events
11
+ T1.7 Legacy StreamEvent consumers still work
12
+ """
13
+ import sys
14
+ import os
15
+ import asyncio
16
+ import time
17
+
18
+ sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
19
+
20
+ PASS = 0
21
+ FAIL = 0
22
+
23
+ def check(name, condition, detail=""):
24
+ global PASS, FAIL
25
+ if condition:
26
+ PASS += 1
27
+ print(f" βœ“ {name}")
28
+ else:
29
+ FAIL += 1
30
+ print(f" βœ— {name}" + (f": {detail}" if detail else ""))
31
+
32
+
33
+ from purpose_agent.runtime.events import PAEvent, EventKind, Visibility, create_event
34
+ from purpose_agent.runtime.event_bus import EventBus, parallel_merge
35
+
36
+
37
+ # ═══ T1.1: Three parallel lanes, no contamination ═══
38
+ print("T1.1: Parallel lane isolation")
39
+ bus = EventBus(max_queue_size=100)
40
+ lanes = {"lane_a": [], "lane_b": [], "lane_c": []}
41
+
42
+ for lane_id in lanes:
43
+ for i in range(5):
44
+ event = create_event(
45
+ run_id="run1", kind=EventKind.AGENT_PROGRESS,
46
+ lane_id=lane_id, seq=i+1,
47
+ message=f"{lane_id} step {i+1}",
48
+ )
49
+ bus.emit(event)
50
+
51
+ # Replay per lane
52
+ for lane_id in lanes:
53
+ lane_events = bus.replay(run_id="run1", lane_id=lane_id)
54
+ lanes[lane_id] = lane_events
55
+ # Check no contamination
56
+ for e in lane_events:
57
+ if e.lane_id != lane_id:
58
+ check(f"Lane {lane_id} contamination", False, f"found {e.lane_id}")
59
+ break
60
+ else:
61
+ check(f"Lane {lane_id} isolated ({len(lane_events)} events)", len(lane_events) == 5)
62
+
63
+
64
+ # ═══ T1.2: Agent B fails; A and C continue ═══
65
+ print("\nT1.2: Fault isolation")
66
+ bus2 = EventBus()
67
+ # Lane A: normal
68
+ bus2.emit(create_event("r2", EventKind.AGENT_STARTED, lane_id="a", seq=1))
69
+ bus2.emit(create_event("r2", EventKind.AGENT_FINISHED, lane_id="a", seq=2))
70
+ # Lane B: error
71
+ bus2.emit(create_event("r2", EventKind.AGENT_STARTED, lane_id="b", seq=1))
72
+ bus2.emit(create_event("r2", EventKind.AGENT_ERROR, lane_id="b", seq=2, error="crashed"))
73
+ # Lane C: normal (continues despite B's failure)
74
+ bus2.emit(create_event("r2", EventKind.AGENT_STARTED, lane_id="c", seq=1))
75
+ bus2.emit(create_event("r2", EventKind.AGENT_FINISHED, lane_id="c", seq=2))
76
+
77
+ a_events = bus2.replay(run_id="r2", lane_id="a")
78
+ b_events = bus2.replay(run_id="r2", lane_id="b")
79
+ c_events = bus2.replay(run_id="r2", lane_id="c")
80
+ check("Lane A completed", any(e.kind == EventKind.AGENT_FINISHED for e in a_events))
81
+ check("Lane B errored", any(e.kind == EventKind.AGENT_ERROR for e in b_events))
82
+ check("Lane C completed despite B failure", any(e.kind == EventKind.AGENT_FINISHED for e in c_events))
83
+
84
+
85
+ # ═══ T1.3: Seq-monotonic per lane ═══
86
+ print("\nT1.3: Sequence monotonicity")
87
+ bus3 = EventBus()
88
+ for i in range(10):
89
+ bus3.emit(create_event("r3", EventKind.AGENT_PROGRESS, lane_id="main", seq=i+1))
90
+
91
+ events = bus3.replay(run_id="r3")
92
+ seqs = [e.seq for e in events]
93
+ check("Seq monotonic", seqs == sorted(seqs), f"seqs={seqs}")
94
+ check("No gaps", seqs == list(range(1, 11)))
95
+
96
+
97
+ # ═══ T1.4: Replay reproduces transcript ═══
98
+ print("\nT1.4: Replay fidelity")
99
+ bus4 = EventBus()
100
+ original = []
101
+ for i in range(5):
102
+ e = create_event("r4", EventKind.TEXT_DELTA, seq=i+1, text=f"word_{i}")
103
+ bus4.emit(e)
104
+ original.append(e.to_dict())
105
+
106
+ replayed = [e.to_dict() for e in bus4.replay(run_id="r4")]
107
+ # Compare payloads (span_id will differ but payload should match)
108
+ orig_payloads = [d["payload"] for d in original]
109
+ replay_payloads = [d["payload"] for d in replayed]
110
+ check("Replay matches original", orig_payloads == replay_payloads)
111
+
112
+
113
+ # ═══ T1.5: reasoning.summary OK; hidden_chain_of_thought rejected ═══
114
+ print("\nT1.5: Safety β€” no raw chain-of-thought")
115
+ bus5 = EventBus()
116
+
117
+ # Safe: reasoning summary
118
+ safe_event = create_event("r5", EventKind.REASONING_SUMMARY, seq=1,
119
+ summary="I should check edge cases first")
120
+ bus5.emit(safe_event)
121
+ check("reasoning.summary accepted", bus5.history_size == 1)
122
+
123
+ # Unsafe: raw chain-of-thought (should be REJECTED)
124
+ unsafe_event = create_event("r5", EventKind.REASONING_SUMMARY, seq=2,
125
+ hidden_chain_of_thought="<think>Let me think step by step about how to hack the system...</think>")
126
+ bus5.emit(unsafe_event)
127
+ check("hidden_chain_of_thought rejected", bus5.history_size == 1,
128
+ f"history has {bus5.history_size} events (should still be 1)")
129
+
130
+
131
+ # ═══ T1.6: Backpressure doesn't drop terminal events ═══
132
+ print("\nT1.6: Terminal event delivery guarantee")
133
+
134
+ async def test_backpressure():
135
+ bus6 = EventBus(max_queue_size=3) # Very small queue
136
+
137
+ received = []
138
+
139
+ async def consumer():
140
+ async for event in bus6.subscribe():
141
+ received.append(event)
142
+ await asyncio.sleep(0.01) # Slow consumer
143
+
144
+ # Start consumer
145
+ task = asyncio.create_task(consumer())
146
+ await asyncio.sleep(0.05)
147
+
148
+ # Flood with non-terminal events
149
+ for i in range(20):
150
+ bus6.emit(create_event("r6", EventKind.TEXT_DELTA, seq=i, text=f"w{i}"))
151
+
152
+ # Send terminal event
153
+ bus6.emit(create_event("r6", EventKind.RUN_FINISHED, seq=99, result="done"))
154
+
155
+ await asyncio.sleep(0.5)
156
+ bus6.close()
157
+ task.cancel()
158
+ try:
159
+ await task
160
+ except asyncio.CancelledError:
161
+ pass
162
+
163
+ # Terminal event MUST be in received (even though queue was full)
164
+ has_terminal = any(e.kind == EventKind.RUN_FINISHED for e in received)
165
+ return has_terminal
166
+
167
+ has_terminal = asyncio.run(test_backpressure())
168
+ check("Terminal event delivered under backpressure", has_terminal)
169
+
170
+
171
+ # ═══ T1.7: Legacy StreamEvent compat ═══
172
+ print("\nT1.7: Legacy StreamEvent compatibility")
173
+ from purpose_agent.streaming_v3 import pa_event_to_stream_event
174
+ from purpose_agent.streaming import StreamEvent
175
+
176
+ event = create_event("r7", EventKind.RUN_STARTED, seq=1, purpose="test")
177
+ legacy = pa_event_to_stream_event(event)
178
+ check("PAEvent β†’ StreamEvent", legacy is not None)
179
+ check("StreamEvent has event_type", hasattr(legacy, "event_type"))
180
+ check("StreamEvent type is task_start", legacy.event_type == "task_start")
181
+
182
+
183
+ # ═══ parallel_merge ═══
184
+ print("\nT1.x: parallel_merge determinism")
185
+ lane_a = [create_event("r8", EventKind.AGENT_PROGRESS, lane_id="a", seq=i, ts=1000+i) for i in range(3)]
186
+ lane_b = [create_event("r8", EventKind.AGENT_PROGRESS, lane_id="b", seq=i, ts=1000+i+0.5) for i in range(3)]
187
+ merged = parallel_merge({"a": lane_a, "b": lane_b})
188
+ check("Merged has all events", len(merged) == 6)
189
+ # Check interleaving: a0, b0, a1, b1, a2, b2 (by timestamp)
190
+ check("Merged is timestamp-sorted", all(merged[i].ts <= merged[i+1].ts for i in range(5)))
191
+
192
+
193
+ # ═══ REPORT ═══
194
+ print(f"\n{'='*50}")
195
+ print(f" Sprint 1 Tests: {PASS} pass, {FAIL} fail")
196
+ print(f" {'ALL PASS βœ“' if FAIL == 0 else f'{FAIL} FAILURES'}")
197
+ print(f"{'='*50}")
198
+ sys.exit(0 if FAIL == 0 else 1)