File size: 6,600 Bytes
4fbc241
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""Comprehensive tests for the trace simulator and sub-components."""
from __future__ import annotations

from llmserve_env.models import QuantizationTier, ServeAction, WorkloadSnapshot
from server.kv_cache_simulator import KVCacheSimulator
from server.speculative_decoder import SpeculativeDecoder
from server.trace_simulator import TraceSimulator


def _make_action(**overrides) -> ServeAction:
    defaults = dict(
        batch_cap=32,
        kv_budget_fraction=1.0,
        speculation_depth=0,
        quantization_tier=QuantizationTier.FP16,
        prefill_decode_split=False,
        priority_routing=False,
    )
    defaults.update(overrides)
    return ServeAction(**defaults)


def _make_workload(**overrides) -> WorkloadSnapshot:
    defaults = dict(
        arrival_rate=10.0,
        queue_depth=20,
        mean_prompt_length=128.0,
        prompt_length_bucket=1,
        priority_fraction=0.0,
        phase="steady",
    )
    defaults.update(overrides)
    return WorkloadSnapshot(**defaults)


# ─── TraceSimulator ───────────────────────────────────────────────

class TestTraceSimulatorSmoke:
    """Basic smoke tests: simulator never crashes on valid input."""

    def test_returns_metrics_snapshot(self):
        sim = TraceSimulator()
        metrics = sim.simulate_step("static_workload", _make_action(), _make_workload())
        assert metrics.throughput_tps > 0
        assert metrics.p50_ttft_ms > 0
        assert metrics.p99_ttft_ms >= metrics.p50_ttft_ms
        assert metrics.gpu_memory_used_gb > 0
        assert metrics.estimated_cost_per_1k > 0

    def test_all_tasks_produce_metrics(self):
        sim = TraceSimulator()
        for task_id in ["static_workload", "bursty_workload", "adversarial_multitenant"]:
            metrics = sim.simulate_step(task_id, _make_action(), _make_workload())
            assert metrics.throughput_tps >= 1.0

    def test_varied_actions_no_crash(self):
        sim = TraceSimulator()
        for batch in [1, 8, 64, 256, 512]:
            for kv in [0.1, 0.5, 1.0]:
                for spec in [0, 2, 8]:
                    action = _make_action(batch_cap=batch, kv_budget_fraction=kv, speculation_depth=spec)
                    metrics = sim.simulate_step("static_workload", action, _make_workload())
                    assert metrics.throughput_tps >= 1.0
                    assert metrics.requests_served >= 0


class TestTraceSimulatorMonotonicity:
    """Higher batch_cap should generally increase throughput."""

    def test_throughput_increases_with_batch(self):
        sim = TraceSimulator()
        workload = _make_workload(queue_depth=200, arrival_rate=50.0)
        throughputs = []
        for batch in [4, 32, 128, 512]:
            action = _make_action(batch_cap=batch)
            metrics = sim.simulate_step("static_workload", action, workload)
            throughputs.append(metrics.throughput_tps)
        # Throughput should be non-decreasing (allow ties)
        for i in range(len(throughputs) - 1):
            assert throughputs[i] <= throughputs[i + 1], f"Throughput decreased: {throughputs}"


class TestTraceSimulatorOOM:
    """High batch + high kv_budget should trigger memory pressure."""

    def test_high_load_caps_memory(self):
        sim = TraceSimulator()
        action = _make_action(batch_cap=512, kv_budget_fraction=1.0)
        workload = _make_workload(queue_depth=500, arrival_rate=200.0, mean_prompt_length=4096.0)
        metrics = sim.simulate_step("adversarial_multitenant", action, workload)
        assert metrics.gpu_memory_used_gb <= 38.0  # OOM cap


class TestTraceSimulatorQuantization:
    """INT8/INT4 should be cheaper and faster than FP16."""

    def test_int8_cheaper_than_fp16(self):
        sim = TraceSimulator()
        workload = _make_workload()
        fp16 = sim.simulate_step("static_workload", _make_action(quantization_tier=QuantizationTier.FP16), workload)
        int8 = sim.simulate_step("static_workload", _make_action(quantization_tier=QuantizationTier.INT8), workload)
        assert int8.estimated_cost_per_1k <= fp16.estimated_cost_per_1k

    def test_int4_faster_than_fp16(self):
        sim = TraceSimulator()
        workload = _make_workload()
        fp16 = sim.simulate_step("static_workload", _make_action(quantization_tier=QuantizationTier.FP16), workload)
        int4 = sim.simulate_step("static_workload", _make_action(quantization_tier=QuantizationTier.INT4), workload)
        assert int4.throughput_tps >= fp16.throughput_tps


# ─── KVCacheSimulator ─────────────────────────────────────────────

class TestKVCacheSimulator:
    def test_low_load_no_evictions(self):
        kv = KVCacheSimulator()
        occupancy, evictions = kv.apply(queue_depth=5, mean_prompt_length=64.0, kv_budget_fraction=1.0)
        assert evictions == 0
        assert 0.0 <= occupancy <= 1.0

    def test_high_load_causes_evictions(self):
        kv = KVCacheSimulator()
        occupancy, evictions = kv.apply(queue_depth=500, mean_prompt_length=4096.0, kv_budget_fraction=0.1)
        assert evictions > 0
        assert occupancy == 1.0

    def test_full_budget_less_evictions(self):
        kv = KVCacheSimulator()
        _, evictions_low = kv.apply(queue_depth=100, mean_prompt_length=512.0, kv_budget_fraction=0.1)
        _, evictions_high = kv.apply(queue_depth=100, mean_prompt_length=512.0, kv_budget_fraction=1.0)
        assert evictions_high <= evictions_low


# ─── SpeculativeDecoder ───────────────────────────────────────────

class TestSpeculativeDecoder:
    def test_no_speculation(self):
        sd = SpeculativeDecoder()
        acceptance, itl = sd.estimate("static_workload", 0, 128.0)
        assert acceptance == 0.0
        assert itl == 1.0

    def test_static_has_high_acceptance(self):
        sd = SpeculativeDecoder()
        acceptance, _ = sd.estimate("static_workload", 4, 128.0)
        assert acceptance > 0.4  # depth=4 yields ~0.49 with depth decay

    def test_adversarial_has_low_acceptance(self):
        sd = SpeculativeDecoder()
        acceptance, _ = sd.estimate("adversarial_multitenant", 4, 4096.0)
        assert acceptance < 0.5

    def test_itl_speedup_bounded(self):
        sd = SpeculativeDecoder()
        _, itl = sd.estimate("static_workload", 8, 128.0)
        assert 0.5 <= itl <= 1.0