File size: 14,396 Bytes
cf0a8ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
# MERGED: OpenCode (deep KV physics) + CC (surface coverage)
# All tests hermetic: no GPU, no TCP, no downloaded weights required
from __future__ import annotations

import pytest

# Optional dep guard — skip entire module if sentence_transformers not installed
# (CompressionCoordinator requires SemanticDedupEngine which requires sentence_transformers)
try:
    import sentence_transformers  # noqa: F401
    _HAS_SENTENCE_TRANSFORMERS = True
except ModuleNotFoundError:
    pytest.skip("sentence_transformers not installed", allow_module_level=True)

from typing import Any

from apohara_context_forge.compression.coordinator import CompressionCoordinator
from apohara_context_forge.config import settings
from apohara_context_forge.models import CompressionDecision, ContextMatch


# ---- Hermetic doubles (zero real model loads) ------------------------------------


class FakeDedup:
    """Deterministic stand-in for SemanticDedupEngine.

    ``count_prefix_tokens`` returns a value from a precomputed dict-by-string
    when present, else falls back to ``len(s) // 4`` so tests never depend on
    a real tokenizer. ``find_shared_prefix`` mirrors the real char-level loop.
    """

    def __init__(self) -> None:
        self._counts: dict[str, int] = {}

    def set_count(self, text: str, count: int) -> None:
        self._counts[text] = count

    def count_prefix_tokens(self, prefix: str) -> int:
        if prefix in self._counts:
            return self._counts[prefix]
        return len(prefix) // 4

    def find_shared_prefix(self, a: str, b: str) -> str:
        n = min(len(a), len(b))
        i = 0
        while i < n and a[i] == b[i]:
            i += 1
        if i == n:
            return a[:i]
        j = a.rfind(" ", 0, i)
        return a[:j] if j > 0 else a[:i]


class FakeContextRegistry:
    """Stores a list of pre-canned ``ContextMatch`` objects and returns them
    sorted desc by similarity from ``find_similar`` (mirroring the real
    registry's contract). Exposes ``dedup`` so the coordinator's sharing rule
    can be exercised without loading a real embedder.
    """

    def __init__(self, dedup: FakeDedup | None = None) -> None:
        self._matches: list[ContextMatch] = []
        self.dedup: FakeDedup = dedup if dedup is not None else FakeDedup()
        self.find_similar_calls: list[tuple[str, float | None]] = []

    def set_matches(self, matches: list[ContextMatch]) -> None:
        self._matches = list(matches)

    async def find_similar(
        self, context: str, threshold: float | None = None
    ) -> list[ContextMatch]:
        self.find_similar_calls.append((context, threshold))
        return sorted(self._matches, key=lambda m: m.similarity, reverse=True)


class FakeCompressor:
    """Async double for ``ContextCompressor.compress``. Returns a deterministic
    ``(body, 0.5)`` tuple and records every call's (context, rate) pair so
    tests can assert which slice of the input was compressed.
    """

    def __init__(self, response: str | None = None) -> None:
        self.calls: list[tuple[str, float]] = []
        self._response = response

    async def compress(self, context: str, rate: float = 0.5) -> tuple[str, float]:
        self.calls.append((context, rate))
        body = (
            self._response
            if self._response is not None
            else f"COMPRESSED({len(context)})"
        )
        return body, 0.5


def make_coordinator(
    *,
    registry: FakeContextRegistry | None = None,
    compressor: FakeCompressor | None = None,
    dedup: FakeDedup | None = None,
) -> CompressionCoordinator:
    return CompressionCoordinator(
        registry=registry,
        compressor=compressor,
        dedup=dedup,
    )


# ---- Strategy branch tests --------------------------------------------------------


async def test_apc_reuse_strategy_when_strong_match_long_prefix_short_ctx():
    dedup = FakeDedup()
    registry = FakeContextRegistry(dedup=dedup)
    compressor = FakeCompressor()

    incoming = "hello agent_a body short enough to skip compression"
    dedup.set_count(incoming, 400)  # < COMPRESS_MIN_CONTEXT_TOKENS=500

    match = ContextMatch(
        agent_id="agent_a",
        similarity=0.95,
        shared_prefix="hello agent_a body",
        shared_prefix_tokens=300,  # > APC_REUSE_MIN_SHARED_PREFIX_TOKENS=200
    )
    registry.set_matches([match])

    coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup)
    decision = await coord.decide("agent_a", incoming)

    assert decision.strategy == "apc_reuse"
    assert decision.final_context == incoming
    assert decision.shared_prefix == "hello agent_a body"
    assert decision.original_tokens == 400
    assert decision.final_tokens == 400
    assert decision.tokens_saved == 0
    # apc_reuse must NEVER call the compressor.
    assert compressor.calls == []


