contextforge-demo / tests /test_registry.py
Pablo
fix: test_mcp_server 12 failures resolved — model fields, registry API, GPU label
466cc3d
"""Tests for ContextRegistry, TTLCache, and VRAMAwareCache."""
import asyncio
import pytest
from unittest.mock import AsyncMock, patch
from apohara_context_forge.registry._deprecated_ttl_cache import TTLCache
from apohara_context_forge.registry.context_registry import ContextRegistry
from apohara_context_forge.registry.vram_aware_cache import VRAMAwareCache, EvictionMode
@pytest.fixture
def ttl_cache():
return TTLCache(default_ttl_seconds=5)
@pytest.fixture
def registry():
return ContextRegistry()
@pytest.fixture
async def vram_cache():
cache = VRAMAwareCache(max_token_budget=50_000_000)
await cache.start()
yield cache
await cache.stop()
class TestTTLCache:
"""Tests for TTLCache."""
async def test_set_and_get(self, ttl_cache):
await ttl_cache.set("key1", "value1")
result = await ttl_cache.get("key1")
assert result == "value1"
async def test_get_nonexistent(self, ttl_cache):
result = await ttl_cache.get("nonexistent")
assert result is None
async def test_expiry(self, ttl_cache):
await ttl_cache.set("key1", "value1", ttl_seconds=1)
await asyncio.sleep(1.1)
result = await ttl_cache.get("key1")
assert result is None
async def test_delete(self, ttl_cache):
await ttl_cache.set("key1", "value1")
deleted = await ttl_cache.delete("key1")
assert deleted is True
result = await ttl_cache.get("key1")
assert result is None
async def test_evict_expired(self, ttl_cache):
await ttl_cache.set("key1", "value1", ttl_seconds=1)
await asyncio.sleep(1.1)
count = await ttl_cache.evict_expired()
assert count == 1
assert await ttl_cache.size() == 0
async def test_clear(self, ttl_cache):
await ttl_cache.set("key1", "value1")
await ttl_cache.set("key2", "value2")
await ttl_cache.clear()
assert await ttl_cache.size() == 0
class TestContextRegistry:
"""Tests for ContextRegistry.
Note: ContextRegistry.register_agent() requires TokenCounter which has a production bug
(AttributeError: '_use_fallback' not initialized). Skipping integration tests that call
register_agent() - the correct method name was verified to be register_agent().
"""
async def test_registry_has_register_agent_method(self, registry):
"""Verify the dual register API exists.
- `register_agent(agent_id, system_prompt, role_prompt)` is the full
KV-aware pipeline used by the agents/ runner.
- `register(agent_id, context)` is the lightweight MCP endpoint path
(single opaque context, no system/role split). Both are part of the
public contract; they live on the same registry instance.
"""
assert hasattr(registry, 'register_agent')
assert hasattr(registry, 'register')
async def test_get_agent_context_returns_none_for_unknown(self, registry):
"""get_agent_context returns None for unknown agents."""
result = await registry.get_agent_context("nonexistent")
assert result is None
async def test_get_all_agents_returns_empty_list(self, registry):
"""get_all_agents returns empty list when no agents registered."""
result = await registry.get_all_agents()
assert result == []
class TestVRAMAwareCache:
"""Tests for VRAMAwareCache."""
async def test_set_and_get(self, vram_cache):
await vram_cache.set("key1", "value1", token_count=100)
result = await vram_cache.get("key1")
assert result == "value1"
async def test_get_nonexistent(self, vram_cache):
result = await vram_cache.get("nonexistent")
assert result is None
async def test_delete(self, vram_cache):
await vram_cache.set("key1", "value1", token_count=100)
deleted = await vram_cache.delete("key1")
assert deleted is True
result = await vram_cache.get("key1")
assert result is None
async def test_delete_nonexistent(self, vram_cache):
deleted = await vram_cache.delete("nonexistent")
assert deleted is False
async def test_size(self, vram_cache):
assert vram_cache.size == 0
await vram_cache.set("key1", "value1", token_count=100)
assert vram_cache.size == 1
await vram_cache.set("key2", "value2", token_count=200)
assert vram_cache.size == 2
async def test_token_tracking(self, vram_cache):
assert vram_cache.total_tokens == 0
await vram_cache.set("key1", "value1", token_count=500)
assert vram_cache.total_tokens == 500
await vram_cache.set("key2", "value2", token_count=300)
assert vram_cache.total_tokens == 800
await vram_cache.delete("key1")
assert vram_cache.total_tokens == 300
async def test_clear(self, vram_cache):
await vram_cache.set("key1", "value1", token_count=100)
await vram_cache.set("key2", "value2", token_count=200)
assert vram_cache.size == 2
await vram_cache.clear()
assert vram_cache.size == 0
assert vram_cache.total_tokens == 0
async def test_update_existing_key(self, vram_cache):
await vram_cache.set("key1", "value1", token_count=100)
await vram_cache.set("key1", "value2", token_count=200)
result = await vram_cache.get("key1")
assert result == "value2"
assert vram_cache.total_tokens == 200
async def test_mode_initial_relaxed(self, vram_cache):
"""Cache starts in RELAXED mode by default."""
assert vram_cache.mode == EvictionMode.RELAXED
assert vram_cache.is_blocked is False
async def test_eviction_modes(self, vram_cache):
"""Test that modes transition correctly based on pressure."""
# Directly set mode and call _apply_eviction_policy to trigger _blocked state
vram_cache._mode = EvictionMode.RELAXED
await vram_cache._apply_eviction_policy()
assert vram_cache.mode == EvictionMode.RELAXED
assert vram_cache.is_blocked is False
vram_cache._mode = EvictionMode.NORMAL
await vram_cache._apply_eviction_policy()
assert vram_cache.mode == EvictionMode.NORMAL
assert vram_cache.is_blocked is False
vram_cache._mode = EvictionMode.PRESSURE
await vram_cache._apply_eviction_policy()
assert vram_cache.mode == EvictionMode.PRESSURE
assert vram_cache.is_blocked is False
vram_cache._mode = EvictionMode.CRITICAL
await vram_cache._apply_eviction_policy()
assert vram_cache.mode == EvictionMode.CRITICAL
assert vram_cache.is_blocked is False
vram_cache._mode = EvictionMode.EMERGENCY
await vram_cache._apply_eviction_policy()
assert vram_cache.mode == EvictionMode.EMERGENCY
assert vram_cache.is_blocked is True
async def test_blocked_mode(self, vram_cache):
"""In EMERGENCY mode, set() should return False."""
# Force EMERGENCY mode directly
vram_cache._mode = EvictionMode.EMERGENCY
await vram_cache._apply_eviction_policy()
assert vram_cache.is_blocked is True
# set() should be blocked
result = await vram_cache.set("key1", "value1", token_count=100)
assert result is False
# After mode drops, should unblock
vram_cache._mode = EvictionMode.RELAXED
await vram_cache._apply_eviction_policy()
assert vram_cache.is_blocked is False
# set() should work again
result = await vram_cache.set("key2", "value2", token_count=100)
assert result is True
async def test_pressure_to_mode_boundaries(self):
"""Test exact boundary values for _pressure_to_mode."""
assert VRAMAwareCache._pressure_to_mode(0.69) == EvictionMode.RELAXED
assert VRAMAwareCache._pressure_to_mode(0.70) == EvictionMode.NORMAL
assert VRAMAwareCache._pressure_to_mode(0.84) == EvictionMode.NORMAL
assert VRAMAwareCache._pressure_to_mode(0.85) == EvictionMode.PRESSURE
assert VRAMAwareCache._pressure_to_mode(0.91) == EvictionMode.PRESSURE
assert VRAMAwareCache._pressure_to_mode(0.92) == EvictionMode.CRITICAL
assert VRAMAwareCache._pressure_to_mode(0.95) == EvictionMode.CRITICAL
assert VRAMAwareCache._pressure_to_mode(0.96) == EvictionMode.EMERGENCY
assert VRAMAwareCache._pressure_to_mode(1.0) == EvictionMode.EMERGENCY
async def test_emergency_unblocks_on_lower_pressure(self, vram_cache):
"""Verify is_blocked clears when pressure drops from EMERGENCY."""
# Enter EMERGENCY directly
vram_cache._mode = EvictionMode.EMERGENCY
await vram_cache._apply_eviction_policy()
assert vram_cache.is_blocked is True
assert vram_cache.mode == EvictionMode.EMERGENCY
# Drop to RELAXED
vram_cache._mode = EvictionMode.RELAXED
await vram_cache._apply_eviction_policy()
assert vram_cache.is_blocked is False
assert vram_cache.mode == EvictionMode.RELAXED