Pablo
ContextForge v0.1.0 - shared context compiler for multi-agent LLM systems
6d9c72b
raw
history blame
3.33 kB
"""FastAPI MCP-compatible server exposing ContextForge tools."""
import asyncio
import logging
from datetime import datetime
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from contextforge.config import settings
from contextforge.metrics.collector import MetricsCollector
from contextforge.models import (
CompressionDecision,
ContextEntry,
ContextMatch,
MetricsSnapshot,
)
from contextforge.registry.context_registry import ContextRegistry
logger = logging.getLogger(__name__)
# Create FastAPI app
app = FastAPI(title="ContextForge", version="0.1.0")
# Global instances
registry = ContextRegistry()
metrics = MetricsCollector()
# Request/Response models
class ContextRegistration(BaseModel):
agent_id: str
context: str
class OptimizedContextRequest(BaseModel):
agent_id: str
context: str
# Tool endpoints
@app.post("/tools/register_context")
async def register_context(registration: ContextRegistration) -> ContextEntry:
"""Register an agent's context in the registry."""
logger.info(f"Registering context for agent: {registration.agent_id}")
entry = await registry.register(registration.agent_id, registration.context)
# Update metrics
await metrics.record_tokens(entry.token_count, entry.token_count)
active_count = len(await registry.get_all_active())
await metrics.set_active_agents(active_count)
return entry
@app.post("/tools/get_optimized_context")
async def get_optimized_context(request: OptimizedContextRequest) -> CompressionDecision:
"""Get compression decision for an agent's context."""
logger.info(f"Optimizing context for agent: {request.agent_id}")
from contextforge.compression.coordinator import CompressionCoordinator
coordinator = CompressionCoordinator()
decision = await coordinator.decide(request.agent_id, request.context)
# Update metrics
await metrics.record_tokens(decision.original_tokens, decision.final_tokens)
return decision
@app.get("/metrics/snapshot")
async def get_metrics() -> MetricsSnapshot:
"""Get current metrics snapshot."""
return await metrics.snapshot()
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "ok", "gpu": "MI300X", "service": "ContextForge"}
@app.get("/")
async def root():
"""Root endpoint with service info."""
return {
"service": "ContextForge",
"version": "0.1.0",
"description": "The shared context compiler for multi-agent LLM systems",
"docs": "/docs",
}
# Startup event
@app.on_event("startup")
async def startup_event():
logger.info(f"ContextForge started on {settings.contextforge_host}:{settings.contextforge_port}")
logger.info(f"vLLM: {settings.vllm_base_url}")
logger.info(f"Model: {settings.vllm_model}")
# Background metrics loop
async def metrics_loop():
while True:
try:
await asyncio.sleep(30)
snapshot = await metrics.snapshot()
logger.info(
f"Metrics: VRAM={snapshot.vram_used_gb:.1f}GB, "
f"TTFT={snapshot.ttft_ms:.1f}ms, "
f"Dedup={snapshot.dedup_rate:.1f}%"
)
except Exception as e:
logger.error(f"Metrics collection error: {e}")