Pablo
ContextForge v0.1.0 - shared context compiler for multi-agent LLM systems
6d9c72b
raw
history blame
2.13 kB
"""LLMLingua-2 async wrapper - runs in ThreadPoolExecutor."""
import asyncio
import logging
from typing import Literal
from llmlingua import LLMLingua
logger = logging.getLogger(__name__)
class ContextCompressor:
"""Async wrapper for LLMLingua-2 compression."""
def __init__(self, model_name: str = "microsoft/llmlingua-2-xlm-roberta-large-meetingbank"):
self._model_name = model_name
self._model: LLMLingua | None = None
self._lock = asyncio.Lock()
async def load(self) -> None:
"""Lazy load the compressor model."""
if self._model is None:
async with self._lock:
if self._model is None:
logger.info(f"Loading compressor: {self._model_name}")
self._model = LLMLingua(self._model_name)
async def compress(self, context: str, rate: float = 0.5) -> tuple[str, float]:
"""
Compress context at given rate.
Returns (compressed_text, actual_compression_ratio).
"""
await self.load()
loop = asyncio.get_event_loop()
def sync_compress():
assert self._model is not None
result = self._model.compress_prompt(
context,
rate=rate,
force_tokens=[".", "!", "?", ",", "\n"],
)
return result["compressed_prompt"]
compressed = await loop.run_in_executor(None, sync_compress)
original_tokens = len(context.split())
compressed_tokens = len(compressed.split())
actual_ratio = original_tokens / compressed_tokens if compressed_tokens > 0 else 1.0
logger.debug(f"Compressed {original_tokens} -> {compressed_tokens} tokens (rate={rate})")
return compressed, actual_ratio
async def compress_batch(
self, contexts: list[str], rate: float = 0.5
) -> list[tuple[str, float]]:
"""Compress multiple contexts."""
results = []
for ctx in contexts:
compressed, ratio = await self.compress(ctx, rate)
results.append((compressed, ratio))
return results