contextforge-demo / agents /pipeline.py
Pablo
feat: APOHARA: Context Forge V5 — synthesis + rebrand complete
cf0a8ed
"""Pipeline orchestrator v3.0 - wired to ContextForge registry."""
import asyncio
import logging
import time
from typing import Any, Optional
from agents.demo_agents import create_agents
from apohara_context_forge.dedup.faiss_index import FAISSContextIndex
from apohara_context_forge.dedup.lsh_engine import LSHTokenMatcher
from apohara_context_forge.metrics.vram_monitor import VRAMMonitor
from apohara_context_forge.pipeline_config import PipelineConfig
from apohara_context_forge.registry.context_registry import ContextRegistry
from apohara_context_forge.registry.vram_aware_cache import VRAMAwareCache
logger = logging.getLogger(__name__)
class Pipeline:
"""
Orchestrates 5-agent pipeline with ContextForge v3.0 registry.
Uses LSHTokenMatcher + FAISSContextIndex + VRAMAwareCache for:
- Token-level SimHash deduplication (LSH)
- O(log n) ANN semantic search (FAISS)
- VRAM-pressure-responsive eviction (VRAMAwareCache)
Usage:
config = PipelineConfig(model_id="Qwen/Qwen3-235B-A22B")
pipeline = Pipeline(config=config)
await pipeline.start()
result = await pipeline.run("What is machine learning?")
await pipeline.stop()
"""
def __init__(
self,
config: Optional[PipelineConfig] = None,
enable_contextforge: bool = True,
):
self._config = config or PipelineConfig()
self._config.validate()
self.enable_contextforge = enable_contextforge
# Create ContextForge registry with dependency injection
self._registry: Optional[ContextRegistry] = None
self._vram_monitor: Optional[VRAMMonitor] = None
# Create demo agents
self.agents = create_agents()
# Metrics collection
self.metrics = {
"total_tokens_before": 0,
"total_tokens_after": 0,
"agent_ttft_ms": [],
"strategies_used": {},
"cache_hits": 0,
"cache_misses": 0,
"lsh_matches": 0,
}
async def start(self) -> None:
"""Start ContextForge registry and VRAM monitor."""
if not self.enable_contextforge:
return
# Initialize VRAM monitor
self._vram_monitor = VRAMMonitor()
await self._vram_monitor.start()
# Initialize registry with wired components
self._registry = ContextRegistry(
lsh_matcher=LSHTokenMatcher(
block_size=self._config.block_size,
hamming_threshold=self._config.hamming_threshold,
),
vram_cache=VRAMAwareCache(
max_token_budget=self._config.vram_budget_tokens,
),
faiss_index=FAISSContextIndex(dim=self._config.faiss_dim),
vram_budget_tokens=self._config.vram_budget_tokens,
block_size=self._config.block_size,
hamming_threshold=self._config.hamming_threshold,
faiss_nlist=self._config.faiss_nlist,
)
await self._registry.start()
logger.info(f"Pipeline started with ContextForge v3.0 (model={self._config.model_id})")
async def stop(self) -> None:
"""Stop ContextForge registry and VRAM monitor."""
if self._registry:
await self._registry.stop()
self._registry = None
if self._vram_monitor:
await self._vram_monitor.stop()
self._vram_monitor = None
logger.info("Pipeline stopped")
async def run(self, query: str) -> dict[str, Any]:
"""Run the full pipeline for a query."""
logger.info(f"Starting pipeline for query: {query[:50]}...")
input_data = {"query": query}
pipeline_output = {}
start_time = time.time()
for i, agent in enumerate(self.agents):
agent_start = time.time()
# Build context for this agent
if self.enable_contextforge and self._registry:
shared_context = self._build_shared_context(input_data, agent)
# Register with ContextForge
try:
# Get shared system prompt from first agent or use default
system_prompt = self._get_system_prompt()
role_prompt = self._build_role_prompt(agent)
await self._registry.register_agent(
agent.agent_id,
system_prompt,
role_prompt,
)
# Query for shared context across agents
all_agents = await self._registry.get_all_agents()
if len(all_agents) >= 2:
shared_results = await self._registry.get_shared_context(
all_agents,
target_agent_id=agent.agent_id,
)
if shared_results:
self.metrics["lsh_matches"] += 1
self.metrics["cache_hits"] += 1
else:
self.metrics["cache_misses"] += 1
except Exception as e:
logger.warning(f"ContextForge error for {agent.agent_id}: {e}")
# Process agent
result = await agent.process(input_data)
agent_duration = (time.time() - agent_start) * 1000
pipeline_output[f"{agent.agent_id}_output"] = result["result"]
pipeline_output[f"{agent.agent_id}_metrics"] = {
"ttft_ms": agent_duration,
"strategy": result["strategy"],
"tokens_before": result["tokens_before"],
"tokens_after": result["tokens_after"],
}
self.metrics["total_tokens_before"] += result["tokens_before"]
self.metrics["total_tokens_after"] += result["tokens_after"]
self.metrics["agent_ttft_ms"].append(agent_duration)
self.metrics["strategies_used"][result["strategy"]] = \
self.metrics["strategies_used"].get(result["strategy"], 0) + 1
input_data[f"{agent.agent_id}_output"] = result["result"]
total_duration = (time.time() - start_time) * 1000
return {
"query": query,
"final_output": pipeline_output.get("responder_output", ""),
"pipeline_duration_ms": total_duration,
"agent_metrics": pipeline_output,
"summary": {
"total_tokens_before": self.metrics["total_tokens_before"],
"total_tokens_after": self.metrics["total_tokens_after"],
"avg_ttft_ms": sum(self.metrics["agent_ttft_ms"]) / len(self.metrics["agent_ttft_ms"]),
"strategies": self.metrics["strategies_used"],
"token_savings_pct": (
(self.metrics["total_tokens_before"] - self.metrics["total_tokens_after"])
/ self.metrics["total_tokens_before"] * 100
if self.metrics["total_tokens_before"] > 0 else 0
),
"cache_hits": self.metrics["cache_hits"],
"cache_misses": self.metrics["cache_misses"],
"lsh_matches": self.metrics["lsh_matches"],
},
"contextforge": {
"vram_pressure": self._vram_monitor.get_pressure() if self._vram_monitor else 0.0,
"eviction_mode": self._registry.get_vram_mode() if self._registry else "unknown",
"registry_size": self._registry.registry_size if self._registry else 0,
} if self.enable_contextforge else None,
}
def _build_shared_context(self, input_data: dict, agent) -> str:
"""Build the shared context string for an agent."""
prev_output = input_data.get(f"{agent.agent_id}_output", "")
return f"Query: {input_data.get('query', '')}\nPrevious: {prev_output}\nRole: {agent.role}"
def _get_system_prompt(self) -> str:
"""Get the canonical system prompt (shared across all agents)."""
return (
"You are a helpful AI assistant. "
"Provide accurate, detailed, and thoughtful responses. "
"Use chain-of-thought reasoning when appropriate."
)
def _build_role_prompt(self, agent) -> str:
"""Build agent-specific role prompt."""
return f"You are a {agent.role}. {agent.agent_id}"
@property
def registry(self) -> Optional[ContextRegistry]:
"""Direct access to ContextRegistry (for advanced queries)."""
return self._registry
async def run_pipeline_dry():
"""Dry run - prints agent plan without execution."""
agents = create_agents()
print("\n=== ContextForge v3.0 Pipeline - Dry Run ===")
print(f"Total agents: {len(agents)}\n")
for i, agent in enumerate(agents, 1):
print(f"{i}. {agent.agent_id.upper()} ({agent.role})")
print("\nPipeline flow:")
print(" Query -> Retriever -> Reranker -> Summarizer -> Critic -> Responder")
print("\nContextForge v3.0 wiring:")
print(" - LSHTokenMatcher: SimHash on Qwen3 token IDs")
print(" - FAISSContextIndex: O(log n) ANN search")
print(" - VRAMAwareCache: 5-mode VRAM-pressure eviction")
print("\nEach agent will:")
print(" 1. Register context with ContextForge (LSH + VRAM cache)")
print(" 2. Query shared context via FAISS ANN + LSH validation")
print(" 3. Return result with metrics\n")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="ContextForge v3.0 Pipeline")
parser.add_argument("--dry-run", action="store_true", help="Print plan without running")
parser.add_argument("--query", default="What is machine learning?", help="Query to process")
parser.add_argument(
"--no-contextforge",
action="store_true",
help="Disable ContextForge (use raw pipeline)",
)
args = parser.parse_args()
if args.dry_run:
asyncio.run(run_pipeline_dry())
else:
config = PipelineConfig()
pipeline = Pipeline(config=config, enable_contextforge=not args.no_contextforge)
async def main():
await pipeline.start()
result = await pipeline.run(args.query)
await pipeline.stop()
return result
result = asyncio.run(main())
print(f"\n=== Pipeline Result ===")
print(f"Token savings: {result['summary']['token_savings_pct']:.1f}%")
print(f"Avg TTFT: {result['summary']['avg_ttft_ms']:.1f}ms")
print(f"Strategies: {result['summary']['strategies']}")
if result.get("contextforge"):
print(f"VRAM pressure: {result['contextforge']['vram_pressure']:.2%}")
print(f"Eviction mode: {result['contextforge']['eviction_mode']}")
print(f"Registry size: {result['contextforge']['registry_size']}")