contextforge-demo / tests /test_coordinator.py
Pablo
feat: APOHARA: Context Forge V5 — synthesis + rebrand complete
cf0a8ed
# MERGED: OpenCode (deep KV physics) + CC (surface coverage)
# All tests hermetic: no GPU, no TCP, no downloaded weights required
from __future__ import annotations
import pytest
# Optional dep guard — skip entire module if sentence_transformers not installed
# (CompressionCoordinator requires SemanticDedupEngine which requires sentence_transformers)
try:
import sentence_transformers # noqa: F401
_HAS_SENTENCE_TRANSFORMERS = True
except ModuleNotFoundError:
pytest.skip("sentence_transformers not installed", allow_module_level=True)
from typing import Any
from apohara_context_forge.compression.coordinator import CompressionCoordinator
from apohara_context_forge.config import settings
from apohara_context_forge.models import CompressionDecision, ContextMatch
# ---- Hermetic doubles (zero real model loads) ------------------------------------
class FakeDedup:
"""Deterministic stand-in for SemanticDedupEngine.
``count_prefix_tokens`` returns a value from a precomputed dict-by-string
when present, else falls back to ``len(s) // 4`` so tests never depend on
a real tokenizer. ``find_shared_prefix`` mirrors the real char-level loop.
"""
def __init__(self) -> None:
self._counts: dict[str, int] = {}
def set_count(self, text: str, count: int) -> None:
self._counts[text] = count
def count_prefix_tokens(self, prefix: str) -> int:
if prefix in self._counts:
return self._counts[prefix]
return len(prefix) // 4
def find_shared_prefix(self, a: str, b: str) -> str:
n = min(len(a), len(b))
i = 0
while i < n and a[i] == b[i]:
i += 1
if i == n:
return a[:i]
j = a.rfind(" ", 0, i)
return a[:j] if j > 0 else a[:i]
class FakeContextRegistry:
"""Stores a list of pre-canned ``ContextMatch`` objects and returns them
sorted desc by similarity from ``find_similar`` (mirroring the real
registry's contract). Exposes ``dedup`` so the coordinator's sharing rule
can be exercised without loading a real embedder.
"""
def __init__(self, dedup: FakeDedup | None = None) -> None:
self._matches: list[ContextMatch] = []
self.dedup: FakeDedup = dedup if dedup is not None else FakeDedup()
self.find_similar_calls: list[tuple[str, float | None]] = []
def set_matches(self, matches: list[ContextMatch]) -> None:
self._matches = list(matches)
async def find_similar(
self, context: str, threshold: float | None = None
) -> list[ContextMatch]:
self.find_similar_calls.append((context, threshold))
return sorted(self._matches, key=lambda m: m.similarity, reverse=True)
class FakeCompressor:
"""Async double for ``ContextCompressor.compress``. Returns a deterministic
``(body, 0.5)`` tuple and records every call's (context, rate) pair so
tests can assert which slice of the input was compressed.
"""
def __init__(self, response: str | None = None) -> None:
self.calls: list[tuple[str, float]] = []
self._response = response
async def compress(self, context: str, rate: float = 0.5) -> tuple[str, float]:
self.calls.append((context, rate))
body = (
self._response
if self._response is not None
else f"COMPRESSED({len(context)})"
)
return body, 0.5
def make_coordinator(
*,
registry: FakeContextRegistry | None = None,
compressor: FakeCompressor | None = None,
dedup: FakeDedup | None = None,
) -> CompressionCoordinator:
return CompressionCoordinator(
registry=registry,
compressor=compressor,
dedup=dedup,
)
# ---- Strategy branch tests --------------------------------------------------------
async def test_apc_reuse_strategy_when_strong_match_long_prefix_short_ctx():
dedup = FakeDedup()
registry = FakeContextRegistry(dedup=dedup)
compressor = FakeCompressor()
incoming = "hello agent_a body short enough to skip compression"
dedup.set_count(incoming, 400) # < COMPRESS_MIN_CONTEXT_TOKENS=500
match = ContextMatch(
agent_id="agent_a",
similarity=0.95,
shared_prefix="hello agent_a body",
shared_prefix_tokens=300, # > APC_REUSE_MIN_SHARED_PREFIX_TOKENS=200
)
registry.set_matches([match])
coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup)
decision = await coord.decide("agent_a", incoming)
assert decision.strategy == "apc_reuse"
assert decision.final_context == incoming
assert decision.shared_prefix == "hello agent_a body"
assert decision.original_tokens == 400
assert decision.final_tokens == 400
assert decision.tokens_saved == 0
# apc_reuse must NEVER call the compressor.
assert compressor.calls == []
async def test_compress_and_reuse_when_strong_match_long_prefix_long_ctx():
dedup = FakeDedup()
registry = FakeContextRegistry(dedup=dedup)
compressor = FakeCompressor(response="COMPRESSED_TAIL")
shared_prefix = "agent shared prefix portion"
unique_tail = " tail body that is much longer and should be compressed"
incoming = shared_prefix + unique_tail
dedup.set_count(incoming, 800)
dedup.set_count("COMPRESSED_TAIL", 50)
match = ContextMatch(
agent_id="agent_b",
similarity=0.95,
shared_prefix=shared_prefix,
shared_prefix_tokens=300,
)
registry.set_matches([match])
coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup)
decision = await coord.decide("agent_b", incoming)
assert decision.strategy == "compress_and_reuse"
assert decision.final_context.startswith(shared_prefix)
assert decision.final_context.endswith("COMPRESSED_TAIL")
# Compressor MUST be called exactly once with the unique tail — not the
# full incoming context — so KV-prefix reuse pays off downstream.
assert len(compressor.calls) == 1
assert compressor.calls[0][0] == unique_tail
assert compressor.calls[0][1] == settings.CONTEXTFORGE_COMPRESSION_RATE
assert decision.final_tokens == 350 # 300 (prefix) + 50 (compressed tail)
assert decision.tokens_saved == 800 - 350
assert decision.shared_prefix == shared_prefix
async def test_compress_when_no_long_prefix_long_ctx():
dedup = FakeDedup()
registry = FakeContextRegistry(dedup=dedup)
compressor = FakeCompressor(response="WHOLE_COMPRESSED")
incoming = (
"long body that needs full-context compression because the prefix "
"overlap is too small to bother with KV reuse"
)
dedup.set_count(incoming, 800)
dedup.set_count("WHOLE_COMPRESSED", 100)
# Strong sim but short prefix — has_long_prefix=False forces "compress".
match = ContextMatch(
agent_id="agent_c",
similarity=0.90,
shared_prefix="short",
shared_prefix_tokens=50,
)
registry.set_matches([match])
coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup)
decision = await coord.decide("agent_c", incoming)
assert decision.strategy == "compress"
assert decision.final_context == "WHOLE_COMPRESSED"
assert decision.shared_prefix == ""
# Compressor MUST be called with the FULL incoming context.
assert len(compressor.calls) == 1
assert compressor.calls[0][0] == incoming
assert decision.final_tokens == 100
assert decision.tokens_saved == 700
async def test_passthrough_when_no_match_short_ctx():
dedup = FakeDedup()
registry = FakeContextRegistry(dedup=dedup) # empty matches
compressor = FakeCompressor()
incoming = "tiny context that is just passed through"
dedup.set_count(incoming, 200)
coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup)
decision = await coord.decide("agent_d", incoming)
assert decision.strategy == "passthrough"
assert decision.final_context == incoming
assert decision.shared_prefix == ""
assert decision.original_tokens == 200
assert decision.final_tokens == 200
assert decision.tokens_saved == 0
assert compressor.calls == []
async def test_passthrough_when_short_ctx_with_weak_match():
"""Strong sim + short prefix + short ctx → passthrough preserves weak prefix."""
dedup = FakeDedup()
registry = FakeContextRegistry(dedup=dedup)
compressor = FakeCompressor()
incoming = "short body with a weak match preserved for observability"
dedup.set_count(incoming, 200) # short — long_enough=False
match = ContextMatch(
agent_id="agent_e",
similarity=0.90,
shared_prefix="short body",
shared_prefix_tokens=50, # weak — has_long_prefix=False
)
registry.set_matches([match])
coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup)
decision = await coord.decide("agent_e", incoming)
assert decision.strategy == "passthrough"
# Weak-match prefix surfaced to the caller for observability, even though
# we did not act on it.
assert decision.shared_prefix == "short body"
assert decision.final_context == incoming
assert decision.tokens_saved == 0
assert compressor.calls == []
async def test_no_prior_contexts_returns_passthrough():
dedup = FakeDedup()
registry = FakeContextRegistry(dedup=dedup)
compressor = FakeCompressor()
incoming = "first context ever"
dedup.set_count(incoming, 50)
coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup)
decision = await coord.decide("first_agent", incoming)
assert decision.strategy == "passthrough"
assert decision.shared_prefix == ""
assert compressor.calls == []
async def test_decide_uses_registry_dedup_engine_by_default():
"""When only `registry` is provided, coordinator MUST reuse `registry.dedup`
(identity check) so we never spin up a second embedder."""
shared_dedup = FakeDedup()
registry = FakeContextRegistry(dedup=shared_dedup)
compressor = FakeCompressor()
coord = CompressionCoordinator(registry=registry, compressor=compressor)
assert coord.dedup is registry.dedup
assert coord.dedup is shared_dedup
# Exercising decide() must not allocate a new dedup somewhere internally.
incoming = "x" # ctx_tokens proxy = 0 → passthrough
decision = await coord.decide("solo", incoming)
assert isinstance(decision, CompressionDecision)
assert coord.dedup is shared_dedup
async def test_compression_decision_strict_typing():
"""R014 strict-typing surface check: Pydantic round-trip is identity, and
`strategy` is one of the four documented literals."""
dedup = FakeDedup()
registry = FakeContextRegistry(dedup=dedup)
compressor = FakeCompressor()
incoming = "anything"
dedup.set_count(incoming, 100)
coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup)
decision = await coord.decide("agent_strict", incoming)
assert type(decision) is CompressionDecision
assert decision.strategy in {
"apc_reuse",
"compress",
"compress_and_reuse",
"passthrough",
}
payload: dict[str, Any] = decision.model_dump()
rebuilt = CompressionDecision.model_validate(payload)
assert rebuilt == decision
assert rebuilt.model_dump() == payload
# ---- Boundary / negative tests (Q7) ----------------------------------------------
async def test_long_enough_uses_strict_greater_than_at_boundary():
"""ctx_tokens == COMPRESS_MIN_CONTEXT_TOKENS (=500) → long_enough is False."""
dedup = FakeDedup()
registry = FakeContextRegistry(dedup=dedup)
compressor = FakeCompressor()
incoming = "boundary body"
dedup.set_count(incoming, settings.COMPRESS_MIN_CONTEXT_TOKENS) # exactly 500
coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup)
decision = await coord.decide("agent_b1", incoming)
# No match AND long_enough=False → passthrough; compressor untouched.
assert decision.strategy == "passthrough"
assert compressor.calls == []
async def test_has_long_prefix_uses_strict_greater_than_at_boundary():
"""shared_prefix_tokens == APC_REUSE_MIN_SHARED_PREFIX_TOKENS (=200) → not long enough."""
dedup = FakeDedup()
registry = FakeContextRegistry(dedup=dedup)
compressor = FakeCompressor(response="WHOLE")
incoming = "boundary prefix body for whole-context compression check"
dedup.set_count(incoming, 800) # long_enough=True
dedup.set_count("WHOLE", 50)
match = ContextMatch(
agent_id="agent_b2",
similarity=0.95,
shared_prefix="boundary prefix",
shared_prefix_tokens=settings.APC_REUSE_MIN_SHARED_PREFIX_TOKENS,
)
registry.set_matches([match])
coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup)
decision = await coord.decide("agent_b2", incoming)
# has_long_prefix=False (strict >), long_enough=True → compress branch
# with FULL context, not unique tail.
assert decision.strategy == "compress"
assert len(compressor.calls) == 1
assert compressor.calls[0][0] == incoming
async def test_registry_results_already_sorted_picks_top_match():
"""Coordinator trusts registry-sort-desc — picks matches[0] as best."""
dedup = FakeDedup()
registry = FakeContextRegistry(dedup=dedup)
compressor = FakeCompressor()
incoming = "overlap body"
dedup.set_count(incoming, 100) # short — only apc_reuse can fire
matches = [
ContextMatch(
agent_id="lower",
similarity=0.86,
shared_prefix="overlap",
shared_prefix_tokens=50, # weak — would NOT fire apc_reuse alone
),
ContextMatch(
agent_id="higher",
similarity=0.95,
shared_prefix="overlap body",
shared_prefix_tokens=300, # strong — fires apc_reuse
),
]
registry.set_matches(matches)
coord = make_coordinator(registry=registry, compressor=compressor, dedup=dedup)
decision = await coord.decide("agent_pick", incoming)
assert decision.strategy == "apc_reuse"
assert decision.shared_prefix == "overlap body"
assert compressor.calls == []