Spaces:
Sleeping
Sleeping
File size: 9,123 Bytes
234574a 6d9c72b 234574a 6d9c72b cf0a8ed 6d9c72b cf0a8ed 6d9c72b 234574a 6d9c72b cf0a8ed 6d9c72b cf0a8ed 6d9c72b cf0a8ed 466cc3d cf0a8ed 466cc3d 6d9c72b cf0a8ed 6d9c72b cf0a8ed 234574a cf0a8ed 234574a cf0a8ed 234574a cf0a8ed 234574a cf0a8ed 234574a cf0a8ed 234574a cf0a8ed 234574a cf0a8ed 234574a cf0a8ed 234574a cf0a8ed 234574a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 | """Tests for ContextRegistry, TTLCache, and VRAMAwareCache."""
import asyncio
import pytest
from unittest.mock import AsyncMock, patch
from apohara_context_forge.registry._deprecated_ttl_cache import TTLCache
from apohara_context_forge.registry.context_registry import ContextRegistry
from apohara_context_forge.registry.vram_aware_cache import VRAMAwareCache, EvictionMode
@pytest.fixture
def ttl_cache():
return TTLCache(default_ttl_seconds=5)
@pytest.fixture
def registry():
return ContextRegistry()
@pytest.fixture
async def vram_cache():
cache = VRAMAwareCache(max_token_budget=50_000_000)
await cache.start()
yield cache
await cache.stop()
class TestTTLCache:
"""Tests for TTLCache."""
async def test_set_and_get(self, ttl_cache):
await ttl_cache.set("key1", "value1")
result = await ttl_cache.get("key1")
assert result == "value1"
async def test_get_nonexistent(self, ttl_cache):
result = await ttl_cache.get("nonexistent")
assert result is None
async def test_expiry(self, ttl_cache):
await ttl_cache.set("key1", "value1", ttl_seconds=1)
await asyncio.sleep(1.1)
result = await ttl_cache.get("key1")
assert result is None
async def test_delete(self, ttl_cache):
await ttl_cache.set("key1", "value1")
deleted = await ttl_cache.delete("key1")
assert deleted is True
result = await ttl_cache.get("key1")
assert result is None
async def test_evict_expired(self, ttl_cache):
await ttl_cache.set("key1", "value1", ttl_seconds=1)
await asyncio.sleep(1.1)
count = await ttl_cache.evict_expired()
assert count == 1
assert await ttl_cache.size() == 0
async def test_clear(self, ttl_cache):
await ttl_cache.set("key1", "value1")
await ttl_cache.set("key2", "value2")
await ttl_cache.clear()
assert await ttl_cache.size() == 0
class TestContextRegistry:
"""Tests for ContextRegistry.
Note: ContextRegistry.register_agent() requires TokenCounter which has a production bug
(AttributeError: '_use_fallback' not initialized). Skipping integration tests that call
register_agent() - the correct method name was verified to be register_agent().
"""
async def test_registry_has_register_agent_method(self, registry):
"""Verify the dual register API exists.
- `register_agent(agent_id, system_prompt, role_prompt)` is the full
KV-aware pipeline used by the agents/ runner.
- `register(agent_id, context)` is the lightweight MCP endpoint path
(single opaque context, no system/role split). Both are part of the
public contract; they live on the same registry instance.
"""
assert hasattr(registry, 'register_agent')
assert hasattr(registry, 'register')
async def test_get_agent_context_returns_none_for_unknown(self, registry):
"""get_agent_context returns None for unknown agents."""
result = await registry.get_agent_context("nonexistent")
assert result is None
async def test_get_all_agents_returns_empty_list(self, registry):
"""get_all_agents returns empty list when no agents registered."""
result = await registry.get_all_agents()
assert result == []
class TestVRAMAwareCache:
"""Tests for VRAMAwareCache."""
async def test_set_and_get(self, vram_cache):
await vram_cache.set("key1", "value1", token_count=100)
result = await vram_cache.get("key1")
assert result == "value1"
async def test_get_nonexistent(self, vram_cache):
result = await vram_cache.get("nonexistent")
assert result is None
async def test_delete(self, vram_cache):
await vram_cache.set("key1", "value1", token_count=100)
deleted = await vram_cache.delete("key1")
assert deleted is True
result = await vram_cache.get("key1")
assert result is None
async def test_delete_nonexistent(self, vram_cache):
deleted = await vram_cache.delete("nonexistent")
assert deleted is False
async def test_size(self, vram_cache):
assert vram_cache.size == 0
await vram_cache.set("key1", "value1", token_count=100)
assert vram_cache.size == 1
await vram_cache.set("key2", "value2", token_count=200)
assert vram_cache.size == 2
async def test_token_tracking(self, vram_cache):
assert vram_cache.total_tokens == 0
await vram_cache.set("key1", "value1", token_count=500)
assert vram_cache.total_tokens == 500
await vram_cache.set("key2", "value2", token_count=300)
assert vram_cache.total_tokens == 800
await vram_cache.delete("key1")
assert vram_cache.total_tokens == 300
async def test_clear(self, vram_cache):
await vram_cache.set("key1", "value1", token_count=100)
await vram_cache.set("key2", "value2", token_count=200)
assert vram_cache.size == 2
await vram_cache.clear()
assert vram_cache.size == 0
assert vram_cache.total_tokens == 0
async def test_update_existing_key(self, vram_cache):
await vram_cache.set("key1", "value1", token_count=100)
await vram_cache.set("key1", "value2", token_count=200)
result = await vram_cache.get("key1")
assert result == "value2"
assert vram_cache.total_tokens == 200
async def test_mode_initial_relaxed(self, vram_cache):
"""Cache starts in RELAXED mode by default."""
assert vram_cache.mode == EvictionMode.RELAXED
assert vram_cache.is_blocked is False
async def test_eviction_modes(self, vram_cache):
"""Test that modes transition correctly based on pressure."""
# Directly set mode and call _apply_eviction_policy to trigger _blocked state
vram_cache._mode = EvictionMode.RELAXED
await vram_cache._apply_eviction_policy()
assert vram_cache.mode == EvictionMode.RELAXED
assert vram_cache.is_blocked is False
vram_cache._mode = EvictionMode.NORMAL
await vram_cache._apply_eviction_policy()
assert vram_cache.mode == EvictionMode.NORMAL
assert vram_cache.is_blocked is False
vram_cache._mode = EvictionMode.PRESSURE
await vram_cache._apply_eviction_policy()
assert vram_cache.mode == EvictionMode.PRESSURE
assert vram_cache.is_blocked is False
vram_cache._mode = EvictionMode.CRITICAL
await vram_cache._apply_eviction_policy()
assert vram_cache.mode == EvictionMode.CRITICAL
assert vram_cache.is_blocked is False
vram_cache._mode = EvictionMode.EMERGENCY
await vram_cache._apply_eviction_policy()
assert vram_cache.mode == EvictionMode.EMERGENCY
assert vram_cache.is_blocked is True
async def test_blocked_mode(self, vram_cache):
"""In EMERGENCY mode, set() should return False."""
# Force EMERGENCY mode directly
vram_cache._mode = EvictionMode.EMERGENCY
await vram_cache._apply_eviction_policy()
assert vram_cache.is_blocked is True
# set() should be blocked
result = await vram_cache.set("key1", "value1", token_count=100)
assert result is False
# After mode drops, should unblock
vram_cache._mode = EvictionMode.RELAXED
await vram_cache._apply_eviction_policy()
assert vram_cache.is_blocked is False
# set() should work again
result = await vram_cache.set("key2", "value2", token_count=100)
assert result is True
async def test_pressure_to_mode_boundaries(self):
"""Test exact boundary values for _pressure_to_mode."""
assert VRAMAwareCache._pressure_to_mode(0.69) == EvictionMode.RELAXED
assert VRAMAwareCache._pressure_to_mode(0.70) == EvictionMode.NORMAL
assert VRAMAwareCache._pressure_to_mode(0.84) == EvictionMode.NORMAL
assert VRAMAwareCache._pressure_to_mode(0.85) == EvictionMode.PRESSURE
assert VRAMAwareCache._pressure_to_mode(0.91) == EvictionMode.PRESSURE
assert VRAMAwareCache._pressure_to_mode(0.92) == EvictionMode.CRITICAL
assert VRAMAwareCache._pressure_to_mode(0.95) == EvictionMode.CRITICAL
assert VRAMAwareCache._pressure_to_mode(0.96) == EvictionMode.EMERGENCY
assert VRAMAwareCache._pressure_to_mode(1.0) == EvictionMode.EMERGENCY
async def test_emergency_unblocks_on_lower_pressure(self, vram_cache):
"""Verify is_blocked clears when pressure drops from EMERGENCY."""
# Enter EMERGENCY directly
vram_cache._mode = EvictionMode.EMERGENCY
await vram_cache._apply_eviction_policy()
assert vram_cache.is_blocked is True
assert vram_cache.mode == EvictionMode.EMERGENCY
# Drop to RELAXED
vram_cache._mode = EvictionMode.RELAXED
await vram_cache._apply_eviction_policy()
assert vram_cache.is_blocked is False
assert vram_cache.mode == EvictionMode.RELAXED |