| """ |
| GPU Acceleration Module for RAGAnything |
| |
| Provides GPU-accelerated operations for: |
| - Batch embeddings with automatic GPU detection |
| - Optimized vector similarity search |
| - Concurrent API calls with GPU batching |
| - Memory-efficient GPU utilization |
| |
| Author: RAGAnything Performance Team |
| """ |
|
|
| import asyncio |
| import logging |
| import time |
| from typing import List, Optional, Callable, Any, Tuple, Dict |
| from dataclasses import dataclass, field |
| import numpy as np |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| GPU_AVAILABLE = False |
| CUDA_AVAILABLE = False |
|
|
| try: |
| import torch |
| GPU_AVAILABLE = True |
| CUDA_AVAILABLE = torch.cuda.is_available() |
| if CUDA_AVAILABLE: |
| logger.info(f"GPU acceleration available: {torch.cuda.get_device_name(0)}") |
| logger.info(f"CUDA version: {torch.version.cuda}") |
| logger.info(f"GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB") |
| except ImportError: |
| logger.warning("PyTorch not available. GPU acceleration disabled.") |
| torch = None |
|
|
|
|
| @dataclass |
| class GPUConfig: |
| """Configuration for GPU acceleration""" |
|
|
| enable_gpu: bool = True |
| """Enable GPU acceleration if available""" |
|
|
| device: str = "cuda" |
| """Device to use: 'cuda', 'cuda:0', 'cpu'""" |
|
|
| batch_size: int = 64 |
| """Batch size for GPU operations""" |
|
|
| max_gpu_memory_gb: float = 4.0 |
| """Maximum GPU memory to use (GB)""" |
|
|
| prefetch_batches: int = 2 |
| """Number of batches to prefetch""" |
|
|
| pin_memory: bool = True |
| """Use pinned memory for faster CPU->GPU transfers""" |
|
|
| num_workers: int = 2 |
| """Number of worker threads for data loading""" |
|
|
| mixed_precision: bool = True |
| """Use FP16 for embeddings to save memory""" |
|
|
| optimize_for_inference: bool = True |
| """Apply inference-specific optimizations""" |
|
|
|
|
| class GPUEmbeddingAccelerator: |
| """ |
| GPU-accelerated embedding generation |
| |
| Features: |
| - Automatic batching for GPU |
| - Memory-efficient processing |
| - Mixed precision support |
| - Concurrent API calls with batching |
| """ |
|
|
| def __init__( |
| self, |
| embedding_func: Callable, |
| config: Optional[GPUConfig] = None, |
| logger: Optional[logging.Logger] = None |
| ): |
| """ |
| Initialize GPU embedding accelerator |
| |
| Args: |
| embedding_func: Base embedding function to accelerate |
| config: GPU configuration |
| logger: Optional logger |
| """ |
| self.embedding_func = embedding_func |
| self.config = config or GPUConfig() |
| self.logger = logger or logging.getLogger(__name__) |
|
|
| |
| if GPU_AVAILABLE and CUDA_AVAILABLE and self.config.enable_gpu: |
| self.device = torch.device(self.config.device) |
| self.use_gpu = True |
| self.logger.info(f"Using GPU device: {self.device}") |
| else: |
| self.device = torch.device("cpu") |
| self.use_gpu = False |
| self.logger.info("Using CPU for embeddings") |
|
|
| |
| self.stats = { |
| "total_embeddings": 0, |
| "total_batches": 0, |
| "total_time": 0.0, |
| "gpu_time": 0.0, |
| "cpu_time": 0.0, |
| } |
|
|
| async def embed_batch(self, texts: List[str]) -> List[List[float]]: |
| """ |
| Generate embeddings for a batch of texts with GPU acceleration |
| |
| Args: |
| texts: List of texts to embed |
| |
| Returns: |
| List of embedding vectors |
| """ |
| if not texts: |
| return [] |
|
|
| start_time = time.time() |
|
|
| try: |
| |
| embeddings = await self._call_embedding_func(texts) |
|
|
| |
| if self.use_gpu and GPU_AVAILABLE: |
| embeddings = self._optimize_embeddings_gpu(embeddings) |
|
|
| |
| elapsed = time.time() - start_time |
| self.stats["total_embeddings"] += len(texts) |
| self.stats["total_batches"] += 1 |
| self.stats["total_time"] += elapsed |
|
|
| if self.use_gpu: |
| self.stats["gpu_time"] += elapsed |
| else: |
| self.stats["cpu_time"] += elapsed |
|
|
| return embeddings |
|
|
| except Exception as e: |
| self.logger.error(f"Error in embed_batch: {e}", exc_info=True) |
| raise |
|
|
| async def _call_embedding_func(self, texts: List[str]) -> List[List[float]]: |
| """Call the original embedding function""" |
| if asyncio.iscoroutinefunction(self.embedding_func): |
| return await self.embedding_func(texts) |
| else: |
| |
| loop = asyncio.get_event_loop() |
| return await loop.run_in_executor(None, self.embedding_func, texts) |
|
|
| def _optimize_embeddings_gpu(self, embeddings: List[List[float]]) -> List[List[float]]: |
| """Optimize embeddings using GPU""" |
| try: |
| |
| tensor = torch.tensor(embeddings, dtype=torch.float32, device=self.device) |
|
|
| |
| if self.config.mixed_precision: |
| tensor = tensor.half() |
|
|
| |
| tensor = torch.nn.functional.normalize(tensor, p=2, dim=1) |
|
|
| |
| if self.config.mixed_precision: |
| tensor = tensor.float() |
|
|
| return tensor.cpu().tolist() |
|
|
| except Exception as e: |
| self.logger.warning(f"GPU optimization failed, using original embeddings: {e}") |
| return embeddings |
|
|
| async def embed_documents_chunked( |
| self, |
| texts: List[str], |
| chunk_size: Optional[int] = None |
| ) -> List[List[float]]: |
| """ |
| Embed large document collections in chunks with GPU batching |
| |
| Args: |
| texts: List of all texts to embed |
| chunk_size: Size of each chunk (defaults to config batch_size) |
| |
| Returns: |
| List of all embeddings |
| """ |
| chunk_size = chunk_size or self.config.batch_size |
|
|
| self.logger.info(f"Embedding {len(texts)} texts in chunks of {chunk_size}") |
|
|
| all_embeddings = [] |
|
|
| |
| for i in range(0, len(texts), chunk_size): |
| chunk = texts[i:i + chunk_size] |
|
|
| |
| chunk_embeddings = await self.embed_batch(chunk) |
| all_embeddings.extend(chunk_embeddings) |
|
|
| |
| if (i + chunk_size) % (chunk_size * 10) == 0 or i + chunk_size >= len(texts): |
| progress = min(i + chunk_size, len(texts)) |
| self.logger.info(f"Embedded {progress}/{len(texts)} texts") |
|
|
| return all_embeddings |
|
|
| async def embed_concurrent( |
| self, |
| text_batches: List[List[str]], |
| max_concurrent: int = 4 |
| ) -> List[List[List[float]]]: |
| """ |
| Embed multiple batches concurrently with GPU |
| |
| Args: |
| text_batches: List of text batches |
| max_concurrent: Maximum concurrent embedding calls |
| |
| Returns: |
| List of embedding batches |
| """ |
| semaphore = asyncio.Semaphore(max_concurrent) |
|
|
| async def embed_with_semaphore(batch: List[str]) -> List[List[float]]: |
| async with semaphore: |
| return await self.embed_batch(batch) |
|
|
| |
| tasks = [embed_with_semaphore(batch) for batch in text_batches] |
|
|
| |
| results = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
| |
| final_results = [] |
| for i, result in enumerate(results): |
| if isinstance(result, Exception): |
| self.logger.error(f"Batch {i} failed: {result}") |
| raise result |
| final_results.append(result) |
|
|
| return final_results |
|
|
| def get_stats(self) -> Dict[str, Any]: |
| """Get embedding statistics""" |
| avg_time_per_embedding = ( |
| self.stats["total_time"] / self.stats["total_embeddings"] |
| if self.stats["total_embeddings"] > 0 else 0 |
| ) |
|
|
| return { |
| **self.stats, |
| "use_gpu": self.use_gpu, |
| "device": str(self.device), |
| "avg_time_per_embedding_ms": avg_time_per_embedding * 1000, |
| "embeddings_per_second": ( |
| self.stats["total_embeddings"] / self.stats["total_time"] |
| if self.stats["total_time"] > 0 else 0 |
| ), |
| } |
|
|
| def clear_gpu_cache(self): |
| """Clear GPU cache to free memory""" |
| if self.use_gpu and GPU_AVAILABLE: |
| torch.cuda.empty_cache() |
| self.logger.info("GPU cache cleared") |
|
|
|
|
| class GPUVectorSearchAccelerator: |
| """ |
| GPU-accelerated vector similarity search |
| |
| Features: |
| - Batch similarity computation on GPU |
| - Fast top-k selection |
| - Memory-efficient operations |
| """ |
|
|
| def __init__( |
| self, |
| config: Optional[GPUConfig] = None, |
| logger: Optional[logging.Logger] = None |
| ): |
| """ |
| Initialize GPU vector search accelerator |
| |
| Args: |
| config: GPU configuration |
| logger: Optional logger |
| """ |
| self.config = config or GPUConfig() |
| self.logger = logger or logging.getLogger(__name__) |
|
|
| |
| if GPU_AVAILABLE and CUDA_AVAILABLE and self.config.enable_gpu: |
| self.device = torch.device(self.config.device) |
| self.use_gpu = True |
| else: |
| self.device = torch.device("cpu") |
| self.use_gpu = False |
|
|
| def compute_similarities_batch( |
| self, |
| query_vectors: List[List[float]], |
| database_vectors: List[List[float]], |
| top_k: int = 10 |
| ) -> List[Tuple[List[int], List[float]]]: |
| """ |
| Compute top-k similarities for multiple queries |
| |
| Args: |
| query_vectors: List of query vectors |
| database_vectors: List of database vectors |
| top_k: Number of top results to return per query |
| |
| Returns: |
| List of (indices, similarities) tuples for each query |
| """ |
| if not query_vectors or not database_vectors: |
| return [] |
|
|
| try: |
| |
| queries = torch.tensor(query_vectors, dtype=torch.float32, device=self.device) |
| database = torch.tensor(database_vectors, dtype=torch.float32, device=self.device) |
|
|
| |
| queries = torch.nn.functional.normalize(queries, p=2, dim=1) |
| database = torch.nn.functional.normalize(database, p=2, dim=1) |
|
|
| |
| similarities = torch.mm(queries, database.t()) |
|
|
| |
| top_k = min(top_k, database.shape[0]) |
| top_scores, top_indices = torch.topk(similarities, k=top_k, dim=1) |
|
|
| |
| results = [] |
| for i in range(len(query_vectors)): |
| indices = top_indices[i].cpu().tolist() |
| scores = top_scores[i].cpu().tolist() |
| results.append((indices, scores)) |
|
|
| return results |
|
|
| except Exception as e: |
| self.logger.error(f"Error in GPU similarity search: {e}", exc_info=True) |
| |
| return self._compute_similarities_cpu(query_vectors, database_vectors, top_k) |
|
|
| def _compute_similarities_cpu( |
| self, |
| query_vectors: List[List[float]], |
| database_vectors: List[List[float]], |
| top_k: int |
| ) -> List[Tuple[List[int], List[float]]]: |
| """CPU fallback for similarity computation""" |
| import numpy as np |
|
|
| queries = np.array(query_vectors) |
| database = np.array(database_vectors) |
|
|
| |
| queries = queries / np.linalg.norm(queries, axis=1, keepdims=True) |
| database = database / np.linalg.norm(database, axis=1, keepdims=True) |
|
|
| |
| similarities = np.dot(queries, database.T) |
|
|
| |
| results = [] |
| for sim_row in similarities: |
| top_k_actual = min(top_k, len(sim_row)) |
| top_indices = np.argpartition(sim_row, -top_k_actual)[-top_k_actual:] |
| top_indices = top_indices[np.argsort(-sim_row[top_indices])] |
| top_scores = sim_row[top_indices].tolist() |
| results.append((top_indices.tolist(), top_scores)) |
|
|
| return results |
|
|
|
|
| def create_gpu_accelerated_embedding_func( |
| base_embedding_func: Callable, |
| config: Optional[GPUConfig] = None |
| ) -> Callable: |
| """ |
| Create a GPU-accelerated version of an embedding function |
| |
| Args: |
| base_embedding_func: Original embedding function |
| config: GPU configuration |
| |
| Returns: |
| Accelerated embedding function |
| |
| Example: |
| ```python |
| # Original embedding function |
| async def my_embedding_func(texts: List[str]) -> List[List[float]]: |
| # ... API call or model inference ... |
| return embeddings |
| |
| # Create GPU-accelerated version |
| gpu_embedding_func = create_gpu_accelerated_embedding_func( |
| my_embedding_func, |
| config=GPUConfig(batch_size=64) |
| ) |
| |
| # Use in RAGAnything |
| rag = await create_rag_anything( |
| llm_model_func=my_llm, |
| embedding_func=gpu_embedding_func |
| ) |
| ``` |
| """ |
| accelerator = GPUEmbeddingAccelerator(base_embedding_func, config) |
|
|
| async def accelerated_func(texts: List[str]) -> List[List[float]]: |
| return await accelerator.embed_batch(texts) |
|
|
| |
| accelerated_func.accelerator = accelerator |
|
|
| return accelerated_func |
|
|
|
|
| def check_gpu_availability() -> Dict[str, Any]: |
| """ |
| Check GPU availability and return system information |
| |
| Returns: |
| Dictionary with GPU information |
| """ |
| info = { |
| "pytorch_available": GPU_AVAILABLE, |
| "cuda_available": CUDA_AVAILABLE, |
| "device_count": 0, |
| "devices": [], |
| } |
|
|
| if GPU_AVAILABLE and CUDA_AVAILABLE: |
| info["device_count"] = torch.cuda.device_count() |
| info["cuda_version"] = torch.version.cuda |
|
|
| for i in range(torch.cuda.device_count()): |
| props = torch.cuda.get_device_properties(i) |
| info["devices"].append({ |
| "id": i, |
| "name": props.name, |
| "total_memory_gb": props.total_memory / 1e9, |
| "compute_capability": f"{props.major}.{props.minor}", |
| }) |
|
|
| return info |
|
|
|
|
| |
| __all__ = [ |
| "GPUConfig", |
| "GPUEmbeddingAccelerator", |
| "GPUVectorSearchAccelerator", |
| "create_gpu_accelerated_embedding_func", |
| "check_gpu_availability", |
| "GPU_AVAILABLE", |
| "CUDA_AVAILABLE", |
| ] |
|
|