purpose-agent / tests /test_sprint1_events.py
Rohan03's picture
Fix Issue 2: Make backpressure test T1.6 more robust with consumer_started sync
59578ae verified
#!/usr/bin/env python3
"""
Sprint 1 Tests β€” Canonical Event Runtime.
T1.1 Three agents stream in parallel; no lane contamination
T1.2 Agent B fails; Agent A and C continue
T1.3 Events in each lane are strictly seq-monotonic
T1.4 Stream replay reproduces same visible event transcript
T1.5 reasoning.summary allowed; raw hidden_chain_of_thought rejected
T1.6 Backpressure does not drop terminal events
T1.7 Legacy StreamEvent consumers still work
"""
import sys
import os
import asyncio
import time
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
PASS = 0
FAIL = 0
def check(name, condition, detail=""):
global PASS, FAIL
if condition:
PASS += 1
print(f" βœ“ {name}")
else:
FAIL += 1
print(f" βœ— {name}" + (f": {detail}" if detail else ""))
from purpose_agent.runtime.events import PAEvent, EventKind, Visibility, create_event
from purpose_agent.runtime.event_bus import EventBus, parallel_merge
# ═══ T1.1: Three parallel lanes, no contamination ═══
print("T1.1: Parallel lane isolation")
bus = EventBus(max_queue_size=100)
lanes = {"lane_a": [], "lane_b": [], "lane_c": []}
for lane_id in lanes:
for i in range(5):
event = create_event(
run_id="run1", kind=EventKind.AGENT_PROGRESS,
lane_id=lane_id, seq=i+1,
message=f"{lane_id} step {i+1}",
)
bus.emit(event)
# Replay per lane
for lane_id in lanes:
lane_events = bus.replay(run_id="run1", lane_id=lane_id)
lanes[lane_id] = lane_events
for e in lane_events:
if e.lane_id != lane_id:
check(f"Lane {lane_id} contamination", False, f"found {e.lane_id}")
break
else:
check(f"Lane {lane_id} isolated ({len(lane_events)} events)", len(lane_events) == 5)
# ═══ T1.2: Agent B fails; A and C continue ═══
print("\nT1.2: Fault isolation")
bus2 = EventBus()
bus2.emit(create_event("r2", EventKind.AGENT_STARTED, lane_id="a", seq=1))
bus2.emit(create_event("r2", EventKind.AGENT_FINISHED, lane_id="a", seq=2))
bus2.emit(create_event("r2", EventKind.AGENT_STARTED, lane_id="b", seq=1))
bus2.emit(create_event("r2", EventKind.AGENT_ERROR, lane_id="b", seq=2, error="crashed"))
bus2.emit(create_event("r2", EventKind.AGENT_STARTED, lane_id="c", seq=1))
bus2.emit(create_event("r2", EventKind.AGENT_FINISHED, lane_id="c", seq=2))
a_events = bus2.replay(run_id="r2", lane_id="a")
b_events = bus2.replay(run_id="r2", lane_id="b")
c_events = bus2.replay(run_id="r2", lane_id="c")
check("Lane A completed", any(e.kind == EventKind.AGENT_FINISHED for e in a_events))
check("Lane B errored", any(e.kind == EventKind.AGENT_ERROR for e in b_events))
check("Lane C completed despite B failure", any(e.kind == EventKind.AGENT_FINISHED for e in c_events))
# ═══ T1.3: Seq-monotonic per lane ═══
print("\nT1.3: Sequence monotonicity")
bus3 = EventBus()
for i in range(10):
bus3.emit(create_event("r3", EventKind.AGENT_PROGRESS, lane_id="main", seq=i+1))
events = bus3.replay(run_id="r3")
seqs = [e.seq for e in events]
check("Seq monotonic", seqs == sorted(seqs), f"seqs={seqs}")
check("No gaps", seqs == list(range(1, 11)))
# ═══ T1.4: Replay reproduces transcript ═══
print("\nT1.4: Replay fidelity")
bus4 = EventBus()
original = []
for i in range(5):
e = create_event("r4", EventKind.TEXT_DELTA, seq=i+1, text=f"word_{i}")
bus4.emit(e)
original.append(e.to_dict())
replayed = [e.to_dict() for e in bus4.replay(run_id="r4")]
orig_payloads = [d["payload"] for d in original]
replay_payloads = [d["payload"] for d in replayed]
check("Replay matches original", orig_payloads == replay_payloads)
# ═══ T1.5: reasoning.summary OK; hidden_chain_of_thought rejected ═══
print("\nT1.5: Safety β€” no raw chain-of-thought")
bus5 = EventBus()
safe_event = create_event("r5", EventKind.REASONING_SUMMARY, seq=1,
summary="I should check edge cases first")
bus5.emit(safe_event)
check("reasoning.summary accepted", bus5.history_size == 1)
unsafe_event = create_event("r5", EventKind.REASONING_SUMMARY, seq=2,
hidden_chain_of_thought="secret reasoning here")
bus5.emit(unsafe_event)
check("hidden_chain_of_thought rejected", bus5.history_size == 1,
f"history has {bus5.history_size} events (should still be 1)")
# ═══ T1.6: Backpressure doesn't drop terminal events ═══
print("\nT1.6: Terminal event delivery guarantee")
async def test_backpressure():
bus6 = EventBus(max_queue_size=3)
received = []
consumer_started = asyncio.Event()
async def consumer():
consumer_started.set()
try:
async for event in bus6.subscribe():
received.append(event)
await asyncio.sleep(0.01)
except asyncio.CancelledError:
pass
task = asyncio.create_task(consumer())
await consumer_started.wait()
await asyncio.sleep(0.05)
for i in range(20):
bus6.emit(create_event("r6", EventKind.TEXT_DELTA, seq=i, text=f"w{i}"))
bus6.emit(create_event("r6", EventKind.RUN_FINISHED, seq=99, result="done"))
await asyncio.sleep(1.0)
bus6.close()
task.cancel()
try:
await asyncio.wait_for(task, timeout=2.0)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
has_terminal = any(e.kind == EventKind.RUN_FINISHED for e in received)
return has_terminal
has_terminal = asyncio.run(test_backpressure())
check("Terminal event delivered under backpressure", has_terminal)
# ═══ T1.7: Legacy StreamEvent compat ═══
print("\nT1.7: Legacy StreamEvent compatibility")
from purpose_agent.streaming_v3 import pa_event_to_stream_event
from purpose_agent.streaming import StreamEvent
event = create_event("r7", EventKind.RUN_STARTED, seq=1, purpose="test")
legacy = pa_event_to_stream_event(event)
check("PAEvent β†’ StreamEvent", legacy is not None)
check("StreamEvent has event_type", hasattr(legacy, "event_type"))
check("StreamEvent type is task_start", legacy.event_type == "task_start")
# ═══ parallel_merge ═══
print("\nT1.x: parallel_merge determinism")
lane_a = [create_event("r8", EventKind.AGENT_PROGRESS, lane_id="a", seq=i, ts=1000+i) for i in range(3)]
lane_b = [create_event("r8", EventKind.AGENT_PROGRESS, lane_id="b", seq=i, ts=1000+i+0.5) for i in range(3)]
merged = parallel_merge({"a": lane_a, "b": lane_b})
check("Merged has all events", len(merged) == 6)
check("Merged is timestamp-sorted", all(merged[i].ts <= merged[i+1].ts for i in range(5)))
# ═══ REPORT ═══
print(f"\n{'='*50}")
print(f" Sprint 1 Tests: {PASS} pass, {FAIL} fail")
print(f" {'ALL PASS βœ“' if FAIL == 0 else f'{FAIL} FAILURES'}")
print(f"{'='*50}")
sys.exit(0 if FAIL == 0 else 1)