""" Dual-LLM Verification and Answer Modification Module for RAG-Anything This module implements a sophisticated two-stage verification system where: 1. A generator LLM produces an initial answer 2. A verifier LLM (typically more powerful) evaluates answer quality 3. If quality is below threshold, a modifier improves the answer iteratively The system prevents hallucinations, improves factual consistency, and ensures high-quality responses through systematic verification and refinement. Usage Example: ```python from raganything.verification import ( AnswerVerifier, AnswerModifier, DualLLMPipeline, VerificationConfig ) # Initialize configuration config = VerificationConfig( verification_threshold=7.5, max_modification_iterations=3, require_all_criteria_pass=False ) # Create pipeline pipeline = DualLLMPipeline( generator_llm=generator_func, verifier_llm=verifier_func, config=config ) # Process answer with verification result = await pipeline.process_answer( query="What causes diabetes?", answer="Diabetes is caused by...", context="[Retrieved context about diabetes]" ) print(f"Final answer: {result['final_answer']}") print(f"Quality score: {result['final_score']}/10") print(f"Iterations: {result['total_iterations']}") ``` Author: RAG-Anything Team Version: 2.0.0 """ from __future__ import annotations import re import json import asyncio from typing import Dict, List, Any, Optional, Callable from dataclasses import dataclass, field from enum import Enum from lightrag.utils import logger # ============================================================================= # Configuration Classes # ============================================================================= class VerificationCriterion(Enum): """Verification criteria for answer quality assessment""" FAITHFULNESS = "faithfulness" # Supported by context COMPLETENESS = "completeness" # Addresses all query aspects ACCURACY = "accuracy" # Factually correct CLARITY = "clarity" # Clear and well-structured RELEVANCE = "relevance" # Directly answers the query COHERENCE = "coherence" # Logically consistent @dataclass class VerificationConfig: """Configuration for dual-LLM verification system Attributes: verification_threshold: Minimum score (0-10) for answer to pass max_modification_iterations: Maximum number of improvement attempts require_all_criteria_pass: Whether all criteria must pass individually individual_criterion_threshold: Min score per criterion if required enable_confidence_scoring: Enable probabilistic confidence estimation enable_detailed_feedback: Generate detailed improvement suggestions stop_on_first_pass: Stop iterations when answer first passes criteria_weights: Custom weights for each criterion (must sum to 1.0) context_truncation_length: Max context chars to send to verifier min_improvement_delta: Minimum score improvement to continue iterations """ verification_threshold: float = 7.0 max_modification_iterations: int = 2 require_all_criteria_pass: bool = False individual_criterion_threshold: float = 6.0 enable_confidence_scoring: bool = True enable_detailed_feedback: bool = True stop_on_first_pass: bool = True context_truncation_length: int = 4000 min_improvement_delta: float = 0.5 # Criteria weights (must sum to 1.0) criteria_weights: Dict[str, float] = field(default_factory=lambda: { "faithfulness": 0.35, "completeness": 0.25, "accuracy": 0.20, "relevance": 0.10, "clarity": 0.05, "coherence": 0.05 }) def __post_init__(self): """Validate configuration""" # Ensure weights sum to 1.0 total_weight = sum(self.criteria_weights.values()) if not (0.99 <= total_weight <= 1.01): logger.warning( f"Criteria weights sum to {total_weight}, normalizing to 1.0" ) # Normalize weights for key in self.criteria_weights: self.criteria_weights[key] /= total_weight # ============================================================================= # Answer Verifier # ============================================================================= class AnswerVerifier: """Advanced answer quality verifier with multi-criteria evaluation This class evaluates generated answers across multiple quality dimensions, providing detailed feedback and confidence scores. It uses structured prompting to ensure consistent, reliable verification. Attributes: verifier_llm_func: LLM function for verification (typically GPT-4 or similar) config: VerificationConfig instance """ def __init__( self, verifier_llm_func: Callable, config: Optional[VerificationConfig] = None ): """Initialize AnswerVerifier Args: verifier_llm_func: LLM function for verification config: Configuration object, if None will use defaults """ self.verifier_llm_func = verifier_llm_func self.config = config or VerificationConfig() async def verify_answer( self, query: str, answer: str, context: str, original_query: Optional[str] = None ) -> Dict[str, Any]: """Verify answer quality across multiple criteria Args: query: Query used for generation (may be improved query) answer: Generated answer to verify context: Retrieved context used for generation original_query: Original user query (if different from query) Returns: Dictionary containing: - passed: Whether answer meets quality threshold - overall_score: Weighted average score (0-10) - criteria_scores: Individual scores per criterion - confidence: Confidence in verification (0-1) - feedback: Detailed evaluation feedback - issues: List of specific issues found - suggestions: Improvement suggestions - metadata: Additional verification metadata Example: ```python result = await verifier.verify_answer( query="What causes type 2 diabetes?", answer="Type 2 diabetes is caused by insulin resistance...", context="[Medical literature about diabetes]" ) if result['passed']: print(f"Answer quality: {result['overall_score']}/10") else: print(f"Issues: {result['issues']}") ``` """ if not answer or not answer.strip(): logger.warning("Empty answer provided for verification") return self._create_failed_result("Empty answer", 0.0) try: # Build verification prompt verification_prompt = self._build_verification_prompt( query=original_query or query, answer=answer, context=context ) # Call verifier LLM logger.debug("Calling verifier LLM for answer evaluation...") response = await self._call_verifier_safely(verification_prompt) if not response: logger.warning("Empty response from verifier LLM") return self._create_default_pass_result() # Parse verification response result = self._parse_verification_response(response) # Determine if answer passes result["passed"] = self._evaluate_pass_criteria(result) # Add confidence score if enabled if self.config.enable_confidence_scoring: result["confidence"] = self._calculate_confidence(result) logger.info( f"Verification complete: score={result['overall_score']:.2f}, " f"passed={result['passed']}" ) return result except Exception as e: logger.error(f"Error during answer verification: {e}", exc_info=True) return self._create_error_result(str(e)) def _build_verification_prompt( self, query: str, answer: str, context: str ) -> str: """Build structured verification prompt with JSON schema Args: query: Original query answer: Generated answer context: Retrieved context Returns: Formatted verification prompt """ # Truncate context if too long if len(context) > self.config.context_truncation_length: context = context[:self.config.context_truncation_length] + "\n\n[... context truncated ...]" # Build criteria descriptions criteria_desc = [] for criterion, weight in self.config.criteria_weights.items(): criteria_desc.append( f" - {criterion.capitalize()}: {self._get_criterion_description(criterion)} " f"(Weight: {weight*100:.0f}%)" ) criteria_text = "\n".join(criteria_desc) prompt = f"""Evaluate the following answer for quality and correctness. QUERY: {query} RETRIEVED CONTEXT: {context} GENERATED ANSWER: {answer} EVALUATION CRITERIA: {criteria_text} For each criterion, provide: 1. A score from 0-10 (0=completely fails, 10=perfect) 2. Specific evidence from the answer/context 3. Identified issues or strengths IMPORTANT INSTRUCTIONS: - Be critical and objective in your evaluation - Check if the answer is fully supported by the context (no hallucinations) - Verify factual accuracy against the context - Identify any missing information or incomplete aspects - Note any logical inconsistencies or unclear statements - Do not be lenient - high scores should be rare and well-deserved Respond with ONLY a valid JSON object in this exact format: {{ "faithfulness": {{ "score": <0-10>, "evidence": "", "issues": ["", ""] }}, "completeness": {{ "score": <0-10>, "evidence": "", "issues": [""] }}, "accuracy": {{ "score": <0-10>, "evidence": "", "issues": [] }}, "relevance": {{ "score": <0-10>, "evidence": "", "issues": [] }}, "clarity": {{ "score": <0-10>, "evidence": "", "issues": [] }}, "coherence": {{ "score": <0-10>, "evidence": "", "issues": [] }}, "overall_feedback": "", "critical_issues": ["", ""], "suggestions": ["", ""] }} DO NOT include any text before or after the JSON object. DO NOT use markdown code blocks.""" return prompt def _get_criterion_description(self, criterion: str) -> str: """Get description for each criterion Args: criterion: Criterion name Returns: Human-readable description """ descriptions = { "faithfulness": "Answer is fully supported by the context without hallucinations", "completeness": "Answer addresses all aspects of the query comprehensively", "accuracy": "Information is factually correct and precise", "relevance": "Answer directly addresses the query without tangents", "clarity": "Answer is well-structured, clear, and easy to understand", "coherence": "Answer is logically consistent without contradictions" } return descriptions.get(criterion, "Quality assessment") async def _call_verifier_safely(self, prompt: str) -> str: """Call verifier LLM with error handling Args: prompt: Verification prompt Returns: LLM response string """ try: system_prompt = """You are an expert answer evaluator for RAG systems. Your role is to critically assess answer quality across multiple dimensions. You must be: - Objective and unbiased - Critical and demanding (high scores are rare) - Specific and evidence-based in your feedback - Focused on factual accuracy and faithfulness to context - Able to identify subtle issues like hallucinations or incompleteness Always respond with a valid JSON object. Do not add explanations outside the JSON.""" if asyncio.iscoroutinefunction(self.verifier_llm_func): response = await self.verifier_llm_func( prompt=prompt, system_prompt=system_prompt, temperature=0.2, # Lower temperature for more consistent evaluation max_tokens=1500 ) else: response = self.verifier_llm_func( prompt=prompt, system_prompt=system_prompt, temperature=0.2, max_tokens=1500 ) return response except Exception as e: logger.error(f"Error calling verifier LLM: {e}", exc_info=True) raise def _parse_verification_response(self, response: str) -> Dict[str, Any]: """Parse verification response with robust error handling Args: response: LLM response string Returns: Parsed verification result """ try: # Clean response - remove markdown code blocks cleaned = self._clean_json_response(response) # Parse JSON data = json.loads(cleaned) # Extract criterion scores criteria_scores = {} all_issues = [] all_evidence = {} for criterion in self.config.criteria_weights.keys(): if criterion in data: criterion_data = data[criterion] if isinstance(criterion_data, dict): score = float(criterion_data.get("score", 5.0)) criteria_scores[criterion] = score all_evidence[criterion] = criterion_data.get("evidence", "") all_issues.extend(criterion_data.get("issues", [])) elif isinstance(criterion_data, (int, float)): criteria_scores[criterion] = float(criterion_data) else: # Default score if missing criteria_scores[criterion] = 5.0 # Calculate weighted overall score overall_score = sum( criteria_scores[k] * self.config.criteria_weights[k] for k in criteria_scores.keys() ) # Extract feedback and suggestions feedback = data.get("overall_feedback", "No detailed feedback provided") critical_issues = data.get("critical_issues", []) suggestions = data.get("suggestions", []) # Combine all issues all_issues.extend(critical_issues) all_issues = list(set(all_issues)) # Remove duplicates return { "overall_score": overall_score, "criteria_scores": criteria_scores, "feedback": feedback, "issues": all_issues, "suggestions": suggestions, "evidence": all_evidence, "metadata": { "response_parsed": True, "criteria_evaluated": len(criteria_scores) } } except json.JSONDecodeError as e: logger.warning(f"Failed to parse verification response as JSON: {e}") # Attempt to extract scores from text return self._fallback_parse(response) except Exception as e: logger.error(f"Error parsing verification response: {e}", exc_info=True) return self._fallback_parse(response) def _fallback_parse(self, response: str) -> Dict[str, Any]: """Fallback parsing when JSON parsing fails Args: response: Raw response text Returns: Best-effort parsed result """ # Try to extract any score from text scores_found = re.findall(r'(?:score|rating)[:\s]+(\d+(?:\.\d+)?)', response, re.IGNORECASE) if scores_found: # Use average of found scores avg_score = sum(float(s) for s in scores_found) / len(scores_found) else: avg_score = 5.0 # Neutral default return { "overall_score": avg_score, "criteria_scores": {k: avg_score for k in self.config.criteria_weights.keys()}, "feedback": response[:500], "issues": ["Failed to parse structured verification response"], "suggestions": [], "evidence": {}, "metadata": { "response_parsed": False, "fallback_used": True } } def _clean_json_response(self, response: str) -> str: """Clean JSON response by removing markdown and extra content Args: response: Raw LLM response Returns: Cleaned JSON string """ # Remove markdown code blocks cleaned = re.sub(r'```json\s*', '', response) cleaned = re.sub(r'```\s*', '', cleaned) # Extract JSON object (first complete {} block) json_match = re.search(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', cleaned, re.DOTALL) if json_match: cleaned = json_match.group(0) # Strip whitespace cleaned = cleaned.strip() return cleaned def _evaluate_pass_criteria(self, result: Dict[str, Any]) -> bool: """Determine if answer passes based on configuration Args: result: Verification result dictionary Returns: True if answer passes all criteria """ # Check overall score threshold if result["overall_score"] < self.config.verification_threshold: return False # If individual criteria must all pass if self.config.require_all_criteria_pass: for score in result["criteria_scores"].values(): if score < self.config.individual_criterion_threshold: return False return True def _calculate_confidence(self, result: Dict[str, Any]) -> float: """Calculate confidence score for verification Args: result: Verification result Returns: Confidence score (0-1) """ # Factors that affect confidence: # 1. Score variance (low variance = high confidence) # 2. Number of issues (few issues = high confidence) # 3. Whether response was properly parsed scores = list(result["criteria_scores"].values()) if not scores: return 0.5 # Calculate variance mean_score = sum(scores) / len(scores) variance = sum((s - mean_score) ** 2 for s in scores) / len(scores) # Low variance = high confidence variance_factor = max(0, 1 - (variance / 10)) # Few issues = high confidence num_issues = len(result.get("issues", [])) issues_factor = max(0, 1 - (num_issues * 0.1)) # Properly parsed = bonus confidence parse_factor = 1.0 if result.get("metadata", {}).get("response_parsed", False) else 0.8 # Combine factors confidence = (variance_factor * 0.3 + issues_factor * 0.4 + parse_factor * 0.3) return min(1.0, max(0.0, confidence)) def _create_failed_result(self, reason: str, score: float) -> Dict[str, Any]: """Create a failed verification result Args: reason: Failure reason score: Score to assign Returns: Failed result dictionary """ return { "passed": False, "overall_score": score, "criteria_scores": {k: score for k in self.config.criteria_weights.keys()}, "feedback": f"Verification failed: {reason}", "issues": [reason], "suggestions": [], "confidence": 0.0, "evidence": {}, "metadata": {"error": reason} } def _create_default_pass_result(self) -> Dict[str, Any]: """Create a default passing result (used when verifier fails) Returns: Default passing result """ threshold = self.config.verification_threshold return { "passed": True, "overall_score": threshold, "criteria_scores": {k: threshold for k in self.config.criteria_weights.keys()}, "feedback": "Verification completed with default scores (verifier unavailable)", "issues": [], "suggestions": [], "confidence": 0.5, "evidence": {}, "metadata": {"default_result": True} } def _create_error_result(self, error: str) -> Dict[str, Any]: """Create an error result (passes by default to avoid blocking) Args: error: Error message Returns: Error result dictionary """ threshold = self.config.verification_threshold return { "passed": True, # Pass by default on error "overall_score": threshold, "criteria_scores": {k: threshold for k in self.config.criteria_weights.keys()}, "feedback": f"Verification error: {error}", "issues": [], "suggestions": [], "confidence": 0.0, "evidence": {}, "metadata": {"error": error} } # ============================================================================= # Answer Modifier # ============================================================================= class AnswerModifier: """Answer modifier that improves answers based on verification feedback This class takes verification feedback and generates improved versions of answers, addressing identified issues while maintaining accuracy. Attributes: generator_llm_func: LLM function for answer modification config: VerificationConfig instance """ def __init__( self, generator_llm_func: Callable, config: Optional[VerificationConfig] = None ): """Initialize AnswerModifier Args: generator_llm_func: LLM function for modification config: Configuration object """ self.generator_llm_func = generator_llm_func self.config = config or VerificationConfig() async def modify_answer( self, query: str, answer: str, context: str, verification_result: Dict[str, Any] ) -> Dict[str, Any]: """Modify answer based on verification feedback Args: query: Original query answer: Answer that failed verification context: Retrieved context verification_result: Feedback from verifier Returns: Dictionary containing: - modified_answer: Improved answer - changes_made: List of changes - modification_successful: Whether modification completed - metadata: Additional metadata Example: ```python result = await modifier.modify_answer( query="What causes diabetes?", answer="Diabetes is caused by...", context="[Medical context]", verification_result=verification_feedback ) print(result['modified_answer']) ``` """ logger.info("Modifying answer based on verification feedback...") try: # Build modification prompt modification_prompt = self._build_modification_prompt( query=query, answer=answer, context=context, verification_result=verification_result ) # Generate improved answer response = await self._call_generator_safely(modification_prompt) if not response: logger.warning("Empty response from generator, returning original") return { "modified_answer": answer, "changes_made": [], "modification_successful": False, "metadata": {"error": "Empty response"} } # Extract modified answer modified_answer = self._extract_answer(response) # Analyze changes changes_made = self._identify_changes(answer, modified_answer) logger.info(f"Answer modification complete ({len(changes_made)} changes)") return { "modified_answer": modified_answer, "changes_made": changes_made, "modification_successful": True, "metadata": { "original_length": len(answer), "modified_length": len(modified_answer), "length_delta": len(modified_answer) - len(answer) } } except Exception as e: logger.error(f"Error during answer modification: {e}", exc_info=True) return { "modified_answer": answer, # Return original on error "changes_made": [], "modification_successful": False, "metadata": {"error": str(e)} } def _build_modification_prompt( self, query: str, answer: str, context: str, verification_result: Dict[str, Any] ) -> str: """Build modification prompt with detailed feedback Args: query: Original query answer: Current answer context: Retrieved context verification_result: Verification feedback Returns: Formatted modification prompt """ # Truncate context if needed if len(context) > self.config.context_truncation_length: context = context[:self.config.context_truncation_length] + "\n\n[... context truncated ...]" # Format issues and suggestions issues = verification_result.get("issues", []) suggestions = verification_result.get("suggestions", []) issues_text = "\n".join(f" - {issue}" for issue in issues) if issues else " - None identified" suggestions_text = "\n".join(f" - {sug}" for sug in suggestions) if suggestions else " - General improvement needed" # Format criterion scores criteria_scores = verification_result.get("criteria_scores", {}) scores_text = "\n".join( f" - {k.capitalize()}: {v:.1f}/10" for k, v in criteria_scores.items() ) prompt = f"""Improve the following answer based on verification feedback. QUERY: {query} REFERENCE CONTEXT: {context} CURRENT ANSWER: {answer} VERIFICATION FEEDBACK: Overall Score: {verification_result.get('overall_score', 0):.1f}/10 Threshold: {self.config.verification_threshold}/10 Criterion Scores: {scores_text} Identified Issues: {issues_text} Improvement Suggestions: {suggestions_text} Detailed Feedback: {verification_result.get('feedback', 'No additional feedback')} IMPROVEMENT INSTRUCTIONS: 1. Address ALL identified issues completely 2. Ensure EVERY statement is supported by the context (no hallucinations) 3. Be comprehensive - answer all aspects of the query 4. Maintain factual accuracy - verify all claims against context 5. Improve clarity and structure 6. Fix any logical inconsistencies IMPORTANT: - Only use information from the provided context - If context doesn't support a claim, remove it - Add missing information if present in context - Be specific and detailed while remaining concise - Do not apologize or explain changes - just provide the improved answer IMPROVED ANSWER:""" return prompt async def _call_generator_safely(self, prompt: str) -> str: """Call generator LLM with error handling Args: prompt: Modification prompt Returns: LLM response """ try: system_prompt = """You are an expert answer improver. Your task is to enhance answers based on verification feedback while maintaining strict factual accuracy. You must: - Only use information from the provided context - Address all identified issues - Maintain or improve answer quality - Be comprehensive yet concise - Never hallucinate or add unsupported information Provide ONLY the improved answer without explanations or preamble.""" if asyncio.iscoroutinefunction(self.generator_llm_func): response = await self.generator_llm_func( prompt=prompt, system_prompt=system_prompt, temperature=0.4, max_tokens=1500 ) else: response = self.generator_llm_func( prompt=prompt, system_prompt=system_prompt, temperature=0.4, max_tokens=1500 ) return response except Exception as e: logger.error(f"Error calling generator LLM: {e}", exc_info=True) raise def _extract_answer(self, response: str) -> str: """Extract answer from response, removing any preamble Args: response: LLM response Returns: Cleaned answer """ # Remove common preambles preambles = [ r'^(?:here is|here\'s|the)\s+(?:an?\s+)?improved answer:?\s*', r'^improved answer:?\s*', r'^answer:?\s*', ] cleaned = response for pattern in preambles: cleaned = re.sub(pattern, '', cleaned, flags=re.IGNORECASE) return cleaned.strip() def _identify_changes(self, original: str, modified: str) -> List[str]: """Identify high-level changes between answers Args: original: Original answer modified: Modified answer Returns: List of change descriptions """ changes = [] # Length changes len_diff = len(modified) - len(original) if len_diff > 50: changes.append(f"Expanded answer (+{len_diff} characters)") elif len_diff < -50: changes.append(f"Condensed answer ({len_diff} characters)") # Word count changes orig_words = len(original.split()) mod_words = len(modified.split()) word_diff = mod_words - orig_words if word_diff > 10: changes.append(f"Added {word_diff} words") elif word_diff < -10: changes.append(f"Removed {abs(word_diff)} words") # Structural changes orig_sentences = len(re.findall(r'[.!?]+', original)) mod_sentences = len(re.findall(r'[.!?]+', modified)) if mod_sentences > orig_sentences: changes.append(f"Improved structure ({mod_sentences - orig_sentences} more sentences)") # If significantly different if len_diff == 0 and word_diff == 0: changes.append("Minor refinements") elif not changes: changes.append("Modified answer content") return changes # ============================================================================= # Dual-LLM Pipeline # ============================================================================= class DualLLMPipeline: """Complete dual-LLM verification and modification pipeline This class orchestrates the full verification-modification loop, coordinating between verifier and modifier until answer quality meets standards or maximum iterations are reached. Attributes: verifier: AnswerVerifier instance modifier: AnswerModifier instance config: VerificationConfig instance """ def __init__( self, generator_llm: Callable, verifier_llm: Callable, config: Optional[VerificationConfig] = None ): """Initialize DualLLMPipeline Args: generator_llm: LLM function for answer generation/modification verifier_llm: LLM function for verification (typically more powerful) config: Configuration object """ self.config = config or VerificationConfig() self.verifier = AnswerVerifier(verifier_llm, self.config) self.modifier = AnswerModifier(generator_llm, self.config) async def process_answer( self, query: str, answer: str, context: str, max_iterations: Optional[int] = None ) -> Dict[str, Any]: """Process answer through verification-modification loop Args: query: Original query answer: Initial generated answer context: Retrieved context max_iterations: Override config max iterations Returns: Dictionary containing: - final_answer: Best answer after iterations - final_score: Final verification score - passed: Whether final answer passed - total_iterations: Number of iterations performed - iteration_history: Detailed history of all iterations - improvement_delta: Score improvement from first to last - metadata: Additional processing metadata Example: ```python result = await pipeline.process_answer( query="What is photosynthesis?", answer="Photosynthesis is a process...", context="[Biology context about photosynthesis]" ) print(f"Final answer (score {result['final_score']}/10):") print(result['final_answer']) print(f"\\nImprovement: +{result['improvement_delta']:.1f} points") ``` """ max_iter = max_iterations or self.config.max_modification_iterations logger.info(f"Starting dual-LLM pipeline (max {max_iter} iterations)...") # Initialize tracking iteration_history = [] current_answer = answer iteration = 0 # Verify initial answer verification_result = await self.verifier.verify_answer( query=query, answer=current_answer, context=context ) initial_score = verification_result["overall_score"] iteration_history.append({ "iteration": 0, "answer": current_answer, "verification": verification_result, "modification": None }) logger.info( f"Initial verification: score={initial_score:.2f}, " f"passed={verification_result['passed']}" ) # If passed and stop_on_first_pass, we're done if verification_result["passed"] and self.config.stop_on_first_pass: logger.info("Answer passed verification on first attempt") return self._create_result( final_answer=current_answer, final_verification=verification_result, iteration_history=iteration_history, initial_score=initial_score ) # Modification loop previous_score = initial_score while iteration < max_iter: iteration += 1 # Check if we should continue if verification_result["passed"] and self.config.stop_on_first_pass: logger.info(f"Answer passed verification after {iteration-1} modifications") break # Check for minimal improvement if iteration > 1: score_improvement = verification_result["overall_score"] - previous_score if score_improvement < self.config.min_improvement_delta: logger.info( f"Minimal improvement detected ({score_improvement:.2f}), " "stopping iterations" ) break previous_score = verification_result["overall_score"] logger.info(f"Iteration {iteration}: Modifying answer...") # Modify answer modification_result = await self.modifier.modify_answer( query=query, answer=current_answer, context=context, verification_result=verification_result ) if not modification_result["modification_successful"]: logger.warning("Modification failed, using previous answer") break current_answer = modification_result["modified_answer"] # Verify modified answer verification_result = await self.verifier.verify_answer( query=query, answer=current_answer, context=context ) logger.info( f"Iteration {iteration} verification: score={verification_result['overall_score']:.2f}, " f"passed={verification_result['passed']}" ) # Record iteration iteration_history.append({ "iteration": iteration, "answer": current_answer, "verification": verification_result, "modification": modification_result }) # Create final result return self._create_result( final_answer=current_answer, final_verification=verification_result, iteration_history=iteration_history, initial_score=initial_score ) def _create_result( self, final_answer: str, final_verification: Dict[str, Any], iteration_history: List[Dict[str, Any]], initial_score: float ) -> Dict[str, Any]: """Create final pipeline result Args: final_answer: Final answer string final_verification: Final verification result iteration_history: Complete iteration history initial_score: Initial verification score Returns: Complete result dictionary """ final_score = final_verification["overall_score"] improvement_delta = final_score - initial_score return { "final_answer": final_answer, "final_score": final_score, "passed": final_verification["passed"], "total_iterations": len(iteration_history) - 1, "iteration_history": iteration_history, "improvement_delta": improvement_delta, "confidence": final_verification.get("confidence", 0.0), "metadata": { "initial_score": initial_score, "final_score": final_score, "improvement_percentage": (improvement_delta / max(initial_score, 0.1)) * 100, "threshold": self.config.verification_threshold, "max_iterations_reached": len(iteration_history) - 1 >= self.config.max_modification_iterations } } # ============================================================================= # Mixin Class for Integration # ============================================================================= class DualLLMVerificationMixin: """ Mixin providing dual-LLM verification functionality to RAGAnything This mixin adds answer verification and modification capabilities using a two-LLM approach: 1. Generator LLM creates the initial answer 2. Verifier LLM evaluates answer quality across multiple criteria 3. Modifier LLM improves the answer based on verification feedback 4. Process repeats until answer passes verification or max iterations reached The mixin expects the following attributes to be present: - self.answer_verifier: AnswerVerifier instance (optional) - self.answer_modifier: AnswerModifier instance (optional) - self.verification_pipeline: DualLLMPipeline instance (optional) - self.lightrag: LightRAG instance for answer generation - self.config: RAGAnythingConfig instance - self.logger: Logger instance """ async def _generate_with_verification( self, query: str, context: str, original_query: Optional[str] = None ) -> Dict[str, Any]: """ Generate answer with dual-LLM verification This method generates an answer and then verifies it using a separate verifier LLM. If the answer doesn't pass verification, it can be iteratively improved based on feedback. Args: query: The query to answer (possibly improved) context: Retrieved context from RAG system original_query: Original user query before improvement (optional) Returns: Dict containing: - answer: The final verified answer - verification_passed: Whether verification passed - verification_score: Overall quality score (0-10) - modification_attempts: Number of modification iterations - verification_history: List of verification results per iteration - final_criteria_scores: Scores for each criterion - confidence: Confidence in the verification - metadata: Additional verification metadata Example: result = await self._generate_with_verification( query="What is the treatment for hypertension?", context="Retrieved medical context...", original_query="What is HTN treatment?" ) # result might be: # { # 'answer': 'Hypertension treatment includes...', # 'verification_passed': True, # 'verification_score': 8.5, # 'modification_attempts': 1, # 'confidence': 0.92 # } """ # Check if verification pipeline is available if not hasattr(self, 'verification_pipeline') or self.verification_pipeline is None: # Fall back to simple answer verifier if available if hasattr(self, 'answer_verifier') and self.answer_verifier is not None: return await self._verify_answer_only(query, context, original_query) else: # No verification available, generate answer without verification if hasattr(self, 'logger'): self.logger.debug( "Verification pipeline not initialized, generating without verification" ) return await self._generate_without_verification(query, context, original_query) try: if hasattr(self, 'logger'): self.logger.info( f"Generating answer with verification (query: '{query[:50]}...')" ) # Use verification pipeline for full verification-modification loop verification_result = await self.verification_pipeline.process_answer( query=query, answer=None, # Pipeline will generate initial answer context=context, max_iterations=getattr( self.config, 'max_verification_iterations', getattr(self.config, 'max_verification_retries', 2) ) if hasattr(self, 'config') else 2 ) if hasattr(self, 'logger'): self.logger.info( f"Verification complete: passed={verification_result.get('passed', False)}, " f"score={verification_result.get('final_score', 0):.2f}, " f"iterations={verification_result.get('total_iterations', 0)}" ) # Format result for consistent return structure return { 'answer': verification_result.get('final_answer', ''), 'verification_passed': verification_result.get('passed', False), 'verification_score': verification_result.get('final_score', 0), 'modification_attempts': verification_result.get('total_iterations', 0), 'verification_history': verification_result.get('iteration_history', []), 'final_criteria_scores': verification_result.get('iteration_history', [{}])[-1].get('criteria_scores', {}) if verification_result.get('iteration_history') else {}, 'confidence': verification_result.get('iteration_history', [{}])[-1].get('confidence', 0) if verification_result.get('iteration_history') else 0, 'improvement_delta': verification_result.get('improvement_delta', 0), 'metadata': { 'original_query': original_query or query, 'improved_query': query, 'verification_method': 'dual_llm_pipeline' } } except Exception as e: if hasattr(self, 'logger'): self.logger.error(f"Error in verification pipeline: {e}", exc_info=True) # Fall back to unverified answer generation return await self._generate_without_verification(query, context, original_query) async def _verify_answer_only( self, query: str, context: str, original_query: Optional[str] = None ) -> Dict[str, Any]: """ Verify answer without modification (verifier available but no modifier) Args: query: The query to answer context: Retrieved context original_query: Original query before improvement Returns: Dict with verification results (but no iterative improvement) """ try: # Generate initial answer using LightRAG answer = await self._generate_answer_from_context(query, context) if hasattr(self, 'logger'): self.logger.info("Verifying answer (modification disabled)") # Verify the answer verification_result = await self.answer_verifier.verify_answer( query=query, answer=answer, context=context, original_query=original_query ) return { 'answer': answer, 'verification_passed': verification_result.get('passed', False), 'verification_score': verification_result.get('overall_score', 0), 'modification_attempts': 0, 'verification_history': [verification_result], 'final_criteria_scores': verification_result.get('criteria_scores', {}), 'confidence': verification_result.get('confidence', 0), 'metadata': { 'original_query': original_query or query, 'improved_query': query, 'verification_method': 'verify_only', 'note': 'Answer modification not enabled' } } except Exception as e: if hasattr(self, 'logger'): self.logger.error(f"Error in answer verification: {e}", exc_info=True) # Fall back to unverified answer answer = await self._generate_answer_from_context(query, context) return { 'answer': answer, 'verification_passed': True, 'verification_score': 10.0, 'modification_attempts': 0, 'metadata': {'error': str(e), 'verification_method': 'none'} } async def _generate_without_verification( self, query: str, context: str, original_query: Optional[str] = None ) -> Dict[str, Any]: """ Generate answer without verification (fallback method) Args: query: The query to answer context: Retrieved context original_query: Original query before improvement Returns: Dict with answer but no verification info """ try: if hasattr(self, 'logger'): self.logger.debug("Generating answer without verification") answer = await self._generate_answer_from_context(query, context) return { 'answer': answer, 'verification_passed': True, 'verification_score': 10.0, 'modification_attempts': 0, 'metadata': { 'original_query': original_query or query, 'improved_query': query, 'verification_method': 'none', 'note': 'Verification not enabled' } } except Exception as e: if hasattr(self, 'logger'): self.logger.error(f"Error generating answer: {e}", exc_info=True) return { 'answer': f"Error generating answer: {str(e)}", 'verification_passed': False, 'verification_score': 0, 'modification_attempts': 0, 'metadata': {'error': str(e)} } async def _generate_answer_from_context( self, query: str, context: str ) -> str: """ Generate answer from query and context using LightRAG Args: query: The query context: Retrieved context Returns: Generated answer string """ # Check if LightRAG is available if not hasattr(self, 'lightrag') or self.lightrag is None: if hasattr(self, 'logger'): self.logger.warning("LightRAG not available for answer generation") return "Unable to generate answer: LightRAG not initialized" try: # Use LightRAG to generate answer from context from lightrag import QueryParam # Generate answer using the context query_param = QueryParam(mode="mix") answer = await self.lightrag.aquery(query, param=query_param) return answer except Exception as e: if hasattr(self, 'logger'): self.logger.error(f"Error generating answer from context: {e}", exc_info=True) return f"Error generating answer: {str(e)}"