async def test_compress_and_reuse_when_strong_match_long_prefix_long_ctx():
    dedup = FakeDedup()
    registry = FakeContextRegistry(dedup=dedup)
    compressor = FakeCompressor(response="COMPRESSED_TAIL")

    shared_prefix = "agent shared prefix portion"
    unique_tail = " tail body that is much longer and should be compressed"
    incoming = shared_prefix + unique_tail
    dedup.set_count(incoming, 800)
    dedup.set_count("COMPRESSED_TAIL", 50)

    match = ContextMatch(
        agent_id="agent_b",
        similarity=0.95,
        shared_prefix=shared_prefix,
        shared_prefix_tokens=300,
    )
    registry.set_matches([match])

    coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup)
    decision = await coord.decide("agent_b", incoming)

    assert decision.strategy == "compress_and_reuse"
    assert decision.final_context.startswith(shared_prefix)
    assert decision.final_context.endswith("COMPRESSED_TAIL")
    # Compressor MUST be called exactly once with the unique tail — not the
    # full incoming context — so KV-prefix reuse pays off downstream.
    assert len(compressor.calls) == 1
    assert compressor.calls[0][0] == unique_tail
    assert compressor.calls[0][1] == settings.CONTEXTFORGE_COMPRESSION_RATE
    assert decision.final_tokens == 350  # 300 (prefix) + 50 (compressed tail)
    assert decision.tokens_saved == 800 - 350
    assert decision.shared_prefix == shared_prefix


async def test_compress_when_no_long_prefix_long_ctx():
    dedup = FakeDedup()
    registry = FakeContextRegistry(dedup=dedup)
    compressor = FakeCompressor(response="WHOLE_COMPRESSED")

    incoming = (
        "long body that needs full-context compression because the prefix "
        "overlap is too small to bother with KV reuse"
    )
    dedup.set_count(incoming, 800)
    dedup.set_count("WHOLE_COMPRESSED", 100)

    # Strong sim but short prefix — has_long_prefix=False forces "compress".
    match = ContextMatch(
        agent_id="agent_c",
        similarity=0.90,
        shared_prefix="short",
        shared_prefix_tokens=50,
    )
    registry.set_matches([match])

    coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup)
    decision = await coord.decide("agent_c", incoming)

    assert decision.strategy == "compress"
    assert decision.final_context == "WHOLE_COMPRESSED"
    assert decision.shared_prefix == ""
    # Compressor MUST be called with the FULL incoming context.
    assert len(compressor.calls) == 1
    assert compressor.calls[0][0] == incoming
    assert decision.final_tokens == 100
    assert decision.tokens_saved == 700


async def test_passthrough_when_no_match_short_ctx():
    dedup = FakeDedup()
    registry = FakeContextRegistry(dedup=dedup)  # empty matches
    compressor = FakeCompressor()

    incoming = "tiny context that is just passed through"
    dedup.set_count(incoming, 200)

    coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup)
    decision = await coord.decide("agent_d", incoming)

    assert decision.strategy == "passthrough"
    assert decision.final_context == incoming
    assert decision.shared_prefix == ""
    assert decision.original_tokens == 200
    assert decision.final_tokens == 200
    assert decision.tokens_saved == 0
    assert compressor.calls == []


async def test_passthrough_when_short_ctx_with_weak_match():
    """Strong sim + short prefix + short ctx → passthrough preserves weak prefix."""
    dedup = FakeDedup()
    registry = FakeContextRegistry(dedup=dedup)
    compressor = FakeCompressor()

    incoming = "short body with a weak match preserved for observability"
    dedup.set_count(incoming, 200)  # short — long_enough=False

    match = ContextMatch(
        agent_id="agent_e",
        similarity=0.90,
        shared_prefix="short body",
        shared_prefix_tokens=50,  # weak — has_long_prefix=False
    )
    registry.set_matches([match])

    coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup)
    decision = await coord.decide("agent_e", incoming)

    assert decision.strategy == "passthrough"
    # Weak-match prefix surfaced to the caller for observability, even though
    # we did not act on it.
    assert decision.shared_prefix == "short body"
    assert decision.final_context == incoming
    assert decision.tokens_saved == 0
    assert compressor.calls == []


async def test_no_prior_contexts_returns_passthrough():
    dedup = FakeDedup()
    registry = FakeContextRegistry(dedup=dedup)
    compressor = FakeCompressor()

    incoming = "first context ever"
    dedup.set_count(incoming, 50)

    coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup)
    decision = await coord.decide("first_agent", incoming)

    assert decision.strategy == "passthrough"
    assert decision.shared_prefix == ""
    assert compressor.calls == []


