Pablo commited on
Commit
be03608
·
1 Parent(s): bd7899d

PBKVPredictor: 2nd-order Markov model, 19 tests (stub → production)

Browse files
Files changed (1) hide show
  1. tests/test_pbkv_predictor.py +249 -9
tests/test_pbkv_predictor.py CHANGED
@@ -1,13 +1,20 @@
1
- """Tests for PBKVPredictor — TASK-013."""
2
- import pytest
3
  import json
 
4
  import tempfile
5
  from pathlib import Path
6
- from contextforge.scheduling.pbkv_predictor import PBKVPredictor, WorkflowStepRecord, PredictionResult
 
 
 
 
 
7
 
8
 
9
  class TestPBKVPredictor:
10
- """Tests for PBKV predictor stub."""
 
 
11
 
12
  @pytest.mark.asyncio
13
  async def test_log_workflow_step(self):
@@ -28,7 +35,7 @@ class TestPBKVPredictor:
28
 
29
  @pytest.mark.asyncio
30
  async def test_predict_next_agents_returns_prediction_result(self):
31
- """predict_next_agents() returns PredictionResult."""
32
  with tempfile.TemporaryDirectory() as tmpdir:
33
  predictor = PBKVPredictor(log_dir=tmpdir, max_history_steps=10)
34
 
@@ -42,7 +49,9 @@ class TestPBKVPredictor:
42
  cla_group=i % 2,
43
  )
44
 
45
- result = await predictor.predict_next_agents("agent_0", current_step=3, num_predictions=2)
 
 
46
 
47
  assert isinstance(result, PredictionResult)
48
  assert isinstance(result.predicted_agents, list)
@@ -54,7 +63,9 @@ class TestPBKVPredictor:
54
  with tempfile.TemporaryDirectory() as tmpdir:
55
  predictor = PBKVPredictor(log_dir=tmpdir, max_history_steps=10)
56
 
57
- result = await predictor.predict_next_agents("agent_0", current_step=0, num_predictions=3)
 
 
58
 
59
  assert isinstance(result, PredictionResult)
60
  # Empty history → confidence 0, returns current agent as fallback
@@ -62,7 +73,7 @@ class TestPBKVPredictor:
62
 
63
  @pytest.mark.asyncio
64
  async def test_get_prefetch_candidates(self):
65
- """get_prefetch_candidates() returns list of block IDs."""
66
  with tempfile.TemporaryDirectory() as tmpdir:
67
  predictor = PBKVPredictor(log_dir=tmpdir, max_history_steps=10)
68
 
@@ -110,4 +121,233 @@ class TestPBKVPredictor:
110
  stats = predictor.get_stats()
111
  assert stats["history_size"] == 0
112
  assert stats["max_history_steps"] == 50
