File size: 9,123 Bytes
234574a
6d9c72b
 
234574a
6d9c72b
cf0a8ed
 
 
6d9c72b
 
 
 
 
 
 
 
 
cf0a8ed
6d9c72b
 
234574a
 
 
 
 
 
 
 
6d9c72b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cf0a8ed
6d9c72b
cf0a8ed
 
 
 
6d9c72b
cf0a8ed
466cc3d
 
 
 
 
 
 
 
cf0a8ed
466cc3d
6d9c72b
cf0a8ed
 
 
 
6d9c72b
cf0a8ed
 
 
 
234574a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cf0a8ed
 
 
 
 
234574a
cf0a8ed
 
 
 
234574a
cf0a8ed
 
 
 
234574a
cf0a8ed
 
 
 
234574a
cf0a8ed
 
 
 
234574a
 
 
cf0a8ed
 
 
 
234574a
 
 
 
 
cf0a8ed
 
 
 
234574a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cf0a8ed
 
 
234574a
 
 
 
cf0a8ed
 
234574a
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
"""Tests for ContextRegistry, TTLCache, and VRAMAwareCache."""
import asyncio
import pytest
from unittest.mock import AsyncMock, patch

from apohara_context_forge.registry._deprecated_ttl_cache import TTLCache
from apohara_context_forge.registry.context_registry import ContextRegistry
from apohara_context_forge.registry.vram_aware_cache import VRAMAwareCache, EvictionMode


@pytest.fixture
def ttl_cache():
    return TTLCache(default_ttl_seconds=5)


@pytest.fixture
def registry():
    return ContextRegistry()


@pytest.fixture
async def vram_cache():
    cache = VRAMAwareCache(max_token_budget=50_000_000)
    await cache.start()
    yield cache
    await cache.stop()


class TestTTLCache:
    """Tests for TTLCache."""

    async def test_set_and_get(self, ttl_cache):
        await ttl_cache.set("key1", "value1")
        result = await ttl_cache.get("key1")
        assert result == "value1"

    async def test_get_nonexistent(self, ttl_cache):
        result = await ttl_cache.get("nonexistent")
        assert result is None

    async def test_expiry(self, ttl_cache):
        await ttl_cache.set("key1", "value1", ttl_seconds=1)
        await asyncio.sleep(1.1)
        result = await ttl_cache.get("key1")
        assert result is None

    async def test_delete(self, ttl_cache):
        await ttl_cache.set("key1", "value1")
        deleted = await ttl_cache.delete("key1")
        assert deleted is True
        result = await ttl_cache.get("key1")
        assert result is None

    async def test_evict_expired(self, ttl_cache):
        await ttl_cache.set("key1", "value1", ttl_seconds=1)
        await asyncio.sleep(1.1)
        count = await ttl_cache.evict_expired()
        assert count == 1
        assert await ttl_cache.size() == 0

    async def test_clear(self, ttl_cache):
        await ttl_cache.set("key1", "value1")
        await ttl_cache.set("key2", "value2")
        await ttl_cache.clear()
        assert await ttl_cache.size() == 0


class TestContextRegistry:
    """Tests for ContextRegistry.

    Note: ContextRegistry.register_agent() requires TokenCounter which has a production bug
    (AttributeError: '_use_fallback' not initialized). Skipping integration tests that call
    register_agent() - the correct method name was verified to be register_agent().
    """

    async def test_registry_has_register_agent_method(self, registry):
        """Verify the dual register API exists.

        - `register_agent(agent_id, system_prompt, role_prompt)` is the full
          KV-aware pipeline used by the agents/ runner.
        - `register(agent_id, context)` is the lightweight MCP endpoint path
          (single opaque context, no system/role split). Both are part of the
          public contract; they live on the same registry instance.
        """
        assert hasattr(registry, 'register_agent')
        assert hasattr(registry, 'register')

    async def test_get_agent_context_returns_none_for_unknown(self, registry):
        """get_agent_context returns None for unknown agents."""
        result = await registry.get_agent_context("nonexistent")
        assert result is None

    async def test_get_all_agents_returns_empty_list(self, registry):
        """get_all_agents returns empty list when no agents registered."""
        result = await registry.get_all_agents()
        assert result == []


