contextforge-demo / tests /test_kv_aware_router.py
Pablo
feat: APOHARA: Context Forge V5 — synthesis + rebrand complete
cf0a8ed
"""Tests for KVAwareRouter — TASK-009."""
import pytest
from apohara_context_forge.routing.kv_aware_router import KVAwareRouter, RouteDecision, WorkerState
class TestKVAwareRouter:
"""Tests for KV-aware routing."""
def test_register_worker(self):
"""register_worker() adds worker to routing mesh."""
router = KVAwareRouter(num_workers=2)
router.register_worker("worker_0")
stats = router.get_stats()
assert stats["num_workers"] == 1
def test_get_worker_for_anchor_unknown(self):
"""get_worker_for_anchor() returns None for unknown anchor."""
router = KVAwareRouter()
result = router.get_worker_for_anchor("unknown_anchor")
assert result is None
@pytest.mark.asyncio
async def test_select_worker_returns_route_decision(self):
"""select_worker() returns RouteDecision."""
router = KVAwareRouter(num_workers=2)
router.register_worker("worker_0")
router.register_worker("worker_1")
decision = await router.select_worker("anchor_hash", cla_group=1)
assert isinstance(decision, RouteDecision)
assert decision.anchor_hash == "anchor_hash"
assert decision.pre_rope == True # INVARIANT 10
@pytest.mark.asyncio
async def test_select_worker_anchor_locality(self):
"""Same anchor_hash routes to same worker (locality)."""
router = KVAwareRouter(num_workers=2, enable_anchor_locality=True)
router.register_worker("worker_0")
router.register_worker("worker_1")
d1 = await router.select_worker("anchor_x", cla_group=1)
d2 = await router.select_worker("anchor_x", cla_group=1)
# Both should route to same worker
assert d1.target_worker_id == d2.target_worker_id
@pytest.mark.asyncio
async def test_select_worker_load_balancing(self):
"""With no anchor history, routes to least loaded worker."""
router = KVAwareRouter(num_workers=3)
for i in range(3):
router.register_worker(f"worker_{i}")
decision = await router.select_worker("new_anchor", cla_group=None)
assert decision.target_worker_id.startswith("worker_")
@pytest.mark.asyncio
async def test_update_worker_state(self):
"""update_worker_state() updates worker load and CLA groups."""
router = KVAwareRouter(num_workers=2)
router.register_worker("worker_0")
await router.update_worker_state("worker_0", load=0.75, cla_group=2, workflow_step=5)
stats = router.get_stats()
assert stats["worker_loads"]["worker_0"]["load"] == 0.75
@pytest.mark.asyncio
async def test_broadcast_new_blocks(self):
"""broadcast_new_blocks() updates routing table."""
router = KVAwareRouter(num_workers=2)
router.register_worker("worker_0")
await router.broadcast_new_blocks("anchor_abc", ["b0", "b1"], "worker_0")
# Verify anchor now maps to worker
worker = router.get_worker_for_anchor("anchor_abc")
assert worker == "worker_0"
def test_get_stats_returns_worker_states(self):
"""get_stats() returns worker loads and CLA groups."""
router = KVAwareRouter(num_workers=2)
router.register_worker("worker_0")
router.register_worker("worker_1")
stats = router.get_stats()
assert "worker_loads" in stats
assert "worker_0" in stats["worker_loads"]
assert "worker_1" in stats["worker_loads"]