Phonex
TheTruthSchool_RAG
167596f
"""
Streaming Query Module with Verification Support
This module provides streaming capabilities for RAGAnything while maintaining
the dual-LLM verification layer. It allows real-time token streaming to the
frontend while buffering the complete response for post-generation verification.
Key Features:
- Real-time token streaming from LLM (Gemini, OpenAI, etc.)
- Complete response buffering for verification
- Async verification after streaming completes
- Verification metadata injection into stream
- Support for both verified and unverified streaming modes
Architecture:
1. Stream tokens to frontend in real-time
2. Buffer complete response for verification
3. Run verification asynchronously after completion
4. Send verification metadata as final stream chunk
Author: RAG-Anything Team
Version: 1.0.0
"""
from __future__ import annotations
import asyncio
import json
from typing import Dict, List, Any, Optional, AsyncGenerator, Callable
from dataclasses import dataclass
from enum import Enum
from lightrag.utils import logger
# =============================================================================
# Configuration Classes
# =============================================================================
class StreamMode(Enum):
"""Streaming modes"""
TOKENS_ONLY = "tokens_only" # Stream tokens, no verification
TOKENS_WITH_VERIFICATION = "tokens_with_verification" # Stream tokens + verify
TOKENS_WITH_METADATA = "tokens_with_metadata" # Include metadata chunks
@dataclass
class StreamingConfig:
"""Configuration for streaming queries
Attributes:
mode: Streaming mode
enable_verification: Whether to run verification after streaming
send_verification_metadata: Send verification results as final chunk
verification_async: Run verification in background (non-blocking)
buffer_size: Number of tokens to buffer before sending
include_context: Include retrieved context in metadata
"""
mode: StreamMode = StreamMode.TOKENS_WITH_VERIFICATION
enable_verification: bool = True
send_verification_metadata: bool = True
verification_async: bool = True
buffer_size: int = 1
include_context: bool = False
# =============================================================================
# Streaming Response Buffer
# =============================================================================
class StreamBuffer:
"""Buffer for collecting streamed tokens and managing verification
This class collects tokens as they're streamed and provides the complete
response for verification after streaming completes.
"""
def __init__(self):
"""Initialize StreamBuffer"""
self.tokens: List[str] = []
self.complete_response: str = ""
self.is_complete: bool = False
self.verification_result: Optional[Dict[str, Any]] = None
def add_token(self, token: str):
"""Add a token to the buffer
Args:
token: Token to add
"""
self.tokens.append(token)
def finalize(self) -> str:
"""Finalize buffer and return complete response
Returns:
Complete response string
"""
self.complete_response = "".join(self.tokens)
self.is_complete = True
return self.complete_response
def set_verification_result(self, result: Dict[str, Any]):
"""Store verification result
Args:
result: Verification result dictionary
"""
self.verification_result = result
# =============================================================================
# Streaming Query Handler
# =============================================================================
class StreamingQueryHandler:
"""Handler for streaming queries with verification support
This class orchestrates the streaming process, managing token streaming
to the frontend while buffering for verification.
Attributes:
config: StreamingConfig instance
verifier: AnswerVerifier instance (optional)
modifier: AnswerModifier instance (optional)
"""
def __init__(
self,
config: Optional[StreamingConfig] = None,
verifier: Optional[Any] = None,
modifier: Optional[Any] = None
):
"""Initialize StreamingQueryHandler
Args:
config: Streaming configuration
verifier: AnswerVerifier instance for verification
modifier: AnswerModifier instance for improvements
"""
self.config = config or StreamingConfig()
self.verifier = verifier
self.modifier = modifier
async def stream_with_verification(
self,
llm_stream_func: Callable,
query: str,
context: str,
original_query: Optional[str] = None,
**llm_kwargs
) -> AsyncGenerator[Dict[str, Any], None]:
"""Stream LLM response with verification support
This is the main streaming method. It:
1. Streams tokens to frontend in real-time
2. Buffers tokens for complete response
3. Runs verification after streaming completes
4. Sends verification metadata as final chunk
Args:
llm_stream_func: Async generator function that yields tokens
query: Query to answer
context: Retrieved context
original_query: Original query before improvement
**llm_kwargs: Additional kwargs for LLM
Yields:
Dict with keys:
- type: "token" | "metadata" | "verification" | "error"
- content: Token string or metadata dict
- done: Boolean indicating if streaming is complete
Example:
```python
async for chunk in handler.stream_with_verification(
llm_stream_func=my_gemini_stream,
query="What is photosynthesis?",
context="[Retrieved context]"
):
if chunk["type"] == "token":
print(chunk["content"], end="", flush=True)
elif chunk["type"] == "verification":
print(f"\n\nVerification Score: {chunk['content']['score']}")
```
"""
buffer = StreamBuffer()
try:
# Step 1: Stream tokens to frontend
logger.info("Starting token streaming...")
async for token in llm_stream_func(
prompt=self._build_prompt(query, context),
**llm_kwargs
):
# Add token to buffer
buffer.add_token(token)
# Yield token to frontend
yield {
"type": "token",
"content": token,
"done": False
}
# Step 2: Finalize buffer
complete_response = buffer.finalize()
logger.info(f"Streaming complete. Total response length: {len(complete_response)}")
# Send completion signal
yield {
"type": "token",
"content": "",
"done": True
}
# Step 3: Run verification (if enabled)
if self.config.enable_verification and self.verifier:
logger.info("Running post-stream verification...")
if self.config.verification_async:
# Non-blocking verification
asyncio.create_task(
self._verify_response_async(
buffer,
query,
context,
original_query
)
)
# Send placeholder verification metadata
if self.config.send_verification_metadata:
yield {
"type": "verification",
"content": {
"status": "verifying",
"message": "Verification in progress..."
},
"done": False
}
else:
# Blocking verification
verification_result = await self._verify_response(
complete_response,
query,
context,
original_query
)
buffer.set_verification_result(verification_result)
# Send verification metadata
if self.config.send_verification_metadata:
yield {
"type": "verification",
"content": verification_result,
"done": True
}
except Exception as e:
logger.error(f"Error during streaming: {e}", exc_info=True)
yield {
"type": "error",
"content": {
"message": str(e),
"error_type": type(e).__name__
},
"done": True
}
async def stream_simple(
self,
llm_stream_func: Callable,
query: str,
context: str,
**llm_kwargs
) -> AsyncGenerator[str, None]:
"""Simple token streaming without verification
This is a lightweight streaming method that just yields tokens
without any verification or metadata.
Args:
llm_stream_func: Async generator function that yields tokens
query: Query to answer
context: Retrieved context
**llm_kwargs: Additional kwargs for LLM
Yields:
str: Individual tokens
Example:
```python
async for token in handler.stream_simple(
llm_stream_func=my_llm_stream,
query="What is AI?",
context="[Context]"
):
print(token, end="", flush=True)
```
"""
try:
async for token in llm_stream_func(
prompt=self._build_prompt(query, context),
**llm_kwargs
):
yield token
except Exception as e:
logger.error(f"Error during simple streaming: {e}", exc_info=True)
yield f"[Error: {str(e)}]"
def _build_prompt(self, query: str, context: str) -> str:
"""Build prompt from query and context
Args:
query: User query
context: Retrieved context
Returns:
Formatted prompt string
"""
# Enhanced prompt with better instructions for higher quality responses
return f"""You are an expert assistant analyzing a knowledge base. Use the provided context to answer the question accurately and comprehensively.
## Context Information:
{context}
## User Question:
{query}
## Instructions:
1. Answer based ONLY on the information provided in the context above
2. If the context contains relevant information, provide a clear, detailed answer
3. Structure your response with:
- Direct answer to the question
- Supporting details and evidence from the context
- Relevant examples or specifics when available
4. If the context doesn't contain enough information to fully answer the question, state what you know and what's missing
5. Be precise and cite specific information from the context when possible
6. Use clear, professional language appropriate for the domain
## Answer:"""
async def _verify_response(
self,
response: str,
query: str,
context: str,
original_query: Optional[str] = None
) -> Dict[str, Any]:
"""Verify a complete response
Args:
response: Complete LLM response
query: Query used
context: Retrieved context
original_query: Original query before improvement
Returns:
Verification result dictionary
"""
if not self.verifier:
logger.warning("Verifier not available, skipping verification")
return {
"passed": True,
"score": 10.0,
"message": "Verification not available"
}
try:
verification_result = await self.verifier.verify_answer(
query=query,
answer=response,
context=context,
original_query=original_query
)
return {
"passed": verification_result.get("passed", False),
"score": verification_result.get("overall_score", 0.0),
"criteria_scores": verification_result.get("criteria_scores", {}),
"issues": verification_result.get("issues", []),
"suggestions": verification_result.get("suggestions", []),
"confidence": verification_result.get("confidence", 0.0)
}
except Exception as e:
logger.error(f"Verification error: {e}", exc_info=True)
return {
"passed": False,
"score": 0.0,
"error": str(e)
}
async def _verify_response_async(
self,
buffer: StreamBuffer,
query: str,
context: str,
original_query: Optional[str] = None
):
"""Async verification (non-blocking background task)
Args:
buffer: StreamBuffer to store result in
query: Query used
context: Retrieved context
original_query: Original query before improvement
"""
verification_result = await self._verify_response(
buffer.complete_response,
query,
context,
original_query
)
buffer.set_verification_result(verification_result)
logger.info(f"Background verification complete: score={verification_result.get('score', 0):.2f}")
# =============================================================================
# Streaming Mixin for RAGAnything Integration
# =============================================================================
class StreamingQueryMixin:
"""Mixin providing streaming query capabilities to RAGAnything
This mixin adds streaming query methods that can be used alongside
the existing query methods. It integrates with the verification system.
Expected attributes:
- self.lightrag: LightRAG instance
- self.answer_verifier: AnswerVerifier instance (optional)
- self.answer_modifier: AnswerModifier instance (optional)
- self.config: RAGAnythingConfig instance
- self.logger: Logger instance
"""
async def aquery_stream(
self,
query: str,
mode: str = "mix",
enable_verification: bool = True,
**kwargs
) -> AsyncGenerator[Dict[str, Any], None]:
"""Streaming query with verification support
This method streams LLM responses while optionally running verification.
Perfect for real-time user interfaces.
Args:
query: User query
mode: RAG mode ("local", "global", "hybrid", "naive", "mix")
enable_verification: Whether to run verification
**kwargs: Additional query parameters
Yields:
Dict containing:
- type: "token" | "metadata" | "verification" | "error"
- content: Token or metadata
- done: Completion flag
Example:
```python
async for chunk in rag.aquery_stream(
query="What is machine learning?",
enable_verification=True
):
if chunk["type"] == "token":
print(chunk["content"], end="")
elif chunk["type"] == "verification":
print(f"\n\nQuality Score: {chunk['content']['score']}/10")
```
"""
if not hasattr(self, 'lightrag') or self.lightrag is None:
raise ValueError("LightRAG not initialized")
try:
# Import here to avoid circular dependencies
from lightrag import QueryParam
original_query = query
# Step 1: Apply query improvement if enabled
use_query_improvement = kwargs.pop(
'enable_query_improvement',
getattr(self.config, 'enable_query_improvement', False)
)
if use_query_improvement and hasattr(self, 'query_improver') and self.query_improver:
self.logger.info("Applying query improvement for streaming...")
try:
query_improvement_result = await self._apply_query_improvement(query)
improved = query_improvement_result.get("improved_query", query)
if improved and improved.strip():
query = improved
self.logger.info(f"Query improved: '{original_query}' -> '{query}'")
else:
self.logger.warning("Query improvement returned empty result, using original query")
except Exception as e:
self.logger.warning(f"Query improvement failed: {e}, using original query")
# Continue with original query on error
# Step 2: Retrieve context
self.logger.info(f"Retrieving context for streaming query: {query[:100]}...")
query_param = QueryParam(mode=mode, only_need_context=True)
context = await self.lightrag.aquery(query, param=query_param)
if not context or not context.strip():
self.logger.warning("No context retrieved for query")
yield {
"type": "error",
"content": {
"message": "I couldn't find any relevant information in the knowledge base to answer your question. Please ensure documents have been uploaded and indexed, or try rephrasing your query with different keywords.",
"suggestion": "Try uploading relevant documents first, or rephrase your question with more specific terms."
},
"done": True
}
return
# Step 3: Create streaming handler
streaming_config = StreamingConfig(
enable_verification=enable_verification and hasattr(self, 'answer_verifier'),
send_verification_metadata=True,
verification_async=False # Blocking to ensure verification completes
)
handler = StreamingQueryHandler(
config=streaming_config,
verifier=getattr(self, 'answer_verifier', None),
modifier=getattr(self, 'answer_modifier', None)
)
# Step 4: Stream response
if hasattr(self.lightrag, 'llm_model_func'):
# Create streaming wrapper for non-streaming LLM
llm_func = self.lightrag.llm_model_func
async def llm_stream_wrapper(prompt, **llm_kwargs):
"""Wrapper to simulate streaming from non-streaming LLM"""
if asyncio.iscoroutinefunction(llm_func):
response = await llm_func(prompt, **llm_kwargs)
else:
response = llm_func(prompt, **llm_kwargs)
# Simulate token-by-token streaming
# Split by words for more natural streaming
words = response.split()
for i, word in enumerate(words):
if i < len(words) - 1:
yield word + " "
else:
yield word
# Small delay to simulate real streaming
await asyncio.sleep(0.01)
async for chunk in handler.stream_with_verification(
llm_stream_func=llm_stream_wrapper,
query=query,
context=context,
original_query=original_query
):
yield chunk
else:
raise ValueError("LLM model function not available for streaming")
except Exception as e:
self.logger.error(f"Error in streaming query: {e}", exc_info=True)
yield {
"type": "error",
"content": {"message": str(e)},
"done": True
}
async def aquery_stream_simple(
self,
query: str,
mode: str = "mix",
**kwargs
) -> AsyncGenerator[str, None]:
"""Simple streaming query without verification
Lightweight streaming that just yields tokens without any
verification or metadata overhead.
Args:
query: User query
mode: RAG mode
**kwargs: Additional parameters
Yields:
str: Individual tokens
Example:
```python
async for token in rag.aquery_stream_simple(
query="Explain photosynthesis"
):
print(token, end="", flush=True)
```
"""
try:
# Get context
from lightrag import QueryParam
query_param = QueryParam(mode=mode, only_need_context=True)
context = await self.lightrag.aquery(query, param=query_param)
if not context:
yield "[No context found]"
return
# Create handler
handler = StreamingQueryHandler(
config=StreamingConfig(enable_verification=False)
)
# Stream tokens
if hasattr(self.lightrag, 'llm_model_func'):
llm_func = self.lightrag.llm_model_func
async def llm_stream_wrapper(prompt, **llm_kwargs):
if asyncio.iscoroutinefunction(llm_func):
response = await llm_func(prompt, **llm_kwargs)
else:
response = llm_func(prompt, **llm_kwargs)
words = response.split()
for i, word in enumerate(words):
if i < len(words) - 1:
yield word + " "
else:
yield word
await asyncio.sleep(0.01)
async for token in handler.stream_simple(
llm_stream_func=llm_stream_wrapper,
query=query,
context=context
):
yield token
else:
yield "[LLM not available]"
except Exception as e:
self.logger.error(f"Error in simple streaming: {e}", exc_info=True)
yield f"[Error: {str(e)}]"