File size: 3,496 Bytes
8bfcf43
 
cf0a8ed
8bfcf43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
"""Tests for KVAwareRouter — TASK-009."""
import pytest
from apohara_context_forge.routing.kv_aware_router import KVAwareRouter, RouteDecision, WorkerState


class TestKVAwareRouter:
    """Tests for KV-aware routing."""

    def test_register_worker(self):
        """register_worker() adds worker to routing mesh."""
        router = KVAwareRouter(num_workers=2)
        router.register_worker("worker_0")
        stats = router.get_stats()
        assert stats["num_workers"] == 1

    def test_get_worker_for_anchor_unknown(self):
        """get_worker_for_anchor() returns None for unknown anchor."""
        router = KVAwareRouter()
        result = router.get_worker_for_anchor("unknown_anchor")
        assert result is None

    @pytest.mark.asyncio
    async def test_select_worker_returns_route_decision(self):
        """select_worker() returns RouteDecision."""
        router = KVAwareRouter(num_workers=2)
        router.register_worker("worker_0")
        router.register_worker("worker_1")

        decision = await router.select_worker("anchor_hash", cla_group=1)
        assert isinstance(decision, RouteDecision)
        assert decision.anchor_hash == "anchor_hash"
        assert decision.pre_rope == True  # INVARIANT 10

    @pytest.mark.asyncio
    async def test_select_worker_anchor_locality(self):
        """Same anchor_hash routes to same worker (locality)."""
        router = KVAwareRouter(num_workers=2, enable_anchor_locality=True)
        router.register_worker("worker_0")
        router.register_worker("worker_1")

        d1 = await router.select_worker("anchor_x", cla_group=1)
        d2 = await router.select_worker("anchor_x", cla_group=1)
        # Both should route to same worker
        assert d1.target_worker_id == d2.target_worker_id

    @pytest.mark.asyncio
    async def test_select_worker_load_balancing(self):
        """With no anchor history, routes to least loaded worker."""
        router = KVAwareRouter(num_workers=3)
        for i in range(3):
            router.register_worker(f"worker_{i}")

        decision = await router.select_worker("new_anchor", cla_group=None)
        assert decision.target_worker_id.startswith("worker_")

    @pytest.mark.asyncio
    async def test_update_worker_state(self):
        """update_worker_state() updates worker load and CLA groups."""
        router = KVAwareRouter(num_workers=2)
        router.register_worker("worker_0")

        await router.update_worker_state("worker_0", load=0.75, cla_group=2, workflow_step=5)

        stats = router.get_stats()
        assert stats["worker_loads"]["worker_0"]["load"] == 0.75

    @pytest.mark.asyncio
    async def test_broadcast_new_blocks(self):
        """broadcast_new_blocks() updates routing table."""
        router = KVAwareRouter(num_workers=2)
        router.register_worker("worker_0")

        await router.broadcast_new_blocks("anchor_abc", ["b0", "b1"], "worker_0")

        # Verify anchor now maps to worker
        worker = router.get_worker_for_anchor("anchor_abc")
        assert worker == "worker_0"

    def test_get_stats_returns_worker_states(self):
        """get_stats() returns worker loads and CLA groups."""
        router = KVAwareRouter(num_workers=2)
        router.register_worker("worker_0")
        router.register_worker("worker_1")

        stats = router.get_stats()
        assert "worker_loads" in stats
        assert "worker_0" in stats["worker_loads"]
        assert "worker_1" in stats["worker_loads"]