113
- assert "_pbkv_logs" in stats["log_file"]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Tests for PBKVPredictor — Markov chain implementation."""
 
2
  import json
3
+ import pytest
4
  import tempfile
5
  from pathlib import Path
6
+
7
+ from contextforge.scheduling.pbkv_predictor import (
8
+ PBKVPredictor,
9
+ WorkflowStepRecord,
10
+ PredictionResult,
11
+ )
12
 
13
 
14
  class TestPBKVPredictor:
15
+ """Tests for PBKV predictor Markov chain implementation."""
16
+
17
+ # ===== Existing stub tests (backward compatibility) =====
18
 
19
  @pytest.mark.asyncio
20
  async def test_log_workflow_step(self):
 
35
 
36
  @pytest.mark.asyncio
37
  async def test_predict_next_agents_returns_prediction_result(self):
38
+ """predict_next_agents() returns PredictionResult via async path."""
39
  with tempfile.TemporaryDirectory() as tmpdir:
40
  predictor = PBKVPredictor(log_dir=tmpdir, max_history_steps=10)
41
 
 
49
  cla_group=i % 2,
50
  )
51
 
52
+ result = await predictor._predict_next_agents_async(
53
+ "agent_0", current_step=3, num_predictions=2
54
+ )
55
 
56
  assert isinstance(result, PredictionResult)
57
  assert isinstance(result.predicted_agents, list)
 
63
  with tempfile.TemporaryDirectory() as tmpdir:
64
  predictor = PBKVPredictor(log_dir=tmpdir, max_history_steps=10)
65
 
66
+ result = await predictor._predict_next_agents_async(
67
+ "agent_0", current_step=0, num_predictions=3
68
+ )
69
 
70
  assert isinstance(result, PredictionResult)
71
  # Empty history → confidence 0, returns current agent as fallback
 
73
 
74
  @pytest.mark.asyncio
75
  async def test_get_prefetch_candidates(self):
76
+ """get_prefetch_candidates() returns list of agent IDs."""
77
  with tempfile.TemporaryDirectory() as tmpdir:
78
  predictor = PBKVPredictor(log_dir=tmpdir, max_history_steps=10)
79
 
 
121
  stats = predictor.get_stats()
122
  assert stats["history_size"] == 0
123
  assert stats["max_history_steps"] == 50
124
+ assert "workflow_steps.jsonl" in stats["log_file"]
125
+ assert stats["trained"] is False
126
+
127
+ # ===== Markov chain training tests =====
128
+
129
+ def test_train_from_jsonl(self):
130
+ """train_from_jsonl() builds transition table correctly."""
131
+ with tempfile.TemporaryDirectory() as tmpdir:
132
+ log_file = Path(tmpdir) / "workflow_steps.jsonl"
133
+
134
+ # Write JSONL with known sequence: A → B → C → A → B
135
+ records = [
136
+ {"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1},
137
+ {"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1},
138
+ {"step_idx": 2, "agent_id": "C", "anchor_hash": "h2", "token_length": 10, "cla_group": 1},
139
+ {"step_idx": 3, "agent_id": "A", "anchor_hash": "h3", "token_length": 10, "cla_group": 1},
140
+ {"step_idx": 4, "agent_id": "B", "anchor_hash": "h4", "token_length": 10, "cla_group": 1},
141
+ ]
142
+ with open(log_file, "w") as f:
143
+ for rec in records:
144
+ f.write(json.dumps(rec) + "\n")
145
+
146
+ predictor = PBKVPredictor(log_dir=tmpdir)
147
+ predictor.train_from_jsonl(tmpdir)
148
+
149
+ assert predictor._trained is True
150
+ assert predictor._all_agents == {"A", "B", "C"}
151
+ # Check 2nd-order transitions exist
152
+ assert ("A", "B") in predictor._transition_table
153
+ assert ("B", "C") in predictor._transition_table
154
+ assert ("C", "A") in predictor._transition_table
155
+ assert ("A", "B") in predictor._transition_table
156
+
157
+ def test_train_from_jsonl_with_multiple_sequences(self):
158
+ """train_from_jsonl() handles multiple sequences (empty lines)."""
159
+ with tempfile.TemporaryDirectory() as tmpdir:
160
+ log_file = Path(tmpdir) / "workflow_steps.jsonl"
161
+
162
+ # Two sequences: A→B and C→D
163
+ records = [
164
+ {"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1},
165
+ {"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1},
166
+ {},
167
+ {"step_idx": 0, "agent_id": "C", "anchor_hash": "h2", "token_length": 10, "cla_group": 1},
168
+ {"step_idx": 1, "agent_id": "D", "anchor_hash": "h3", "token_length": 10, "cla_group": 1},
169
+ ]
170
+ with open(log_file, "w") as f:
171
+ for rec in records:
172
+ f.write(json.dumps(rec) + "\n")
173
+
174
+ predictor = PBKVPredictor(log_dir=tmpdir)
175
+ predictor.train_from_jsonl(tmpdir)
176
+
177
+ assert predictor._trained is True
178
+ assert predictor._all_agents == {"A", "B", "C", "D"}
179
+
180
+ def test_train_from_jsonl_missing_file(self):
181
+ """train_from_jsonl() handles missing file gracefully."""
182
+ with tempfile.TemporaryDirectory() as tmpdir:
183
+ predictor = PBKVPredictor(log_dir=tmpdir)
184
+ predictor.train_from_jsonl(str(Path(tmpdir) / "nonexistent.jsonl"))
185
+ assert predictor._trained is False
186
+
187
+ # ===== Prediction correctness tests =====
188
+
189
+ def test_predict_next_agents_sync(self):
190
+ """Synchronous predict_next_agents() returns list of agent IDs."""
191
+ with tempfile.TemporaryDirectory() as tmpdir:
192
+ predictor = PBKVPredictor(log_dir=tmpdir)
193
+
194
+ # Train with known pattern: A → B → C
195
+ log_file = Path(tmpdir) / "workflow_steps.jsonl"
196
+ records = [
197
+ {"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1},
198
+ {"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1},
199
+ {"step_idx": 2, "agent_id": "C", "anchor_hash": "h2", "token_length": 10, "cla_group": 1},
200
+ ]
201
+ with open(log_file, "w") as f:
202
+ for rec in records:
203
+ f.write(json.dumps(rec) + "\n")
204
+
205
+ predictor.train_from_jsonl(tmpdir)
206
+ predictions = predictor.predict_next_agents("B", top_k=2)
207
+
208
+ assert isinstance(predictions, list)
209
+ assert "C" in predictions # B → C is the trained transition
210
+ assert len(predictions) <= 2
211
+
212
+ def test_predict_next_agents_fallback_on_empty_history(self):
213
+ """predict_next_agents() falls back when no training data."""
214
+ with tempfile.TemporaryDirectory() as tmpdir:
215
+ predictor = PBKVPredictor(log_dir=tmpdir)
216
+
217
+ # No training, no history
218
+ predictions = predictor.predict_next_agents("X", top_k=3)
219
+
220
+ assert predictions == ["X"]
221
+
222
+ def test_predict_next_agents_fallback_1st_order(self):
223
+ """predict_next_agents() uses 1st-order when 2nd-order state unseen."""
224
+ with tempfile.TemporaryDirectory() as tmpdir:
225
+ predictor = PBKVPredictor(log_dir=tmpdir)
226
+
227
+ # Train: A → B → C (only 2nd-order state (A,B)→C)
228
+ log_file = Path(tmpdir) / "workflow_steps.jsonl"
229
+ records = [
230
+ {"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1},
231
+ {"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1},
232
+ {"step_idx": 2, "agent_id": "C", "anchor_hash": "h2", "token_length": 10, "cla_group": 1},
233
+ ]
234
+ with open(log_file, "w") as f:
235
+ for rec in records:
236
+ f.write(json.dumps(rec) + "\n")
237
+
238
+ predictor.train_from_jsonl(tmpdir)
239
+
240
+ # Query for unseen state: should fall back to 1st-order
241
+ predictions = predictor.predict_next_agents("B", top_k=1)
242
+ assert "C" in predictions
243
+
244
+ def test_predict_next_agents_top_k(self):
245
+ """predict_next_agents() respects top_k parameter."""
246
+ with tempfile.TemporaryDirectory() as tmpdir:
247
+ predictor = PBKVPredictor(log_dir=tmpdir)
248
+
249
+ log_file = Path(tmpdir) / "workflow_steps.jsonl"
250
+ records = [
251
+ {"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1},
252
+ {"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1},
253
+ {"step_idx": 2, "agent_id": "A", "anchor_hash": "h2", "token_length": 10, "cla_group": 1},
254
+ ]
255
+ with open(log_file, "w") as f:
256
+ for rec in records:
257
+ f.write(json.dumps(rec) + "\n")
258
+
259
+ predictor.train_from_jsonl(tmpdir)
260
+ predictions = predictor.predict_next_agents("B", top_k=1)
261
+
262
+ assert len(predictions) == 1
263
+
264
+ # ===== blend_alpha tests =====
265
+
266
+ def test_blend_alpha_parameter(self):
267
+ """blend_alpha is stored correctly in __init__."""
268
+ with tempfile.TemporaryDirectory() as tmpdir:
269
+ predictor = PBKVPredictor(log_dir=tmpdir, blend_alpha=0.7)
270
+ assert predictor._blend_alpha == 0.7
271
+
272
+ def test_blend_alpha_default(self):
273
+ """blend_alpha defaults to 0.6."""
274
+ with tempfile.TemporaryDirectory() as tmpdir:
275
+ predictor = PBKVPredictor(log_dir=tmpdir)
276
+ assert predictor._blend_alpha == 0.6
277
+
278
+ @pytest.mark.asyncio
279
+ async def test_get_eviction_priority_without_step_graph(self):
280
+ """get_eviction_priority() works without AgentStepGraph."""
281
+ with tempfile.TemporaryDirectory() as tmpdir:
282
+ predictor = PBKVPredictor(log_dir=tmpdir)
283
+
284
+ log_file = Path(tmpdir) / "workflow_steps.jsonl"
285
+ records = [
286
+ {"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1},
287
+ {"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1},
288
+ {"step_idx": 2, "agent_id": "C", "anchor_hash": "h2", "token_length": 10, "cla_group": 1},
289
+ ]
290
+ with open(log_file, "w") as f:
291
+ for rec in records:
292
+ f.write(json.dumps(rec) + "\n")
293
+
294
+ predictor.train_from_jsonl(tmpdir)
295
+
296
+ priority = await predictor.get_eviction_priority(["A", "B", "C"])
297
+ assert isinstance(priority, list)
298
+ assert len(priority) == 3
299
+
300
+ @pytest.mark.asyncio
301
+ async def test_get_eviction_priority_with_step_graph(self):
302
+ """get_eviction_priority() blends with AgentStepGraph."""
303
+ with tempfile.TemporaryDirectory() as tmpdir:
304
+ predictor = PBKVPredictor(log_dir=tmpdir, blend_alpha=0.6)
305
+
306
+ # Train with pattern
307
+ log_file = Path(tmpdir) / "workflow_steps.jsonl"
308
+ records = [
309
+ {"step_idx": 0, "agent_id": "retriever", "anchor_hash": "h0", "token_length": 10, "cla_group": 1},
310
+ {"step_idx": 1, "agent_id": "summarizer", "anchor_hash": "h1", "token_length": 10, "cla_group": 1},
311
+ ]
312
+ with open(log_file, "w") as f:
313
+ for rec in records:
314
+ f.write(json.dumps(rec) + "\n")
315
+
316
+ predictor.train_from_jsonl(tmpdir)
317
+
318
+ # Create a simple step graph
319
+ from contextforge.scheduling.step_graph import AgentStepGraph, AgentStep
320
+
321
+ graph = AgentStepGraph()
322
+ graph.add_step(AgentStep(agent_id="retriever", depends_on=[], step_index=0))
323
+ graph.add_step(AgentStep(agent_id="summarizer", depends_on=["retriever"], step_index=1))
324
+
325
+ priority = await predictor.get_eviction_priority(
326
+ ["retriever", "summarizer"], step_graph=graph
327
+ )
328
+
329
+ assert isinstance(priority, list)
330
+ assert len(priority) == 2
331
+
332
+ # ===== Stats tests =====
333
+
334
+ def test_get_stats_after_training(self):
335
+ """get_stats() reflects training state."""
336
+ with tempfile.TemporaryDirectory() as tmpdir:
337
+ predictor = PBKVPredictor(log_dir=tmpdir)
338
+
339
+ log_file = Path(tmpdir) / "workflow_steps.jsonl"
340
+ records = [
341
+ {"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1},
342
+ {"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1},
343
+ ]
344
+ with open(log_file, "w") as f:
345
+ for rec in records:
346
+ f.write(json.dumps(rec) + "\n")
347
+
348
+ predictor.train_from_jsonl(tmpdir)
349
+
350
+ stats = predictor.get_stats()
351
+ assert stats["trained"] is True
352
+ assert stats["transition_table_size"] > 0
353
+ assert stats["unique_agents"] == 2