"""Compression coordinator - decision engine for ContextForge.""" import asyncio import logging from typing import Literal from contextforge.config import settings from contextforge.dedup.dedup_engine import SemanticDedupEngine from contextforge.models import CompressionDecision logger = logging.getLogger(__name__) class CompressionCoordinator: """ Decision engine - the brain of ContextForge. Logic: IF similarity >= 0.85 AND shared_prefix > 200 tokens → "apc_reuse" IF similarity < 0.85 AND context > 500 tokens → "compress" IF similarity >= 0.85 AND context > 500 tokens → "compress_and_reuse" ELSE → "passthrough" """ def __init__(self): self._dedup = SemanticDedupEngine() self._min_tokens = settings.contextforge_min_tokens_to_compress async def decide(self, agent_id: str, context: str) -> CompressionDecision: """Make compression decision for an agent's context.""" from contextforge.registry.context_registry import ContextRegistry registry = ContextRegistry() original_tokens = len(context.split()) # Find similar contexts matches = await registry.find_similar(context) if not matches: return CompressionDecision( strategy="passthrough", original_tokens=original_tokens, final_tokens=original_tokens, savings_pct=0.0, ) best_match = matches[0] similarity = best_match.similarity shared_prefix = best_match.shared_prefix shared_tokens = len(shared_prefix.split()) if shared_prefix else 0 # Decision logic if similarity >= 0.85 and shared_tokens > 200: # APC reuse - share the prefix directly return CompressionDecision( strategy="apc_reuse", shared_prefix=shared_prefix, original_tokens=original_tokens, final_tokens=shared_tokens, savings_pct=((original_tokens - shared_tokens) / original_tokens * 100) if original_tokens > 0 else 0.0, ) elif similarity < 0.85 and original_tokens > 500: # Compress only from contextforge.compression.compressor import ContextCompressor compressor = ContextCompressor() compressed, ratio = await compressor.compress(context, settings.contextforge_compression_rate) final_tokens = len(compressed.split()) return CompressionDecision( strategy="compress", compressed_context=compressed, original_tokens=original_tokens, final_tokens=final_tokens, savings_pct=((original_tokens - final_tokens) / original_tokens * 100) if original_tokens > 0 else 0.0, ) elif similarity >= 0.85 and original_tokens > 500: # Both reuse and compress from contextforge.compression.compressor import ContextCompressor compressor = ContextCompressor() compressed, ratio = await compressor.compress(context, settings.contextforge_compression_rate) final_tokens = len(compressed.split()) return CompressionDecision( strategy="compress_and_reuse", shared_prefix=shared_prefix, compressed_context=compressed, original_tokens=original_tokens, final_tokens=final_tokens, savings_pct=((original_tokens - final_tokens) / original_tokens * 100) if original_tokens > 0 else 0.0, ) else: return CompressionDecision( strategy="passthrough", original_tokens=original_tokens, final_tokens=original_tokens, savings_pct=0.0, )