Rohan03 commited on
Commit
8f83eeb
Β·
verified Β·
1 Parent(s): b0e5b32

Sprint 2: durable execution tests (T2.1-T2.5)

Browse files
Files changed (1) hide show
  1. tests/test_sprint2_checkpoint.py +195 -0
tests/test_sprint2_checkpoint.py ADDED
@@ -0,0 +1,195 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ Sprint 2 Tests β€” Durable Execution.
4
+
5
+ T2.1 Interrupt after node 2 in 5-node flow; resume at node 3
6
+ T2.2 Crash during tool; idempotent tool doesn't run twice on resume
7
+ T2.3 HITL checkpoint pauses and resumes with modified state
8
+ T2.4 SQLite checkpointer survives (simulated) process restart
9
+ T2.5 JSONL event log reconstructs run transcript
10
+ """
11
+ import sys
12
+ import os
13
+ import json
14
+ import tempfile
15
+ import time
16
+
17
+ sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
18
+
19
+ PASS = 0
20
+ FAIL = 0
21
+
22
+ def check(name, condition, detail=""):
23
+ global PASS, FAIL
24
+ if condition:
25
+ PASS += 1
26
+ print(f" βœ“ {name}")
27
+ else:
28
+ FAIL += 1
29
+ print(f" βœ— {name}" + (f": {detail}" if detail else ""))
30
+
31
+
32
+ from purpose_agent.runtime.events import PAEvent, EventKind, create_event
33
+ from purpose_agent.runtime.state import RunState, RunStatus, NodeState
34
+ from purpose_agent.runtime.checkpoint import (
35
+ InMemoryCheckpointer, JSONLCheckpointer, SQLiteCheckpointer,
36
+ )
37
+
38
+
39
+ # ═══ T2.1: Interrupt and resume ═══
40
+ print("T2.1: Interrupt after node 2, resume at node 3")
41
+
42
+ state = RunState(run_id="flow1", purpose="5 node flow", max_steps=5)
43
+ checkpointer = InMemoryCheckpointer()
44
+
45
+ # Simulate nodes 1-2 completing
46
+ for i in range(1, 3):
47
+ state.mark_node_started(f"node_{i}")
48
+ state.mark_node_completed(f"node_{i}", output={"result": f"done_{i}"})
49
+ state.current_step = i
50
+ checkpointer.save_event(create_event("flow1", EventKind.AGENT_FINISHED, seq=i, node=f"node_{i}"))
51
+
52
+ # Save checkpoint (simulating interrupt)
53
+ checkpointer.save_snapshot("flow1", state)
54
+
55
+ # "Restart" β€” load from checkpoint
56
+ restored = checkpointer.load_latest("flow1")
57
+ check("State restored", restored is not None)
58
+ check("Correct step (2)", restored.current_step == 2)
59
+ check("2 nodes completed", len(restored.completed_nodes) == 2)
60
+ check("Node 3 not started", "node_3" not in restored.completed_nodes)
61
+
62
+ # Resume from node 3
63
+ restored.mark_node_started("node_3")
64
+ check("Resume at node 3", restored.current_node == "node_3")
65
+
66
+
67
+ # ═══ T2.2: Idempotent tool calls ═══
68
+ print("\nT2.2: Idempotent tool β€” no double execution")
69
+
70
+ state2 = RunState(run_id="tool_test")
71
+ state2.current_step = 1
72
+
73
+ # First execution β€” tool runs
74
+ key = state2.get_idempotency_key("calculator", "2+2")
75
+ check("No cached result initially", not state2.has_cached_result(key))
76
+
77
+ # Tool executes, result cached
78
+ state2.cache_result(key, "4")
79
+ check("Result cached", state2.has_cached_result(key))
80
+ check("Cached value correct", state2.get_cached_result(key) == "4")
81
+
82
+ # On "resume" β€” same key, should use cached result
83
+ key2 = state2.get_idempotency_key("calculator", "2+2")
84
+ check("Same key on resume", key == key2)
85
+ check("Uses cached (idempotent)", state2.has_cached_result(key2))
86
+
87
+
88
+ # ═══ T2.3: HITL checkpoint pause/resume ═══
89
+ print("\nT2.3: HITL pause and resume with modified state")
90
+
91
+ state3 = RunState(run_id="hitl_test", status=RunStatus.RUNNING)
92
+ state3.data = {"task": "review code", "approval": None}
93
+ checkpointer.save_snapshot("hitl_test", state3)
94
+
95
+ # Simulate pause for human
96
+ state3.status = RunStatus.PAUSED
97
+ checkpointer.save_snapshot("hitl_test", state3)
98
+
99
+ # "Human" modifies state
100
+ loaded = checkpointer.load_latest("hitl_test")
101
+ check("Loaded paused state", loaded.status == RunStatus.PAUSED)
102
+ loaded.data["approval"] = "approved"
103
+ loaded.status = RunStatus.RUNNING
104
+ checkpointer.save_snapshot("hitl_test", loaded)
105
+
106
+ # Resume
107
+ final = checkpointer.load_latest("hitl_test")
108
+ check("Human modification preserved", final.data["approval"] == "approved")
109
+ check("Status back to running", final.status == RunStatus.RUNNING)
110
+
111
+
112
+ # ═══ T2.4: SQLite survives "restart" ═══
113
+ print("\nT2.4: SQLite durability")
114
+
115
+ with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
116
+ db_path = f.name
117
+
118
+ try:
119
+ # First "process"
120
+ cp1 = SQLiteCheckpointer(db_path)
121
+ state4 = RunState(run_id="sqlite_test", purpose="durable", current_step=7)
122
+ state4.data = {"progress": "midway"}
123
+ cp1.save_snapshot("sqlite_test", state4)
124
+ for i in range(5):
125
+ cp1.save_event(create_event("sqlite_test", EventKind.AGENT_PROGRESS, seq=i+1))
126
+ del cp1 # "Process dies"
127
+
128
+ # Second "process" β€” new connection
129
+ cp2 = SQLiteCheckpointer(db_path)
130
+ restored4 = cp2.load_latest("sqlite_test")
131
+ events4 = cp2.list_events("sqlite_test")
132
+ check("SQLite snapshot survives", restored4 is not None)
133
+ check("SQLite state correct", restored4.current_step == 7)
134
+ check("SQLite data correct", restored4.data.get("progress") == "midway")
135
+ check("SQLite events survive", len(events4) == 5)
136
+ check("SQLite lists runs", "sqlite_test" in cp2.list_runs())
137
+ finally:
138
+ os.unlink(db_path)
139
+
140
+
141
+ # ═══ T2.5: JSONL event log reconstruction ═══
142
+ print("\nT2.5: JSONL event log β†’ transcript reconstruction")
143
+
144
+ with tempfile.TemporaryDirectory() as tmpdir:
145
+ cp = JSONLCheckpointer(tmpdir)
146
+
147
+ # Write events
148
+ for i in range(10):
149
+ cp.save_event(create_event("jsonl_test", EventKind.AGENT_PROGRESS, seq=i+1,
150
+ message=f"step {i+1}"))
151
+
152
+ # Save snapshot
153
+ s = RunState(run_id="jsonl_test", current_step=10)
154
+ cp.save_snapshot("jsonl_test", s)
155
+
156
+ # Reconstruct
157
+ events = cp.list_events("jsonl_test")
158
+ check("JSONL has all events", len(events) == 10)
159
+ check("Events ordered", all(events[i].seq <= events[i+1].seq for i in range(9)))
160
+ check("Payloads preserved", events[0].payload.get("message") == "step 1")
161
+ check("Snapshot loads", cp.load_latest("jsonl_test") is not None)
162
+
163
+ # Partial replay (since_seq)
164
+ partial = cp.list_events("jsonl_test", since_seq=5)
165
+ check("Partial replay works", len(partial) == 5)
166
+ check("Partial starts at seq 6", partial[0].seq == 6)
167
+
168
+
169
+ # ═══ T2.x: RunState serialization roundtrip ═══
170
+ print("\nT2.x: RunState to_dict/from_dict roundtrip")
171
+
172
+ original = RunState(
173
+ run_id="rt_test", session_id="sess1", status=RunStatus.PAUSED,
174
+ purpose="test roundtrip", current_node="node_2", current_step=3,
175
+ data={"key": "value"}, completed_nodes=["node_1"],
176
+ )
177
+ original.mark_node_started("node_2")
178
+ original.cache_result("tool:calc:2+2", "4")
179
+
180
+ d = original.to_dict()
181
+ restored_rt = RunState.from_dict(d)
182
+ check("run_id roundtrip", restored_rt.run_id == "rt_test")
183
+ check("status roundtrip", restored_rt.status == RunStatus.PAUSED)
184
+ check("current_node roundtrip", restored_rt.current_node == "node_2")
185
+ check("data roundtrip", restored_rt.data == {"key": "value"})
186
+ check("tool_cache roundtrip", restored_rt.has_cached_result("tool:calc:2+2"))
187
+ check("completed_nodes roundtrip", "node_1" in restored_rt.completed_nodes)
188
+
189
+
190
+ # ═══ REPORT ═══
191
+ print(f"\n{'='*50}")
192
+ print(f" Sprint 2 Tests: {PASS} pass, {FAIL} fail")
193
+ print(f" {'ALL PASS βœ“' if FAIL == 0 else f'{FAIL} FAILURES'}")
194
+ print(f"{'='*50}")
195
+ sys.exit(0 if FAIL == 0 else 1)