OrgAI / rag_anything_smaranika /raganything /gpu_accelerator.py
Phonex
TheTruthSchool_RAG
167596f
"""
GPU Acceleration Module for RAGAnything
Provides GPU-accelerated operations for:
- Batch embeddings with automatic GPU detection
- Optimized vector similarity search
- Concurrent API calls with GPU batching
- Memory-efficient GPU utilization
Author: RAGAnything Performance Team
"""
import asyncio
import logging
import time
from typing import List, Optional, Callable, Any, Tuple, Dict
from dataclasses import dataclass, field
import numpy as np
logger = logging.getLogger(__name__)
# Try importing GPU libraries
GPU_AVAILABLE = False
CUDA_AVAILABLE = False
try:
import torch
GPU_AVAILABLE = True
CUDA_AVAILABLE = torch.cuda.is_available()
if CUDA_AVAILABLE:
logger.info(f"GPU acceleration available: {torch.cuda.get_device_name(0)}")
logger.info(f"CUDA version: {torch.version.cuda}")
logger.info(f"GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")
except ImportError:
logger.warning("PyTorch not available. GPU acceleration disabled.")
torch = None
@dataclass
class GPUConfig:
"""Configuration for GPU acceleration"""
enable_gpu: bool = True
"""Enable GPU acceleration if available"""
device: str = "cuda"
"""Device to use: 'cuda', 'cuda:0', 'cpu'"""
batch_size: int = 64
"""Batch size for GPU operations"""
max_gpu_memory_gb: float = 4.0
"""Maximum GPU memory to use (GB)"""
prefetch_batches: int = 2
"""Number of batches to prefetch"""
pin_memory: bool = True
"""Use pinned memory for faster CPU->GPU transfers"""
num_workers: int = 2
"""Number of worker threads for data loading"""
mixed_precision: bool = True
"""Use FP16 for embeddings to save memory"""
optimize_for_inference: bool = True
"""Apply inference-specific optimizations"""
class GPUEmbeddingAccelerator:
"""
GPU-accelerated embedding generation
Features:
- Automatic batching for GPU
- Memory-efficient processing
- Mixed precision support
- Concurrent API calls with batching
"""
def __init__(
self,
embedding_func: Callable,
config: Optional[GPUConfig] = None,
logger: Optional[logging.Logger] = None
):
"""
Initialize GPU embedding accelerator
Args:
embedding_func: Base embedding function to accelerate
config: GPU configuration
logger: Optional logger
"""
self.embedding_func = embedding_func
self.config = config or GPUConfig()
self.logger = logger or logging.getLogger(__name__)
# Determine device
if GPU_AVAILABLE and CUDA_AVAILABLE and self.config.enable_gpu:
self.device = torch.device(self.config.device)
self.use_gpu = True
self.logger.info(f"Using GPU device: {self.device}")
else:
self.device = torch.device("cpu")
self.use_gpu = False
self.logger.info("Using CPU for embeddings")
# Statistics
self.stats = {
"total_embeddings": 0,
"total_batches": 0,
"total_time": 0.0,
"gpu_time": 0.0,
"cpu_time": 0.0,
}
async def embed_batch(self, texts: List[str]) -> List[List[float]]:
"""
Generate embeddings for a batch of texts with GPU acceleration
Args:
texts: List of texts to embed
Returns:
List of embedding vectors
"""
if not texts:
return []
start_time = time.time()
try:
# Call the original embedding function
embeddings = await self._call_embedding_func(texts)
# Convert to GPU tensors if enabled
if self.use_gpu and GPU_AVAILABLE:
embeddings = self._optimize_embeddings_gpu(embeddings)
# Update stats
elapsed = time.time() - start_time
self.stats["total_embeddings"] += len(texts)
self.stats["total_batches"] += 1
self.stats["total_time"] += elapsed
if self.use_gpu:
self.stats["gpu_time"] += elapsed
else:
self.stats["cpu_time"] += elapsed
return embeddings
except Exception as e:
self.logger.error(f"Error in embed_batch: {e}", exc_info=True)
raise
async def _call_embedding_func(self, texts: List[str]) -> List[List[float]]:
"""Call the original embedding function"""
if asyncio.iscoroutinefunction(self.embedding_func):
return await self.embedding_func(texts)
else:
# Run sync function in thread pool
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self.embedding_func, texts)
def _optimize_embeddings_gpu(self, embeddings: List[List[float]]) -> List[List[float]]:
"""Optimize embeddings using GPU"""
try:
# Convert to tensor
tensor = torch.tensor(embeddings, dtype=torch.float32, device=self.device)
# Apply mixed precision if enabled
if self.config.mixed_precision:
tensor = tensor.half() # Convert to FP16
# Normalize embeddings on GPU
tensor = torch.nn.functional.normalize(tensor, p=2, dim=1)
# Convert back to CPU and list
if self.config.mixed_precision:
tensor = tensor.float() # Convert back to FP32
return tensor.cpu().tolist()
except Exception as e:
self.logger.warning(f"GPU optimization failed, using original embeddings: {e}")
return embeddings
async def embed_documents_chunked(
self,
texts: List[str],
chunk_size: Optional[int] = None
) -> List[List[float]]:
"""
Embed large document collections in chunks with GPU batching
Args:
texts: List of all texts to embed
chunk_size: Size of each chunk (defaults to config batch_size)
Returns:
List of all embeddings
"""
chunk_size = chunk_size or self.config.batch_size
self.logger.info(f"Embedding {len(texts)} texts in chunks of {chunk_size}")
all_embeddings = []
# Process in chunks
for i in range(0, len(texts), chunk_size):
chunk = texts[i:i + chunk_size]
# Get embeddings for chunk
chunk_embeddings = await self.embed_batch(chunk)
all_embeddings.extend(chunk_embeddings)
# Log progress
if (i + chunk_size) % (chunk_size * 10) == 0 or i + chunk_size >= len(texts):
progress = min(i + chunk_size, len(texts))
self.logger.info(f"Embedded {progress}/{len(texts)} texts")
return all_embeddings
async def embed_concurrent(
self,
text_batches: List[List[str]],
max_concurrent: int = 4
) -> List[List[List[float]]]:
"""
Embed multiple batches concurrently with GPU
Args:
text_batches: List of text batches
max_concurrent: Maximum concurrent embedding calls
Returns:
List of embedding batches
"""
semaphore = asyncio.Semaphore(max_concurrent)
async def embed_with_semaphore(batch: List[str]) -> List[List[float]]:
async with semaphore:
return await self.embed_batch(batch)
# Create tasks for all batches
tasks = [embed_with_semaphore(batch) for batch in text_batches]
# Execute concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle errors
final_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
self.logger.error(f"Batch {i} failed: {result}")
raise result
final_results.append(result)
return final_results
def get_stats(self) -> Dict[str, Any]:
"""Get embedding statistics"""
avg_time_per_embedding = (
self.stats["total_time"] / self.stats["total_embeddings"]
if self.stats["total_embeddings"] > 0 else 0
)
return {
**self.stats,
"use_gpu": self.use_gpu,
"device": str(self.device),
"avg_time_per_embedding_ms": avg_time_per_embedding * 1000,
"embeddings_per_second": (
self.stats["total_embeddings"] / self.stats["total_time"]
if self.stats["total_time"] > 0 else 0
),
}
def clear_gpu_cache(self):
"""Clear GPU cache to free memory"""
if self.use_gpu and GPU_AVAILABLE:
torch.cuda.empty_cache()
self.logger.info("GPU cache cleared")
class GPUVectorSearchAccelerator:
"""
GPU-accelerated vector similarity search
Features:
- Batch similarity computation on GPU
- Fast top-k selection
- Memory-efficient operations
"""
def __init__(
self,
config: Optional[GPUConfig] = None,
logger: Optional[logging.Logger] = None
):
"""
Initialize GPU vector search accelerator
Args:
config: GPU configuration
logger: Optional logger
"""
self.config = config or GPUConfig()
self.logger = logger or logging.getLogger(__name__)
# Determine device
if GPU_AVAILABLE and CUDA_AVAILABLE and self.config.enable_gpu:
self.device = torch.device(self.config.device)
self.use_gpu = True
else:
self.device = torch.device("cpu")
self.use_gpu = False
def compute_similarities_batch(
self,
query_vectors: List[List[float]],
database_vectors: List[List[float]],
top_k: int = 10
) -> List[Tuple[List[int], List[float]]]:
"""
Compute top-k similarities for multiple queries
Args:
query_vectors: List of query vectors
database_vectors: List of database vectors
top_k: Number of top results to return per query
Returns:
List of (indices, similarities) tuples for each query
"""
if not query_vectors or not database_vectors:
return []
try:
# Convert to tensors
queries = torch.tensor(query_vectors, dtype=torch.float32, device=self.device)
database = torch.tensor(database_vectors, dtype=torch.float32, device=self.device)
# Normalize vectors
queries = torch.nn.functional.normalize(queries, p=2, dim=1)
database = torch.nn.functional.normalize(database, p=2, dim=1)
# Compute cosine similarities (batch matrix multiplication)
similarities = torch.mm(queries, database.t())
# Get top-k for each query
top_k = min(top_k, database.shape[0])
top_scores, top_indices = torch.topk(similarities, k=top_k, dim=1)
# Convert to lists
results = []
for i in range(len(query_vectors)):
indices = top_indices[i].cpu().tolist()
scores = top_scores[i].cpu().tolist()
results.append((indices, scores))
return results
except Exception as e:
self.logger.error(f"Error in GPU similarity search: {e}", exc_info=True)
# Fallback to CPU
return self._compute_similarities_cpu(query_vectors, database_vectors, top_k)
def _compute_similarities_cpu(
self,
query_vectors: List[List[float]],
database_vectors: List[List[float]],
top_k: int
) -> List[Tuple[List[int], List[float]]]:
"""CPU fallback for similarity computation"""
import numpy as np
queries = np.array(query_vectors)
database = np.array(database_vectors)
# Normalize
queries = queries / np.linalg.norm(queries, axis=1, keepdims=True)
database = database / np.linalg.norm(database, axis=1, keepdims=True)
# Compute similarities
similarities = np.dot(queries, database.T)
# Get top-k
results = []
for sim_row in similarities:
top_k_actual = min(top_k, len(sim_row))
top_indices = np.argpartition(sim_row, -top_k_actual)[-top_k_actual:]
top_indices = top_indices[np.argsort(-sim_row[top_indices])]
top_scores = sim_row[top_indices].tolist()
results.append((top_indices.tolist(), top_scores))
return results
def create_gpu_accelerated_embedding_func(
base_embedding_func: Callable,
config: Optional[GPUConfig] = None
) -> Callable:
"""
Create a GPU-accelerated version of an embedding function
Args:
base_embedding_func: Original embedding function
config: GPU configuration
Returns:
Accelerated embedding function
Example:
```python
# Original embedding function
async def my_embedding_func(texts: List[str]) -> List[List[float]]:
# ... API call or model inference ...
return embeddings
# Create GPU-accelerated version
gpu_embedding_func = create_gpu_accelerated_embedding_func(
my_embedding_func,
config=GPUConfig(batch_size=64)
)
# Use in RAGAnything
rag = await create_rag_anything(
llm_model_func=my_llm,
embedding_func=gpu_embedding_func
)
```
"""
accelerator = GPUEmbeddingAccelerator(base_embedding_func, config)
async def accelerated_func(texts: List[str]) -> List[List[float]]:
return await accelerator.embed_batch(texts)
# Attach accelerator for access to stats
accelerated_func.accelerator = accelerator
return accelerated_func
def check_gpu_availability() -> Dict[str, Any]:
"""
Check GPU availability and return system information
Returns:
Dictionary with GPU information
"""
info = {
"pytorch_available": GPU_AVAILABLE,
"cuda_available": CUDA_AVAILABLE,
"device_count": 0,
"devices": [],
}
if GPU_AVAILABLE and CUDA_AVAILABLE:
info["device_count"] = torch.cuda.device_count()
info["cuda_version"] = torch.version.cuda
for i in range(torch.cuda.device_count()):
props = torch.cuda.get_device_properties(i)
info["devices"].append({
"id": i,
"name": props.name,
"total_memory_gb": props.total_memory / 1e9,
"compute_capability": f"{props.major}.{props.minor}",
})
return info
# Export main classes and functions
__all__ = [
"GPUConfig",
"GPUEmbeddingAccelerator",
"GPUVectorSearchAccelerator",
"create_gpu_accelerated_embedding_func",
"check_gpu_availability",
"GPU_AVAILABLE",
"CUDA_AVAILABLE",
]