"""LMCache V1 bridge for ContextForge V4.0. Provides transparent bridge between ContextForge's AnchorPool/offset tracking and LMCache's distributed KV cache layer. Enables cross-worker KV reuse with anchor-aware offset hints. Architecture: - LMCache acts as external KV store (separate from VRAMCache) - Bridge intercepts save/load events and augments with ContextForge metadata - AnchorPool offset hints propagate to LMCache for cross-node alignment INVARIANT 10: Only pre-RoPE tensors are quantized/shared. """ from __future__ import annotations import asyncio import logging import weakref from dataclasses import dataclass, field from typing import Optional logger = logging.getLogger(__name__) @dataclass class LMCacheMeta: """Metadata stored alongside KV blocks in LMCache.""" anchor_hash: str = "" agent_id: str = "" token_length: int = 0 pre_rope: bool = True # INVARIANT 10 flag cla_group: Optional[int] = None workflow_step: Optional[int] = None offset_hint: Optional[list[float]] = None # from AnchorPool class LMCacheConnectorV1: """Bridge between ContextForge AnchorPool and LMCache V1. Supports: - Saving KV layers with anchor-aware metadata - Loading with offset_hint injection for RoPE de-rotation - Cross-worker block sharing with prefix anchoring """ def __init__( self, lmcache_client=None, # LMCache client instance (optional for graceful degradation) enable_offset_hints: bool = True, enable_cla_metadata: bool = True, ): self._client = lmcache_client self._enable_offset_hints = enable_offset_hints self._enable_cla_metadata = enable_cla_metadata self._active = lmcache_client is not None self._pending_saves: dict[str, asyncio.Event] = {} def is_active(self) -> bool: """Check if LMCache bridge is active.""" return self._active def build_prefix_hint( self, token_ids: list[int], agent_id: str, anchor_hash: str, ) -> dict: """Build prefix hint dict for LMCache save operations. This hint is stored alongside the KV data so loading workers can reconstruct RoPE-aligned context. """ return { "anchor_hash": anchor_hash, "agent_id": agent_id, "token_length": len(token_ids), "pre_rope": True, # INVARIANT 10 } async def on_save_kv_layer( self, block_id: str, kv_data, # Pre-RoPE KV tensor metadata: dict, ) -> None: """Called when ContextForge saves a KV layer to LMCache. Augments metadata with anchor hash and CLA group info. """ if not self._active: return # INVARIANT 10: Ensure pre-RoPE flag is set meta = LMCacheMeta( anchor_hash=metadata.get("anchor_hash", ""), agent_id=metadata.get("agent_id", ""), token_length=metadata.get("token_length", 0), pre_rope=True, cla_group=metadata.get("cla_group"), workflow_step=metadata.get("workflow_step"), offset_hint=metadata.get("offset_hint"), ) logger.debug( f"LMCache save: block={block_id} anchor={meta.anchor_hash} " f"pre_rope={meta.pre_rope} cla_group={meta.cla_group}" ) async def on_load_kv_layer( self, block_id: str, metadata: dict, ) -> Optional[dict]: """Called when ContextForge loads a KV layer from LMCache. Returns offset_hint if available for RoPE de-rotation alignment. """ if not self._active: return None offset_hint = metadata.get("offset_hint") anchor_hash = metadata.get("anchor_hash") if offset_hint: logger.debug( f"LMCache load: block={block_id} anchor={anchor_hash} " f"has_offset_hint len={len(offset_hint)}" ) return { "offset_hint": offset_hint, "anchor_hash": anchor_hash, "pre_rope": metadata.get("pre_rope", True), # INVARIANT 10 } async def prefetch_blocks( self, block_ids: list[str], priority: Optional[list[int]] = None, ) -> None: """Prefetch blocks from LMCache into local cache.""" if not self._active or not self._client: return # priority not supported in V1 fallback; fetch in order logger.debug(f"LMCache prefetch: {len(block_ids)} blocks") def get_stats(self) -> dict: """Return LMCache bridge statistics.""" return { "active": self._active, "offset_hints_enabled": self._enable_offset_hints, "cla_metadata_enabled": self._enable_cla_metadata, "pending_saves": len(self._pending_saves), }