File size: 8,327 Bytes
6d9c72b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
"""5 concrete demo agents simulating a RAG pipeline."""
import asyncio
import logging
from typing import Any

from agents.base_agent import BaseAgent

logger = logging.getLogger(__name__)

AGENT_CONFIGS = [
    {
        "id": "retriever",
        "role": "retrieve relevant documents from the corpus",
        "context_overlap": 0.6,
        "thinking": False,   # speed-critical, no CoT needed
    },
    {
        "id": "reranker",
        "role": "rerank retrieved documents by relevance",
        "context_overlap": 0.7,
        "thinking": False,   # deterministic ranking, no CoT needed
    },
    {
        "id": "summarizer",
        "role": "summarize retrieved documents into coherent context",
        "context_overlap": 0.6,
        "thinking": False,   # structured output, no CoT needed
    },
    {
        "id": "critic",
        "role": "verify factual accuracy and flag hallucinations",
        "context_overlap": 0.5,
        "thinking": True,    # reasoning-heavy, CoT improves accuracy
    },
    {
        "id": "responder",
        "role": "generate final user-facing response",
        "context_overlap": 0.4,
        "thinking": True,    # quality-critical final output
    },
]


class RetrieverAgent(BaseAgent):
    """Agent 1: Retrieves relevant documents."""

    def __init__(self):
        super().__init__("retriever", "retrieve relevant documents", thinking=False)

    async def process(self, input_data: Any) -> dict[str, Any]:
        shared_context = self._build_shared_context(input_data)
        
        try:
            await self.call_contextforge_register(shared_context)
            decision = await self.call_contextforge_optimize(shared_context)
        except Exception as e:
            logger.warning(f"ContextForge unavailable, using raw context: {e}")
            decision = {"strategy": "passthrough", "original_tokens": len(shared_context.split())}

        result = f"[{self.agent_id}] Retrieved docs for query: {input_data.get('query', 'unknown')}"
        return {
            "agent_id": self.agent_id,
            "result": result,
            "strategy": decision.get("strategy", "passthrough"),
            "tokens_before": decision.get("original_tokens", 0),
            "tokens_after": decision.get("final_tokens", 0),
        }

    def _build_shared_context(self, input_data: Any) -> str:
        return f"""System: You are a retriever agent.
Query: {input_data.get('query', '')}
Knowledge base: Document 1 about AI, Document 2 about ML, Document 3 about NLP.
Role: {self.role}
Instruction: Retrieve the most relevant documents."""


class RerankerAgent(BaseAgent):
    """Agent 2: Reranks documents by relevance."""

    def __init__(self):
        super().__init__("reranker", "rerank by relevance", thinking=False)

    async def process(self, input_data: Any) -> dict[str, Any]:
        prev_output = input_data.get("retriever_output", "")
        shared_context = self._build_shared_context(input_data, prev_output)

        try:
            await self.call_contextforge_register(shared_context)
            decision = await self.call_contextforge_optimize(shared_context)
        except Exception as e:
            logger.warning(f"ContextForge unavailable: {e}")
            decision = {"strategy": "passthrough", "original_tokens": len(shared_context.split())}

        result = f"[{self.agent_id}] Reranked documents by relevance scores"
        return {
            "agent_id": self.agent_id,
            "result": result,
            "strategy": decision.get("strategy", "passthrough"),
            "tokens_before": decision.get("original_tokens", 0),
            "tokens_after": decision.get("final_tokens", 0),
        }

    def _build_shared_context(self, input_data: Any, prev_output: str) -> str:
        return f"""System: You are a reranker agent.
Previous: {prev_output}
Query: {input_data.get('query', '')}
Role: {self.role}
Instruction: Rerank documents by relevance scores."""


class SummarizerAgent(BaseAgent):
    """Agent 3: Summarizes retrieved documents."""

    def __init__(self):
        super().__init__("summarizer", "summarize retrieved docs", thinking=False)

    async def process(self, input_data: Any) -> dict[str, Any]:
        prev_output = input_data.get("reranker_output", "")
        shared_context = self._build_shared_context(input_data, prev_output)

        try:
            await self.call_contextforge_register(shared_context)
            decision = await self.call_contextforge_optimize(shared_context)
        except Exception as e:
            logger.warning(f"ContextForge unavailable: {e}")
            decision = {"strategy": "passthrough", "original_tokens": len(shared_context.split())}

        result = f"[{self.agent_id}] Summarized documents into key points"
        return {
            "agent_id": self.agent_id,
            "result": result,
            "strategy": decision.get("strategy", "passthrough"),
            "tokens_before": decision.get("original_tokens", 0),
            "tokens_after": decision.get("final_tokens", 0),
        }

    def _build_shared_context(self, input_data: Any, prev_output: str) -> str:
        return f"""System: You are a summarizer agent.
Previous: {prev_output}
Query: {input_data.get('query', '')}
Role: {self.role}
Instruction: Summarize the retrieved documents into key points."""


class CriticAgent(BaseAgent):
    """Agent 4: Verifies factual accuracy."""

    def __init__(self):
        super().__init__("critic", "verify factual accuracy", thinking=True)

    async def process(self, input_data: Any) -> dict[str, Any]:
        prev_output = input_data.get("summarizer_output", "")
        shared_context = self._build_shared_context(input_data, prev_output)

        try:
            await self.call_contextforge_register(shared_context)
            decision = await self.call_contextforge_optimize(shared_context)
        except Exception as e:
            logger.warning(f"ContextForge unavailable: {e}")
            decision = {"strategy": "passthrough", "original_tokens": len(shared_context.split())}

        result = f"[{self.agent_id}] Verified factual accuracy of summary"
        return {
            "agent_id": self.agent_id,
            "result": result,
            "strategy": decision.get("strategy", "passthrough"),
            "tokens_before": decision.get("original_tokens", 0),
            "tokens_after": decision.get("final_tokens", 0),
        }

    def _build_shared_context(self, input_data: Any, prev_output: str) -> str:
        return f"""System: You are a critic agent.
Previous: {prev_output}
Query: {input_data.get('query', '')}
Role: {self.role}
Instruction: Verify factual accuracy and identify issues."""


class ResponderAgent(BaseAgent):
    """Agent 5: Generates final response."""

    def __init__(self):
        super().__init__("responder", "generate final response", thinking=True)

    async def process(self, input_data: Any) -> dict[str, Any]:
        prev_output = input_data.get("critic_output", "")
        shared_context = self._build_shared_context(input_data, prev_output)

        try:
            await self.call_contextforge_register(shared_context)
            decision = await self.call_contextforge_optimize(shared_context)
        except Exception as e:
            logger.warning(f"ContextForge unavailable: {e}")
            decision = {"strategy": "passthrough", "original_tokens": len(shared_context.split())}

        result = f"[{self.agent_id}] Generated final response to query"
        return {
            "agent_id": self.agent_id,
            "result": result,
            "strategy": decision.get("strategy", "passthrough"),
            "tokens_before": decision.get("original_tokens", 0),
            "tokens_after": decision.get("final_tokens", 0),
        }

    def _build_shared_context(self, input_data: Any, prev_output: str) -> str:
        return f"""System: You are a responder agent.
Previous: {prev_output}
Query: {input_data.get('query', '')}
Role: {self.role}
Instruction: Generate the final response based on all prior agent outputs."""


def create_agents() -> list[BaseAgent]:
    """Create all 5 demo agents."""
    return [
        RetrieverAgent(),
        RerankerAgent(),
        SummarizerAgent(),
        CriticAgent(),
        ResponderAgent(),
    ]