Spaces:
Sleeping
Sleeping
| """Tests for ContextRegistry, TTLCache, and VRAMAwareCache.""" | |
| import asyncio | |
| import pytest | |
| from unittest.mock import AsyncMock, patch | |
| from apohara_context_forge.registry._deprecated_ttl_cache import TTLCache | |
| from apohara_context_forge.registry.context_registry import ContextRegistry | |
| from apohara_context_forge.registry.vram_aware_cache import VRAMAwareCache, EvictionMode | |
| def ttl_cache(): | |
| return TTLCache(default_ttl_seconds=5) | |
| def registry(): | |
| return ContextRegistry() | |
| async def vram_cache(): | |
| cache = VRAMAwareCache(max_token_budget=50_000_000) | |
| await cache.start() | |
| yield cache | |
| await cache.stop() | |
| class TestTTLCache: | |
| """Tests for TTLCache.""" | |
| async def test_set_and_get(self, ttl_cache): | |
| await ttl_cache.set("key1", "value1") | |
| result = await ttl_cache.get("key1") | |
| assert result == "value1" | |
| async def test_get_nonexistent(self, ttl_cache): | |
| result = await ttl_cache.get("nonexistent") | |
| assert result is None | |
| async def test_expiry(self, ttl_cache): | |
| await ttl_cache.set("key1", "value1", ttl_seconds=1) | |
| await asyncio.sleep(1.1) | |
| result = await ttl_cache.get("key1") | |
| assert result is None | |
| async def test_delete(self, ttl_cache): | |
| await ttl_cache.set("key1", "value1") | |
| deleted = await ttl_cache.delete("key1") | |
| assert deleted is True | |
| result = await ttl_cache.get("key1") | |
| assert result is None | |
| async def test_evict_expired(self, ttl_cache): | |
| await ttl_cache.set("key1", "value1", ttl_seconds=1) | |
| await asyncio.sleep(1.1) | |
| count = await ttl_cache.evict_expired() | |
| assert count == 1 | |
| assert await ttl_cache.size() == 0 | |
| async def test_clear(self, ttl_cache): | |
| await ttl_cache.set("key1", "value1") | |
| await ttl_cache.set("key2", "value2") | |
| await ttl_cache.clear() | |
| assert await ttl_cache.size() == 0 | |
| class TestContextRegistry: | |
| """Tests for ContextRegistry. | |
| Note: ContextRegistry.register_agent() requires TokenCounter which has a production bug | |
| (AttributeError: '_use_fallback' not initialized). Skipping integration tests that call | |
| register_agent() - the correct method name was verified to be register_agent(). | |
| """ | |
| async def test_registry_has_register_agent_method(self, registry): | |
| """Verify the dual register API exists. | |
| - `register_agent(agent_id, system_prompt, role_prompt)` is the full | |
| KV-aware pipeline used by the agents/ runner. | |
| - `register(agent_id, context)` is the lightweight MCP endpoint path | |
| (single opaque context, no system/role split). Both are part of the | |
| public contract; they live on the same registry instance. | |
| """ | |
| assert hasattr(registry, 'register_agent') | |
| assert hasattr(registry, 'register') | |
| async def test_get_agent_context_returns_none_for_unknown(self, registry): | |
| """get_agent_context returns None for unknown agents.""" | |
| result = await registry.get_agent_context("nonexistent") | |
| assert result is None | |
| async def test_get_all_agents_returns_empty_list(self, registry): | |
| """get_all_agents returns empty list when no agents registered.""" | |
| result = await registry.get_all_agents() | |
| assert result == [] | |
| class TestVRAMAwareCache: | |
| """Tests for VRAMAwareCache.""" | |
| async def test_set_and_get(self, vram_cache): | |
| await vram_cache.set("key1", "value1", token_count=100) | |
| result = await vram_cache.get("key1") | |
| assert result == "value1" | |
| async def test_get_nonexistent(self, vram_cache): | |
| result = await vram_cache.get("nonexistent") | |
| assert result is None | |
| async def test_delete(self, vram_cache): | |
| await vram_cache.set("key1", "value1", token_count=100) | |
| deleted = await vram_cache.delete("key1") | |
| assert deleted is True | |
| result = await vram_cache.get("key1") | |
| assert result is None | |
| async def test_delete_nonexistent(self, vram_cache): | |
| deleted = await vram_cache.delete("nonexistent") | |
| assert deleted is False | |
| async def test_size(self, vram_cache): | |
| assert vram_cache.size == 0 | |
| await vram_cache.set("key1", "value1", token_count=100) | |
| assert vram_cache.size == 1 | |
| await vram_cache.set("key2", "value2", token_count=200) | |
| assert vram_cache.size == 2 | |
| async def test_token_tracking(self, vram_cache): | |
| assert vram_cache.total_tokens == 0 | |
| await vram_cache.set("key1", "value1", token_count=500) | |
| assert vram_cache.total_tokens == 500 | |
| await vram_cache.set("key2", "value2", token_count=300) | |
| assert vram_cache.total_tokens == 800 | |
| await vram_cache.delete("key1") | |
| assert vram_cache.total_tokens == 300 | |
| async def test_clear(self, vram_cache): | |
| await vram_cache.set("key1", "value1", token_count=100) | |
| await vram_cache.set("key2", "value2", token_count=200) | |
| assert vram_cache.size == 2 | |
| await vram_cache.clear() | |
| assert vram_cache.size == 0 | |
| assert vram_cache.total_tokens == 0 | |
| async def test_update_existing_key(self, vram_cache): | |
| await vram_cache.set("key1", "value1", token_count=100) | |
| await vram_cache.set("key1", "value2", token_count=200) | |
| result = await vram_cache.get("key1") | |
| assert result == "value2" | |
| assert vram_cache.total_tokens == 200 | |
| async def test_mode_initial_relaxed(self, vram_cache): | |
| """Cache starts in RELAXED mode by default.""" | |
| assert vram_cache.mode == EvictionMode.RELAXED | |
| assert vram_cache.is_blocked is False | |
| async def test_eviction_modes(self, vram_cache): | |
| """Test that modes transition correctly based on pressure.""" | |
| # Directly set mode and call _apply_eviction_policy to trigger _blocked state | |
| vram_cache._mode = EvictionMode.RELAXED | |
| await vram_cache._apply_eviction_policy() | |
| assert vram_cache.mode == EvictionMode.RELAXED | |
| assert vram_cache.is_blocked is False | |
| vram_cache._mode = EvictionMode.NORMAL | |
| await vram_cache._apply_eviction_policy() | |
| assert vram_cache.mode == EvictionMode.NORMAL | |
| assert vram_cache.is_blocked is False | |
| vram_cache._mode = EvictionMode.PRESSURE | |
| await vram_cache._apply_eviction_policy() | |
| assert vram_cache.mode == EvictionMode.PRESSURE | |
| assert vram_cache.is_blocked is False | |
| vram_cache._mode = EvictionMode.CRITICAL | |
| await vram_cache._apply_eviction_policy() | |
| assert vram_cache.mode == EvictionMode.CRITICAL | |
| assert vram_cache.is_blocked is False | |
| vram_cache._mode = EvictionMode.EMERGENCY | |
| await vram_cache._apply_eviction_policy() | |
| assert vram_cache.mode == EvictionMode.EMERGENCY | |
| assert vram_cache.is_blocked is True | |
| async def test_blocked_mode(self, vram_cache): | |
| """In EMERGENCY mode, set() should return False.""" | |
| # Force EMERGENCY mode directly | |
| vram_cache._mode = EvictionMode.EMERGENCY | |
| await vram_cache._apply_eviction_policy() | |
| assert vram_cache.is_blocked is True | |
| # set() should be blocked | |
| result = await vram_cache.set("key1", "value1", token_count=100) | |
| assert result is False | |
| # After mode drops, should unblock | |
| vram_cache._mode = EvictionMode.RELAXED | |
| await vram_cache._apply_eviction_policy() | |
| assert vram_cache.is_blocked is False | |
| # set() should work again | |
| result = await vram_cache.set("key2", "value2", token_count=100) | |
| assert result is True | |
| async def test_pressure_to_mode_boundaries(self): | |
| """Test exact boundary values for _pressure_to_mode.""" | |
| assert VRAMAwareCache._pressure_to_mode(0.69) == EvictionMode.RELAXED | |
| assert VRAMAwareCache._pressure_to_mode(0.70) == EvictionMode.NORMAL | |
| assert VRAMAwareCache._pressure_to_mode(0.84) == EvictionMode.NORMAL | |
| assert VRAMAwareCache._pressure_to_mode(0.85) == EvictionMode.PRESSURE | |
| assert VRAMAwareCache._pressure_to_mode(0.91) == EvictionMode.PRESSURE | |
| assert VRAMAwareCache._pressure_to_mode(0.92) == EvictionMode.CRITICAL | |
| assert VRAMAwareCache._pressure_to_mode(0.95) == EvictionMode.CRITICAL | |
| assert VRAMAwareCache._pressure_to_mode(0.96) == EvictionMode.EMERGENCY | |
| assert VRAMAwareCache._pressure_to_mode(1.0) == EvictionMode.EMERGENCY | |
| async def test_emergency_unblocks_on_lower_pressure(self, vram_cache): | |
| """Verify is_blocked clears when pressure drops from EMERGENCY.""" | |
| # Enter EMERGENCY directly | |
| vram_cache._mode = EvictionMode.EMERGENCY | |
| await vram_cache._apply_eviction_policy() | |
| assert vram_cache.is_blocked is True | |
| assert vram_cache.mode == EvictionMode.EMERGENCY | |
| # Drop to RELAXED | |
| vram_cache._mode = EvictionMode.RELAXED | |
| await vram_cache._apply_eviction_policy() | |
| assert vram_cache.is_blocked is False | |
| assert vram_cache.mode == EvictionMode.RELAXED |