File size: 6,784 Bytes
25de736 59578ae 25de736 59578ae 25de736 59578ae 25de736 59578ae 25de736 59578ae 25de736 59578ae 25de736 59578ae 25de736 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 | #!/usr/bin/env python3
"""
Sprint 1 Tests β Canonical Event Runtime.
T1.1 Three agents stream in parallel; no lane contamination
T1.2 Agent B fails; Agent A and C continue
T1.3 Events in each lane are strictly seq-monotonic
T1.4 Stream replay reproduces same visible event transcript
T1.5 reasoning.summary allowed; raw hidden_chain_of_thought rejected
T1.6 Backpressure does not drop terminal events
T1.7 Legacy StreamEvent consumers still work
"""
import sys
import os
import asyncio
import time
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
PASS = 0
FAIL = 0
def check(name, condition, detail=""):
global PASS, FAIL
if condition:
PASS += 1
print(f" β {name}")
else:
FAIL += 1
print(f" β {name}" + (f": {detail}" if detail else ""))
from purpose_agent.runtime.events import PAEvent, EventKind, Visibility, create_event
from purpose_agent.runtime.event_bus import EventBus, parallel_merge
# βββ T1.1: Three parallel lanes, no contamination βββ
print("T1.1: Parallel lane isolation")
bus = EventBus(max_queue_size=100)
lanes = {"lane_a": [], "lane_b": [], "lane_c": []}
for lane_id in lanes:
for i in range(5):
event = create_event(
run_id="run1", kind=EventKind.AGENT_PROGRESS,
lane_id=lane_id, seq=i+1,
message=f"{lane_id} step {i+1}",
)
bus.emit(event)
# Replay per lane
for lane_id in lanes:
lane_events = bus.replay(run_id="run1", lane_id=lane_id)
lanes[lane_id] = lane_events
for e in lane_events:
if e.lane_id != lane_id:
check(f"Lane {lane_id} contamination", False, f"found {e.lane_id}")
break
else:
check(f"Lane {lane_id} isolated ({len(lane_events)} events)", len(lane_events) == 5)
# βββ T1.2: Agent B fails; A and C continue βββ
print("\nT1.2: Fault isolation")
bus2 = EventBus()
bus2.emit(create_event("r2", EventKind.AGENT_STARTED, lane_id="a", seq=1))
bus2.emit(create_event("r2", EventKind.AGENT_FINISHED, lane_id="a", seq=2))
bus2.emit(create_event("r2", EventKind.AGENT_STARTED, lane_id="b", seq=1))
bus2.emit(create_event("r2", EventKind.AGENT_ERROR, lane_id="b", seq=2, error="crashed"))
bus2.emit(create_event("r2", EventKind.AGENT_STARTED, lane_id="c", seq=1))
bus2.emit(create_event("r2", EventKind.AGENT_FINISHED, lane_id="c", seq=2))
a_events = bus2.replay(run_id="r2", lane_id="a")
b_events = bus2.replay(run_id="r2", lane_id="b")
c_events = bus2.replay(run_id="r2", lane_id="c")
check("Lane A completed", any(e.kind == EventKind.AGENT_FINISHED for e in a_events))
check("Lane B errored", any(e.kind == EventKind.AGENT_ERROR for e in b_events))
check("Lane C completed despite B failure", any(e.kind == EventKind.AGENT_FINISHED for e in c_events))
# βββ T1.3: Seq-monotonic per lane βββ
print("\nT1.3: Sequence monotonicity")
bus3 = EventBus()
for i in range(10):
bus3.emit(create_event("r3", EventKind.AGENT_PROGRESS, lane_id="main", seq=i+1))
events = bus3.replay(run_id="r3")
seqs = [e.seq for e in events]
check("Seq monotonic", seqs == sorted(seqs), f"seqs={seqs}")
check("No gaps", seqs == list(range(1, 11)))
# βββ T1.4: Replay reproduces transcript βββ
print("\nT1.4: Replay fidelity")
bus4 = EventBus()
original = []
for i in range(5):
e = create_event("r4", EventKind.TEXT_DELTA, seq=i+1, text=f"word_{i}")
bus4.emit(e)
original.append(e.to_dict())
replayed = [e.to_dict() for e in bus4.replay(run_id="r4")]
orig_payloads = [d["payload"] for d in original]
replay_payloads = [d["payload"] for d in replayed]
check("Replay matches original", orig_payloads == replay_payloads)
# βββ T1.5: reasoning.summary OK; hidden_chain_of_thought rejected βββ
print("\nT1.5: Safety β no raw chain-of-thought")
bus5 = EventBus()
safe_event = create_event("r5", EventKind.REASONING_SUMMARY, seq=1,
summary="I should check edge cases first")
bus5.emit(safe_event)
check("reasoning.summary accepted", bus5.history_size == 1)
unsafe_event = create_event("r5", EventKind.REASONING_SUMMARY, seq=2,
hidden_chain_of_thought="secret reasoning here")
bus5.emit(unsafe_event)
check("hidden_chain_of_thought rejected", bus5.history_size == 1,
f"history has {bus5.history_size} events (should still be 1)")
# βββ T1.6: Backpressure doesn't drop terminal events βββ
print("\nT1.6: Terminal event delivery guarantee")
async def test_backpressure():
bus6 = EventBus(max_queue_size=3)
received = []
consumer_started = asyncio.Event()
async def consumer():
consumer_started.set()
try:
async for event in bus6.subscribe():
received.append(event)
await asyncio.sleep(0.01)
except asyncio.CancelledError:
pass
task = asyncio.create_task(consumer())
await consumer_started.wait()
await asyncio.sleep(0.05)
for i in range(20):
bus6.emit(create_event("r6", EventKind.TEXT_DELTA, seq=i, text=f"w{i}"))
bus6.emit(create_event("r6", EventKind.RUN_FINISHED, seq=99, result="done"))
await asyncio.sleep(1.0)
bus6.close()
task.cancel()
try:
await asyncio.wait_for(task, timeout=2.0)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
has_terminal = any(e.kind == EventKind.RUN_FINISHED for e in received)
return has_terminal
has_terminal = asyncio.run(test_backpressure())
check("Terminal event delivered under backpressure", has_terminal)
# βββ T1.7: Legacy StreamEvent compat βββ
print("\nT1.7: Legacy StreamEvent compatibility")
from purpose_agent.streaming_v3 import pa_event_to_stream_event
from purpose_agent.streaming import StreamEvent
event = create_event("r7", EventKind.RUN_STARTED, seq=1, purpose="test")
legacy = pa_event_to_stream_event(event)
check("PAEvent β StreamEvent", legacy is not None)
check("StreamEvent has event_type", hasattr(legacy, "event_type"))
check("StreamEvent type is task_start", legacy.event_type == "task_start")
# βββ parallel_merge βββ
print("\nT1.x: parallel_merge determinism")
lane_a = [create_event("r8", EventKind.AGENT_PROGRESS, lane_id="a", seq=i, ts=1000+i) for i in range(3)]
lane_b = [create_event("r8", EventKind.AGENT_PROGRESS, lane_id="b", seq=i, ts=1000+i+0.5) for i in range(3)]
merged = parallel_merge({"a": lane_a, "b": lane_b})
check("Merged has all events", len(merged) == 6)
check("Merged is timestamp-sorted", all(merged[i].ts <= merged[i+1].ts for i in range(5)))
# βββ REPORT βββ
print(f"\n{'='*50}")
print(f" Sprint 1 Tests: {PASS} pass, {FAIL} fail")
print(f" {'ALL PASS β' if FAIL == 0 else f'{FAIL} FAILURES'}")
print(f"{'='*50}")
sys.exit(0 if FAIL == 0 else 1)
|