Spaces:
Sleeping
Sleeping
File size: 15,054 Bytes
be03608 8bfcf43 be03608 8bfcf43 be03608 cf0a8ed be03608 8bfcf43 be03608 8bfcf43 be03608 8bfcf43 be03608 8bfcf43 be03608 8bfcf43 be03608 8bfcf43 be03608 cf0a8ed be03608 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 | """Tests for PBKVPredictor — Markov chain implementation."""
import json
import pytest
import tempfile
from pathlib import Path
from apohara_context_forge.scheduling.pbkv_predictor import (
PBKVPredictor,
WorkflowStepRecord,
PredictionResult,
)
class TestPBKVPredictor:
"""Tests for PBKV predictor Markov chain implementation."""
# ===== Existing stub tests (backward compatibility) =====
@pytest.mark.asyncio
async def test_log_workflow_step(self):
"""log_workflow_step() records steps in history and JSONL."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir, max_history_steps=10)
await predictor.log_workflow_step(
step_idx=0,
agent_id="agent_retriever",
anchor_hash="anchor_0",
token_length=100,
cla_group=1,
)
assert len(predictor._history) == 1
assert predictor._history[0].agent_id == "agent_retriever"
@pytest.mark.asyncio
async def test_predict_next_agents_returns_prediction_result(self):
"""predict_next_agents() returns PredictionResult via async path."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir, max_history_steps=10)
# Log some steps first
for i in range(5):
await predictor.log_workflow_step(
step_idx=i,
agent_id=f"agent_{i % 2}",
anchor_hash=f"anchor_{i}",
token_length=100,
cla_group=i % 2,
)
result = await predictor._predict_next_agents_async(
"agent_0", current_step=3, num_predictions=2
)
assert isinstance(result, PredictionResult)
assert isinstance(result.predicted_agents, list)
assert 0.0 <= result.confidence <= 1.0
@pytest.mark.asyncio
async def test_predict_next_agents_empty_history(self):
"""predict_next_agents() returns default when no history."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir, max_history_steps=10)
result = await predictor._predict_next_agents_async(
"agent_0", current_step=0, num_predictions=3
)
assert isinstance(result, PredictionResult)
# Empty history → confidence 0, returns current agent as fallback
assert result.confidence == 0.0
@pytest.mark.asyncio
async def test_get_prefetch_candidates(self):
"""get_prefetch_candidates() returns list of agent IDs."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir, max_history_steps=10)
for i in range(5):
await predictor.log_workflow_step(
step_idx=i,
agent_id=f"agent_{i % 2}",
anchor_hash=f"anchor_{i}",
token_length=100,
cla_group=i % 2,
)
candidates = await predictor.get_prefetch_candidates("agent_0", step=3)
assert isinstance(candidates, list)
def test_workflow_step_record(self):
"""WorkflowStepRecord dataclass works."""
record = WorkflowStepRecord(
step_idx=0,
agent_id="test_agent",
anchor_hash="anchor_x",
token_length=100,
cla_group=2,
)
assert record.step_idx == 0
assert record.agent_id == "test_agent"
assert record.cla_group == 2
def test_prediction_result_defaults(self):
"""PredictionResult has correct defaults."""
result = PredictionResult(
predicted_agents=["a1"],
predicted_anchor_hashes=["h1"],
confidence=0.5,
)
assert result.prefetch_block_ids == []
assert result.confidence == 0.5
def test_get_stats(self):
"""get_stats() returns predictor statistics."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir, max_history_steps=50)
stats = predictor.get_stats()
assert stats["history_size"] == 0
assert stats["max_history_steps"] == 50
assert "workflow_steps.jsonl" in stats["log_file"]
assert stats["trained"] is False
# ===== Markov chain training tests =====
def test_train_from_jsonl(self):
"""train_from_jsonl() builds transition table correctly."""
with tempfile.TemporaryDirectory() as tmpdir:
log_file = Path(tmpdir) / "workflow_steps.jsonl"
# Write JSONL with known sequence: A → B → C → A → B
records = [
{"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1},
{"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1},
{"step_idx": 2, "agent_id": "C", "anchor_hash": "h2", "token_length": 10, "cla_group": 1},
{"step_idx": 3, "agent_id": "A", "anchor_hash": "h3", "token_length": 10, "cla_group": 1},
{"step_idx": 4, "agent_id": "B", "anchor_hash": "h4", "token_length": 10, "cla_group": 1},
]
with open(log_file, "w") as f:
for rec in records:
f.write(json.dumps(rec) + "\n")
predictor = PBKVPredictor(log_dir=tmpdir)
predictor.train_from_jsonl(tmpdir)
assert predictor._trained is True
assert predictor._all_agents == {"A", "B", "C"}
# Check 2nd-order transitions exist
assert ("A", "B") in predictor._transition_table
assert ("B", "C") in predictor._transition_table
assert ("C", "A") in predictor._transition_table
assert ("A", "B") in predictor._transition_table
def test_train_from_jsonl_with_multiple_sequences(self):
"""train_from_jsonl() handles multiple sequences (empty lines)."""
with tempfile.TemporaryDirectory() as tmpdir:
log_file = Path(tmpdir) / "workflow_steps.jsonl"
# Two sequences: A→B and C→D
records = [
{"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1},
{"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1},
{},
{"step_idx": 0, "agent_id": "C", "anchor_hash": "h2", "token_length": 10, "cla_group": 1},
{"step_idx": 1, "agent_id": "D", "anchor_hash": "h3", "token_length": 10, "cla_group": 1},
]
with open(log_file, "w") as f:
for rec in records:
f.write(json.dumps(rec) + "\n")
predictor = PBKVPredictor(log_dir=tmpdir)
predictor.train_from_jsonl(tmpdir)
assert predictor._trained is True
assert predictor._all_agents == {"A", "B", "C", "D"}
def test_train_from_jsonl_missing_file(self):
"""train_from_jsonl() handles missing file gracefully."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir)
predictor.train_from_jsonl(str(Path(tmpdir) / "nonexistent.jsonl"))
assert predictor._trained is False
# ===== Prediction correctness tests =====
def test_predict_next_agents_sync(self):
"""Synchronous predict_next_agents() returns list of agent IDs."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir)
# Train with known pattern: A → B → C
log_file = Path(tmpdir) / "workflow_steps.jsonl"
records = [
{"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1},
{"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1},
{"step_idx": 2, "agent_id": "C", "anchor_hash": "h2", "token_length": 10, "cla_group": 1},
]
with open(log_file, "w") as f:
for rec in records:
f.write(json.dumps(rec) + "\n")
predictor.train_from_jsonl(tmpdir)
predictions = predictor.predict_next_agents("B", top_k=2)
assert isinstance(predictions, list)
assert "C" in predictions # B → C is the trained transition
assert len(predictions) <= 2
def test_predict_next_agents_fallback_on_empty_history(self):
"""predict_next_agents() falls back when no training data."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir)
# No training, no history
predictions = predictor.predict_next_agents("X", top_k=3)
assert predictions == ["X"]
def test_predict_next_agents_fallback_1st_order(self):
"""predict_next_agents() uses 1st-order when 2nd-order state unseen."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir)
# Train: A → B → C (only 2nd-order state (A,B)→C)
log_file = Path(tmpdir) / "workflow_steps.jsonl"
records = [
{"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1},
{"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1},
{"step_idx": 2, "agent_id": "C", "anchor_hash": "h2", "token_length": 10, "cla_group": 1},
]
with open(log_file, "w") as f:
for rec in records:
f.write(json.dumps(rec) + "\n")
predictor.train_from_jsonl(tmpdir)
# Query for unseen state: should fall back to 1st-order
predictions = predictor.predict_next_agents("B", top_k=1)
assert "C" in predictions
def test_predict_next_agents_top_k(self):
"""predict_next_agents() respects top_k parameter."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir)
log_file = Path(tmpdir) / "workflow_steps.jsonl"
records = [
{"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1},
{"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1},
{"step_idx": 2, "agent_id": "A", "anchor_hash": "h2", "token_length": 10, "cla_group": 1},
]
with open(log_file, "w") as f:
for rec in records:
f.write(json.dumps(rec) + "\n")
predictor.train_from_jsonl(tmpdir)
predictions = predictor.predict_next_agents("B", top_k=1)
assert len(predictions) == 1
# ===== blend_alpha tests =====
def test_blend_alpha_parameter(self):
"""blend_alpha is stored correctly in __init__."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir, blend_alpha=0.7)
assert predictor._blend_alpha == 0.7
def test_blend_alpha_default(self):
"""blend_alpha defaults to 0.6."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir)
assert predictor._blend_alpha == 0.6
@pytest.mark.asyncio
async def test_get_eviction_priority_without_step_graph(self):
"""get_eviction_priority() works without AgentStepGraph."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir)
log_file = Path(tmpdir) / "workflow_steps.jsonl"
records = [
{"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1},
{"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1},
{"step_idx": 2, "agent_id": "C", "anchor_hash": "h2", "token_length": 10, "cla_group": 1},
]
with open(log_file, "w") as f:
for rec in records:
f.write(json.dumps(rec) + "\n")
predictor.train_from_jsonl(tmpdir)
priority = await predictor.get_eviction_priority(["A", "B", "C"])
assert isinstance(priority, list)
assert len(priority) == 3
@pytest.mark.asyncio
async def test_get_eviction_priority_with_step_graph(self):
"""get_eviction_priority() blends with AgentStepGraph."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir, blend_alpha=0.6)
# Train with pattern
log_file = Path(tmpdir) / "workflow_steps.jsonl"
records = [
{"step_idx": 0, "agent_id": "retriever", "anchor_hash": "h0", "token_length": 10, "cla_group": 1},
{"step_idx": 1, "agent_id": "summarizer", "anchor_hash": "h1", "token_length": 10, "cla_group": 1},
]
with open(log_file, "w") as f:
for rec in records:
f.write(json.dumps(rec) + "\n")
predictor.train_from_jsonl(tmpdir)
# Create a simple step graph
from apohara_context_forge.scheduling.step_graph import AgentStepGraph, AgentStep
graph = AgentStepGraph()
graph.add_step(AgentStep(agent_id="retriever", depends_on=[], step_index=0))
graph.add_step(AgentStep(agent_id="summarizer", depends_on=["retriever"], step_index=1))
priority = await predictor.get_eviction_priority(
["retriever", "summarizer"], step_graph=graph
)
assert isinstance(priority, list)
assert len(priority) == 2
# ===== Stats tests =====
def test_get_stats_after_training(self):
"""get_stats() reflects training state."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir)
log_file = Path(tmpdir) / "workflow_steps.jsonl"
records = [
{"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1},
{"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1},
]
with open(log_file, "w") as f:
for rec in records:
f.write(json.dumps(rec) + "\n")
predictor.train_from_jsonl(tmpdir)
stats = predictor.get_stats()
assert stats["trained"] is True
assert stats["transition_table_size"] > 0
assert stats["unique_agents"] == 2
|