contextforge-demo / tests /test_pbkv_predictor.py
Pablo
feat: APOHARA: Context Forge V5 — synthesis + rebrand complete
cf0a8ed
"""Tests for PBKVPredictor — Markov chain implementation."""
import json
import pytest
import tempfile
from pathlib import Path
from apohara_context_forge.scheduling.pbkv_predictor import (
PBKVPredictor,
WorkflowStepRecord,
PredictionResult,
)
class TestPBKVPredictor:
"""Tests for PBKV predictor Markov chain implementation."""
# ===== Existing stub tests (backward compatibility) =====
@pytest.mark.asyncio
async def test_log_workflow_step(self):
"""log_workflow_step() records steps in history and JSONL."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir, max_history_steps=10)
await predictor.log_workflow_step(
step_idx=0,
agent_id="agent_retriever",
anchor_hash="anchor_0",
token_length=100,
cla_group=1,
)
assert len(predictor._history) == 1
assert predictor._history[0].agent_id == "agent_retriever"
@pytest.mark.asyncio
async def test_predict_next_agents_returns_prediction_result(self):
"""predict_next_agents() returns PredictionResult via async path."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir, max_history_steps=10)
# Log some steps first
for i in range(5):
await predictor.log_workflow_step(
step_idx=i,
agent_id=f"agent_{i % 2}",
anchor_hash=f"anchor_{i}",
token_length=100,
cla_group=i % 2,
)
result = await predictor._predict_next_agents_async(
"agent_0", current_step=3, num_predictions=2
)
assert isinstance(result, PredictionResult)
assert isinstance(result.predicted_agents, list)
assert 0.0 <= result.confidence <= 1.0
@pytest.mark.asyncio
async def test_predict_next_agents_empty_history(self):
"""predict_next_agents() returns default when no history."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir, max_history_steps=10)
result = await predictor._predict_next_agents_async(
"agent_0", current_step=0, num_predictions=3
)
assert isinstance(result, PredictionResult)
# Empty history → confidence 0, returns current agent as fallback
assert result.confidence == 0.0
@pytest.mark.asyncio
async def test_get_prefetch_candidates(self):
"""get_prefetch_candidates() returns list of agent IDs."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir, max_history_steps=10)
for i in range(5):
await predictor.log_workflow_step(
step_idx=i,
agent_id=f"agent_{i % 2}",
anchor_hash=f"anchor_{i}",
token_length=100,
cla_group=i % 2,
)
candidates = await predictor.get_prefetch_candidates("agent_0", step=3)
assert isinstance(candidates, list)
def test_workflow_step_record(self):
"""WorkflowStepRecord dataclass works."""
record = WorkflowStepRecord(
step_idx=0,
agent_id="test_agent",
anchor_hash="anchor_x",
token_length=100,
cla_group=2,
)
assert record.step_idx == 0
assert record.agent_id == "test_agent"
assert record.cla_group == 2
def test_prediction_result_defaults(self):
"""PredictionResult has correct defaults."""
result = PredictionResult(
predicted_agents=["a1"],
predicted_anchor_hashes=["h1"],
confidence=0.5,
)
assert result.prefetch_block_ids == []
assert result.confidence == 0.5
def test_get_stats(self):
"""get_stats() returns predictor statistics."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir, max_history_steps=50)
stats = predictor.get_stats()
assert stats["history_size"] == 0
assert stats["max_history_steps"] == 50
assert "workflow_steps.jsonl" in stats["log_file"]
assert stats["trained"] is False
# ===== Markov chain training tests =====
def test_train_from_jsonl(self):
"""train_from_jsonl() builds transition table correctly."""
with tempfile.TemporaryDirectory() as tmpdir:
log_file = Path(tmpdir) / "workflow_steps.jsonl"
# Write JSONL with known sequence: A → B → C → A → B
records = [
{"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1},
{"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1},
{"step_idx": 2, "agent_id": "C", "anchor_hash": "h2", "token_length": 10, "cla_group": 1},
{"step_idx": 3, "agent_id": "A", "anchor_hash": "h3", "token_length": 10, "cla_group": 1},
{"step_idx": 4, "agent_id": "B", "anchor_hash": "h4", "token_length": 10, "cla_group": 1},
]
with open(log_file, "w") as f:
for rec in records:
f.write(json.dumps(rec) + "\n")
predictor = PBKVPredictor(log_dir=tmpdir)
predictor.train_from_jsonl(tmpdir)
assert predictor._trained is True
assert predictor._all_agents == {"A", "B", "C"}
# Check 2nd-order transitions exist
assert ("A", "B") in predictor._transition_table
assert ("B", "C") in predictor._transition_table
assert ("C", "A") in predictor._transition_table
assert ("A", "B") in predictor._transition_table
def test_train_from_jsonl_with_multiple_sequences(self):
"""train_from_jsonl() handles multiple sequences (empty lines)."""
with tempfile.TemporaryDirectory() as tmpdir:
log_file = Path(tmpdir) / "workflow_steps.jsonl"
# Two sequences: A→B and C→D
records = [
{"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1},
{"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1},
{},
{"step_idx": 0, "agent_id": "C", "anchor_hash": "h2", "token_length": 10, "cla_group": 1},
{"step_idx": 1, "agent_id": "D", "anchor_hash": "h3", "token_length": 10, "cla_group": 1},
]
with open(log_file, "w") as f:
for rec in records:
f.write(json.dumps(rec) + "\n")
predictor = PBKVPredictor(log_dir=tmpdir)
predictor.train_from_jsonl(tmpdir)
assert predictor._trained is True
assert predictor._all_agents == {"A", "B", "C", "D"}
def test_train_from_jsonl_missing_file(self):
"""train_from_jsonl() handles missing file gracefully."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir)
predictor.train_from_jsonl(str(Path(tmpdir) / "nonexistent.jsonl"))
assert predictor._trained is False
# ===== Prediction correctness tests =====
def test_predict_next_agents_sync(self):
"""Synchronous predict_next_agents() returns list of agent IDs."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir)
# Train with known pattern: A → B → C
log_file = Path(tmpdir) / "workflow_steps.jsonl"
records = [
{"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1},
{"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1},
{"step_idx": 2, "agent_id": "C", "anchor_hash": "h2", "token_length": 10, "cla_group": 1},
]
with open(log_file, "w") as f:
for rec in records:
f.write(json.dumps(rec) + "\n")
predictor.train_from_jsonl(tmpdir)
predictions = predictor.predict_next_agents("B", top_k=2)
assert isinstance(predictions, list)
assert "C" in predictions # B → C is the trained transition
assert len(predictions) <= 2
def test_predict_next_agents_fallback_on_empty_history(self):
"""predict_next_agents() falls back when no training data."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir)
# No training, no history
predictions = predictor.predict_next_agents("X", top_k=3)
assert predictions == ["X"]
def test_predict_next_agents_fallback_1st_order(self):
"""predict_next_agents() uses 1st-order when 2nd-order state unseen."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir)
# Train: A → B → C (only 2nd-order state (A,B)→C)
log_file = Path(tmpdir) / "workflow_steps.jsonl"
records = [
{"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1},
{"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1},
{"step_idx": 2, "agent_id": "C", "anchor_hash": "h2", "token_length": 10, "cla_group": 1},
]
with open(log_file, "w") as f:
for rec in records:
f.write(json.dumps(rec) + "\n")
predictor.train_from_jsonl(tmpdir)
# Query for unseen state: should fall back to 1st-order
predictions = predictor.predict_next_agents("B", top_k=1)
assert "C" in predictions
def test_predict_next_agents_top_k(self):
"""predict_next_agents() respects top_k parameter."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir)
log_file = Path(tmpdir) / "workflow_steps.jsonl"
records = [
{"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1},
{"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1},
{"step_idx": 2, "agent_id": "A", "anchor_hash": "h2", "token_length": 10, "cla_group": 1},
]
with open(log_file, "w") as f:
for rec in records:
f.write(json.dumps(rec) + "\n")
predictor.train_from_jsonl(tmpdir)
predictions = predictor.predict_next_agents("B", top_k=1)
assert len(predictions) == 1
# ===== blend_alpha tests =====
def test_blend_alpha_parameter(self):
"""blend_alpha is stored correctly in __init__."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir, blend_alpha=0.7)
assert predictor._blend_alpha == 0.7
def test_blend_alpha_default(self):
"""blend_alpha defaults to 0.6."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir)
assert predictor._blend_alpha == 0.6
@pytest.mark.asyncio
async def test_get_eviction_priority_without_step_graph(self):
"""get_eviction_priority() works without AgentStepGraph."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir)
log_file = Path(tmpdir) / "workflow_steps.jsonl"
records = [
{"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1},
{"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1},
{"step_idx": 2, "agent_id": "C", "anchor_hash": "h2", "token_length": 10, "cla_group": 1},
]
with open(log_file, "w") as f:
for rec in records:
f.write(json.dumps(rec) + "\n")
predictor.train_from_jsonl(tmpdir)
priority = await predictor.get_eviction_priority(["A", "B", "C"])
assert isinstance(priority, list)
assert len(priority) == 3
@pytest.mark.asyncio
async def test_get_eviction_priority_with_step_graph(self):
"""get_eviction_priority() blends with AgentStepGraph."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir, blend_alpha=0.6)
# Train with pattern
log_file = Path(tmpdir) / "workflow_steps.jsonl"
records = [
{"step_idx": 0, "agent_id": "retriever", "anchor_hash": "h0", "token_length": 10, "cla_group": 1},
{"step_idx": 1, "agent_id": "summarizer", "anchor_hash": "h1", "token_length": 10, "cla_group": 1},
]
with open(log_file, "w") as f:
for rec in records:
f.write(json.dumps(rec) + "\n")
predictor.train_from_jsonl(tmpdir)
# Create a simple step graph
from apohara_context_forge.scheduling.step_graph import AgentStepGraph, AgentStep
graph = AgentStepGraph()
graph.add_step(AgentStep(agent_id="retriever", depends_on=[], step_index=0))
graph.add_step(AgentStep(agent_id="summarizer", depends_on=["retriever"], step_index=1))
priority = await predictor.get_eviction_priority(
["retriever", "summarizer"], step_graph=graph
)
assert isinstance(priority, list)
assert len(priority) == 2
# ===== Stats tests =====
def test_get_stats_after_training(self):
"""get_stats() reflects training state."""
with tempfile.TemporaryDirectory() as tmpdir:
predictor = PBKVPredictor(log_dir=tmpdir)
log_file = Path(tmpdir) / "workflow_steps.jsonl"
records = [
{"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1},
{"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1},
]
with open(log_file, "w") as f:
for rec in records:
f.write(json.dumps(rec) + "\n")
predictor.train_from_jsonl(tmpdir)
stats = predictor.get_stats()
assert stats["trained"] is True
assert stats["transition_table_size"] > 0
assert stats["unique_agents"] == 2