Spaces:
Sleeping
Sleeping
| # MERGED: OpenCode (deep KV physics) + CC (surface coverage) | |
| # All tests hermetic: no GPU, no TCP, no downloaded weights required | |
| from __future__ import annotations | |
| import pytest | |
| # Optional dep guard — skip entire module if sentence_transformers not installed | |
| # (CompressionCoordinator requires SemanticDedupEngine which requires sentence_transformers) | |
| try: | |
| import sentence_transformers # noqa: F401 | |
| _HAS_SENTENCE_TRANSFORMERS = True | |
| except ModuleNotFoundError: | |
| pytest.skip("sentence_transformers not installed", allow_module_level=True) | |
| from typing import Any | |
| from apohara_context_forge.compression.coordinator import CompressionCoordinator | |
| from apohara_context_forge.config import settings | |
| from apohara_context_forge.models import CompressionDecision, ContextMatch | |
| # ---- Hermetic doubles (zero real model loads) ------------------------------------ | |
| class FakeDedup: | |
| """Deterministic stand-in for SemanticDedupEngine. | |
| ``count_prefix_tokens`` returns a value from a precomputed dict-by-string | |
| when present, else falls back to ``len(s) // 4`` so tests never depend on | |
| a real tokenizer. ``find_shared_prefix`` mirrors the real char-level loop. | |
| """ | |
| def __init__(self) -> None: | |
| self._counts: dict[str, int] = {} | |
| def set_count(self, text: str, count: int) -> None: | |
| self._counts[text] = count | |
| def count_prefix_tokens(self, prefix: str) -> int: | |
| if prefix in self._counts: | |
| return self._counts[prefix] | |
| return len(prefix) // 4 | |
| def find_shared_prefix(self, a: str, b: str) -> str: | |
| n = min(len(a), len(b)) | |
| i = 0 | |
| while i < n and a[i] == b[i]: | |
| i += 1 | |
| if i == n: | |
| return a[:i] | |
| j = a.rfind(" ", 0, i) | |
| return a[:j] if j > 0 else a[:i] | |
| class FakeContextRegistry: | |
| """Stores a list of pre-canned ``ContextMatch`` objects and returns them | |
| sorted desc by similarity from ``find_similar`` (mirroring the real | |
| registry's contract). Exposes ``dedup`` so the coordinator's sharing rule | |
| can be exercised without loading a real embedder. | |
| """ | |
| def __init__(self, dedup: FakeDedup | None = None) -> None: | |
| self._matches: list[ContextMatch] = [] | |
| self.dedup: FakeDedup = dedup if dedup is not None else FakeDedup() | |
| self.find_similar_calls: list[tuple[str, float | None]] = [] | |
| def set_matches(self, matches: list[ContextMatch]) -> None: | |
| self._matches = list(matches) | |
| async def find_similar( | |
| self, context: str, threshold: float | None = None | |
| ) -> list[ContextMatch]: | |
| self.find_similar_calls.append((context, threshold)) | |
| return sorted(self._matches, key=lambda m: m.similarity, reverse=True) | |
| class FakeCompressor: | |
| """Async double for ``ContextCompressor.compress``. Returns a deterministic | |
| ``(body, 0.5)`` tuple and records every call's (context, rate) pair so | |
| tests can assert which slice of the input was compressed. | |
| """ | |
| def __init__(self, response: str | None = None) -> None: | |
| self.calls: list[tuple[str, float]] = [] | |
| self._response = response | |
| async def compress(self, context: str, rate: float = 0.5) -> tuple[str, float]: | |
| self.calls.append((context, rate)) | |
| body = ( | |
| self._response | |
| if self._response is not None | |
| else f"COMPRESSED({len(context)})" | |
| ) | |
| return body, 0.5 | |
| def make_coordinator( | |
| *, | |
| registry: FakeContextRegistry | None = None, | |
| compressor: FakeCompressor | None = None, | |
| dedup: FakeDedup | None = None, | |
| ) -> CompressionCoordinator: | |
| return CompressionCoordinator( | |
| registry=registry, | |
| compressor=compressor, | |
| dedup=dedup, | |
| ) | |
| # ---- Strategy branch tests -------------------------------------------------------- | |
| async def test_apc_reuse_strategy_when_strong_match_long_prefix_short_ctx(): | |
| dedup = FakeDedup() | |
| registry = FakeContextRegistry(dedup=dedup) | |
| compressor = FakeCompressor() | |
| incoming = "hello agent_a body short enough to skip compression" | |
| dedup.set_count(incoming, 400) # < COMPRESS_MIN_CONTEXT_TOKENS=500 | |
| match = ContextMatch( | |
| agent_id="agent_a", | |
| similarity=0.95, | |
| shared_prefix="hello agent_a body", | |
| shared_prefix_tokens=300, # > APC_REUSE_MIN_SHARED_PREFIX_TOKENS=200 | |
| ) | |
| registry.set_matches([match]) | |
| coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup) | |
| decision = await coord.decide("agent_a", incoming) | |
| assert decision.strategy == "apc_reuse" | |
| assert decision.final_context == incoming | |
| assert decision.shared_prefix == "hello agent_a body" | |
| assert decision.original_tokens == 400 | |
| assert decision.final_tokens == 400 | |
| assert decision.tokens_saved == 0 | |
| # apc_reuse must NEVER call the compressor. | |
| assert compressor.calls == [] | |
| async def test_compress_and_reuse_when_strong_match_long_prefix_long_ctx(): | |
| dedup = FakeDedup() | |
| registry = FakeContextRegistry(dedup=dedup) | |
| compressor = FakeCompressor(response="COMPRESSED_TAIL") | |
| shared_prefix = "agent shared prefix portion" | |
| unique_tail = " tail body that is much longer and should be compressed" | |
| incoming = shared_prefix + unique_tail | |
| dedup.set_count(incoming, 800) | |
| dedup.set_count("COMPRESSED_TAIL", 50) | |
| match = ContextMatch( | |
| agent_id="agent_b", | |
| similarity=0.95, | |
| shared_prefix=shared_prefix, | |
| shared_prefix_tokens=300, | |
| ) | |
| registry.set_matches([match]) | |
| coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup) | |
| decision = await coord.decide("agent_b", incoming) | |
| assert decision.strategy == "compress_and_reuse" | |
| assert decision.final_context.startswith(shared_prefix) | |
| assert decision.final_context.endswith("COMPRESSED_TAIL") | |
| # Compressor MUST be called exactly once with the unique tail — not the | |
| # full incoming context — so KV-prefix reuse pays off downstream. | |
| assert len(compressor.calls) == 1 | |
| assert compressor.calls[0][0] == unique_tail | |
| assert compressor.calls[0][1] == settings.CONTEXTFORGE_COMPRESSION_RATE | |
| assert decision.final_tokens == 350 # 300 (prefix) + 50 (compressed tail) | |
| assert decision.tokens_saved == 800 - 350 | |
| assert decision.shared_prefix == shared_prefix | |
| async def test_compress_when_no_long_prefix_long_ctx(): | |
| dedup = FakeDedup() | |
| registry = FakeContextRegistry(dedup=dedup) | |
| compressor = FakeCompressor(response="WHOLE_COMPRESSED") | |
| incoming = ( | |
| "long body that needs full-context compression because the prefix " | |
| "overlap is too small to bother with KV reuse" | |
| ) | |
| dedup.set_count(incoming, 800) | |
| dedup.set_count("WHOLE_COMPRESSED", 100) | |
| # Strong sim but short prefix — has_long_prefix=False forces "compress". | |
| match = ContextMatch( | |
| agent_id="agent_c", | |
| similarity=0.90, | |
| shared_prefix="short", | |
| shared_prefix_tokens=50, | |
| ) | |
| registry.set_matches([match]) | |
| coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup) | |
| decision = await coord.decide("agent_c", incoming) | |
| assert decision.strategy == "compress" | |
| assert decision.final_context == "WHOLE_COMPRESSED" | |
| assert decision.shared_prefix == "" | |
| # Compressor MUST be called with the FULL incoming context. | |
| assert len(compressor.calls) == 1 | |
| assert compressor.calls[0][0] == incoming | |
| assert decision.final_tokens == 100 | |
| assert decision.tokens_saved == 700 | |
| async def test_passthrough_when_no_match_short_ctx(): | |
| dedup = FakeDedup() | |
| registry = FakeContextRegistry(dedup=dedup) # empty matches | |
| compressor = FakeCompressor() | |
| incoming = "tiny context that is just passed through" | |
| dedup.set_count(incoming, 200) | |
| coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup) | |
| decision = await coord.decide("agent_d", incoming) | |
| assert decision.strategy == "passthrough" | |
| assert decision.final_context == incoming | |
| assert decision.shared_prefix == "" | |
| assert decision.original_tokens == 200 | |
| assert decision.final_tokens == 200 | |
| assert decision.tokens_saved == 0 | |
| assert compressor.calls == [] | |
| async def test_passthrough_when_short_ctx_with_weak_match(): | |
| """Strong sim + short prefix + short ctx → passthrough preserves weak prefix.""" | |
| dedup = FakeDedup() | |
| registry = FakeContextRegistry(dedup=dedup) | |
| compressor = FakeCompressor() | |
| incoming = "short body with a weak match preserved for observability" | |
| dedup.set_count(incoming, 200) # short — long_enough=False | |
| match = ContextMatch( | |
| agent_id="agent_e", | |
| similarity=0.90, | |
| shared_prefix="short body", | |
| shared_prefix_tokens=50, # weak — has_long_prefix=False | |
| ) | |
| registry.set_matches([match]) | |
| coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup) | |
| decision = await coord.decide("agent_e", incoming) | |
| assert decision.strategy == "passthrough" | |
| # Weak-match prefix surfaced to the caller for observability, even though | |
| # we did not act on it. | |
| assert decision.shared_prefix == "short body" | |
| assert decision.final_context == incoming | |
| assert decision.tokens_saved == 0 | |
| assert compressor.calls == [] | |
| async def test_no_prior_contexts_returns_passthrough(): | |
| dedup = FakeDedup() | |
| registry = FakeContextRegistry(dedup=dedup) | |
| compressor = FakeCompressor() | |
| incoming = "first context ever" | |
| dedup.set_count(incoming, 50) | |
| coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup) | |
| decision = await coord.decide("first_agent", incoming) | |
| assert decision.strategy == "passthrough" | |
| assert decision.shared_prefix == "" | |
| assert compressor.calls == [] | |
| async def test_decide_uses_registry_dedup_engine_by_default(): | |
| """When only `registry` is provided, coordinator MUST reuse `registry.dedup` | |
| (identity check) so we never spin up a second embedder.""" | |
| shared_dedup = FakeDedup() | |
| registry = FakeContextRegistry(dedup=shared_dedup) | |
| compressor = FakeCompressor() | |
| coord = CompressionCoordinator(registry=registry, compressor=compressor) | |
| assert coord.dedup is registry.dedup | |
| assert coord.dedup is shared_dedup | |
| # Exercising decide() must not allocate a new dedup somewhere internally. | |
| incoming = "x" # ctx_tokens proxy = 0 → passthrough | |
| decision = await coord.decide("solo", incoming) | |
| assert isinstance(decision, CompressionDecision) | |
| assert coord.dedup is shared_dedup | |
| async def test_compression_decision_strict_typing(): | |
| """R014 strict-typing surface check: Pydantic round-trip is identity, and | |
| `strategy` is one of the four documented literals.""" | |
| dedup = FakeDedup() | |
| registry = FakeContextRegistry(dedup=dedup) | |
| compressor = FakeCompressor() | |
| incoming = "anything" | |
| dedup.set_count(incoming, 100) | |
| coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup) | |
| decision = await coord.decide("agent_strict", incoming) | |
| assert type(decision) is CompressionDecision | |
| assert decision.strategy in { | |
| "apc_reuse", | |
| "compress", | |
| "compress_and_reuse", | |
| "passthrough", | |
| } | |
| payload: dict[str, Any] = decision.model_dump() | |
| rebuilt = CompressionDecision.model_validate(payload) | |
| assert rebuilt == decision | |
| assert rebuilt.model_dump() == payload | |
| # ---- Boundary / negative tests (Q7) ---------------------------------------------- | |
| async def test_long_enough_uses_strict_greater_than_at_boundary(): | |
| """ctx_tokens == COMPRESS_MIN_CONTEXT_TOKENS (=500) → long_enough is False.""" | |
| dedup = FakeDedup() | |
| registry = FakeContextRegistry(dedup=dedup) | |
| compressor = FakeCompressor() | |
| incoming = "boundary body" | |
| dedup.set_count(incoming, settings.COMPRESS_MIN_CONTEXT_TOKENS) # exactly 500 | |
| coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup) | |
| decision = await coord.decide("agent_b1", incoming) | |
| # No match AND long_enough=False → passthrough; compressor untouched. | |
| assert decision.strategy == "passthrough" | |
| assert compressor.calls == [] | |
| async def test_has_long_prefix_uses_strict_greater_than_at_boundary(): | |
| """shared_prefix_tokens == APC_REUSE_MIN_SHARED_PREFIX_TOKENS (=200) → not long enough.""" | |
| dedup = FakeDedup() | |
| registry = FakeContextRegistry(dedup=dedup) | |
| compressor = FakeCompressor(response="WHOLE") | |
| incoming = "boundary prefix body for whole-context compression check" | |
| dedup.set_count(incoming, 800) # long_enough=True | |
| dedup.set_count("WHOLE", 50) | |
| match = ContextMatch( | |
| agent_id="agent_b2", | |
| similarity=0.95, | |
| shared_prefix="boundary prefix", | |
| shared_prefix_tokens=settings.APC_REUSE_MIN_SHARED_PREFIX_TOKENS, | |
| ) | |
| registry.set_matches([match]) | |
| coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup) | |
| decision = await coord.decide("agent_b2", incoming) | |
| # has_long_prefix=False (strict >), long_enough=True → compress branch | |
| # with FULL context, not unique tail. | |
| assert decision.strategy == "compress" | |
| assert len(compressor.calls) == 1 | |
| assert compressor.calls[0][0] == incoming | |
| async def test_registry_results_already_sorted_picks_top_match(): | |
| """Coordinator trusts registry-sort-desc — picks matches[0] as best.""" | |
| dedup = FakeDedup() | |
| registry = FakeContextRegistry(dedup=dedup) | |
| compressor = FakeCompressor() | |
| incoming = "overlap body" | |
| dedup.set_count(incoming, 100) # short — only apc_reuse can fire | |
| matches = [ | |
| ContextMatch( | |
| agent_id="lower", | |
| similarity=0.86, | |
| shared_prefix="overlap", | |
| shared_prefix_tokens=50, # weak — would NOT fire apc_reuse alone | |
| ), | |
| ContextMatch( | |
| agent_id="higher", | |
| similarity=0.95, | |
| shared_prefix="overlap body", | |
| shared_prefix_tokens=300, # strong — fires apc_reuse | |
| ), | |
| ] | |
| registry.set_matches(matches) | |
| coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup) | |
| decision = await coord.decide("agent_pick", incoming) | |
| assert decision.strategy == "apc_reuse" | |
| assert decision.shared_prefix == "overlap body" | |
| assert compressor.calls == [] | |