| |
| """ |
| Sprint 1 Tests β Canonical Event Runtime. |
| |
| T1.1 Three agents stream in parallel; no lane contamination |
| T1.2 Agent B fails; Agent A and C continue |
| T1.3 Events in each lane are strictly seq-monotonic |
| T1.4 Stream replay reproduces same visible event transcript |
| T1.5 reasoning.summary allowed; raw hidden_chain_of_thought rejected |
| T1.6 Backpressure does not drop terminal events |
| T1.7 Legacy StreamEvent consumers still work |
| """ |
| import sys |
| import os |
| import asyncio |
| import time |
|
|
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) |
|
|
| PASS = 0 |
| FAIL = 0 |
|
|
| def check(name, condition, detail=""): |
| global PASS, FAIL |
| if condition: |
| PASS += 1 |
| print(f" β {name}") |
| else: |
| FAIL += 1 |
| print(f" β {name}" + (f": {detail}" if detail else "")) |
|
|
|
|
| from purpose_agent.runtime.events import PAEvent, EventKind, Visibility, create_event |
| from purpose_agent.runtime.event_bus import EventBus, parallel_merge |
|
|
|
|
| |
| print("T1.1: Parallel lane isolation") |
| bus = EventBus(max_queue_size=100) |
| lanes = {"lane_a": [], "lane_b": [], "lane_c": []} |
|
|
| for lane_id in lanes: |
| for i in range(5): |
| event = create_event( |
| run_id="run1", kind=EventKind.AGENT_PROGRESS, |
| lane_id=lane_id, seq=i+1, |
| message=f"{lane_id} step {i+1}", |
| ) |
| bus.emit(event) |
|
|
| |
| for lane_id in lanes: |
| lane_events = bus.replay(run_id="run1", lane_id=lane_id) |
| lanes[lane_id] = lane_events |
| for e in lane_events: |
| if e.lane_id != lane_id: |
| check(f"Lane {lane_id} contamination", False, f"found {e.lane_id}") |
| break |
| else: |
| check(f"Lane {lane_id} isolated ({len(lane_events)} events)", len(lane_events) == 5) |
|
|
|
|
| |
| print("\nT1.2: Fault isolation") |
| bus2 = EventBus() |
| bus2.emit(create_event("r2", EventKind.AGENT_STARTED, lane_id="a", seq=1)) |
| bus2.emit(create_event("r2", EventKind.AGENT_FINISHED, lane_id="a", seq=2)) |
| bus2.emit(create_event("r2", EventKind.AGENT_STARTED, lane_id="b", seq=1)) |
| bus2.emit(create_event("r2", EventKind.AGENT_ERROR, lane_id="b", seq=2, error="crashed")) |
| bus2.emit(create_event("r2", EventKind.AGENT_STARTED, lane_id="c", seq=1)) |
| bus2.emit(create_event("r2", EventKind.AGENT_FINISHED, lane_id="c", seq=2)) |
|
|
| a_events = bus2.replay(run_id="r2", lane_id="a") |
| b_events = bus2.replay(run_id="r2", lane_id="b") |
| c_events = bus2.replay(run_id="r2", lane_id="c") |
| check("Lane A completed", any(e.kind == EventKind.AGENT_FINISHED for e in a_events)) |
| check("Lane B errored", any(e.kind == EventKind.AGENT_ERROR for e in b_events)) |
| check("Lane C completed despite B failure", any(e.kind == EventKind.AGENT_FINISHED for e in c_events)) |
|
|
|
|
| |
| print("\nT1.3: Sequence monotonicity") |
| bus3 = EventBus() |
| for i in range(10): |
| bus3.emit(create_event("r3", EventKind.AGENT_PROGRESS, lane_id="main", seq=i+1)) |
|
|
| events = bus3.replay(run_id="r3") |
| seqs = [e.seq for e in events] |
| check("Seq monotonic", seqs == sorted(seqs), f"seqs={seqs}") |
| check("No gaps", seqs == list(range(1, 11))) |
|
|
|
|
| |
| print("\nT1.4: Replay fidelity") |
| bus4 = EventBus() |
| original = [] |
| for i in range(5): |
| e = create_event("r4", EventKind.TEXT_DELTA, seq=i+1, text=f"word_{i}") |
| bus4.emit(e) |
| original.append(e.to_dict()) |
|
|
| replayed = [e.to_dict() for e in bus4.replay(run_id="r4")] |
| orig_payloads = [d["payload"] for d in original] |
| replay_payloads = [d["payload"] for d in replayed] |
| check("Replay matches original", orig_payloads == replay_payloads) |
|
|
|
|
| |
| print("\nT1.5: Safety β no raw chain-of-thought") |
| bus5 = EventBus() |
|
|
| safe_event = create_event("r5", EventKind.REASONING_SUMMARY, seq=1, |
| summary="I should check edge cases first") |
| bus5.emit(safe_event) |
| check("reasoning.summary accepted", bus5.history_size == 1) |
|
|
| unsafe_event = create_event("r5", EventKind.REASONING_SUMMARY, seq=2, |
| hidden_chain_of_thought="secret reasoning here") |
| bus5.emit(unsafe_event) |
| check("hidden_chain_of_thought rejected", bus5.history_size == 1, |
| f"history has {bus5.history_size} events (should still be 1)") |
|
|
|
|
| |
| print("\nT1.6: Terminal event delivery guarantee") |
|
|
| async def test_backpressure(): |
| bus6 = EventBus(max_queue_size=3) |
| received = [] |
| consumer_started = asyncio.Event() |
|
|
| async def consumer(): |
| consumer_started.set() |
| try: |
| async for event in bus6.subscribe(): |
| received.append(event) |
| await asyncio.sleep(0.01) |
| except asyncio.CancelledError: |
| pass |
|
|
| task = asyncio.create_task(consumer()) |
| await consumer_started.wait() |
| await asyncio.sleep(0.05) |
|
|
| for i in range(20): |
| bus6.emit(create_event("r6", EventKind.TEXT_DELTA, seq=i, text=f"w{i}")) |
|
|
| bus6.emit(create_event("r6", EventKind.RUN_FINISHED, seq=99, result="done")) |
|
|
| await asyncio.sleep(1.0) |
| bus6.close() |
| task.cancel() |
| try: |
| await asyncio.wait_for(task, timeout=2.0) |
| except (asyncio.CancelledError, asyncio.TimeoutError): |
| pass |
|
|
| has_terminal = any(e.kind == EventKind.RUN_FINISHED for e in received) |
| return has_terminal |
|
|
| has_terminal = asyncio.run(test_backpressure()) |
| check("Terminal event delivered under backpressure", has_terminal) |
|
|
|
|
| |
| print("\nT1.7: Legacy StreamEvent compatibility") |
| from purpose_agent.streaming_v3 import pa_event_to_stream_event |
| from purpose_agent.streaming import StreamEvent |
|
|
| event = create_event("r7", EventKind.RUN_STARTED, seq=1, purpose="test") |
| legacy = pa_event_to_stream_event(event) |
| check("PAEvent β StreamEvent", legacy is not None) |
| check("StreamEvent has event_type", hasattr(legacy, "event_type")) |
| check("StreamEvent type is task_start", legacy.event_type == "task_start") |
|
|
|
|
| |
| print("\nT1.x: parallel_merge determinism") |
| lane_a = [create_event("r8", EventKind.AGENT_PROGRESS, lane_id="a", seq=i, ts=1000+i) for i in range(3)] |
| lane_b = [create_event("r8", EventKind.AGENT_PROGRESS, lane_id="b", seq=i, ts=1000+i+0.5) for i in range(3)] |
| merged = parallel_merge({"a": lane_a, "b": lane_b}) |
| check("Merged has all events", len(merged) == 6) |
| check("Merged is timestamp-sorted", all(merged[i].ts <= merged[i+1].ts for i in range(5))) |
|
|
|
|
| |
| print(f"\n{'='*50}") |
| print(f" Sprint 1 Tests: {PASS} pass, {FAIL} fail") |
| print(f" {'ALL PASS β' if FAIL == 0 else f'{FAIL} FAILURES'}") |
| print(f"{'='*50}") |
| sys.exit(0 if FAIL == 0 else 1) |
|
|