Spaces:
Sleeping
Sleeping
| """Tests for KVAwareRouter — TASK-009.""" | |
| import pytest | |
| from apohara_context_forge.routing.kv_aware_router import KVAwareRouter, RouteDecision, WorkerState | |
| class TestKVAwareRouter: | |
| """Tests for KV-aware routing.""" | |
| def test_register_worker(self): | |
| """register_worker() adds worker to routing mesh.""" | |
| router = KVAwareRouter(num_workers=2) | |
| router.register_worker("worker_0") | |
| stats = router.get_stats() | |
| assert stats["num_workers"] == 1 | |
| def test_get_worker_for_anchor_unknown(self): | |
| """get_worker_for_anchor() returns None for unknown anchor.""" | |
| router = KVAwareRouter() | |
| result = router.get_worker_for_anchor("unknown_anchor") | |
| assert result is None | |
| async def test_select_worker_returns_route_decision(self): | |
| """select_worker() returns RouteDecision.""" | |
| router = KVAwareRouter(num_workers=2) | |
| router.register_worker("worker_0") | |
| router.register_worker("worker_1") | |
| decision = await router.select_worker("anchor_hash", cla_group=1) | |
| assert isinstance(decision, RouteDecision) | |
| assert decision.anchor_hash == "anchor_hash" | |
| assert decision.pre_rope == True # INVARIANT 10 | |
| async def test_select_worker_anchor_locality(self): | |
| """Same anchor_hash routes to same worker (locality).""" | |
| router = KVAwareRouter(num_workers=2, enable_anchor_locality=True) | |
| router.register_worker("worker_0") | |
| router.register_worker("worker_1") | |
| d1 = await router.select_worker("anchor_x", cla_group=1) | |
| d2 = await router.select_worker("anchor_x", cla_group=1) | |
| # Both should route to same worker | |
| assert d1.target_worker_id == d2.target_worker_id | |
| async def test_select_worker_load_balancing(self): | |
| """With no anchor history, routes to least loaded worker.""" | |
| router = KVAwareRouter(num_workers=3) | |
| for i in range(3): | |
| router.register_worker(f"worker_{i}") | |
| decision = await router.select_worker("new_anchor", cla_group=None) | |
| assert decision.target_worker_id.startswith("worker_") | |
| async def test_update_worker_state(self): | |
| """update_worker_state() updates worker load and CLA groups.""" | |
| router = KVAwareRouter(num_workers=2) | |
| router.register_worker("worker_0") | |
| await router.update_worker_state("worker_0", load=0.75, cla_group=2, workflow_step=5) | |
| stats = router.get_stats() | |
| assert stats["worker_loads"]["worker_0"]["load"] == 0.75 | |
| async def test_broadcast_new_blocks(self): | |
| """broadcast_new_blocks() updates routing table.""" | |
| router = KVAwareRouter(num_workers=2) | |
| router.register_worker("worker_0") | |
| await router.broadcast_new_blocks("anchor_abc", ["b0", "b1"], "worker_0") | |
| # Verify anchor now maps to worker | |
| worker = router.get_worker_for_anchor("anchor_abc") | |
| assert worker == "worker_0" | |
| def test_get_stats_returns_worker_states(self): | |
| """get_stats() returns worker loads and CLA groups.""" | |
| router = KVAwareRouter(num_workers=2) | |
| router.register_worker("worker_0") | |
| router.register_worker("worker_1") | |
| stats = router.get_stats() | |
| assert "worker_loads" in stats | |
| assert "worker_0" in stats["worker_loads"] | |
| assert "worker_1" in stats["worker_loads"] |