async def test_decide_uses_registry_dedup_engine_by_default():
    """When only `registry` is provided, coordinator MUST reuse `registry.dedup`
    (identity check) so we never spin up a second embedder."""
    shared_dedup = FakeDedup()
    registry = FakeContextRegistry(dedup=shared_dedup)
    compressor = FakeCompressor()

    coord = CompressionCoordinator(registry=registry, compressor=compressor)

    assert coord.dedup is registry.dedup
    assert coord.dedup is shared_dedup

    # Exercising decide() must not allocate a new dedup somewhere internally.
    incoming = "x"  # ctx_tokens proxy = 0 → passthrough
    decision = await coord.decide("solo", incoming)
    assert isinstance(decision, CompressionDecision)
    assert coord.dedup is shared_dedup


async def test_compression_decision_strict_typing():
    """R014 strict-typing surface check: Pydantic round-trip is identity, and
    `strategy` is one of the four documented literals."""
    dedup = FakeDedup()
    registry = FakeContextRegistry(dedup=dedup)
    compressor = FakeCompressor()

    incoming = "anything"
    dedup.set_count(incoming, 100)

    coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup)
    decision = await coord.decide("agent_strict", incoming)

    assert type(decision) is CompressionDecision
    assert decision.strategy in {
        "apc_reuse",
        "compress",
        "compress_and_reuse",
        "passthrough",
    }

    payload: dict[str, Any] = decision.model_dump()
    rebuilt = CompressionDecision.model_validate(payload)
    assert rebuilt == decision
    assert rebuilt.model_dump() == payload


# ---- Boundary / negative tests (Q7) ----------------------------------------------


async def test_long_enough_uses_strict_greater_than_at_boundary():
    """ctx_tokens == COMPRESS_MIN_CONTEXT_TOKENS (=500) → long_enough is False."""
    dedup = FakeDedup()
    registry = FakeContextRegistry(dedup=dedup)
    compressor = FakeCompressor()

    incoming = "boundary body"
    dedup.set_count(incoming, settings.COMPRESS_MIN_CONTEXT_TOKENS)  # exactly 500

    coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup)
    decision = await coord.decide("agent_b1", incoming)

    # No match AND long_enough=False → passthrough; compressor untouched.
    assert decision.strategy == "passthrough"
    assert compressor.calls == []


async def test_has_long_prefix_uses_strict_greater_than_at_boundary():
    """shared_prefix_tokens == APC_REUSE_MIN_SHARED_PREFIX_TOKENS (=200) → not long enough."""
    dedup = FakeDedup()
    registry = FakeContextRegistry(dedup=dedup)
    compressor = FakeCompressor(response="WHOLE")

    incoming = "boundary prefix body for whole-context compression check"
    dedup.set_count(incoming, 800)  # long_enough=True
    dedup.set_count("WHOLE", 50)

    match = ContextMatch(
        agent_id="agent_b2",
        similarity=0.95,
        shared_prefix="boundary prefix",
        shared_prefix_tokens=settings.APC_REUSE_MIN_SHARED_PREFIX_TOKENS,
    )
    registry.set_matches([match])

    coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup)
    decision = await coord.decide("agent_b2", incoming)

    # has_long_prefix=False (strict >), long_enough=True → compress branch
    # with FULL context, not unique tail.
    assert decision.strategy == "compress"
    assert len(compressor.calls) == 1
    assert compressor.calls[0][0] == incoming


async def test_registry_results_already_sorted_picks_top_match():
    """Coordinator trusts registry-sort-desc — picks matches[0] as best."""
    dedup = FakeDedup()
    registry = FakeContextRegistry(dedup=dedup)
    compressor = FakeCompressor()

    incoming = "overlap body"
    dedup.set_count(incoming, 100)  # short — only apc_reuse can fire

    matches = [
        ContextMatch(
            agent_id="lower",
            similarity=0.86,
            shared_prefix="overlap",
            shared_prefix_tokens=50,  # weak — would NOT fire apc_reuse alone
        ),
        ContextMatch(
            agent_id="higher",
            similarity=0.95,
            shared_prefix="overlap body",
            shared_prefix_tokens=300,  # strong — fires apc_reuse
        ),
    ]
    registry.set_matches(matches)

    coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup)
    decision = await coord.decide("agent_pick", incoming)

    assert decision.strategy == "apc_reuse"
    assert decision.shared_prefix == "overlap body"
    assert compressor.calls == []