#!/usr/bin/env python3 """ Sprint 1 Tests — Canonical Event Runtime. T1.1 Three agents stream in parallel; no lane contamination T1.2 Agent B fails; Agent A and C continue T1.3 Events in each lane are strictly seq-monotonic T1.4 Stream replay reproduces same visible event transcript T1.5 reasoning.summary allowed; raw hidden_chain_of_thought rejected T1.6 Backpressure does not drop terminal events T1.7 Legacy StreamEvent consumers still work """ import sys import os import asyncio import time sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) PASS = 0 FAIL = 0 def check(name, condition, detail=""): global PASS, FAIL if condition: PASS += 1 print(f" ✓ {name}") else: FAIL += 1 print(f" ✗ {name}" + (f": {detail}" if detail else "")) from purpose_agent.runtime.events import PAEvent, EventKind, Visibility, create_event from purpose_agent.runtime.event_bus import EventBus, parallel_merge # ═══ T1.1: Three parallel lanes, no contamination ═══ print("T1.1: Parallel lane isolation") bus = EventBus(max_queue_size=100) lanes = {"lane_a": [], "lane_b": [], "lane_c": []} for lane_id in lanes: for i in range(5): event = create_event( run_id="run1", kind=EventKind.AGENT_PROGRESS, lane_id=lane_id, seq=i+1, message=f"{lane_id} step {i+1}", ) bus.emit(event) # Replay per lane for lane_id in lanes: lane_events = bus.replay(run_id="run1", lane_id=lane_id) lanes[lane_id] = lane_events for e in lane_events: if e.lane_id != lane_id: check(f"Lane {lane_id} contamination", False, f"found {e.lane_id}") break else: check(f"Lane {lane_id} isolated ({len(lane_events)} events)", len(lane_events) == 5) # ═══ T1.2: Agent B fails; A and C continue ═══ print("\nT1.2: Fault isolation") bus2 = EventBus() bus2.emit(create_event("r2", EventKind.AGENT_STARTED, lane_id="a", seq=1)) bus2.emit(create_event("r2", EventKind.AGENT_FINISHED, lane_id="a", seq=2)) bus2.emit(create_event("r2", EventKind.AGENT_STARTED, lane_id="b", seq=1)) bus2.emit(create_event("r2", EventKind.AGENT_ERROR, lane_id="b", seq=2, error="crashed")) bus2.emit(create_event("r2", EventKind.AGENT_STARTED, lane_id="c", seq=1)) bus2.emit(create_event("r2", EventKind.AGENT_FINISHED, lane_id="c", seq=2)) a_events = bus2.replay(run_id="r2", lane_id="a") b_events = bus2.replay(run_id="r2", lane_id="b") c_events = bus2.replay(run_id="r2", lane_id="c") check("Lane A completed", any(e.kind == EventKind.AGENT_FINISHED for e in a_events)) check("Lane B errored", any(e.kind == EventKind.AGENT_ERROR for e in b_events)) check("Lane C completed despite B failure", any(e.kind == EventKind.AGENT_FINISHED for e in c_events)) # ═══ T1.3: Seq-monotonic per lane ═══ print("\nT1.3: Sequence monotonicity") bus3 = EventBus() for i in range(10): bus3.emit(create_event("r3", EventKind.AGENT_PROGRESS, lane_id="main", seq=i+1)) events = bus3.replay(run_id="r3") seqs = [e.seq for e in events] check("Seq monotonic", seqs == sorted(seqs), f"seqs={seqs}") check("No gaps", seqs == list(range(1, 11))) # ═══ T1.4: Replay reproduces transcript ═══ print("\nT1.4: Replay fidelity") bus4 = EventBus() original = [] for i in range(5): e = create_event("r4", EventKind.TEXT_DELTA, seq=i+1, text=f"word_{i}") bus4.emit(e) original.append(e.to_dict()) replayed = [e.to_dict() for e in bus4.replay(run_id="r4")] orig_payloads = [d["payload"] for d in original] replay_payloads = [d["payload"] for d in replayed] check("Replay matches original", orig_payloads == replay_payloads) # ═══ T1.5: reasoning.summary OK; hidden_chain_of_thought rejected ═══ print("\nT1.5: Safety — no raw chain-of-thought") bus5 = EventBus() safe_event = create_event("r5", EventKind.REASONING_SUMMARY, seq=1, summary="I should check edge cases first") bus5.emit(safe_event) check("reasoning.summary accepted", bus5.history_size == 1) unsafe_event = create_event("r5", EventKind.REASONING_SUMMARY, seq=2, hidden_chain_of_thought="secret reasoning here") bus5.emit(unsafe_event) check("hidden_chain_of_thought rejected", bus5.history_size == 1, f"history has {bus5.history_size} events (should still be 1)") # ═══ T1.6: Backpressure doesn't drop terminal events ═══ print("\nT1.6: Terminal event delivery guarantee") async def test_backpressure(): bus6 = EventBus(max_queue_size=3) received = [] consumer_started = asyncio.Event() async def consumer(): consumer_started.set() try: async for event in bus6.subscribe(): received.append(event) await asyncio.sleep(0.01) except asyncio.CancelledError: pass task = asyncio.create_task(consumer()) await consumer_started.wait() await asyncio.sleep(0.05) for i in range(20): bus6.emit(create_event("r6", EventKind.TEXT_DELTA, seq=i, text=f"w{i}")) bus6.emit(create_event("r6", EventKind.RUN_FINISHED, seq=99, result="done")) await asyncio.sleep(1.0) bus6.close() task.cancel() try: await asyncio.wait_for(task, timeout=2.0) except (asyncio.CancelledError, asyncio.TimeoutError): pass has_terminal = any(e.kind == EventKind.RUN_FINISHED for e in received) return has_terminal has_terminal = asyncio.run(test_backpressure()) check("Terminal event delivered under backpressure", has_terminal) # ═══ T1.7: Legacy StreamEvent compat ═══ print("\nT1.7: Legacy StreamEvent compatibility") from purpose_agent.streaming_v3 import pa_event_to_stream_event from purpose_agent.streaming import StreamEvent event = create_event("r7", EventKind.RUN_STARTED, seq=1, purpose="test") legacy = pa_event_to_stream_event(event) check("PAEvent → StreamEvent", legacy is not None) check("StreamEvent has event_type", hasattr(legacy, "event_type")) check("StreamEvent type is task_start", legacy.event_type == "task_start") # ═══ parallel_merge ═══ print("\nT1.x: parallel_merge determinism") lane_a = [create_event("r8", EventKind.AGENT_PROGRESS, lane_id="a", seq=i, ts=1000+i) for i in range(3)] lane_b = [create_event("r8", EventKind.AGENT_PROGRESS, lane_id="b", seq=i, ts=1000+i+0.5) for i in range(3)] merged = parallel_merge({"a": lane_a, "b": lane_b}) check("Merged has all events", len(merged) == 6) check("Merged is timestamp-sorted", all(merged[i].ts <= merged[i+1].ts for i in range(5))) # ═══ REPORT ═══ print(f"\n{'='*50}") print(f" Sprint 1 Tests: {PASS} pass, {FAIL} fail") print(f" {'ALL PASS ✓' if FAIL == 0 else f'{FAIL} FAILURES'}") print(f"{'='*50}") sys.exit(0 if FAIL == 0 else 1)