Pablo
feat: APOHARA: Context Forge V5 — synthesis + rebrand complete
cf0a8ed
"""
VisualKVCache — multimodal tensor registry for cross-agent image reuse.
Strategy:
1. Hash incoming images/audio by content (SHA256 of raw bytes)
2. Check VisualKVCache for existing embeddings
3. On miss: run vision encoder + store embeddings in cache
4. On hit: serve cached embeddings directly to language model
bypassing encoder entirely (disaggregated encoder pattern)
5. Batch-level DP hint: emit --mm-encoder-tp-mode data recommendation
when request batch has >= 2 images (AMD benchmark shows +15-45% gain)
"""
import asyncio
import hashlib
import logging
import time
from collections import OrderedDict
from dataclasses import dataclass, field
from typing import Optional
import numpy as np
logger = logging.getLogger(__name__)
@dataclass
class VisualEmbeddingBlock:
content_hash: str # SHA256 of raw image/audio bytes
modality: str # "image" | "audio" | "video"
resolution: Optional[tuple] # (width, height) for images
embedding: np.ndarray # shape (num_patches, hidden_dim)
encoder_model: str # e.g. "Qwen3-VL-235B-A22B-Instruct"
created_at: float # time.monotonic()
access_count: int = 0
estimated_vram_bytes: int = 0
@dataclass
class VisualCacheResult:
cache_hit: bool
content_hash: str
embedding: Optional[np.ndarray]
reuse_count: int # how many agents are sharing this
vram_saved_bytes: int # 0 on miss, embedding size on hit
dp_mode_recommended: bool # True if batch >= 2 images
class QueueingController:
"""Placeholder for queueing controller integration."""
def get_minimum_stable_blocks(self) -> int:
return 0
class VisualKVCache:
def __init__(
self,
max_entries: int = 100,
max_vram_bytes: int = 4 * 1024**3, # 4 GB default
queueing_controller: Optional["QueueingController"] = None,
):
self.max_entries = max_entries
self.max_vram_bytes = max_vram_bytes
self.queueing_controller = queueing_controller
# LFU cache using OrderedDict - move_to_end on access, popitem(last=False) for eviction
self._cache: OrderedDict[str, VisualEmbeddingBlock] = OrderedDict()
# Metrics
self._hits = 0
self._misses = 0
self._vram_saved_bytes = 0
self._dp_mode_recommendations = 0
self._rehash_count = 0
def lookup(self, content_hash: str, modality: str = "image") -> Optional[VisualEmbeddingBlock]:
"""O(1) lookup via dict keyed by content_hash. Updates access_count on hit."""
block = self._cache.get(content_hash)
if block is None:
self._misses += 1
logger.debug(f"VisualKVCache miss for hash={content_hash[:16]}...")
return None
# LFU: move to end (most recently used)
self._cache.move_to_end(content_hash)
block.access_count += 1
self._hits += 1
self._vram_saved_bytes += block.estimated_vram_bytes
logger.debug(
f"VisualKVCache hit for hash={content_hash[:16]}..., "
f"access_count={block.access_count}"
)
return block
def store(
self,
content_hash: str,
modality: str,
embedding: np.ndarray,
resolution: Optional[tuple] = None,
encoder_model: str = "Qwen3-VL-235B-A22B-Instruct",
) -> VisualEmbeddingBlock:
"""Store embedding. Triggers LFU eviction if max_vram_bytes would be exceeded."""
# Compute VRAM estimate: bytes = num_patches * hidden_dim * dtype_size
dtype_size = embedding.dtype.itemsize if embedding.dtype.itemsize > 0 else 4
estimated_vram_bytes = embedding.ndim * embedding.shape[-1] * dtype_size
if embedding.ndim == 3:
estimated_vram_bytes = embedding.shape[0] * embedding.shape[1] * embedding.shape[2] * dtype_size
else:
estimated_vram_bytes = embedding.shape[0] * embedding.shape[1] * dtype_size
block = VisualEmbeddingBlock(
content_hash=content_hash,
modality=modality,
resolution=resolution,
embedding=embedding,
encoder_model=encoder_model,
created_at=time.monotonic(),
access_count=0,
estimated_vram_bytes=estimated_vram_bytes,
)
# Check if we need to evict
self._evict_if_needed(estimated_vram_bytes)
# Store (overwrites if exists, preserving LRU position)
if content_hash in self._cache:
self._cache.move_to_end(content_hash)
else:
# Evict LFU entry if at capacity
while len(self._cache) >= self.max_entries:
self._evict_lfu()
self._cache[content_hash] = block
logger.debug(
f"VisualKVCache stored hash={content_hash[:16]}..., "
f"entries={len(self._cache)}, vram_bytes={estimated_vram_bytes}"
)
return block
def _evict_if_needed(self, incoming_vram_bytes: int) -> None:
"""Evict LFU entries until we have room for incoming entry."""
current_vram = sum(b.estimated_vram_bytes for b in self._cache.values())
while current_vram + incoming_vram_bytes > self.max_vram_bytes and self._cache:
evicted = self._evict_lfu()
if evicted:
current_vram -= evicted.estimated_vram_bytes
else:
break
def _evict_lfu(self) -> Optional[VisualEmbeddingBlock]:
"""Evict the least frequently used entry (first item in OrderedDict)."""
if not self._cache:
return None
# INV-11: With queueing_controller, respect minimum_stable_blocks
if self.queueing_controller is not None:
min_stable = self.queueing_controller.get_minimum_stable_blocks()
if len(self._cache) <= min_stable:
logger.debug(
f"Skipping eviction: cache size {len(self._cache)} <= "
f"minimum_stable_blocks {min_stable}"
)
return None
# Pop the first item (least frequently used due to move_to_end on access)
content_hash, evicted_block = self._cache.popitem(last=False)
logger.debug(
f"Evicted LFU block hash={content_hash[:16]}..., "
f"access_count={evicted_block.access_count}"
)
return evicted_block
def compute_content_hash(self, raw_bytes: bytes) -> str:
"""SHA256 hex digest of raw image/audio bytes. INV-13."""
return hashlib.sha256(raw_bytes).hexdigest()
def get_dp_mode_recommendation(
self,
batch_image_count: int,
image_resolution: tuple = (512, 512),
encoder_depth: int = 27,
) -> bool:
"""Returns True (use DP mode) when:
- batch_image_count >= 2 (AMD benchmark: +15-45% at 3+ images)
- OR image_resolution >= (512, 512) (AMD: +14.6% avg at 512px)
- encoder_depth >= 45 (InternVL: +15-17% avg gain)
Returns False when:
- batch_image_count >= 10 AND resolution <= (256, 256) (diminishing returns, +9.5%)
"""
w, h = image_resolution
# Diminishing returns case
if batch_image_count >= 10 and w <= 256 and h <= 256:
self._dp_mode_recommendations += 1
return False
# Positive conditions for DP mode
if batch_image_count >= 2:
self._dp_mode_recommendations += 1
return True
if w >= 512 and h >= 512:
self._dp_mode_recommendations += 1
return True
if encoder_depth >= 45:
self._dp_mode_recommendations += 1
return True
return False
def get_cache_stats(self) -> dict:
"""Returns dict for Prometheus: visual_cache_hits, visual_cache_misses, visual_cache_hit_rate, visual_vram_saved_bytes, visual_cache_entries, dp_mode_recommendations"""
total_requests = self._hits + self._misses
hit_rate = self._hits / total_requests if total_requests > 0 else 0.0
return {
"visual_cache_hits": self._hits,
"visual_cache_misses": self._misses,
"visual_cache_hit_rate": hit_rate,
"visual_vram_saved_bytes": self._vram_saved_bytes,
"visual_cache_entries": len(self._cache),
"dp_mode_recommendations": self._dp_mode_recommendations,
}
def clear(self) -> None:
"""Clear all cached entries and reset metrics."""
self._cache.clear()
self._hits = 0
self._misses = 0
self._vram_saved_bytes = 0
self._dp_mode_recommendations = 0
logger.info("VisualKVCache cleared")