class TestVRAMAwareCache:
    """Tests for VRAMAwareCache."""

    async def test_set_and_get(self, vram_cache):
        await vram_cache.set("key1", "value1", token_count=100)
        result = await vram_cache.get("key1")
        assert result == "value1"

    async def test_get_nonexistent(self, vram_cache):
        result = await vram_cache.get("nonexistent")
        assert result is None

    async def test_delete(self, vram_cache):
        await vram_cache.set("key1", "value1", token_count=100)
        deleted = await vram_cache.delete("key1")
        assert deleted is True
        result = await vram_cache.get("key1")
        assert result is None

    async def test_delete_nonexistent(self, vram_cache):
        deleted = await vram_cache.delete("nonexistent")
        assert deleted is False

    async def test_size(self, vram_cache):
        assert vram_cache.size == 0
        await vram_cache.set("key1", "value1", token_count=100)
        assert vram_cache.size == 1
        await vram_cache.set("key2", "value2", token_count=200)
        assert vram_cache.size == 2

    async def test_token_tracking(self, vram_cache):
        assert vram_cache.total_tokens == 0
        await vram_cache.set("key1", "value1", token_count=500)
        assert vram_cache.total_tokens == 500
        await vram_cache.set("key2", "value2", token_count=300)
        assert vram_cache.total_tokens == 800
        await vram_cache.delete("key1")
        assert vram_cache.total_tokens == 300

    async def test_clear(self, vram_cache):
        await vram_cache.set("key1", "value1", token_count=100)
        await vram_cache.set("key2", "value2", token_count=200)
        assert vram_cache.size == 2
        await vram_cache.clear()
        assert vram_cache.size == 0
        assert vram_cache.total_tokens == 0

    async def test_update_existing_key(self, vram_cache):
        await vram_cache.set("key1", "value1", token_count=100)
        await vram_cache.set("key1", "value2", token_count=200)
        result = await vram_cache.get("key1")
        assert result == "value2"
        assert vram_cache.total_tokens == 200

    async def test_mode_initial_relaxed(self, vram_cache):
        """Cache starts in RELAXED mode by default."""
        assert vram_cache.mode == EvictionMode.RELAXED
        assert vram_cache.is_blocked is False

    async def test_eviction_modes(self, vram_cache):
        """Test that modes transition correctly based on pressure."""
        # Directly set mode and call _apply_eviction_policy to trigger _blocked state
        vram_cache._mode = EvictionMode.RELAXED
        await vram_cache._apply_eviction_policy()
        assert vram_cache.mode == EvictionMode.RELAXED
        assert vram_cache.is_blocked is False

        vram_cache._mode = EvictionMode.NORMAL
        await vram_cache._apply_eviction_policy()
        assert vram_cache.mode == EvictionMode.NORMAL
        assert vram_cache.is_blocked is False

        vram_cache._mode = EvictionMode.PRESSURE
        await vram_cache._apply_eviction_policy()
        assert vram_cache.mode == EvictionMode.PRESSURE
        assert vram_cache.is_blocked is False

        vram_cache._mode = EvictionMode.CRITICAL
        await vram_cache._apply_eviction_policy()
        assert vram_cache.mode == EvictionMode.CRITICAL
        assert vram_cache.is_blocked is False

        vram_cache._mode = EvictionMode.EMERGENCY
        await vram_cache._apply_eviction_policy()
        assert vram_cache.mode == EvictionMode.EMERGENCY
        assert vram_cache.is_blocked is True

    async def test_blocked_mode(self, vram_cache):
        """In EMERGENCY mode, set() should return False."""
        # Force EMERGENCY mode directly
        vram_cache._mode = EvictionMode.EMERGENCY
        await vram_cache._apply_eviction_policy()
        assert vram_cache.is_blocked is True

        # set() should be blocked
        result = await vram_cache.set("key1", "value1", token_count=100)
        assert result is False

        # After mode drops, should unblock
        vram_cache._mode = EvictionMode.RELAXED
        await vram_cache._apply_eviction_policy()
        assert vram_cache.is_blocked is False

        # set() should work again
        result = await vram_cache.set("key2", "value2", token_count=100)
        assert result is True

    async def test_pressure_to_mode_boundaries(self):
        """Test exact boundary values for _pressure_to_mode."""
        assert VRAMAwareCache._pressure_to_mode(0.69) == EvictionMode.RELAXED
        assert VRAMAwareCache._pressure_to_mode(0.70) == EvictionMode.NORMAL
        assert VRAMAwareCache._pressure_to_mode(0.84) == EvictionMode.NORMAL
        assert VRAMAwareCache._pressure_to_mode(0.85) == EvictionMode.PRESSURE
        assert VRAMAwareCache._pressure_to_mode(0.91) == EvictionMode.PRESSURE
        assert VRAMAwareCache._pressure_to_mode(0.92) == EvictionMode.CRITICAL
        assert VRAMAwareCache._pressure_to_mode(0.95) == EvictionMode.CRITICAL
        assert VRAMAwareCache._pressure_to_mode(0.96) == EvictionMode.EMERGENCY
        assert VRAMAwareCache._pressure_to_mode(1.0) == EvictionMode.EMERGENCY

    async def test_emergency_unblocks_on_lower_pressure(self, vram_cache):
        """Verify is_blocked clears when pressure drops from EMERGENCY."""
        # Enter EMERGENCY directly
        vram_cache._mode = EvictionMode.EMERGENCY
        await vram_cache._apply_eviction_policy()
        assert vram_cache.is_blocked is True
        assert vram_cache.mode == EvictionMode.EMERGENCY

        # Drop to RELAXED
        vram_cache._mode = EvictionMode.RELAXED
        await vram_cache._apply_eviction_policy()
        assert vram_cache.is_blocked is False
        assert vram_cache.mode == EvictionMode.RELAXED