contextforge-demo / agents /demo_agents.py
Pablo
ContextForge v0.1.0 - shared context compiler for multi-agent LLM systems
6d9c72b
"""5 concrete demo agents simulating a RAG pipeline."""
import asyncio
import logging
from typing import Any
from agents.base_agent import BaseAgent
logger = logging.getLogger(__name__)
AGENT_CONFIGS = [
{
"id": "retriever",
"role": "retrieve relevant documents from the corpus",
"context_overlap": 0.6,
"thinking": False, # speed-critical, no CoT needed
},
{
"id": "reranker",
"role": "rerank retrieved documents by relevance",
"context_overlap": 0.7,
"thinking": False, # deterministic ranking, no CoT needed
},
{
"id": "summarizer",
"role": "summarize retrieved documents into coherent context",
"context_overlap": 0.6,
"thinking": False, # structured output, no CoT needed
},
{
"id": "critic",
"role": "verify factual accuracy and flag hallucinations",
"context_overlap": 0.5,
"thinking": True, # reasoning-heavy, CoT improves accuracy
},
{
"id": "responder",
"role": "generate final user-facing response",
"context_overlap": 0.4,
"thinking": True, # quality-critical final output
},
]
class RetrieverAgent(BaseAgent):
"""Agent 1: Retrieves relevant documents."""
def __init__(self):
super().__init__("retriever", "retrieve relevant documents", thinking=False)
async def process(self, input_data: Any) -> dict[str, Any]:
shared_context = self._build_shared_context(input_data)
try:
await self.call_contextforge_register(shared_context)
decision = await self.call_contextforge_optimize(shared_context)
except Exception as e:
logger.warning(f"ContextForge unavailable, using raw context: {e}")
decision = {"strategy": "passthrough", "original_tokens": len(shared_context.split())}
result = f"[{self.agent_id}] Retrieved docs for query: {input_data.get('query', 'unknown')}"
return {
"agent_id": self.agent_id,
"result": result,
"strategy": decision.get("strategy", "passthrough"),
"tokens_before": decision.get("original_tokens", 0),
"tokens_after": decision.get("final_tokens", 0),
}
def _build_shared_context(self, input_data: Any) -> str:
return f"""System: You are a retriever agent.
Query: {input_data.get('query', '')}
Knowledge base: Document 1 about AI, Document 2 about ML, Document 3 about NLP.
Role: {self.role}
Instruction: Retrieve the most relevant documents."""
class RerankerAgent(BaseAgent):
"""Agent 2: Reranks documents by relevance."""
def __init__(self):
super().__init__("reranker", "rerank by relevance", thinking=False)
async def process(self, input_data: Any) -> dict[str, Any]:
prev_output = input_data.get("retriever_output", "")
shared_context = self._build_shared_context(input_data, prev_output)
try:
await self.call_contextforge_register(shared_context)
decision = await self.call_contextforge_optimize(shared_context)
except Exception as e:
logger.warning(f"ContextForge unavailable: {e}")
decision = {"strategy": "passthrough", "original_tokens": len(shared_context.split())}
result = f"[{self.agent_id}] Reranked documents by relevance scores"
return {
"agent_id": self.agent_id,
"result": result,
"strategy": decision.get("strategy", "passthrough"),
"tokens_before": decision.get("original_tokens", 0),
"tokens_after": decision.get("final_tokens", 0),
}
def _build_shared_context(self, input_data: Any, prev_output: str) -> str:
return f"""System: You are a reranker agent.
Previous: {prev_output}
Query: {input_data.get('query', '')}
Role: {self.role}
Instruction: Rerank documents by relevance scores."""
class SummarizerAgent(BaseAgent):
"""Agent 3: Summarizes retrieved documents."""
def __init__(self):
super().__init__("summarizer", "summarize retrieved docs", thinking=False)
async def process(self, input_data: Any) -> dict[str, Any]:
prev_output = input_data.get("reranker_output", "")
shared_context = self._build_shared_context(input_data, prev_output)
try:
await self.call_contextforge_register(shared_context)
decision = await self.call_contextforge_optimize(shared_context)
except Exception as e:
logger.warning(f"ContextForge unavailable: {e}")
decision = {"strategy": "passthrough", "original_tokens": len(shared_context.split())}
result = f"[{self.agent_id}] Summarized documents into key points"
return {
"agent_id": self.agent_id,
"result": result,
"strategy": decision.get("strategy", "passthrough"),
"tokens_before": decision.get("original_tokens", 0),
"tokens_after": decision.get("final_tokens", 0),
}
def _build_shared_context(self, input_data: Any, prev_output: str) -> str:
return f"""System: You are a summarizer agent.
Previous: {prev_output}
Query: {input_data.get('query', '')}
Role: {self.role}
Instruction: Summarize the retrieved documents into key points."""
class CriticAgent(BaseAgent):
"""Agent 4: Verifies factual accuracy."""
def __init__(self):
super().__init__("critic", "verify factual accuracy", thinking=True)
async def process(self, input_data: Any) -> dict[str, Any]:
prev_output = input_data.get("summarizer_output", "")
shared_context = self._build_shared_context(input_data, prev_output)
try:
await self.call_contextforge_register(shared_context)
decision = await self.call_contextforge_optimize(shared_context)
except Exception as e:
logger.warning(f"ContextForge unavailable: {e}")
decision = {"strategy": "passthrough", "original_tokens": len(shared_context.split())}
result = f"[{self.agent_id}] Verified factual accuracy of summary"
return {
"agent_id": self.agent_id,
"result": result,
"strategy": decision.get("strategy", "passthrough"),
"tokens_before": decision.get("original_tokens", 0),
"tokens_after": decision.get("final_tokens", 0),
}
def _build_shared_context(self, input_data: Any, prev_output: str) -> str:
return f"""System: You are a critic agent.
Previous: {prev_output}
Query: {input_data.get('query', '')}
Role: {self.role}
Instruction: Verify factual accuracy and identify issues."""
class ResponderAgent(BaseAgent):
"""Agent 5: Generates final response."""
def __init__(self):
super().__init__("responder", "generate final response", thinking=True)
async def process(self, input_data: Any) -> dict[str, Any]:
prev_output = input_data.get("critic_output", "")
shared_context = self._build_shared_context(input_data, prev_output)
try:
await self.call_contextforge_register(shared_context)
decision = await self.call_contextforge_optimize(shared_context)
except Exception as e:
logger.warning(f"ContextForge unavailable: {e}")
decision = {"strategy": "passthrough", "original_tokens": len(shared_context.split())}
result = f"[{self.agent_id}] Generated final response to query"
return {
"agent_id": self.agent_id,
"result": result,
"strategy": decision.get("strategy", "passthrough"),
"tokens_before": decision.get("original_tokens", 0),
"tokens_after": decision.get("final_tokens", 0),
}
def _build_shared_context(self, input_data: Any, prev_output: str) -> str:
return f"""System: You are a responder agent.
Previous: {prev_output}
Query: {input_data.get('query', '')}
Role: {self.role}
Instruction: Generate the final response based on all prior agent outputs."""
def create_agents() -> list[BaseAgent]:
"""Create all 5 demo agents."""
return [
RetrieverAgent(),
RerankerAgent(),
SummarizerAgent(),
CriticAgent(),
ResponderAgent(),
]