| """ |
| Dual-LLM Verification and Answer Modification Module for RAG-Anything |
| |
| This module implements a sophisticated two-stage verification system where: |
| 1. A generator LLM produces an initial answer |
| 2. A verifier LLM (typically more powerful) evaluates answer quality |
| 3. If quality is below threshold, a modifier improves the answer iteratively |
| |
| The system prevents hallucinations, improves factual consistency, and ensures |
| high-quality responses through systematic verification and refinement. |
| |
| Usage Example: |
| ```python |
| from raganything.verification import ( |
| AnswerVerifier, |
| AnswerModifier, |
| DualLLMPipeline, |
| VerificationConfig |
| ) |
| |
| # Initialize configuration |
| config = VerificationConfig( |
| verification_threshold=7.5, |
| max_modification_iterations=3, |
| require_all_criteria_pass=False |
| ) |
| |
| # Create pipeline |
| pipeline = DualLLMPipeline( |
| generator_llm=generator_func, |
| verifier_llm=verifier_func, |
| config=config |
| ) |
| |
| # Process answer with verification |
| result = await pipeline.process_answer( |
| query="What causes diabetes?", |
| answer="Diabetes is caused by...", |
| context="[Retrieved context about diabetes]" |
| ) |
| |
| print(f"Final answer: {result['final_answer']}") |
| print(f"Quality score: {result['final_score']}/10") |
| print(f"Iterations: {result['total_iterations']}") |
| ``` |
| |
| Author: RAG-Anything Team |
| Version: 2.0.0 |
| """ |
|
|
| from __future__ import annotations |
|
|
| import re |
| import json |
| import asyncio |
| from typing import Dict, List, Any, Optional, Callable |
| from dataclasses import dataclass, field |
| from enum import Enum |
| from lightrag.utils import logger |
|
|
|
|
| |
| |
| |
|
|
| class VerificationCriterion(Enum): |
| """Verification criteria for answer quality assessment""" |
| FAITHFULNESS = "faithfulness" |
| COMPLETENESS = "completeness" |
| ACCURACY = "accuracy" |
| CLARITY = "clarity" |
| RELEVANCE = "relevance" |
| COHERENCE = "coherence" |
|
|
|
|
| @dataclass |
| class VerificationConfig: |
| """Configuration for dual-LLM verification system |
| |
| Attributes: |
| verification_threshold: Minimum score (0-10) for answer to pass |
| max_modification_iterations: Maximum number of improvement attempts |
| require_all_criteria_pass: Whether all criteria must pass individually |
| individual_criterion_threshold: Min score per criterion if required |
| enable_confidence_scoring: Enable probabilistic confidence estimation |
| enable_detailed_feedback: Generate detailed improvement suggestions |
| stop_on_first_pass: Stop iterations when answer first passes |
| criteria_weights: Custom weights for each criterion (must sum to 1.0) |
| context_truncation_length: Max context chars to send to verifier |
| min_improvement_delta: Minimum score improvement to continue iterations |
| """ |
|
|
| verification_threshold: float = 7.0 |
| max_modification_iterations: int = 2 |
| require_all_criteria_pass: bool = False |
| individual_criterion_threshold: float = 6.0 |
| enable_confidence_scoring: bool = True |
| enable_detailed_feedback: bool = True |
| stop_on_first_pass: bool = True |
| context_truncation_length: int = 4000 |
| min_improvement_delta: float = 0.5 |
|
|
| |
| criteria_weights: Dict[str, float] = field(default_factory=lambda: { |
| "faithfulness": 0.35, |
| "completeness": 0.25, |
| "accuracy": 0.20, |
| "relevance": 0.10, |
| "clarity": 0.05, |
| "coherence": 0.05 |
| }) |
|
|
| def __post_init__(self): |
| """Validate configuration""" |
| |
| total_weight = sum(self.criteria_weights.values()) |
| if not (0.99 <= total_weight <= 1.01): |
| logger.warning( |
| f"Criteria weights sum to {total_weight}, normalizing to 1.0" |
| ) |
| |
| for key in self.criteria_weights: |
| self.criteria_weights[key] /= total_weight |
|
|
|
|
| |
| |
| |
|
|
| class AnswerVerifier: |
| """Advanced answer quality verifier with multi-criteria evaluation |
| |
| This class evaluates generated answers across multiple quality dimensions, |
| providing detailed feedback and confidence scores. It uses structured |
| prompting to ensure consistent, reliable verification. |
| |
| Attributes: |
| verifier_llm_func: LLM function for verification (typically GPT-4 or similar) |
| config: VerificationConfig instance |
| """ |
|
|
| def __init__( |
| self, |
| verifier_llm_func: Callable, |
| config: Optional[VerificationConfig] = None |
| ): |
| """Initialize AnswerVerifier |
| |
| Args: |
| verifier_llm_func: LLM function for verification |
| config: Configuration object, if None will use defaults |
| """ |
| self.verifier_llm_func = verifier_llm_func |
| self.config = config or VerificationConfig() |
|
|
| async def verify_answer( |
| self, |
| query: str, |
| answer: str, |
| context: str, |
| original_query: Optional[str] = None |
| ) -> Dict[str, Any]: |
| """Verify answer quality across multiple criteria |
| |
| Args: |
| query: Query used for generation (may be improved query) |
| answer: Generated answer to verify |
| context: Retrieved context used for generation |
| original_query: Original user query (if different from query) |
| |
| Returns: |
| Dictionary containing: |
| - passed: Whether answer meets quality threshold |
| - overall_score: Weighted average score (0-10) |
| - criteria_scores: Individual scores per criterion |
| - confidence: Confidence in verification (0-1) |
| - feedback: Detailed evaluation feedback |
| - issues: List of specific issues found |
| - suggestions: Improvement suggestions |
| - metadata: Additional verification metadata |
| |
| Example: |
| ```python |
| result = await verifier.verify_answer( |
| query="What causes type 2 diabetes?", |
| answer="Type 2 diabetes is caused by insulin resistance...", |
| context="[Medical literature about diabetes]" |
| ) |
| if result['passed']: |
| print(f"Answer quality: {result['overall_score']}/10") |
| else: |
| print(f"Issues: {result['issues']}") |
| ``` |
| """ |
| if not answer or not answer.strip(): |
| logger.warning("Empty answer provided for verification") |
| return self._create_failed_result("Empty answer", 0.0) |
|
|
| try: |
| |
| verification_prompt = self._build_verification_prompt( |
| query=original_query or query, |
| answer=answer, |
| context=context |
| ) |
|
|
| |
| logger.debug("Calling verifier LLM for answer evaluation...") |
| response = await self._call_verifier_safely(verification_prompt) |
|
|
| if not response: |
| logger.warning("Empty response from verifier LLM") |
| return self._create_default_pass_result() |
|
|
| |
| result = self._parse_verification_response(response) |
|
|
| |
| result["passed"] = self._evaluate_pass_criteria(result) |
|
|
| |
| if self.config.enable_confidence_scoring: |
| result["confidence"] = self._calculate_confidence(result) |
|
|
| logger.info( |
| f"Verification complete: score={result['overall_score']:.2f}, " |
| f"passed={result['passed']}" |
| ) |
|
|
| return result |
|
|
| except Exception as e: |
| logger.error(f"Error during answer verification: {e}", exc_info=True) |
| return self._create_error_result(str(e)) |
|
|
| def _build_verification_prompt( |
| self, |
| query: str, |
| answer: str, |
| context: str |
| ) -> str: |
| """Build structured verification prompt with JSON schema |
| |
| Args: |
| query: Original query |
| answer: Generated answer |
| context: Retrieved context |
| |
| Returns: |
| Formatted verification prompt |
| """ |
| |
| if len(context) > self.config.context_truncation_length: |
| context = context[:self.config.context_truncation_length] + "\n\n[... context truncated ...]" |
|
|
| |
| criteria_desc = [] |
| for criterion, weight in self.config.criteria_weights.items(): |
| criteria_desc.append( |
| f" - {criterion.capitalize()}: {self._get_criterion_description(criterion)} " |
| f"(Weight: {weight*100:.0f}%)" |
| ) |
| criteria_text = "\n".join(criteria_desc) |
|
|
| prompt = f"""Evaluate the following answer for quality and correctness. |
| |
| QUERY: |
| {query} |
| |
| RETRIEVED CONTEXT: |
| {context} |
| |
| GENERATED ANSWER: |
| {answer} |
| |
| EVALUATION CRITERIA: |
| {criteria_text} |
| |
| For each criterion, provide: |
| 1. A score from 0-10 (0=completely fails, 10=perfect) |
| 2. Specific evidence from the answer/context |
| 3. Identified issues or strengths |
| |
| IMPORTANT INSTRUCTIONS: |
| - Be critical and objective in your evaluation |
| - Check if the answer is fully supported by the context (no hallucinations) |
| - Verify factual accuracy against the context |
| - Identify any missing information or incomplete aspects |
| - Note any logical inconsistencies or unclear statements |
| - Do not be lenient - high scores should be rare and well-deserved |
| |
| Respond with ONLY a valid JSON object in this exact format: |
| {{ |
| "faithfulness": {{ |
| "score": <0-10>, |
| "evidence": "<specific quote or observation>", |
| "issues": ["<issue 1>", "<issue 2>"] |
| }}, |
| "completeness": {{ |
| "score": <0-10>, |
| "evidence": "<specific quote or observation>", |
| "issues": ["<issue 1>"] |
| }}, |
| "accuracy": {{ |
| "score": <0-10>, |
| "evidence": "<specific quote or observation>", |
| "issues": [] |
| }}, |
| "relevance": {{ |
| "score": <0-10>, |
| "evidence": "<specific quote or observation>", |
| "issues": [] |
| }}, |
| "clarity": {{ |
| "score": <0-10>, |
| "evidence": "<specific quote or observation>", |
| "issues": [] |
| }}, |
| "coherence": {{ |
| "score": <0-10>, |
| "evidence": "<specific quote or observation>", |
| "issues": [] |
| }}, |
| "overall_feedback": "<comprehensive evaluation summary>", |
| "critical_issues": ["<critical issue 1>", "<critical issue 2>"], |
| "suggestions": ["<improvement suggestion 1>", "<improvement suggestion 2>"] |
| }} |
| |
| DO NOT include any text before or after the JSON object. DO NOT use markdown code blocks.""" |
|
|
| return prompt |
|
|
| def _get_criterion_description(self, criterion: str) -> str: |
| """Get description for each criterion |
| |
| Args: |
| criterion: Criterion name |
| |
| Returns: |
| Human-readable description |
| """ |
| descriptions = { |
| "faithfulness": "Answer is fully supported by the context without hallucinations", |
| "completeness": "Answer addresses all aspects of the query comprehensively", |
| "accuracy": "Information is factually correct and precise", |
| "relevance": "Answer directly addresses the query without tangents", |
| "clarity": "Answer is well-structured, clear, and easy to understand", |
| "coherence": "Answer is logically consistent without contradictions" |
| } |
| return descriptions.get(criterion, "Quality assessment") |
|
|
| async def _call_verifier_safely(self, prompt: str) -> str: |
| """Call verifier LLM with error handling |
| |
| Args: |
| prompt: Verification prompt |
| |
| Returns: |
| LLM response string |
| """ |
| try: |
| system_prompt = """You are an expert answer evaluator for RAG systems. Your role is to critically assess answer quality across multiple dimensions. |
| |
| You must be: |
| - Objective and unbiased |
| - Critical and demanding (high scores are rare) |
| - Specific and evidence-based in your feedback |
| - Focused on factual accuracy and faithfulness to context |
| - Able to identify subtle issues like hallucinations or incompleteness |
| |
| Always respond with a valid JSON object. Do not add explanations outside the JSON.""" |
|
|
| if asyncio.iscoroutinefunction(self.verifier_llm_func): |
| response = await self.verifier_llm_func( |
| prompt=prompt, |
| system_prompt=system_prompt, |
| temperature=0.2, |
| max_tokens=1500 |
| ) |
| else: |
| response = self.verifier_llm_func( |
| prompt=prompt, |
| system_prompt=system_prompt, |
| temperature=0.2, |
| max_tokens=1500 |
| ) |
|
|
| return response |
|
|
| except Exception as e: |
| logger.error(f"Error calling verifier LLM: {e}", exc_info=True) |
| raise |
|
|
| def _parse_verification_response(self, response: str) -> Dict[str, Any]: |
| """Parse verification response with robust error handling |
| |
| Args: |
| response: LLM response string |
| |
| Returns: |
| Parsed verification result |
| """ |
| try: |
| |
| cleaned = self._clean_json_response(response) |
|
|
| |
| data = json.loads(cleaned) |
|
|
| |
| criteria_scores = {} |
| all_issues = [] |
| all_evidence = {} |
|
|
| for criterion in self.config.criteria_weights.keys(): |
| if criterion in data: |
| criterion_data = data[criterion] |
| if isinstance(criterion_data, dict): |
| score = float(criterion_data.get("score", 5.0)) |
| criteria_scores[criterion] = score |
| all_evidence[criterion] = criterion_data.get("evidence", "") |
| all_issues.extend(criterion_data.get("issues", [])) |
| elif isinstance(criterion_data, (int, float)): |
| criteria_scores[criterion] = float(criterion_data) |
| else: |
| |
| criteria_scores[criterion] = 5.0 |
|
|
| |
| overall_score = sum( |
| criteria_scores[k] * self.config.criteria_weights[k] |
| for k in criteria_scores.keys() |
| ) |
|
|
| |
| feedback = data.get("overall_feedback", "No detailed feedback provided") |
| critical_issues = data.get("critical_issues", []) |
| suggestions = data.get("suggestions", []) |
|
|
| |
| all_issues.extend(critical_issues) |
| all_issues = list(set(all_issues)) |
|
|
| return { |
| "overall_score": overall_score, |
| "criteria_scores": criteria_scores, |
| "feedback": feedback, |
| "issues": all_issues, |
| "suggestions": suggestions, |
| "evidence": all_evidence, |
| "metadata": { |
| "response_parsed": True, |
| "criteria_evaluated": len(criteria_scores) |
| } |
| } |
|
|
| except json.JSONDecodeError as e: |
| logger.warning(f"Failed to parse verification response as JSON: {e}") |
| |
| return self._fallback_parse(response) |
|
|
| except Exception as e: |
| logger.error(f"Error parsing verification response: {e}", exc_info=True) |
| return self._fallback_parse(response) |
|
|
| def _fallback_parse(self, response: str) -> Dict[str, Any]: |
| """Fallback parsing when JSON parsing fails |
| |
| Args: |
| response: Raw response text |
| |
| Returns: |
| Best-effort parsed result |
| """ |
| |
| scores_found = re.findall(r'(?:score|rating)[:\s]+(\d+(?:\.\d+)?)', response, re.IGNORECASE) |
|
|
| if scores_found: |
| |
| avg_score = sum(float(s) for s in scores_found) / len(scores_found) |
| else: |
| avg_score = 5.0 |
|
|
| return { |
| "overall_score": avg_score, |
| "criteria_scores": {k: avg_score for k in self.config.criteria_weights.keys()}, |
| "feedback": response[:500], |
| "issues": ["Failed to parse structured verification response"], |
| "suggestions": [], |
| "evidence": {}, |
| "metadata": { |
| "response_parsed": False, |
| "fallback_used": True |
| } |
| } |
|
|
| def _clean_json_response(self, response: str) -> str: |
| """Clean JSON response by removing markdown and extra content |
| |
| Args: |
| response: Raw LLM response |
| |
| Returns: |
| Cleaned JSON string |
| """ |
| |
| cleaned = re.sub(r'```json\s*', '', response) |
| cleaned = re.sub(r'```\s*', '', cleaned) |
|
|
| |
| json_match = re.search(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', cleaned, re.DOTALL) |
| if json_match: |
| cleaned = json_match.group(0) |
|
|
| |
| cleaned = cleaned.strip() |
|
|
| return cleaned |
|
|
| def _evaluate_pass_criteria(self, result: Dict[str, Any]) -> bool: |
| """Determine if answer passes based on configuration |
| |
| Args: |
| result: Verification result dictionary |
| |
| Returns: |
| True if answer passes all criteria |
| """ |
| |
| if result["overall_score"] < self.config.verification_threshold: |
| return False |
|
|
| |
| if self.config.require_all_criteria_pass: |
| for score in result["criteria_scores"].values(): |
| if score < self.config.individual_criterion_threshold: |
| return False |
|
|
| return True |
|
|
| def _calculate_confidence(self, result: Dict[str, Any]) -> float: |
| """Calculate confidence score for verification |
| |
| Args: |
| result: Verification result |
| |
| Returns: |
| Confidence score (0-1) |
| """ |
| |
| |
| |
| |
|
|
| scores = list(result["criteria_scores"].values()) |
|
|
| if not scores: |
| return 0.5 |
|
|
| |
| mean_score = sum(scores) / len(scores) |
| variance = sum((s - mean_score) ** 2 for s in scores) / len(scores) |
|
|
| |
| variance_factor = max(0, 1 - (variance / 10)) |
|
|
| |
| num_issues = len(result.get("issues", [])) |
| issues_factor = max(0, 1 - (num_issues * 0.1)) |
|
|
| |
| parse_factor = 1.0 if result.get("metadata", {}).get("response_parsed", False) else 0.8 |
|
|
| |
| confidence = (variance_factor * 0.3 + issues_factor * 0.4 + parse_factor * 0.3) |
|
|
| return min(1.0, max(0.0, confidence)) |
|
|
| def _create_failed_result(self, reason: str, score: float) -> Dict[str, Any]: |
| """Create a failed verification result |
| |
| Args: |
| reason: Failure reason |
| score: Score to assign |
| |
| Returns: |
| Failed result dictionary |
| """ |
| return { |
| "passed": False, |
| "overall_score": score, |
| "criteria_scores": {k: score for k in self.config.criteria_weights.keys()}, |
| "feedback": f"Verification failed: {reason}", |
| "issues": [reason], |
| "suggestions": [], |
| "confidence": 0.0, |
| "evidence": {}, |
| "metadata": {"error": reason} |
| } |
|
|
| def _create_default_pass_result(self) -> Dict[str, Any]: |
| """Create a default passing result (used when verifier fails) |
| |
| Returns: |
| Default passing result |
| """ |
| threshold = self.config.verification_threshold |
| return { |
| "passed": True, |
| "overall_score": threshold, |
| "criteria_scores": {k: threshold for k in self.config.criteria_weights.keys()}, |
| "feedback": "Verification completed with default scores (verifier unavailable)", |
| "issues": [], |
| "suggestions": [], |
| "confidence": 0.5, |
| "evidence": {}, |
| "metadata": {"default_result": True} |
| } |
|
|
| def _create_error_result(self, error: str) -> Dict[str, Any]: |
| """Create an error result (passes by default to avoid blocking) |
| |
| Args: |
| error: Error message |
| |
| Returns: |
| Error result dictionary |
| """ |
| threshold = self.config.verification_threshold |
| return { |
| "passed": True, |
| "overall_score": threshold, |
| "criteria_scores": {k: threshold for k in self.config.criteria_weights.keys()}, |
| "feedback": f"Verification error: {error}", |
| "issues": [], |
| "suggestions": [], |
| "confidence": 0.0, |
| "evidence": {}, |
| "metadata": {"error": error} |
| } |
|
|
|
|
| |
| |
| |
|
|
| class AnswerModifier: |
| """Answer modifier that improves answers based on verification feedback |
| |
| This class takes verification feedback and generates improved versions |
| of answers, addressing identified issues while maintaining accuracy. |
| |
| Attributes: |
| generator_llm_func: LLM function for answer modification |
| config: VerificationConfig instance |
| """ |
|
|
| def __init__( |
| self, |
| generator_llm_func: Callable, |
| config: Optional[VerificationConfig] = None |
| ): |
| """Initialize AnswerModifier |
| |
| Args: |
| generator_llm_func: LLM function for modification |
| config: Configuration object |
| """ |
| self.generator_llm_func = generator_llm_func |
| self.config = config or VerificationConfig() |
|
|
| async def modify_answer( |
| self, |
| query: str, |
| answer: str, |
| context: str, |
| verification_result: Dict[str, Any] |
| ) -> Dict[str, Any]: |
| """Modify answer based on verification feedback |
| |
| Args: |
| query: Original query |
| answer: Answer that failed verification |
| context: Retrieved context |
| verification_result: Feedback from verifier |
| |
| Returns: |
| Dictionary containing: |
| - modified_answer: Improved answer |
| - changes_made: List of changes |
| - modification_successful: Whether modification completed |
| - metadata: Additional metadata |
| |
| Example: |
| ```python |
| result = await modifier.modify_answer( |
| query="What causes diabetes?", |
| answer="Diabetes is caused by...", |
| context="[Medical context]", |
| verification_result=verification_feedback |
| ) |
| print(result['modified_answer']) |
| ``` |
| """ |
| logger.info("Modifying answer based on verification feedback...") |
|
|
| try: |
| |
| modification_prompt = self._build_modification_prompt( |
| query=query, |
| answer=answer, |
| context=context, |
| verification_result=verification_result |
| ) |
|
|
| |
| response = await self._call_generator_safely(modification_prompt) |
|
|
| if not response: |
| logger.warning("Empty response from generator, returning original") |
| return { |
| "modified_answer": answer, |
| "changes_made": [], |
| "modification_successful": False, |
| "metadata": {"error": "Empty response"} |
| } |
|
|
| |
| modified_answer = self._extract_answer(response) |
|
|
| |
| changes_made = self._identify_changes(answer, modified_answer) |
|
|
| logger.info(f"Answer modification complete ({len(changes_made)} changes)") |
|
|
| return { |
| "modified_answer": modified_answer, |
| "changes_made": changes_made, |
| "modification_successful": True, |
| "metadata": { |
| "original_length": len(answer), |
| "modified_length": len(modified_answer), |
| "length_delta": len(modified_answer) - len(answer) |
| } |
| } |
|
|
| except Exception as e: |
| logger.error(f"Error during answer modification: {e}", exc_info=True) |
| return { |
| "modified_answer": answer, |
| "changes_made": [], |
| "modification_successful": False, |
| "metadata": {"error": str(e)} |
| } |
|
|
| def _build_modification_prompt( |
| self, |
| query: str, |
| answer: str, |
| context: str, |
| verification_result: Dict[str, Any] |
| ) -> str: |
| """Build modification prompt with detailed feedback |
| |
| Args: |
| query: Original query |
| answer: Current answer |
| context: Retrieved context |
| verification_result: Verification feedback |
| |
| Returns: |
| Formatted modification prompt |
| """ |
| |
| if len(context) > self.config.context_truncation_length: |
| context = context[:self.config.context_truncation_length] + "\n\n[... context truncated ...]" |
|
|
| |
| issues = verification_result.get("issues", []) |
| suggestions = verification_result.get("suggestions", []) |
|
|
| issues_text = "\n".join(f" - {issue}" for issue in issues) if issues else " - None identified" |
| suggestions_text = "\n".join(f" - {sug}" for sug in suggestions) if suggestions else " - General improvement needed" |
|
|
| |
| criteria_scores = verification_result.get("criteria_scores", {}) |
| scores_text = "\n".join( |
| f" - {k.capitalize()}: {v:.1f}/10" |
| for k, v in criteria_scores.items() |
| ) |
|
|
| prompt = f"""Improve the following answer based on verification feedback. |
| |
| QUERY: |
| {query} |
| |
| REFERENCE CONTEXT: |
| {context} |
| |
| CURRENT ANSWER: |
| {answer} |
| |
| VERIFICATION FEEDBACK: |
| Overall Score: {verification_result.get('overall_score', 0):.1f}/10 |
| Threshold: {self.config.verification_threshold}/10 |
| |
| Criterion Scores: |
| {scores_text} |
| |
| Identified Issues: |
| {issues_text} |
| |
| Improvement Suggestions: |
| {suggestions_text} |
| |
| Detailed Feedback: |
| {verification_result.get('feedback', 'No additional feedback')} |
| |
| IMPROVEMENT INSTRUCTIONS: |
| 1. Address ALL identified issues completely |
| 2. Ensure EVERY statement is supported by the context (no hallucinations) |
| 3. Be comprehensive - answer all aspects of the query |
| 4. Maintain factual accuracy - verify all claims against context |
| 5. Improve clarity and structure |
| 6. Fix any logical inconsistencies |
| |
| IMPORTANT: |
| - Only use information from the provided context |
| - If context doesn't support a claim, remove it |
| - Add missing information if present in context |
| - Be specific and detailed while remaining concise |
| - Do not apologize or explain changes - just provide the improved answer |
| |
| IMPROVED ANSWER:""" |
|
|
| return prompt |
|
|
| async def _call_generator_safely(self, prompt: str) -> str: |
| """Call generator LLM with error handling |
| |
| Args: |
| prompt: Modification prompt |
| |
| Returns: |
| LLM response |
| """ |
| try: |
| system_prompt = """You are an expert answer improver. Your task is to enhance answers based on verification feedback while maintaining strict factual accuracy. |
| |
| You must: |
| - Only use information from the provided context |
| - Address all identified issues |
| - Maintain or improve answer quality |
| - Be comprehensive yet concise |
| - Never hallucinate or add unsupported information |
| |
| Provide ONLY the improved answer without explanations or preamble.""" |
|
|
| if asyncio.iscoroutinefunction(self.generator_llm_func): |
| response = await self.generator_llm_func( |
| prompt=prompt, |
| system_prompt=system_prompt, |
| temperature=0.4, |
| max_tokens=1500 |
| ) |
| else: |
| response = self.generator_llm_func( |
| prompt=prompt, |
| system_prompt=system_prompt, |
| temperature=0.4, |
| max_tokens=1500 |
| ) |
|
|
| return response |
|
|
| except Exception as e: |
| logger.error(f"Error calling generator LLM: {e}", exc_info=True) |
| raise |
|
|
| def _extract_answer(self, response: str) -> str: |
| """Extract answer from response, removing any preamble |
| |
| Args: |
| response: LLM response |
| |
| Returns: |
| Cleaned answer |
| """ |
| |
| preambles = [ |
| r'^(?:here is|here\'s|the)\s+(?:an?\s+)?improved answer:?\s*', |
| r'^improved answer:?\s*', |
| r'^answer:?\s*', |
| ] |
|
|
| cleaned = response |
| for pattern in preambles: |
| cleaned = re.sub(pattern, '', cleaned, flags=re.IGNORECASE) |
|
|
| return cleaned.strip() |
|
|
| def _identify_changes(self, original: str, modified: str) -> List[str]: |
| """Identify high-level changes between answers |
| |
| Args: |
| original: Original answer |
| modified: Modified answer |
| |
| Returns: |
| List of change descriptions |
| """ |
| changes = [] |
|
|
| |
| len_diff = len(modified) - len(original) |
| if len_diff > 50: |
| changes.append(f"Expanded answer (+{len_diff} characters)") |
| elif len_diff < -50: |
| changes.append(f"Condensed answer ({len_diff} characters)") |
|
|
| |
| orig_words = len(original.split()) |
| mod_words = len(modified.split()) |
| word_diff = mod_words - orig_words |
| if word_diff > 10: |
| changes.append(f"Added {word_diff} words") |
| elif word_diff < -10: |
| changes.append(f"Removed {abs(word_diff)} words") |
|
|
| |
| orig_sentences = len(re.findall(r'[.!?]+', original)) |
| mod_sentences = len(re.findall(r'[.!?]+', modified)) |
| if mod_sentences > orig_sentences: |
| changes.append(f"Improved structure ({mod_sentences - orig_sentences} more sentences)") |
|
|
| |
| if len_diff == 0 and word_diff == 0: |
| changes.append("Minor refinements") |
| elif not changes: |
| changes.append("Modified answer content") |
|
|
| return changes |
|
|
|
|
| |
| |
| |
|
|
| class DualLLMPipeline: |
| """Complete dual-LLM verification and modification pipeline |
| |
| This class orchestrates the full verification-modification loop, |
| coordinating between verifier and modifier until answer quality |
| meets standards or maximum iterations are reached. |
| |
| Attributes: |
| verifier: AnswerVerifier instance |
| modifier: AnswerModifier instance |
| config: VerificationConfig instance |
| """ |
|
|
| def __init__( |
| self, |
| generator_llm: Callable, |
| verifier_llm: Callable, |
| config: Optional[VerificationConfig] = None |
| ): |
| """Initialize DualLLMPipeline |
| |
| Args: |
| generator_llm: LLM function for answer generation/modification |
| verifier_llm: LLM function for verification (typically more powerful) |
| config: Configuration object |
| """ |
| self.config = config or VerificationConfig() |
| self.verifier = AnswerVerifier(verifier_llm, self.config) |
| self.modifier = AnswerModifier(generator_llm, self.config) |
|
|
| async def process_answer( |
| self, |
| query: str, |
| answer: str, |
| context: str, |
| max_iterations: Optional[int] = None |
| ) -> Dict[str, Any]: |
| """Process answer through verification-modification loop |
| |
| Args: |
| query: Original query |
| answer: Initial generated answer |
| context: Retrieved context |
| max_iterations: Override config max iterations |
| |
| Returns: |
| Dictionary containing: |
| - final_answer: Best answer after iterations |
| - final_score: Final verification score |
| - passed: Whether final answer passed |
| - total_iterations: Number of iterations performed |
| - iteration_history: Detailed history of all iterations |
| - improvement_delta: Score improvement from first to last |
| - metadata: Additional processing metadata |
| |
| Example: |
| ```python |
| result = await pipeline.process_answer( |
| query="What is photosynthesis?", |
| answer="Photosynthesis is a process...", |
| context="[Biology context about photosynthesis]" |
| ) |
| |
| print(f"Final answer (score {result['final_score']}/10):") |
| print(result['final_answer']) |
| print(f"\\nImprovement: +{result['improvement_delta']:.1f} points") |
| ``` |
| """ |
| max_iter = max_iterations or self.config.max_modification_iterations |
|
|
| logger.info(f"Starting dual-LLM pipeline (max {max_iter} iterations)...") |
|
|
| |
| iteration_history = [] |
| current_answer = answer |
| iteration = 0 |
|
|
| |
| verification_result = await self.verifier.verify_answer( |
| query=query, |
| answer=current_answer, |
| context=context |
| ) |
|
|
| initial_score = verification_result["overall_score"] |
|
|
| iteration_history.append({ |
| "iteration": 0, |
| "answer": current_answer, |
| "verification": verification_result, |
| "modification": None |
| }) |
|
|
| logger.info( |
| f"Initial verification: score={initial_score:.2f}, " |
| f"passed={verification_result['passed']}" |
| ) |
|
|
| |
| if verification_result["passed"] and self.config.stop_on_first_pass: |
| logger.info("Answer passed verification on first attempt") |
| return self._create_result( |
| final_answer=current_answer, |
| final_verification=verification_result, |
| iteration_history=iteration_history, |
| initial_score=initial_score |
| ) |
|
|
| |
| previous_score = initial_score |
|
|
| while iteration < max_iter: |
| iteration += 1 |
|
|
| |
| if verification_result["passed"] and self.config.stop_on_first_pass: |
| logger.info(f"Answer passed verification after {iteration-1} modifications") |
| break |
|
|
| |
| if iteration > 1: |
| score_improvement = verification_result["overall_score"] - previous_score |
| if score_improvement < self.config.min_improvement_delta: |
| logger.info( |
| f"Minimal improvement detected ({score_improvement:.2f}), " |
| "stopping iterations" |
| ) |
| break |
|
|
| previous_score = verification_result["overall_score"] |
|
|
| logger.info(f"Iteration {iteration}: Modifying answer...") |
|
|
| |
| modification_result = await self.modifier.modify_answer( |
| query=query, |
| answer=current_answer, |
| context=context, |
| verification_result=verification_result |
| ) |
|
|
| if not modification_result["modification_successful"]: |
| logger.warning("Modification failed, using previous answer") |
| break |
|
|
| current_answer = modification_result["modified_answer"] |
|
|
| |
| verification_result = await self.verifier.verify_answer( |
| query=query, |
| answer=current_answer, |
| context=context |
| ) |
|
|
| logger.info( |
| f"Iteration {iteration} verification: score={verification_result['overall_score']:.2f}, " |
| f"passed={verification_result['passed']}" |
| ) |
|
|
| |
| iteration_history.append({ |
| "iteration": iteration, |
| "answer": current_answer, |
| "verification": verification_result, |
| "modification": modification_result |
| }) |
|
|
| |
| return self._create_result( |
| final_answer=current_answer, |
| final_verification=verification_result, |
| iteration_history=iteration_history, |
| initial_score=initial_score |
| ) |
|
|
| def _create_result( |
| self, |
| final_answer: str, |
| final_verification: Dict[str, Any], |
| iteration_history: List[Dict[str, Any]], |
| initial_score: float |
| ) -> Dict[str, Any]: |
| """Create final pipeline result |
| |
| Args: |
| final_answer: Final answer string |
| final_verification: Final verification result |
| iteration_history: Complete iteration history |
| initial_score: Initial verification score |
| |
| Returns: |
| Complete result dictionary |
| """ |
| final_score = final_verification["overall_score"] |
| improvement_delta = final_score - initial_score |
|
|
| return { |
| "final_answer": final_answer, |
| "final_score": final_score, |
| "passed": final_verification["passed"], |
| "total_iterations": len(iteration_history) - 1, |
| "iteration_history": iteration_history, |
| "improvement_delta": improvement_delta, |
| "confidence": final_verification.get("confidence", 0.0), |
| "metadata": { |
| "initial_score": initial_score, |
| "final_score": final_score, |
| "improvement_percentage": (improvement_delta / max(initial_score, 0.1)) * 100, |
| "threshold": self.config.verification_threshold, |
| "max_iterations_reached": len(iteration_history) - 1 >= self.config.max_modification_iterations |
| } |
| } |
|
|
|
|
| |
| |
| |
|
|
| class DualLLMVerificationMixin: |
| """ |
| Mixin providing dual-LLM verification functionality to RAGAnything |
| |
| This mixin adds answer verification and modification capabilities using |
| a two-LLM approach: |
| 1. Generator LLM creates the initial answer |
| 2. Verifier LLM evaluates answer quality across multiple criteria |
| 3. Modifier LLM improves the answer based on verification feedback |
| 4. Process repeats until answer passes verification or max iterations reached |
| |
| The mixin expects the following attributes to be present: |
| - self.answer_verifier: AnswerVerifier instance (optional) |
| - self.answer_modifier: AnswerModifier instance (optional) |
| - self.verification_pipeline: DualLLMPipeline instance (optional) |
| - self.lightrag: LightRAG instance for answer generation |
| - self.config: RAGAnythingConfig instance |
| - self.logger: Logger instance |
| """ |
|
|
| async def _generate_with_verification( |
| self, |
| query: str, |
| context: str, |
| original_query: Optional[str] = None |
| ) -> Dict[str, Any]: |
| """ |
| Generate answer with dual-LLM verification |
| |
| This method generates an answer and then verifies it using a separate |
| verifier LLM. If the answer doesn't pass verification, it can be |
| iteratively improved based on feedback. |
| |
| Args: |
| query: The query to answer (possibly improved) |
| context: Retrieved context from RAG system |
| original_query: Original user query before improvement (optional) |
| |
| Returns: |
| Dict containing: |
| - answer: The final verified answer |
| - verification_passed: Whether verification passed |
| - verification_score: Overall quality score (0-10) |
| - modification_attempts: Number of modification iterations |
| - verification_history: List of verification results per iteration |
| - final_criteria_scores: Scores for each criterion |
| - confidence: Confidence in the verification |
| - metadata: Additional verification metadata |
| |
| Example: |
| result = await self._generate_with_verification( |
| query="What is the treatment for hypertension?", |
| context="Retrieved medical context...", |
| original_query="What is HTN treatment?" |
| ) |
| # result might be: |
| # { |
| # 'answer': 'Hypertension treatment includes...', |
| # 'verification_passed': True, |
| # 'verification_score': 8.5, |
| # 'modification_attempts': 1, |
| # 'confidence': 0.92 |
| # } |
| """ |
| |
| if not hasattr(self, 'verification_pipeline') or self.verification_pipeline is None: |
| |
| if hasattr(self, 'answer_verifier') and self.answer_verifier is not None: |
| return await self._verify_answer_only(query, context, original_query) |
| else: |
| |
| if hasattr(self, 'logger'): |
| self.logger.debug( |
| "Verification pipeline not initialized, generating without verification" |
| ) |
| return await self._generate_without_verification(query, context, original_query) |
|
|
| try: |
| if hasattr(self, 'logger'): |
| self.logger.info( |
| f"Generating answer with verification (query: '{query[:50]}...')" |
| ) |
|
|
| |
| verification_result = await self.verification_pipeline.process_answer( |
| query=query, |
| answer=None, |
| context=context, |
| max_iterations=getattr( |
| self.config, 'max_verification_iterations', |
| getattr(self.config, 'max_verification_retries', 2) |
| ) if hasattr(self, 'config') else 2 |
| ) |
|
|
| if hasattr(self, 'logger'): |
| self.logger.info( |
| f"Verification complete: passed={verification_result.get('passed', False)}, " |
| f"score={verification_result.get('final_score', 0):.2f}, " |
| f"iterations={verification_result.get('total_iterations', 0)}" |
| ) |
|
|
| |
| return { |
| 'answer': verification_result.get('final_answer', ''), |
| 'verification_passed': verification_result.get('passed', False), |
| 'verification_score': verification_result.get('final_score', 0), |
| 'modification_attempts': verification_result.get('total_iterations', 0), |
| 'verification_history': verification_result.get('iteration_history', []), |
| 'final_criteria_scores': verification_result.get('iteration_history', [{}])[-1].get('criteria_scores', {}) if verification_result.get('iteration_history') else {}, |
| 'confidence': verification_result.get('iteration_history', [{}])[-1].get('confidence', 0) if verification_result.get('iteration_history') else 0, |
| 'improvement_delta': verification_result.get('improvement_delta', 0), |
| 'metadata': { |
| 'original_query': original_query or query, |
| 'improved_query': query, |
| 'verification_method': 'dual_llm_pipeline' |
| } |
| } |
|
|
| except Exception as e: |
| if hasattr(self, 'logger'): |
| self.logger.error(f"Error in verification pipeline: {e}", exc_info=True) |
|
|
| |
| return await self._generate_without_verification(query, context, original_query) |
|
|
| async def _verify_answer_only( |
| self, |
| query: str, |
| context: str, |
| original_query: Optional[str] = None |
| ) -> Dict[str, Any]: |
| """ |
| Verify answer without modification (verifier available but no modifier) |
| |
| Args: |
| query: The query to answer |
| context: Retrieved context |
| original_query: Original query before improvement |
| |
| Returns: |
| Dict with verification results (but no iterative improvement) |
| """ |
| try: |
| |
| answer = await self._generate_answer_from_context(query, context) |
|
|
| if hasattr(self, 'logger'): |
| self.logger.info("Verifying answer (modification disabled)") |
|
|
| |
| verification_result = await self.answer_verifier.verify_answer( |
| query=query, |
| answer=answer, |
| context=context, |
| original_query=original_query |
| ) |
|
|
| return { |
| 'answer': answer, |
| 'verification_passed': verification_result.get('passed', False), |
| 'verification_score': verification_result.get('overall_score', 0), |
| 'modification_attempts': 0, |
| 'verification_history': [verification_result], |
| 'final_criteria_scores': verification_result.get('criteria_scores', {}), |
| 'confidence': verification_result.get('confidence', 0), |
| 'metadata': { |
| 'original_query': original_query or query, |
| 'improved_query': query, |
| 'verification_method': 'verify_only', |
| 'note': 'Answer modification not enabled' |
| } |
| } |
|
|
| except Exception as e: |
| if hasattr(self, 'logger'): |
| self.logger.error(f"Error in answer verification: {e}", exc_info=True) |
|
|
| |
| answer = await self._generate_answer_from_context(query, context) |
| return { |
| 'answer': answer, |
| 'verification_passed': True, |
| 'verification_score': 10.0, |
| 'modification_attempts': 0, |
| 'metadata': {'error': str(e), 'verification_method': 'none'} |
| } |
|
|
| async def _generate_without_verification( |
| self, |
| query: str, |
| context: str, |
| original_query: Optional[str] = None |
| ) -> Dict[str, Any]: |
| """ |
| Generate answer without verification (fallback method) |
| |
| Args: |
| query: The query to answer |
| context: Retrieved context |
| original_query: Original query before improvement |
| |
| Returns: |
| Dict with answer but no verification info |
| """ |
| try: |
| if hasattr(self, 'logger'): |
| self.logger.debug("Generating answer without verification") |
|
|
| answer = await self._generate_answer_from_context(query, context) |
|
|
| return { |
| 'answer': answer, |
| 'verification_passed': True, |
| 'verification_score': 10.0, |
| 'modification_attempts': 0, |
| 'metadata': { |
| 'original_query': original_query or query, |
| 'improved_query': query, |
| 'verification_method': 'none', |
| 'note': 'Verification not enabled' |
| } |
| } |
|
|
| except Exception as e: |
| if hasattr(self, 'logger'): |
| self.logger.error(f"Error generating answer: {e}", exc_info=True) |
|
|
| return { |
| 'answer': f"Error generating answer: {str(e)}", |
| 'verification_passed': False, |
| 'verification_score': 0, |
| 'modification_attempts': 0, |
| 'metadata': {'error': str(e)} |
| } |
|
|
| async def _generate_answer_from_context( |
| self, |
| query: str, |
| context: str |
| ) -> str: |
| """ |
| Generate answer from query and context using LightRAG |
| |
| Args: |
| query: The query |
| context: Retrieved context |
| |
| Returns: |
| Generated answer string |
| """ |
| |
| if not hasattr(self, 'lightrag') or self.lightrag is None: |
| if hasattr(self, 'logger'): |
| self.logger.warning("LightRAG not available for answer generation") |
| return "Unable to generate answer: LightRAG not initialized" |
|
|
| try: |
| |
| from lightrag import QueryParam |
|
|
| |
| query_param = QueryParam(mode="mix") |
| answer = await self.lightrag.aquery(query, param=query_param) |
|
|
| return answer |
|
|
| except Exception as e: |
| if hasattr(self, 'logger'): |
| self.logger.error(f"Error generating answer from context: {e}", exc_info=True) |
| return f"Error generating answer: {str(e)}" |
|
|