| """ |
| Synthesizer Agent |
| |
| Generates grounded answers with proper citations. |
| Follows FAANG best practices for production RAG systems. |
| |
| Key Features: |
| - Structured answer generation with citations |
| - Multi-source synthesis |
| - Confidence estimation |
| - Abstention when information is insufficient |
| - Support for different answer formats (prose, list, table) |
| """ |
|
|
| from typing import List, Optional, Dict, Any, Literal |
| from pydantic import BaseModel, Field |
| from loguru import logger |
| from enum import Enum |
| import json |
| import re |
|
|
| try: |
| import httpx |
| HTTPX_AVAILABLE = True |
| except ImportError: |
| HTTPX_AVAILABLE = False |
|
|
| from .reranker import RankedResult |
| from .query_planner import QueryPlan, QueryIntent |
|
|
|
|
| class AnswerFormat(str, Enum): |
| """Format for generated answer.""" |
| PROSE = "prose" |
| BULLET_POINTS = "bullet_points" |
| TABLE = "table" |
| STEP_BY_STEP = "step_by_step" |
|
|
|
|
| class Citation(BaseModel): |
| """A citation reference in the answer.""" |
| index: int |
| chunk_id: str |
| document_id: str |
| page: Optional[int] = None |
| text_snippet: str |
| relevance_score: float |
|
|
|
|
| class SynthesisResult(BaseModel): |
| """Result from answer synthesis.""" |
| answer: str |
| citations: List[Citation] |
| confidence: float |
| format: AnswerFormat |
|
|
| |
| num_sources_used: int |
| abstained: bool = False |
| abstain_reason: Optional[str] = None |
|
|
| |
| raw_context: Optional[str] = None |
|
|
|
|
| class SynthesizerConfig(BaseModel): |
| """Configuration for synthesizer.""" |
| |
| model: str = Field(default="llama3.2:3b") |
| base_url: str = Field(default="http://localhost:11434") |
| temperature: float = Field(default=0.2) |
| max_tokens: int = Field(default=1024) |
|
|
| |
| require_citations: bool = Field(default=True) |
| min_citations: int = Field(default=1) |
| citation_format: str = Field(default="[{index}]") |
|
|
| |
| abstain_on_low_confidence: bool = Field(default=True) |
| confidence_threshold: float = Field(default=0.4) |
| min_sources: int = Field(default=1) |
|
|
| |
| max_context_length: int = Field(default=4000) |
|
|
|
|
| class SynthesizerAgent: |
| """ |
| Generates grounded answers with citations. |
| |
| Capabilities: |
| 1. Context-aware answer generation |
| 2. Proper citation formatting |
| 3. Multi-source synthesis |
| 4. Confidence-based abstention |
| 5. Format adaptation based on query intent |
| """ |
|
|
| SYNTHESIS_PROMPT = """You are a precise document question-answering assistant. |
| Generate an answer to the query based ONLY on the provided context. |
| |
| RULES: |
| 1. Only use information from the provided context |
| 2. Cite sources using [N] notation where N matches the source number (e.g., [1], [2]) |
| 3. If the context doesn't contain enough information, say "I cannot answer this question based on the available information." |
| 4. Be precise, accurate, and concise |
| 5. Include at least one citation for factual claims |
| 6. Do not make up information not in the context |
| |
| CONTEXT: |
| {context} |
| |
| QUERY: {query} |
| |
| FORMAT: {format_instruction} |
| |
| ANSWER:""" |
|
|
| FORMAT_INSTRUCTIONS = { |
| AnswerFormat.PROSE: "Write a clear, flowing paragraph with proper citations.", |
| AnswerFormat.BULLET_POINTS: "Use bullet points for each key point, with citations.", |
| AnswerFormat.TABLE: "Format as a markdown table if comparing items.", |
| AnswerFormat.STEP_BY_STEP: "Number each step clearly with citations.", |
| } |
|
|
| def __init__(self, config: Optional[SynthesizerConfig] = None): |
| """ |
| Initialize Synthesizer Agent. |
| |
| Args: |
| config: Synthesizer configuration |
| """ |
| self.config = config or SynthesizerConfig() |
| logger.info(f"SynthesizerAgent initialized (model={self.config.model})") |
|
|
| def synthesize( |
| self, |
| query: str, |
| results: List[RankedResult], |
| plan: Optional[QueryPlan] = None, |
| format_override: Optional[AnswerFormat] = None, |
| ) -> SynthesisResult: |
| """ |
| Generate answer from ranked results. |
| |
| Args: |
| query: User's question |
| results: Ranked retrieval results |
| plan: Optional query plan for context |
| format_override: Override auto-detected format |
| |
| Returns: |
| SynthesisResult with answer and citations |
| """ |
| |
| if not results: |
| return self._abstain("No relevant sources found") |
|
|
| |
| avg_confidence = sum(r.relevance_score for r in results) / len(results) |
|
|
| if self.config.abstain_on_low_confidence: |
| if avg_confidence < self.config.confidence_threshold: |
| return self._abstain( |
| f"Low confidence ({avg_confidence:.2f}) in available sources" |
| ) |
| if len(results) < self.config.min_sources: |
| return self._abstain( |
| f"Insufficient sources ({len(results)} < {self.config.min_sources})" |
| ) |
|
|
| |
| answer_format = format_override or self._detect_format(query, plan) |
|
|
| |
| context, citations = self._build_context(results) |
|
|
| |
| if HTTPX_AVAILABLE: |
| raw_answer = self._generate_answer(query, context, answer_format) |
| else: |
| raw_answer = self._simple_answer(query, results) |
|
|
| |
| used_citations = self._extract_used_citations(raw_answer, citations) |
|
|
| |
| confidence = self._calculate_confidence(results, used_citations) |
|
|
| return SynthesisResult( |
| answer=raw_answer, |
| citations=used_citations, |
| confidence=confidence, |
| format=answer_format, |
| num_sources_used=len(used_citations), |
| abstained=False, |
| raw_context=context if len(context) < 2000 else None, |
| ) |
|
|
| def synthesize_multi_hop( |
| self, |
| query: str, |
| sub_results: Dict[str, List[RankedResult]], |
| plan: QueryPlan, |
| ) -> SynthesisResult: |
| """ |
| Synthesize answer from multiple sub-query results. |
| |
| Args: |
| query: Original query |
| sub_results: Results for each sub-query |
| plan: Query plan with sub-queries |
| |
| Returns: |
| Synthesized answer combining all sources |
| """ |
| |
| all_results = [] |
| for sq_id, results in sub_results.items(): |
| all_results.extend(results) |
|
|
| |
| seen = set() |
| unique_results = [] |
| for result in all_results: |
| if result.chunk_id not in seen: |
| seen.add(result.chunk_id) |
| unique_results.append(result) |
|
|
| |
| unique_results.sort(key=lambda x: x.relevance_score, reverse=True) |
|
|
| |
| if plan.requires_aggregation: |
| return self._synthesize_aggregation(query, unique_results, plan) |
|
|
| return self.synthesize(query, unique_results, plan) |
|
|
| def _abstain(self, reason: str) -> SynthesisResult: |
| """Create an abstention result.""" |
| return SynthesisResult( |
| answer="I cannot answer this question based on the available information.", |
| citations=[], |
| confidence=0.0, |
| format=AnswerFormat.PROSE, |
| num_sources_used=0, |
| abstained=True, |
| abstain_reason=reason, |
| ) |
|
|
| def _detect_format( |
| self, |
| query: str, |
| plan: Optional[QueryPlan], |
| ) -> AnswerFormat: |
| """Auto-detect best answer format.""" |
| query_lower = query.lower() |
|
|
| if plan: |
| if plan.intent == QueryIntent.COMPARISON: |
| return AnswerFormat.TABLE |
| if plan.intent == QueryIntent.PROCEDURAL: |
| return AnswerFormat.STEP_BY_STEP |
| if plan.intent == QueryIntent.LIST: |
| return AnswerFormat.BULLET_POINTS |
|
|
| |
| if any(p in query_lower for p in ["list", "what are all", "enumerate"]): |
| return AnswerFormat.BULLET_POINTS |
| if any(p in query_lower for p in ["compare", "difference", "vs"]): |
| return AnswerFormat.TABLE |
| if any(p in query_lower for p in ["how to", "steps", "process"]): |
| return AnswerFormat.STEP_BY_STEP |
|
|
| return AnswerFormat.PROSE |
|
|
| def _build_context( |
| self, |
| results: List[RankedResult], |
| ) -> tuple[str, List[Citation]]: |
| """Build context string and citation list.""" |
| context_parts = [] |
| citations = [] |
|
|
| total_length = 0 |
|
|
| for i, result in enumerate(results, 1): |
| |
| chunk_text = result.text |
| if total_length + len(chunk_text) > self.config.max_context_length: |
| |
| remaining = self.config.max_context_length - total_length |
| if remaining > 100: |
| chunk_text = chunk_text[:remaining] + "..." |
| else: |
| break |
|
|
| |
| header = f"[{i}]" |
| if result.page is not None: |
| header += f" (Page {result.page + 1})" |
| if result.source_path: |
| header += f" - {result.source_path}" |
|
|
| context_parts.append(f"{header}:\n{chunk_text}\n") |
| total_length += len(chunk_text) |
|
|
| |
| citations.append(Citation( |
| index=i, |
| chunk_id=result.chunk_id, |
| document_id=result.document_id, |
| page=result.page, |
| text_snippet=chunk_text[:150] + ("..." if len(chunk_text) > 150 else ""), |
| relevance_score=result.relevance_score, |
| )) |
|
|
| return "\n".join(context_parts), citations |
|
|
| def _generate_answer( |
| self, |
| query: str, |
| context: str, |
| answer_format: AnswerFormat, |
| ) -> str: |
| """Generate answer using LLM.""" |
| format_instruction = self.FORMAT_INSTRUCTIONS.get( |
| answer_format, |
| self.FORMAT_INSTRUCTIONS[AnswerFormat.PROSE] |
| ) |
|
|
| prompt = self.SYNTHESIS_PROMPT.format( |
| context=context, |
| query=query, |
| format_instruction=format_instruction, |
| ) |
|
|
| with httpx.Client(timeout=60.0) as client: |
| response = client.post( |
| f"{self.config.base_url}/api/generate", |
| json={ |
| "model": self.config.model, |
| "prompt": prompt, |
| "stream": False, |
| "options": { |
| "temperature": self.config.temperature, |
| "num_predict": self.config.max_tokens, |
| }, |
| }, |
| ) |
| response.raise_for_status() |
| result = response.json() |
|
|
| return result.get("response", "").strip() |
|
|
| def _simple_answer( |
| self, |
| query: str, |
| results: List[RankedResult], |
| ) -> str: |
| """Simple answer without LLM (fallback).""" |
| if not results: |
| return "No information found." |
|
|
| |
| answer_parts = ["Based on the available sources:\n"] |
| for i, result in enumerate(results[:3], 1): |
| answer_parts.append(f"[{i}] {result.text[:200]}...") |
|
|
| return "\n\n".join(answer_parts) |
|
|
| def _extract_used_citations( |
| self, |
| answer: str, |
| all_citations: List[Citation], |
| ) -> List[Citation]: |
| """Extract citations actually used in the answer.""" |
| used_indices = set() |
|
|
| |
| pattern = r'\[(\d+)\]' |
| matches = re.findall(pattern, answer) |
|
|
| for match in matches: |
| idx = int(match) |
| if 1 <= idx <= len(all_citations): |
| used_indices.add(idx) |
|
|
| |
| return [c for c in all_citations if c.index in used_indices] |
|
|
| def _calculate_confidence( |
| self, |
| results: List[RankedResult], |
| used_citations: List[Citation], |
| ) -> float: |
| """Calculate overall confidence in the answer.""" |
| if not results: |
| return 0.0 |
|
|
| |
| |
| if used_citations: |
| source_confidence = sum(c.relevance_score for c in used_citations) / len(used_citations) |
| else: |
| source_confidence = sum(r.relevance_score for r in results) / len(results) |
|
|
| |
| source_count_factor = min(len(used_citations) / 3, 1.0) if used_citations else 0.5 |
|
|
| |
| |
| consistency_factor = 0.8 |
|
|
| confidence = ( |
| 0.5 * source_confidence + |
| 0.3 * source_count_factor + |
| 0.2 * consistency_factor |
| ) |
|
|
| return min(max(confidence, 0.0), 1.0) |
|
|
| def _synthesize_aggregation( |
| self, |
| query: str, |
| results: List[RankedResult], |
| plan: QueryPlan, |
| ) -> SynthesisResult: |
| """Synthesize aggregation-style answer.""" |
| |
| return self.synthesize( |
| query, |
| results, |
| plan, |
| format_override=AnswerFormat.BULLET_POINTS, |
| ) |
|
|