Spaces:
Sleeping
Sleeping
| """Tests for PBKVPredictor — Markov chain implementation.""" | |
| import json | |
| import pytest | |
| import tempfile | |
| from pathlib import Path | |
| from apohara_context_forge.scheduling.pbkv_predictor import ( | |
| PBKVPredictor, | |
| WorkflowStepRecord, | |
| PredictionResult, | |
| ) | |
| class TestPBKVPredictor: | |
| """Tests for PBKV predictor Markov chain implementation.""" | |
| # ===== Existing stub tests (backward compatibility) ===== | |
| async def test_log_workflow_step(self): | |
| """log_workflow_step() records steps in history and JSONL.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| predictor = PBKVPredictor(log_dir=tmpdir, max_history_steps=10) | |
| await predictor.log_workflow_step( | |
| step_idx=0, | |
| agent_id="agent_retriever", | |
| anchor_hash="anchor_0", | |
| token_length=100, | |
| cla_group=1, | |
| ) | |
| assert len(predictor._history) == 1 | |
| assert predictor._history[0].agent_id == "agent_retriever" | |
| async def test_predict_next_agents_returns_prediction_result(self): | |
| """predict_next_agents() returns PredictionResult via async path.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| predictor = PBKVPredictor(log_dir=tmpdir, max_history_steps=10) | |
| # Log some steps first | |
| for i in range(5): | |
| await predictor.log_workflow_step( | |
| step_idx=i, | |
| agent_id=f"agent_{i % 2}", | |
| anchor_hash=f"anchor_{i}", | |
| token_length=100, | |
| cla_group=i % 2, | |
| ) | |
| result = await predictor._predict_next_agents_async( | |
| "agent_0", current_step=3, num_predictions=2 | |
| ) | |
| assert isinstance(result, PredictionResult) | |
| assert isinstance(result.predicted_agents, list) | |
| assert 0.0 <= result.confidence <= 1.0 | |
| async def test_predict_next_agents_empty_history(self): | |
| """predict_next_agents() returns default when no history.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| predictor = PBKVPredictor(log_dir=tmpdir, max_history_steps=10) | |
| result = await predictor._predict_next_agents_async( | |
| "agent_0", current_step=0, num_predictions=3 | |
| ) | |
| assert isinstance(result, PredictionResult) | |
| # Empty history → confidence 0, returns current agent as fallback | |
| assert result.confidence == 0.0 | |
| async def test_get_prefetch_candidates(self): | |
| """get_prefetch_candidates() returns list of agent IDs.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| predictor = PBKVPredictor(log_dir=tmpdir, max_history_steps=10) | |
| for i in range(5): | |
| await predictor.log_workflow_step( | |
| step_idx=i, | |
| agent_id=f"agent_{i % 2}", | |
| anchor_hash=f"anchor_{i}", | |
| token_length=100, | |
| cla_group=i % 2, | |
| ) | |
| candidates = await predictor.get_prefetch_candidates("agent_0", step=3) | |
| assert isinstance(candidates, list) | |
| def test_workflow_step_record(self): | |
| """WorkflowStepRecord dataclass works.""" | |
| record = WorkflowStepRecord( | |
| step_idx=0, | |
| agent_id="test_agent", | |
| anchor_hash="anchor_x", | |
| token_length=100, | |
| cla_group=2, | |
| ) | |
| assert record.step_idx == 0 | |
| assert record.agent_id == "test_agent" | |
| assert record.cla_group == 2 | |
| def test_prediction_result_defaults(self): | |
| """PredictionResult has correct defaults.""" | |
| result = PredictionResult( | |
| predicted_agents=["a1"], | |
| predicted_anchor_hashes=["h1"], | |
| confidence=0.5, | |
| ) | |
| assert result.prefetch_block_ids == [] | |
| assert result.confidence == 0.5 | |
| def test_get_stats(self): | |
| """get_stats() returns predictor statistics.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| predictor = PBKVPredictor(log_dir=tmpdir, max_history_steps=50) | |
| stats = predictor.get_stats() | |
| assert stats["history_size"] == 0 | |
| assert stats["max_history_steps"] == 50 | |
| assert "workflow_steps.jsonl" in stats["log_file"] | |
| assert stats["trained"] is False | |
| # ===== Markov chain training tests ===== | |
| def test_train_from_jsonl(self): | |
| """train_from_jsonl() builds transition table correctly.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| log_file = Path(tmpdir) / "workflow_steps.jsonl" | |
| # Write JSONL with known sequence: A → B → C → A → B | |
| records = [ | |
| {"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1}, | |
| {"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1}, | |
| {"step_idx": 2, "agent_id": "C", "anchor_hash": "h2", "token_length": 10, "cla_group": 1}, | |
| {"step_idx": 3, "agent_id": "A", "anchor_hash": "h3", "token_length": 10, "cla_group": 1}, | |
| {"step_idx": 4, "agent_id": "B", "anchor_hash": "h4", "token_length": 10, "cla_group": 1}, | |
| ] | |
| with open(log_file, "w") as f: | |
| for rec in records: | |
| f.write(json.dumps(rec) + "\n") | |
| predictor = PBKVPredictor(log_dir=tmpdir) | |
| predictor.train_from_jsonl(tmpdir) | |
| assert predictor._trained is True | |
| assert predictor._all_agents == {"A", "B", "C"} | |
| # Check 2nd-order transitions exist | |
| assert ("A", "B") in predictor._transition_table | |
| assert ("B", "C") in predictor._transition_table | |
| assert ("C", "A") in predictor._transition_table | |
| assert ("A", "B") in predictor._transition_table | |
| def test_train_from_jsonl_with_multiple_sequences(self): | |
| """train_from_jsonl() handles multiple sequences (empty lines).""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| log_file = Path(tmpdir) / "workflow_steps.jsonl" | |
| # Two sequences: A→B and C→D | |
| records = [ | |
| {"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1}, | |
| {"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1}, | |
| {}, | |
| {"step_idx": 0, "agent_id": "C", "anchor_hash": "h2", "token_length": 10, "cla_group": 1}, | |
| {"step_idx": 1, "agent_id": "D", "anchor_hash": "h3", "token_length": 10, "cla_group": 1}, | |
| ] | |
| with open(log_file, "w") as f: | |
| for rec in records: | |
| f.write(json.dumps(rec) + "\n") | |
| predictor = PBKVPredictor(log_dir=tmpdir) | |
| predictor.train_from_jsonl(tmpdir) | |
| assert predictor._trained is True | |
| assert predictor._all_agents == {"A", "B", "C", "D"} | |
| def test_train_from_jsonl_missing_file(self): | |
| """train_from_jsonl() handles missing file gracefully.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| predictor = PBKVPredictor(log_dir=tmpdir) | |
| predictor.train_from_jsonl(str(Path(tmpdir) / "nonexistent.jsonl")) | |
| assert predictor._trained is False | |
| # ===== Prediction correctness tests ===== | |
| def test_predict_next_agents_sync(self): | |
| """Synchronous predict_next_agents() returns list of agent IDs.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| predictor = PBKVPredictor(log_dir=tmpdir) | |
| # Train with known pattern: A → B → C | |
| log_file = Path(tmpdir) / "workflow_steps.jsonl" | |
| records = [ | |
| {"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1}, | |
| {"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1}, | |
| {"step_idx": 2, "agent_id": "C", "anchor_hash": "h2", "token_length": 10, "cla_group": 1}, | |
| ] | |
| with open(log_file, "w") as f: | |
| for rec in records: | |
| f.write(json.dumps(rec) + "\n") | |
| predictor.train_from_jsonl(tmpdir) | |
| predictions = predictor.predict_next_agents("B", top_k=2) | |
| assert isinstance(predictions, list) | |
| assert "C" in predictions # B → C is the trained transition | |
| assert len(predictions) <= 2 | |
| def test_predict_next_agents_fallback_on_empty_history(self): | |
| """predict_next_agents() falls back when no training data.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| predictor = PBKVPredictor(log_dir=tmpdir) | |
| # No training, no history | |
| predictions = predictor.predict_next_agents("X", top_k=3) | |
| assert predictions == ["X"] | |
| def test_predict_next_agents_fallback_1st_order(self): | |
| """predict_next_agents() uses 1st-order when 2nd-order state unseen.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| predictor = PBKVPredictor(log_dir=tmpdir) | |
| # Train: A → B → C (only 2nd-order state (A,B)→C) | |
| log_file = Path(tmpdir) / "workflow_steps.jsonl" | |
| records = [ | |
| {"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1}, | |
| {"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1}, | |
| {"step_idx": 2, "agent_id": "C", "anchor_hash": "h2", "token_length": 10, "cla_group": 1}, | |
| ] | |
| with open(log_file, "w") as f: | |
| for rec in records: | |
| f.write(json.dumps(rec) + "\n") | |
| predictor.train_from_jsonl(tmpdir) | |
| # Query for unseen state: should fall back to 1st-order | |
| predictions = predictor.predict_next_agents("B", top_k=1) | |
| assert "C" in predictions | |
| def test_predict_next_agents_top_k(self): | |
| """predict_next_agents() respects top_k parameter.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| predictor = PBKVPredictor(log_dir=tmpdir) | |
| log_file = Path(tmpdir) / "workflow_steps.jsonl" | |
| records = [ | |
| {"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1}, | |
| {"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1}, | |
| {"step_idx": 2, "agent_id": "A", "anchor_hash": "h2", "token_length": 10, "cla_group": 1}, | |
| ] | |
| with open(log_file, "w") as f: | |
| for rec in records: | |
| f.write(json.dumps(rec) + "\n") | |
| predictor.train_from_jsonl(tmpdir) | |
| predictions = predictor.predict_next_agents("B", top_k=1) | |
| assert len(predictions) == 1 | |
| # ===== blend_alpha tests ===== | |
| def test_blend_alpha_parameter(self): | |
| """blend_alpha is stored correctly in __init__.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| predictor = PBKVPredictor(log_dir=tmpdir, blend_alpha=0.7) | |
| assert predictor._blend_alpha == 0.7 | |
| def test_blend_alpha_default(self): | |
| """blend_alpha defaults to 0.6.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| predictor = PBKVPredictor(log_dir=tmpdir) | |
| assert predictor._blend_alpha == 0.6 | |
| async def test_get_eviction_priority_without_step_graph(self): | |
| """get_eviction_priority() works without AgentStepGraph.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| predictor = PBKVPredictor(log_dir=tmpdir) | |
| log_file = Path(tmpdir) / "workflow_steps.jsonl" | |
| records = [ | |
| {"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1}, | |
| {"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1}, | |
| {"step_idx": 2, "agent_id": "C", "anchor_hash": "h2", "token_length": 10, "cla_group": 1}, | |
| ] | |
| with open(log_file, "w") as f: | |
| for rec in records: | |
| f.write(json.dumps(rec) + "\n") | |
| predictor.train_from_jsonl(tmpdir) | |
| priority = await predictor.get_eviction_priority(["A", "B", "C"]) | |
| assert isinstance(priority, list) | |
| assert len(priority) == 3 | |
| async def test_get_eviction_priority_with_step_graph(self): | |
| """get_eviction_priority() blends with AgentStepGraph.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| predictor = PBKVPredictor(log_dir=tmpdir, blend_alpha=0.6) | |
| # Train with pattern | |
| log_file = Path(tmpdir) / "workflow_steps.jsonl" | |
| records = [ | |
| {"step_idx": 0, "agent_id": "retriever", "anchor_hash": "h0", "token_length": 10, "cla_group": 1}, | |
| {"step_idx": 1, "agent_id": "summarizer", "anchor_hash": "h1", "token_length": 10, "cla_group": 1}, | |
| ] | |
| with open(log_file, "w") as f: | |
| for rec in records: | |
| f.write(json.dumps(rec) + "\n") | |
| predictor.train_from_jsonl(tmpdir) | |
| # Create a simple step graph | |
| from apohara_context_forge.scheduling.step_graph import AgentStepGraph, AgentStep | |
| graph = AgentStepGraph() | |
| graph.add_step(AgentStep(agent_id="retriever", depends_on=[], step_index=0)) | |
| graph.add_step(AgentStep(agent_id="summarizer", depends_on=["retriever"], step_index=1)) | |
| priority = await predictor.get_eviction_priority( | |
| ["retriever", "summarizer"], step_graph=graph | |
| ) | |
| assert isinstance(priority, list) | |
| assert len(priority) == 2 | |
| # ===== Stats tests ===== | |
| def test_get_stats_after_training(self): | |
| """get_stats() reflects training state.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| predictor = PBKVPredictor(log_dir=tmpdir) | |
| log_file = Path(tmpdir) / "workflow_steps.jsonl" | |
| records = [ | |
| {"step_idx": 0, "agent_id": "A", "anchor_hash": "h0", "token_length": 10, "cla_group": 1}, | |
| {"step_idx": 1, "agent_id": "B", "anchor_hash": "h1", "token_length": 10, "cla_group": 1}, | |
| ] | |
| with open(log_file, "w") as f: | |
| for rec in records: | |
| f.write(json.dumps(rec) + "\n") | |
| predictor.train_from_jsonl(tmpdir) | |
| stats = predictor.get_stats() | |
| assert stats["trained"] is True | |
| assert stats["transition_table_size"] > 0 | |
| assert stats["unique_agents"] == 2 | |