| """ |
| Critic Agent |
| |
| Validates generated answers for hallucination and factual accuracy. |
| Follows FAANG best practices for production RAG systems. |
| |
| Key Features: |
| - Hallucination detection |
| - Citation verification |
| - Factual consistency checking |
| - Confidence scoring |
| - Actionable feedback for self-correction |
| """ |
|
|
| from typing import List, Optional, Dict, Any, Tuple |
| from pydantic import BaseModel, Field |
| from loguru import logger |
| from enum import Enum |
| import json |
| import re |
|
|
| try: |
| import httpx |
| HTTPX_AVAILABLE = True |
| except ImportError: |
| HTTPX_AVAILABLE = False |
|
|
| from .synthesizer import SynthesisResult, Citation |
| from .reranker import RankedResult |
|
|
|
|
| class IssueType(str, Enum): |
| """Types of validation issues.""" |
| HALLUCINATION = "hallucination" |
| UNSUPPORTED_CLAIM = "unsupported_claim" |
| INCORRECT_CITATION = "incorrect_citation" |
| CONTRADICTION = "contradiction" |
| INCOMPLETE = "incomplete" |
| FACTUAL_ERROR = "factual_error" |
|
|
|
|
| class ValidationIssue(BaseModel): |
| """A single validation issue found.""" |
| issue_type: IssueType |
| severity: float = Field(ge=0.0, le=1.0) |
| description: str |
| problematic_text: Optional[str] = None |
| suggestion: Optional[str] = None |
| citation_index: Optional[int] = None |
|
|
|
|
| class CriticResult(BaseModel): |
| """Result of answer validation.""" |
| is_valid: bool |
| confidence: float |
| issues: List[ValidationIssue] |
|
|
| |
| hallucination_score: float = Field(ge=0.0, le=1.0) |
| citation_accuracy: float = Field(ge=0.0, le=1.0) |
| factual_consistency: float = Field(ge=0.0, le=1.0) |
|
|
| |
| needs_revision: bool = False |
| revision_suggestions: List[str] = Field(default_factory=list) |
|
|
|
|
| class CriticConfig(BaseModel): |
| """Configuration for critic agent.""" |
| |
| model: str = Field(default="llama3.2:3b") |
| base_url: str = Field(default="http://localhost:11434") |
| temperature: float = Field(default=0.1) |
|
|
| |
| hallucination_threshold: float = Field(default=0.3) |
| citation_accuracy_threshold: float = Field(default=0.7) |
| overall_confidence_threshold: float = Field(default=0.6) |
|
|
| |
| check_hallucination: bool = Field(default=True) |
| check_citations: bool = Field(default=True) |
| check_consistency: bool = Field(default=True) |
|
|
|
|
| class CriticAgent: |
| """ |
| Validates generated answers for quality and accuracy. |
| |
| Capabilities: |
| 1. Hallucination detection |
| 2. Citation verification |
| 3. Factual consistency checking |
| 4. Actionable revision suggestions |
| """ |
|
|
| HALLUCINATION_PROMPT = """Analyze this answer for hallucination - information NOT supported by the provided sources. |
| |
| SOURCES: |
| {sources} |
| |
| ANSWER: |
| {answer} |
| |
| For each claim in the answer, determine if it is: |
| 1. SUPPORTED - Directly supported by the sources |
| 2. PARTIALLY_SUPPORTED - Somewhat supported but with additions |
| 3. UNSUPPORTED - Not found in sources (hallucination) |
| |
| Respond with JSON: |
| {{ |
| "claims": [ |
| {{"text": "claim text", "status": "SUPPORTED|PARTIALLY_SUPPORTED|UNSUPPORTED", "source_index": 1 or null}} |
| ], |
| "hallucination_score": 0.0-1.0, |
| "issues": ["list of specific issues found"] |
| }}""" |
|
|
| CITATION_PROMPT = """Verify that each citation in this answer correctly references the source material. |
| |
| SOURCES: |
| {sources} |
| |
| ANSWER WITH CITATIONS: |
| {answer} |
| |
| For each citation [N], check if the claim it supports is actually in source N. |
| |
| Respond with JSON: |
| {{ |
| "citation_checks": [ |
| {{"citation_index": 1, "is_accurate": true/false, "reason": "explanation"}} |
| ], |
| "overall_accuracy": 0.0-1.0 |
| }}""" |
|
|
| def __init__(self, config: Optional[CriticConfig] = None): |
| """ |
| Initialize Critic Agent. |
| |
| Args: |
| config: Critic configuration |
| """ |
| self.config = config or CriticConfig() |
| logger.info(f"CriticAgent initialized (model={self.config.model})") |
|
|
| def validate( |
| self, |
| synthesis_result: SynthesisResult, |
| sources: List[RankedResult], |
| ) -> CriticResult: |
| """ |
| Validate a synthesized answer. |
| |
| Args: |
| synthesis_result: The generated answer with citations |
| sources: Source chunks used for generation |
| |
| Returns: |
| CriticResult with validation details |
| """ |
| issues = [] |
| hallucination_score = 0.0 |
| citation_accuracy = 1.0 |
| factual_consistency = 1.0 |
|
|
| |
| if synthesis_result.abstained: |
| return CriticResult( |
| is_valid=True, |
| confidence=1.0, |
| issues=[], |
| hallucination_score=0.0, |
| citation_accuracy=1.0, |
| factual_consistency=1.0, |
| ) |
|
|
| |
| if self.config.check_hallucination and HTTPX_AVAILABLE: |
| h_score, h_issues = self._check_hallucination( |
| synthesis_result.answer, |
| sources, |
| ) |
| hallucination_score = h_score |
| issues.extend(h_issues) |
|
|
| |
| if self.config.check_citations and synthesis_result.citations: |
| c_accuracy, c_issues = self._check_citations( |
| synthesis_result.answer, |
| synthesis_result.citations, |
| sources, |
| ) |
| citation_accuracy = c_accuracy |
| issues.extend(c_issues) |
|
|
| |
| if self.config.check_consistency: |
| f_score, f_issues = self._check_consistency( |
| synthesis_result.answer, |
| sources, |
| ) |
| factual_consistency = f_score |
| issues.extend(f_issues) |
|
|
| |
| confidence = ( |
| 0.4 * (1 - hallucination_score) + |
| 0.4 * citation_accuracy + |
| 0.2 * factual_consistency |
| ) |
|
|
| |
| is_valid = ( |
| hallucination_score < self.config.hallucination_threshold and |
| citation_accuracy >= self.config.citation_accuracy_threshold and |
| confidence >= self.config.overall_confidence_threshold |
| ) |
|
|
| |
| needs_revision = not is_valid and len(issues) > 0 |
| revision_suggestions = self._generate_revision_suggestions(issues) if needs_revision else [] |
|
|
| return CriticResult( |
| is_valid=is_valid, |
| confidence=confidence, |
| issues=issues, |
| hallucination_score=hallucination_score, |
| citation_accuracy=citation_accuracy, |
| factual_consistency=factual_consistency, |
| needs_revision=needs_revision, |
| revision_suggestions=revision_suggestions, |
| ) |
|
|
| def _check_hallucination( |
| self, |
| answer: str, |
| sources: List[RankedResult], |
| ) -> Tuple[float, List[ValidationIssue]]: |
| """Check for hallucination using LLM.""" |
| |
| source_text = self._format_sources(sources) |
|
|
| prompt = self.HALLUCINATION_PROMPT.format( |
| sources=source_text, |
| answer=answer, |
| ) |
|
|
| try: |
| with httpx.Client(timeout=30.0) as client: |
| response = client.post( |
| f"{self.config.base_url}/api/generate", |
| json={ |
| "model": self.config.model, |
| "prompt": prompt, |
| "stream": False, |
| "options": { |
| "temperature": self.config.temperature, |
| "num_predict": 1024, |
| }, |
| }, |
| ) |
| response.raise_for_status() |
| result = response.json() |
|
|
| |
| response_text = result.get("response", "") |
| data = self._parse_json_response(response_text) |
|
|
| hallucination_score = data.get("hallucination_score", 0.0) |
|
|
| issues = [] |
| for claim in data.get("claims", []): |
| if claim.get("status") == "UNSUPPORTED": |
| issues.append(ValidationIssue( |
| issue_type=IssueType.HALLUCINATION, |
| severity=0.8, |
| description=f"Unsupported claim: {claim.get('text', '')}", |
| problematic_text=claim.get("text"), |
| suggestion="Remove or find supporting source", |
| )) |
| elif claim.get("status") == "PARTIALLY_SUPPORTED": |
| issues.append(ValidationIssue( |
| issue_type=IssueType.UNSUPPORTED_CLAIM, |
| severity=0.4, |
| description=f"Partially supported: {claim.get('text', '')}", |
| problematic_text=claim.get("text"), |
| suggestion="Verify claim against source", |
| )) |
|
|
| return hallucination_score, issues |
|
|
| except Exception as e: |
| logger.warning(f"Hallucination check failed: {e}") |
| |
| return self._heuristic_hallucination_check(answer, sources) |
|
|
| def _heuristic_hallucination_check( |
| self, |
| answer: str, |
| sources: List[RankedResult], |
| ) -> Tuple[float, List[ValidationIssue]]: |
| """Simple heuristic hallucination check.""" |
| |
| source_text = " ".join(s.text.lower() for s in sources) |
| answer_lower = answer.lower() |
|
|
| |
| |
| answer_words = set(re.findall(r'\b[A-Z][a-z]+\b', answer)) |
| source_words = set(re.findall(r'\b[A-Z][a-z]+\b', " ".join(s.text for s in sources))) |
|
|
| unsupported_entities = answer_words - source_words |
| |
| common_words = {"The", "This", "That", "However", "Therefore", "Additionally", "Based", "According"} |
| unsupported_entities = unsupported_entities - common_words |
|
|
| issues = [] |
| for entity in list(unsupported_entities)[:3]: |
| issues.append(ValidationIssue( |
| issue_type=IssueType.HALLUCINATION, |
| severity=0.5, |
| description=f"Entity '{entity}' not found in sources", |
| problematic_text=entity, |
| )) |
|
|
| |
| if answer_words: |
| score = len(unsupported_entities) / len(answer_words) |
| else: |
| score = 0.0 |
|
|
| return min(score, 1.0), issues |
|
|
| def _check_citations( |
| self, |
| answer: str, |
| citations: List[Citation], |
| sources: List[RankedResult], |
| ) -> Tuple[float, List[ValidationIssue]]: |
| """Verify citation accuracy.""" |
| if not citations: |
| |
| return 0.0, [ValidationIssue( |
| issue_type=IssueType.UNSUPPORTED_CLAIM, |
| severity=0.6, |
| description="Answer contains no citations", |
| suggestion="Add citations to support claims", |
| )] |
|
|
| |
| source_text = self._format_sources(sources) |
|
|
| if HTTPX_AVAILABLE: |
| try: |
| prompt = self.CITATION_PROMPT.format( |
| sources=source_text, |
| answer=answer, |
| ) |
|
|
| with httpx.Client(timeout=30.0) as client: |
| response = client.post( |
| f"{self.config.base_url}/api/generate", |
| json={ |
| "model": self.config.model, |
| "prompt": prompt, |
| "stream": False, |
| "options": { |
| "temperature": self.config.temperature, |
| "num_predict": 512, |
| }, |
| }, |
| ) |
| response.raise_for_status() |
| result = response.json() |
|
|
| response_text = result.get("response", "") |
| data = self._parse_json_response(response_text) |
|
|
| accuracy = data.get("overall_accuracy", 1.0) |
|
|
| issues = [] |
| for check in data.get("citation_checks", []): |
| if not check.get("is_accurate", True): |
| issues.append(ValidationIssue( |
| issue_type=IssueType.INCORRECT_CITATION, |
| severity=0.6, |
| description=f"Citation [{check.get('citation_index')}]: {check.get('reason', 'Inaccurate')}", |
| citation_index=check.get("citation_index"), |
| suggestion="Verify citation matches source", |
| )) |
|
|
| return accuracy, issues |
|
|
| except Exception as e: |
| logger.warning(f"Citation check failed: {e}") |
|
|
| |
| citation_pattern = r'\[(\d+)\]' |
| used_citations = set(int(m) for m in re.findall(citation_pattern, answer)) |
|
|
| if not used_citations: |
| return 0.5, [] |
|
|
| |
| valid_indices = set(range(1, len(sources) + 1)) |
| invalid = used_citations - valid_indices |
|
|
| issues = [] |
| for idx in invalid: |
| issues.append(ValidationIssue( |
| issue_type=IssueType.INCORRECT_CITATION, |
| severity=0.7, |
| description=f"Citation [{idx}] references non-existent source", |
| citation_index=idx, |
| )) |
|
|
| accuracy = 1.0 - (len(invalid) / len(used_citations)) if used_citations else 1.0 |
| return accuracy, issues |
|
|
| def _check_consistency( |
| self, |
| answer: str, |
| sources: List[RankedResult], |
| ) -> Tuple[float, List[ValidationIssue]]: |
| """Check for internal and external consistency.""" |
| issues = [] |
|
|
| |
| contradictions = self._detect_contradictions(answer) |
| for contradiction in contradictions: |
| issues.append(ValidationIssue( |
| issue_type=IssueType.CONTRADICTION, |
| severity=0.7, |
| description=contradiction, |
| )) |
|
|
| |
| |
| if len(answer) < 50 and len(sources) > 0: |
| issues.append(ValidationIssue( |
| issue_type=IssueType.INCOMPLETE, |
| severity=0.4, |
| description="Answer may be incomplete given available sources", |
| suggestion="Expand answer to include more relevant information", |
| )) |
|
|
| score = 1.0 - (0.2 * len(issues)) |
| return max(score, 0.0), issues |
|
|
| def _detect_contradictions(self, text: str) -> List[str]: |
| """Simple contradiction detection.""" |
| contradictions = [] |
|
|
| |
| sentences = text.split('.') |
| for i, sent in enumerate(sentences): |
| sent_lower = sent.lower() |
| |
| if any(c in sent_lower for c in ["however", "but", "although"]): |
| |
| pass |
|
|
| return contradictions |
|
|
| def _format_sources(self, sources: List[RankedResult]) -> str: |
| """Format sources for prompt.""" |
| parts = [] |
| for i, source in enumerate(sources, 1): |
| parts.append(f"[{i}] {source.text[:500]}") |
| return "\n\n".join(parts) |
|
|
| def _parse_json_response(self, text: str) -> Dict[str, Any]: |
| """Parse JSON from LLM response.""" |
| try: |
| json_match = re.search(r'\{[\s\S]*\}', text) |
| if json_match: |
| return json.loads(json_match.group()) |
| except json.JSONDecodeError: |
| pass |
| return {} |
|
|
| def _generate_revision_suggestions( |
| self, |
| issues: List[ValidationIssue], |
| ) -> List[str]: |
| """Generate actionable revision suggestions.""" |
| suggestions = [] |
|
|
| for issue in issues: |
| if issue.suggestion: |
| suggestions.append(issue.suggestion) |
| elif issue.issue_type == IssueType.HALLUCINATION: |
| suggestions.append( |
| f"Remove or verify: {issue.problematic_text or 'unsupported claim'}" |
| ) |
| elif issue.issue_type == IssueType.INCORRECT_CITATION: |
| suggestions.append( |
| f"Fix citation [{issue.citation_index}] to match source" |
| ) |
|
|
| return list(set(suggestions))[:5] |
|
|