| """ |
| Streaming Query Module with Verification Support |
| |
| This module provides streaming capabilities for RAGAnything while maintaining |
| the dual-LLM verification layer. It allows real-time token streaming to the |
| frontend while buffering the complete response for post-generation verification. |
| |
| Key Features: |
| - Real-time token streaming from LLM (Gemini, OpenAI, etc.) |
| - Complete response buffering for verification |
| - Async verification after streaming completes |
| - Verification metadata injection into stream |
| - Support for both verified and unverified streaming modes |
| |
| Architecture: |
| 1. Stream tokens to frontend in real-time |
| 2. Buffer complete response for verification |
| 3. Run verification asynchronously after completion |
| 4. Send verification metadata as final stream chunk |
| |
| Author: RAG-Anything Team |
| Version: 1.0.0 |
| """ |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import json |
| from typing import Dict, List, Any, Optional, AsyncGenerator, Callable |
| from dataclasses import dataclass |
| from enum import Enum |
| from lightrag.utils import logger |
|
|
|
|
| |
| |
| |
|
|
| class StreamMode(Enum): |
| """Streaming modes""" |
| TOKENS_ONLY = "tokens_only" |
| TOKENS_WITH_VERIFICATION = "tokens_with_verification" |
| TOKENS_WITH_METADATA = "tokens_with_metadata" |
|
|
|
|
| @dataclass |
| class StreamingConfig: |
| """Configuration for streaming queries |
| |
| Attributes: |
| mode: Streaming mode |
| enable_verification: Whether to run verification after streaming |
| send_verification_metadata: Send verification results as final chunk |
| verification_async: Run verification in background (non-blocking) |
| buffer_size: Number of tokens to buffer before sending |
| include_context: Include retrieved context in metadata |
| """ |
| mode: StreamMode = StreamMode.TOKENS_WITH_VERIFICATION |
| enable_verification: bool = True |
| send_verification_metadata: bool = True |
| verification_async: bool = True |
| buffer_size: int = 1 |
| include_context: bool = False |
|
|
|
|
| |
| |
| |
|
|
| class StreamBuffer: |
| """Buffer for collecting streamed tokens and managing verification |
| |
| This class collects tokens as they're streamed and provides the complete |
| response for verification after streaming completes. |
| """ |
|
|
| def __init__(self): |
| """Initialize StreamBuffer""" |
| self.tokens: List[str] = [] |
| self.complete_response: str = "" |
| self.is_complete: bool = False |
| self.verification_result: Optional[Dict[str, Any]] = None |
|
|
| def add_token(self, token: str): |
| """Add a token to the buffer |
| |
| Args: |
| token: Token to add |
| """ |
| self.tokens.append(token) |
|
|
| def finalize(self) -> str: |
| """Finalize buffer and return complete response |
| |
| Returns: |
| Complete response string |
| """ |
| self.complete_response = "".join(self.tokens) |
| self.is_complete = True |
| return self.complete_response |
|
|
| def set_verification_result(self, result: Dict[str, Any]): |
| """Store verification result |
| |
| Args: |
| result: Verification result dictionary |
| """ |
| self.verification_result = result |
|
|
|
|
| |
| |
| |
|
|
| class StreamingQueryHandler: |
| """Handler for streaming queries with verification support |
| |
| This class orchestrates the streaming process, managing token streaming |
| to the frontend while buffering for verification. |
| |
| Attributes: |
| config: StreamingConfig instance |
| verifier: AnswerVerifier instance (optional) |
| modifier: AnswerModifier instance (optional) |
| """ |
|
|
| def __init__( |
| self, |
| config: Optional[StreamingConfig] = None, |
| verifier: Optional[Any] = None, |
| modifier: Optional[Any] = None |
| ): |
| """Initialize StreamingQueryHandler |
| |
| Args: |
| config: Streaming configuration |
| verifier: AnswerVerifier instance for verification |
| modifier: AnswerModifier instance for improvements |
| """ |
| self.config = config or StreamingConfig() |
| self.verifier = verifier |
| self.modifier = modifier |
|
|
| async def stream_with_verification( |
| self, |
| llm_stream_func: Callable, |
| query: str, |
| context: str, |
| original_query: Optional[str] = None, |
| **llm_kwargs |
| ) -> AsyncGenerator[Dict[str, Any], None]: |
| """Stream LLM response with verification support |
| |
| This is the main streaming method. It: |
| 1. Streams tokens to frontend in real-time |
| 2. Buffers tokens for complete response |
| 3. Runs verification after streaming completes |
| 4. Sends verification metadata as final chunk |
| |
| Args: |
| llm_stream_func: Async generator function that yields tokens |
| query: Query to answer |
| context: Retrieved context |
| original_query: Original query before improvement |
| **llm_kwargs: Additional kwargs for LLM |
| |
| Yields: |
| Dict with keys: |
| - type: "token" | "metadata" | "verification" | "error" |
| - content: Token string or metadata dict |
| - done: Boolean indicating if streaming is complete |
| |
| Example: |
| ```python |
| async for chunk in handler.stream_with_verification( |
| llm_stream_func=my_gemini_stream, |
| query="What is photosynthesis?", |
| context="[Retrieved context]" |
| ): |
| if chunk["type"] == "token": |
| print(chunk["content"], end="", flush=True) |
| elif chunk["type"] == "verification": |
| print(f"\n\nVerification Score: {chunk['content']['score']}") |
| ``` |
| """ |
| buffer = StreamBuffer() |
|
|
| try: |
| |
| logger.info("Starting token streaming...") |
|
|
| async for token in llm_stream_func( |
| prompt=self._build_prompt(query, context), |
| **llm_kwargs |
| ): |
| |
| buffer.add_token(token) |
|
|
| |
| yield { |
| "type": "token", |
| "content": token, |
| "done": False |
| } |
|
|
| |
| complete_response = buffer.finalize() |
| logger.info(f"Streaming complete. Total response length: {len(complete_response)}") |
|
|
| |
| yield { |
| "type": "token", |
| "content": "", |
| "done": True |
| } |
|
|
| |
| if self.config.enable_verification and self.verifier: |
| logger.info("Running post-stream verification...") |
|
|
| if self.config.verification_async: |
| |
| asyncio.create_task( |
| self._verify_response_async( |
| buffer, |
| query, |
| context, |
| original_query |
| ) |
| ) |
|
|
| |
| if self.config.send_verification_metadata: |
| yield { |
| "type": "verification", |
| "content": { |
| "status": "verifying", |
| "message": "Verification in progress..." |
| }, |
| "done": False |
| } |
| else: |
| |
| verification_result = await self._verify_response( |
| complete_response, |
| query, |
| context, |
| original_query |
| ) |
| buffer.set_verification_result(verification_result) |
|
|
| |
| if self.config.send_verification_metadata: |
| yield { |
| "type": "verification", |
| "content": verification_result, |
| "done": True |
| } |
|
|
| except Exception as e: |
| logger.error(f"Error during streaming: {e}", exc_info=True) |
| yield { |
| "type": "error", |
| "content": { |
| "message": str(e), |
| "error_type": type(e).__name__ |
| }, |
| "done": True |
| } |
|
|
| async def stream_simple( |
| self, |
| llm_stream_func: Callable, |
| query: str, |
| context: str, |
| **llm_kwargs |
| ) -> AsyncGenerator[str, None]: |
| """Simple token streaming without verification |
| |
| This is a lightweight streaming method that just yields tokens |
| without any verification or metadata. |
| |
| Args: |
| llm_stream_func: Async generator function that yields tokens |
| query: Query to answer |
| context: Retrieved context |
| **llm_kwargs: Additional kwargs for LLM |
| |
| Yields: |
| str: Individual tokens |
| |
| Example: |
| ```python |
| async for token in handler.stream_simple( |
| llm_stream_func=my_llm_stream, |
| query="What is AI?", |
| context="[Context]" |
| ): |
| print(token, end="", flush=True) |
| ``` |
| """ |
| try: |
| async for token in llm_stream_func( |
| prompt=self._build_prompt(query, context), |
| **llm_kwargs |
| ): |
| yield token |
|
|
| except Exception as e: |
| logger.error(f"Error during simple streaming: {e}", exc_info=True) |
| yield f"[Error: {str(e)}]" |
|
|
| def _build_prompt(self, query: str, context: str) -> str: |
| """Build prompt from query and context |
| |
| Args: |
| query: User query |
| context: Retrieved context |
| |
| Returns: |
| Formatted prompt string |
| """ |
| |
| return f"""You are an expert assistant analyzing a knowledge base. Use the provided context to answer the question accurately and comprehensively. |
| |
| ## Context Information: |
| {context} |
| |
| ## User Question: |
| {query} |
| |
| ## Instructions: |
| 1. Answer based ONLY on the information provided in the context above |
| 2. If the context contains relevant information, provide a clear, detailed answer |
| 3. Structure your response with: |
| - Direct answer to the question |
| - Supporting details and evidence from the context |
| - Relevant examples or specifics when available |
| 4. If the context doesn't contain enough information to fully answer the question, state what you know and what's missing |
| 5. Be precise and cite specific information from the context when possible |
| 6. Use clear, professional language appropriate for the domain |
| |
| ## Answer:""" |
|
|
| async def _verify_response( |
| self, |
| response: str, |
| query: str, |
| context: str, |
| original_query: Optional[str] = None |
| ) -> Dict[str, Any]: |
| """Verify a complete response |
| |
| Args: |
| response: Complete LLM response |
| query: Query used |
| context: Retrieved context |
| original_query: Original query before improvement |
| |
| Returns: |
| Verification result dictionary |
| """ |
| if not self.verifier: |
| logger.warning("Verifier not available, skipping verification") |
| return { |
| "passed": True, |
| "score": 10.0, |
| "message": "Verification not available" |
| } |
|
|
| try: |
| verification_result = await self.verifier.verify_answer( |
| query=query, |
| answer=response, |
| context=context, |
| original_query=original_query |
| ) |
|
|
| return { |
| "passed": verification_result.get("passed", False), |
| "score": verification_result.get("overall_score", 0.0), |
| "criteria_scores": verification_result.get("criteria_scores", {}), |
| "issues": verification_result.get("issues", []), |
| "suggestions": verification_result.get("suggestions", []), |
| "confidence": verification_result.get("confidence", 0.0) |
| } |
|
|
| except Exception as e: |
| logger.error(f"Verification error: {e}", exc_info=True) |
| return { |
| "passed": False, |
| "score": 0.0, |
| "error": str(e) |
| } |
|
|
| async def _verify_response_async( |
| self, |
| buffer: StreamBuffer, |
| query: str, |
| context: str, |
| original_query: Optional[str] = None |
| ): |
| """Async verification (non-blocking background task) |
| |
| Args: |
| buffer: StreamBuffer to store result in |
| query: Query used |
| context: Retrieved context |
| original_query: Original query before improvement |
| """ |
| verification_result = await self._verify_response( |
| buffer.complete_response, |
| query, |
| context, |
| original_query |
| ) |
| buffer.set_verification_result(verification_result) |
| logger.info(f"Background verification complete: score={verification_result.get('score', 0):.2f}") |
|
|
|
|
| |
| |
| |
|
|
| class StreamingQueryMixin: |
| """Mixin providing streaming query capabilities to RAGAnything |
| |
| This mixin adds streaming query methods that can be used alongside |
| the existing query methods. It integrates with the verification system. |
| |
| Expected attributes: |
| - self.lightrag: LightRAG instance |
| - self.answer_verifier: AnswerVerifier instance (optional) |
| - self.answer_modifier: AnswerModifier instance (optional) |
| - self.config: RAGAnythingConfig instance |
| - self.logger: Logger instance |
| """ |
|
|
| async def aquery_stream( |
| self, |
| query: str, |
| mode: str = "mix", |
| enable_verification: bool = True, |
| **kwargs |
| ) -> AsyncGenerator[Dict[str, Any], None]: |
| """Streaming query with verification support |
| |
| This method streams LLM responses while optionally running verification. |
| Perfect for real-time user interfaces. |
| |
| Args: |
| query: User query |
| mode: RAG mode ("local", "global", "hybrid", "naive", "mix") |
| enable_verification: Whether to run verification |
| **kwargs: Additional query parameters |
| |
| Yields: |
| Dict containing: |
| - type: "token" | "metadata" | "verification" | "error" |
| - content: Token or metadata |
| - done: Completion flag |
| |
| Example: |
| ```python |
| async for chunk in rag.aquery_stream( |
| query="What is machine learning?", |
| enable_verification=True |
| ): |
| if chunk["type"] == "token": |
| print(chunk["content"], end="") |
| elif chunk["type"] == "verification": |
| print(f"\n\nQuality Score: {chunk['content']['score']}/10") |
| ``` |
| """ |
| if not hasattr(self, 'lightrag') or self.lightrag is None: |
| raise ValueError("LightRAG not initialized") |
|
|
| try: |
| |
| from lightrag import QueryParam |
|
|
| original_query = query |
|
|
| |
| use_query_improvement = kwargs.pop( |
| 'enable_query_improvement', |
| getattr(self.config, 'enable_query_improvement', False) |
| ) |
|
|
| if use_query_improvement and hasattr(self, 'query_improver') and self.query_improver: |
| self.logger.info("Applying query improvement for streaming...") |
| try: |
| query_improvement_result = await self._apply_query_improvement(query) |
| improved = query_improvement_result.get("improved_query", query) |
| if improved and improved.strip(): |
| query = improved |
| self.logger.info(f"Query improved: '{original_query}' -> '{query}'") |
| else: |
| self.logger.warning("Query improvement returned empty result, using original query") |
| except Exception as e: |
| self.logger.warning(f"Query improvement failed: {e}, using original query") |
| |
|
|
| |
| self.logger.info(f"Retrieving context for streaming query: {query[:100]}...") |
| query_param = QueryParam(mode=mode, only_need_context=True) |
| context = await self.lightrag.aquery(query, param=query_param) |
|
|
| if not context or not context.strip(): |
| self.logger.warning("No context retrieved for query") |
| yield { |
| "type": "error", |
| "content": { |
| "message": "I couldn't find any relevant information in the knowledge base to answer your question. Please ensure documents have been uploaded and indexed, or try rephrasing your query with different keywords.", |
| "suggestion": "Try uploading relevant documents first, or rephrase your question with more specific terms." |
| }, |
| "done": True |
| } |
| return |
|
|
| |
| streaming_config = StreamingConfig( |
| enable_verification=enable_verification and hasattr(self, 'answer_verifier'), |
| send_verification_metadata=True, |
| verification_async=False |
| ) |
|
|
| handler = StreamingQueryHandler( |
| config=streaming_config, |
| verifier=getattr(self, 'answer_verifier', None), |
| modifier=getattr(self, 'answer_modifier', None) |
| ) |
|
|
| |
| if hasattr(self.lightrag, 'llm_model_func'): |
| |
| llm_func = self.lightrag.llm_model_func |
|
|
| async def llm_stream_wrapper(prompt, **llm_kwargs): |
| """Wrapper to simulate streaming from non-streaming LLM""" |
| if asyncio.iscoroutinefunction(llm_func): |
| response = await llm_func(prompt, **llm_kwargs) |
| else: |
| response = llm_func(prompt, **llm_kwargs) |
|
|
| |
| |
| words = response.split() |
| for i, word in enumerate(words): |
| if i < len(words) - 1: |
| yield word + " " |
| else: |
| yield word |
| |
| await asyncio.sleep(0.01) |
|
|
| async for chunk in handler.stream_with_verification( |
| llm_stream_func=llm_stream_wrapper, |
| query=query, |
| context=context, |
| original_query=original_query |
| ): |
| yield chunk |
| else: |
| raise ValueError("LLM model function not available for streaming") |
|
|
| except Exception as e: |
| self.logger.error(f"Error in streaming query: {e}", exc_info=True) |
| yield { |
| "type": "error", |
| "content": {"message": str(e)}, |
| "done": True |
| } |
|
|
| async def aquery_stream_simple( |
| self, |
| query: str, |
| mode: str = "mix", |
| **kwargs |
| ) -> AsyncGenerator[str, None]: |
| """Simple streaming query without verification |
| |
| Lightweight streaming that just yields tokens without any |
| verification or metadata overhead. |
| |
| Args: |
| query: User query |
| mode: RAG mode |
| **kwargs: Additional parameters |
| |
| Yields: |
| str: Individual tokens |
| |
| Example: |
| ```python |
| async for token in rag.aquery_stream_simple( |
| query="Explain photosynthesis" |
| ): |
| print(token, end="", flush=True) |
| ``` |
| """ |
| try: |
| |
| from lightrag import QueryParam |
|
|
| query_param = QueryParam(mode=mode, only_need_context=True) |
| context = await self.lightrag.aquery(query, param=query_param) |
|
|
| if not context: |
| yield "[No context found]" |
| return |
|
|
| |
| handler = StreamingQueryHandler( |
| config=StreamingConfig(enable_verification=False) |
| ) |
|
|
| |
| if hasattr(self.lightrag, 'llm_model_func'): |
| llm_func = self.lightrag.llm_model_func |
|
|
| async def llm_stream_wrapper(prompt, **llm_kwargs): |
| if asyncio.iscoroutinefunction(llm_func): |
| response = await llm_func(prompt, **llm_kwargs) |
| else: |
| response = llm_func(prompt, **llm_kwargs) |
|
|
| words = response.split() |
| for i, word in enumerate(words): |
| if i < len(words) - 1: |
| yield word + " " |
| else: |
| yield word |
| await asyncio.sleep(0.01) |
|
|
| async for token in handler.stream_simple( |
| llm_stream_func=llm_stream_wrapper, |
| query=query, |
| context=context |
| ): |
| yield token |
| else: |
| yield "[LLM not available]" |
|
|
| except Exception as e: |
| self.logger.error(f"Error in simple streaming: {e}", exc_info=True) |
| yield f"[Error: {str(e)}]" |
|
|