File size: 9,120 Bytes
7f8d8d7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 | """
Integration tests for ml-intern production server.
Tests cover:
- Rate limiting across distributed instances
- Circuit breaker state transitions
- Cache hit/miss behavior
- Budget enforcement
- Session isolation
- Health check endpoints
- Graceful shutdown handling
"""
import asyncio
import hashlib
import json
import os
import sys
import time
import uuid
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import redis.asyncio as aioredis
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from production_server import (
CircuitBreaker,
ConcurrencyLimiter,
CostTracker,
RedisManager,
generate_cache_key,
get_provider_from_model,
estimate_cost,
ChatRequest,
)
@pytest.fixture
async def redis_manager():
manager = MagicMock(spec=RedisManager)
manager.get_cache = AsyncMock(return_value=None)
manager.set_cache = AsyncMock()
manager.delete_cache = AsyncMock()
async def check_limit(key, rpm):
return True, 0.0
manager.check_rate_limit = check_limit
async def get_circuit(provider):
return {"state": "closed", "failures": 0, "last_failure": 0}
manager.get_circuit_state = get_circuit
async def set_circuit(provider, state):
pass
manager.set_circuit_state = set_circuit
yield manager
@pytest.fixture
def concurrency_limiter():
return ConcurrencyLimiter(10)
class TestProviderResolution:
def test_cloud_providers(self):
assert get_provider_from_model("anthropic/claude-opus-4") == ("anthropic", "anthropic/claude-opus-4")
assert get_provider_from_model("openai/gpt-5") == ("openai", "openai/gpt-5")
def test_free_tier_providers(self):
assert get_provider_from_model("groq/llama-3.3-70b") == ("groq", "llama-3.3-70b")
def test_nim_provider(self):
assert get_provider_from_model("nim/llama-3-8b") == ("nim", "llama-3-8b")
assert get_provider_from_model("nim/llama-3.1-405b-instruct") == ("nim", "llama-3.1-405b-instruct")
def test_local_providers(self):
assert get_provider_from_model("ollama/llama3.1") == ("ollama", "llama3.1")
assert get_provider_from_model("vllm/llama-3-8b") == ("vllm", "llama-3-8b")
assert get_provider_from_model("llamacpp/llama-3-8b") == ("llamacpp", "llama-3-8b")
assert get_provider_from_model("lmstudio/llama-3-8b") == ("lmstudio", "llama-3-8b")
assert get_provider_from_model("mlx/llama-3-8b") == ("mlx", "llama-3-8b")
assert get_provider_from_model("tgi/llama-3-8b") == ("tgi", "llama-3-8b")
assert get_provider_from_model("local/my-model") == ("local", "my-model")
def test_default_provider(self):
assert get_provider_from_model("some-model") == ("huggingface", "some-model")
class TestCostEstimation:
def test_anthropic_cost(self):
cost = estimate_cost("anthropic", "claude-opus-4", 1000000, 1000000)
assert abs(cost - 90.0) < 1
def test_openai_cost(self):
cost = estimate_cost("openai", "gpt-5", 1000000, 1000000)
assert abs(cost - 12.5) < 1
def test_free_providers_zero_cost(self):
for provider in ["groq", "nim", "ollama", "vllm", "llamacpp", "lmstudio", "mlx", "tgi", "local", "huggingface"]:
cost = estimate_cost(provider, "test-model", 1000000, 1000000)
assert cost == 0.0, f"Provider {provider} should have zero cost"
class TestCacheKeyGeneration:
def test_deterministic_keys(self):
req1 = ChatRequest(
model="groq/llama-3.3-70b",
messages=[{"role": "user", "content": "Hello"}],
temperature=0.7,
)
req2 = ChatRequest(
model="groq/llama-3.3-70b",
messages=[{"role": "user", "content": "Hello"}],
temperature=0.7,
)
assert generate_cache_key(req1) == generate_cache_key(req2)
def test_different_content_different_keys(self):
req1 = ChatRequest(
model="groq/llama-3.3-70b",
messages=[{"role": "user", "content": "Hello"}],
)
req2 = ChatRequest(
model="groq/llama-3.3-70b",
messages=[{"role": "user", "content": "World"}],
)
assert generate_cache_key(req1) != generate_cache_key(req2)
def test_stream_not_in_cache_key(self):
req1 = ChatRequest(
model="groq/llama-3.3-70b",
messages=[{"role": "user", "content": "Hello"}],
stream=False,
)
req2 = ChatRequest(
model="groq/llama-3.3-70b",
messages=[{"role": "user", "content": "Hello"}],
stream=True,
)
assert generate_cache_key(req1) == generate_cache_key(req2)
class TestCircuitBreaker:
@pytest.mark.asyncio
async def test_initially_closed(self, redis_manager):
cb = CircuitBreaker(redis_manager, "groq")
assert await cb.can_execute()
@pytest.mark.asyncio
async def test_opens_after_threshold(self, redis_manager):
cb = CircuitBreaker(redis_manager, "groq")
for _ in range(5):
await cb.record_failure()
redis_manager.get_circuit_state = AsyncMock(return_value={
"state": "open",
"failures": 5,
"last_failure": time.time(),
})
assert not await cb.can_execute()
@pytest.mark.asyncio
async def test_half_open_after_timeout(self, redis_manager):
cb = CircuitBreaker(redis_manager, "groq")
redis_manager.get_circuit_state = AsyncMock(return_value={
"state": "open",
"failures": 5,
"last_failure": time.time() - 120,
})
assert await cb.can_execute()
@pytest.mark.asyncio
async def test_closes_on_success(self, redis_manager):
cb = CircuitBreaker(redis_manager, "groq")
redis_manager.get_circuit_state = AsyncMock(return_value={
"state": "half-open",
"failures": 0,
"last_failure": 0,
})
await cb.record_success()
redis_manager.get_circuit_state = AsyncMock(return_value={
"state": "closed",
"failures": 0,
"last_failure": 0,
})
assert await cb.can_execute()
class TestBudgetTracking:
def test_can_spend_within_budget(self):
tracker = CostTracker("session-1", budget_usd=10.0)
assert tracker.can_spend(5.0)
def test_cannot_exceed_budget(self):
tracker = CostTracker("session-1", budget_usd=10.0)
tracker.spent_usd = 8.0
assert not tracker.can_spend(3.0)
def test_exact_budget_boundary(self):
tracker = CostTracker("session-1", budget_usd=10.0)
tracker.spent_usd = 5.0
assert tracker.can_spend(5.0)
assert not tracker.can_spend(5.01)
def test_zero_budget(self):
tracker = CostTracker("session-1", budget_usd=0.0)
assert not tracker.can_spend(0.01)
class TestConcurrencyLimiter:
@pytest.mark.asyncio
async def test_acquire_release(self):
limiter = ConcurrencyLimiter(2)
await limiter.acquire()
limiter.release()
assert True
@pytest.mark.asyncio
async def test_blocks_at_limit(self):
limiter = ConcurrencyLimiter(1)
await limiter.acquire()
task = asyncio.create_task(limiter.acquire())
await asyncio.sleep(0.1)
limiter.release()
await asyncio.wait_for(task, timeout=2.0)
class TestRateLimiting:
@pytest.mark.asyncio
async def test_token_bucket_allows_requests(self):
manager = MagicMock()
async def mock_check(key, rpm):
return True, 0.0
manager.check_rate_limit = mock_check
allowed, retry = await manager.check_rate_limit("groq:session-1", 40)
assert allowed
assert retry == 0.0
@pytest.mark.asyncio
async def test_token_bucket_denies_when_empty(self):
manager = MagicMock()
async def mock_check_denied(key, rpm):
return False, 1.5
manager.check_rate_limit = mock_check_denied
allowed, retry = await manager.check_rate_limit("groq:session-1", 40)
assert not allowed
assert retry > 0
class TestEndToEndFlow:
@pytest.mark.asyncio
async def test_full_request_flow(self, redis_manager):
session_id = str(uuid.uuid4())
provider = "groq"
model = "llama-3.3-70b-versatile"
allowed, _ = await redis_manager.check_rate_limit(f"{provider}:{session_id}", 30)
assert allowed
tracker = CostTracker(session_id, budget_usd=10.0)
estimated_cost = estimate_cost(provider, model, 1000, 500)
assert tracker.can_spend(estimated_cost)
cb = CircuitBreaker(redis_manager, provider)
assert await cb.can_execute()
tracker.record_spend(estimated_cost)
await cb.record_success()
assert tracker.spent_usd > 0
assert tracker.spent_usd <= tracker.budget_usd
if __name__ == "__main__":
pytest.main([__file__, "-v"])
|