| """ |
| Batch processing functionality for RAGAnything |
| |
| Contains methods for processing multiple documents in batch mode |
| """ |
|
|
| import asyncio |
| import logging |
| from pathlib import Path |
| from typing import List, Dict, Any, Optional, TYPE_CHECKING, Callable |
| import time |
|
|
| from .batch_parser import BatchParser, BatchProcessingResult |
| from .batch_optimizer import BatchOptimizer, ProgressTracker |
|
|
| if TYPE_CHECKING: |
| from .config import RAGAnythingConfig |
|
|
|
|
| class BatchMixin: |
| """BatchMixin class containing batch processing functionality for RAGAnything""" |
|
|
| |
| config: "RAGAnythingConfig" |
| logger: logging.Logger |
|
|
| |
| async def _ensure_lightrag_initialized(self) -> None: ... |
| async def process_document_complete(self, file_path: str, **kwargs) -> None: ... |
|
|
| |
| |
| |
|
|
| async def process_folder_complete( |
| self, |
| folder_path: str, |
| output_dir: str = None, |
| parse_method: str = None, |
| display_stats: bool = None, |
| split_by_character: str | None = None, |
| split_by_character_only: bool = False, |
| file_extensions: Optional[List[str]] = None, |
| recursive: bool = None, |
| max_workers: int = None, |
| ): |
| """ |
| Process all supported files in a folder |
| |
| Args: |
| folder_path: Path to the folder containing files to process |
| output_dir: Directory for parsed outputs (optional) |
| parse_method: Parsing method to use (optional) |
| display_stats: Whether to display statistics (optional) |
| split_by_character: Character to split by (optional) |
| split_by_character_only: Whether to split only by character (optional) |
| file_extensions: List of file extensions to process (optional) |
| recursive: Whether to process folders recursively (optional) |
| max_workers: Maximum number of workers for concurrent processing (optional) |
| """ |
| if output_dir is None: |
| output_dir = self.config.parser_output_dir |
| if parse_method is None: |
| parse_method = self.config.parse_method |
| if display_stats is None: |
| display_stats = True |
| if file_extensions is None: |
| file_extensions = self.config.supported_file_extensions |
| if recursive is None: |
| recursive = self.config.recursive_folder_processing |
| if max_workers is None: |
| max_workers = self.config.max_concurrent_files |
|
|
| await self._ensure_lightrag_initialized() |
|
|
| |
| folder_path_obj = Path(folder_path) |
| if not folder_path_obj.exists(): |
| raise FileNotFoundError(f"Folder not found: {folder_path}") |
|
|
| |
| files_to_process = [] |
| for file_ext in file_extensions: |
| if recursive: |
| pattern = f"**/*{file_ext}" |
| else: |
| pattern = f"*{file_ext}" |
| files_to_process.extend(folder_path_obj.glob(pattern)) |
|
|
| if not files_to_process: |
| self.logger.warning(f"No supported files found in {folder_path}") |
| return |
|
|
| self.logger.info( |
| f"Found {len(files_to_process)} files to process in {folder_path}" |
| ) |
|
|
| |
| output_path = Path(output_dir) |
| output_path.mkdir(parents=True, exist_ok=True) |
|
|
| |
| semaphore = asyncio.Semaphore(max_workers) |
| tasks = [] |
|
|
| async def process_single_file(file_path: Path): |
| async with semaphore: |
| try: |
| await self.process_document_complete( |
| str(file_path), |
| output_dir=output_dir, |
| parse_method=parse_method, |
| split_by_character=split_by_character, |
| split_by_character_only=split_by_character_only, |
| ) |
| return True, str(file_path), None |
| except Exception as e: |
| self.logger.error(f"Failed to process {file_path}: {str(e)}") |
| return False, str(file_path), str(e) |
|
|
| |
| for file_path in files_to_process: |
| task = asyncio.create_task(process_single_file(file_path)) |
| tasks.append(task) |
|
|
| |
| results = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
| |
| successful_files = [] |
| failed_files = [] |
| for result in results: |
| if isinstance(result, Exception): |
| failed_files.append(("unknown", str(result))) |
| else: |
| success, file_path, error = result |
| if success: |
| successful_files.append(file_path) |
| else: |
| failed_files.append((file_path, error)) |
|
|
| |
| if display_stats: |
| self.logger.info("Processing complete!") |
| self.logger.info(f" Successful: {len(successful_files)} files") |
| self.logger.info(f" Failed: {len(failed_files)} files") |
| if failed_files: |
| self.logger.warning("Failed files:") |
| for file_path, error in failed_files: |
| self.logger.warning(f" - {file_path}: {error}") |
|
|
| |
| |
| |
|
|
| def process_documents_batch( |
| self, |
| file_paths: List[str], |
| output_dir: Optional[str] = None, |
| parse_method: Optional[str] = None, |
| max_workers: Optional[int] = None, |
| recursive: Optional[bool] = None, |
| show_progress: bool = True, |
| **kwargs, |
| ) -> BatchProcessingResult: |
| """ |
| Process multiple documents in batch using the new BatchParser |
| |
| Args: |
| file_paths: List of file paths or directories to process |
| output_dir: Output directory for parsed files |
| parse_method: Parsing method to use |
| max_workers: Maximum number of workers for parallel processing |
| recursive: Whether to process directories recursively |
| show_progress: Whether to show progress bar |
| **kwargs: Additional arguments passed to the parser |
| |
| Returns: |
| BatchProcessingResult: Results of the batch processing |
| """ |
| |
| if output_dir is None: |
| output_dir = self.config.parser_output_dir |
| if parse_method is None: |
| parse_method = self.config.parse_method |
| if max_workers is None: |
| max_workers = self.config.max_concurrent_files |
| if recursive is None: |
| recursive = self.config.recursive_folder_processing |
|
|
| |
| batch_parser = BatchParser( |
| parser_type=self.config.parser, |
| max_workers=max_workers, |
| show_progress=show_progress, |
| skip_installation_check=True, |
| ) |
|
|
| |
| return batch_parser.process_batch( |
| file_paths=file_paths, |
| output_dir=output_dir, |
| parse_method=parse_method, |
| recursive=recursive, |
| **kwargs, |
| ) |
|
|
| async def process_documents_batch_async( |
| self, |
| file_paths: List[str], |
| output_dir: Optional[str] = None, |
| parse_method: Optional[str] = None, |
| max_workers: Optional[int] = None, |
| recursive: Optional[bool] = None, |
| show_progress: bool = True, |
| **kwargs, |
| ) -> BatchProcessingResult: |
| """ |
| Asynchronously process multiple documents in batch |
| |
| Args: |
| file_paths: List of file paths or directories to process |
| output_dir: Output directory for parsed files |
| parse_method: Parsing method to use |
| max_workers: Maximum number of workers for parallel processing |
| recursive: Whether to process directories recursively |
| show_progress: Whether to show progress bar |
| **kwargs: Additional arguments passed to the parser |
| |
| Returns: |
| BatchProcessingResult: Results of the batch processing |
| """ |
| |
| if output_dir is None: |
| output_dir = self.config.parser_output_dir |
| if parse_method is None: |
| parse_method = self.config.parse_method |
| if max_workers is None: |
| max_workers = self.config.max_concurrent_files |
| if recursive is None: |
| recursive = self.config.recursive_folder_processing |
|
|
| |
| batch_parser = BatchParser( |
| parser_type=self.config.parser, |
| max_workers=max_workers, |
| show_progress=show_progress, |
| skip_installation_check=True, |
| ) |
|
|
| |
| return await batch_parser.process_batch_async( |
| file_paths=file_paths, |
| output_dir=output_dir, |
| parse_method=parse_method, |
| recursive=recursive, |
| **kwargs, |
| ) |
|
|
| def get_supported_file_extensions(self) -> List[str]: |
| """Get list of supported file extensions for batch processing""" |
| batch_parser = BatchParser(parser_type=self.config.parser) |
| return batch_parser.get_supported_extensions() |
|
|
| def filter_supported_files( |
| self, file_paths: List[str], recursive: Optional[bool] = None |
| ) -> List[str]: |
| """ |
| Filter file paths to only include supported file types |
| |
| Args: |
| file_paths: List of file paths to filter |
| recursive: Whether to process directories recursively |
| |
| Returns: |
| List of supported file paths |
| """ |
| if recursive is None: |
| recursive = self.config.recursive_folder_processing |
|
|
| batch_parser = BatchParser(parser_type=self.config.parser) |
| return batch_parser.filter_supported_files(file_paths, recursive) |
|
|
| async def process_documents_with_rag_batch( |
| self, |
| file_paths: List[str], |
| output_dir: Optional[str] = None, |
| parse_method: Optional[str] = None, |
| max_workers: Optional[int] = None, |
| recursive: Optional[bool] = None, |
| show_progress: bool = True, |
| **kwargs, |
| ) -> Dict[str, Any]: |
| """ |
| Process documents in batch and then add them to RAG |
| |
| This method combines document parsing and RAG insertion: |
| 1. First, parse all documents using batch processing |
| 2. Then, process each successfully parsed document with RAG |
| |
| Args: |
| file_paths: List of file paths or directories to process |
| output_dir: Output directory for parsed files |
| parse_method: Parsing method to use |
| max_workers: Maximum number of workers for parallel processing |
| recursive: Whether to process directories recursively |
| show_progress: Whether to show progress bar |
| **kwargs: Additional arguments passed to the parser |
| |
| Returns: |
| Dict containing both parse results and RAG processing results |
| """ |
| start_time = time.time() |
|
|
| |
| if output_dir is None: |
| output_dir = self.config.parser_output_dir |
| if parse_method is None: |
| parse_method = self.config.parse_method |
| if max_workers is None: |
| max_workers = self.config.max_concurrent_files |
| if recursive is None: |
| recursive = self.config.recursive_folder_processing |
|
|
| self.logger.info("Starting batch processing with RAG integration") |
|
|
| |
| parse_result = self.process_documents_batch( |
| file_paths=file_paths, |
| output_dir=output_dir, |
| parse_method=parse_method, |
| max_workers=max_workers, |
| recursive=recursive, |
| show_progress=show_progress, |
| **kwargs, |
| ) |
|
|
| |
| |
| await self._ensure_lightrag_initialized() |
|
|
| |
| rag_results = {} |
|
|
| if parse_result.successful_files: |
| self.logger.info( |
| f"Processing {len(parse_result.successful_files)} files with RAG" |
| ) |
|
|
| |
| for file_path in parse_result.successful_files: |
| try: |
| |
| await self.process_document_complete( |
| file_path, |
| output_dir=output_dir, |
| parse_method=parse_method, |
| **kwargs, |
| ) |
|
|
| |
| |
| rag_results[file_path] = {"status": "success", "processed": True} |
|
|
| except Exception as e: |
| self.logger.error( |
| f"Failed to process {file_path} with RAG: {str(e)}" |
| ) |
| rag_results[file_path] = { |
| "status": "failed", |
| "error": str(e), |
| "processed": False, |
| } |
|
|
| processing_time = time.time() - start_time |
|
|
| return { |
| "parse_result": parse_result, |
| "rag_results": rag_results, |
| "total_processing_time": processing_time, |
| "successful_rag_files": len( |
| [r for r in rag_results.values() if r["processed"]] |
| ), |
| "failed_rag_files": len( |
| [r for r in rag_results.values() if not r["processed"]] |
| ), |
| } |
|
|
| |
| |
| |
|
|
| async def process_documents_batch_optimized( |
| self, |
| file_paths: List[str], |
| output_dir: Optional[str] = None, |
| parse_method: Optional[str] = None, |
| max_concurrent_parsers: int = 4, |
| max_concurrent_processors: int = 10, |
| enable_progress_tracking: bool = True, |
| progress_callback: Optional[Callable] = None, |
| **kwargs, |
| ) -> Dict[str, Any]: |
| """ |
| Process documents with advanced optimizations for speed |
| |
| This method provides significant performance improvements: |
| - Concurrent document parsing with prefetching (2-3x faster) |
| - Pipeline architecture (parse + process in parallel) |
| - Adaptive rate limiting for API calls |
| - Progress tracking with ETA estimation |
| - Intelligent caching |
| |
| Args: |
| file_paths: List of file paths to process |
| output_dir: Output directory for parsed files |
| parse_method: Parsing method to use |
| max_concurrent_parsers: Maximum concurrent document parsers (default: 4) |
| max_concurrent_processors: Maximum concurrent processors (default: 10) |
| enable_progress_tracking: Whether to track and report progress |
| progress_callback: Optional callback for progress updates |
| **kwargs: Additional processing parameters |
| |
| Returns: |
| Dict with successful_files, failed_files, and detailed statistics |
| |
| Example: |
| ```python |
| def progress_update(progress): |
| print(f"Progress: {progress['percentage']:.1f}%") |
| |
| result = await rag.process_documents_batch_optimized( |
| file_paths=["doc1.pdf", "doc2.pdf"], |
| progress_callback=progress_update |
| ) |
| ``` |
| """ |
| |
| if output_dir is None: |
| output_dir = self.config.parser_output_dir |
| if parse_method is None: |
| parse_method = self.config.parse_method |
|
|
| self.logger.info(f"Starting optimized batch processing for {len(file_paths)} documents") |
|
|
| |
| optimizer = BatchOptimizer( |
| max_concurrent_parsers=max_concurrent_parsers, |
| max_concurrent_processors=max_concurrent_processors, |
| prefetch_buffer_size=5, |
| enable_adaptive_rate=True, |
| enable_progress_tracking=enable_progress_tracking, |
| logger=self.logger, |
| ) |
|
|
| if progress_callback: |
| optimizer.set_progress_callback(progress_callback) |
|
|
| |
| result = await optimizer.process_documents_batch_optimized( |
| rag_instance=self, |
| file_paths=file_paths, |
| output_dir=output_dir, |
| parse_method=parse_method, |
| **kwargs |
| ) |
|
|
| return result |
|
|
| async def process_folder_optimized( |
| self, |
| folder_path: str, |
| output_dir: Optional[str] = None, |
| parse_method: Optional[str] = None, |
| file_extensions: Optional[List[str]] = None, |
| recursive: bool = True, |
| max_concurrent_parsers: int = 4, |
| max_concurrent_processors: int = 10, |
| progress_callback: Optional[Callable] = None, |
| **kwargs, |
| ) -> Dict[str, Any]: |
| """ |
| Process all files in a folder with optimization |
| |
| Args: |
| folder_path: Path to folder |
| output_dir: Output directory |
| parse_method: Parse method |
| file_extensions: File extensions to process |
| recursive: Process subfolders |
| max_concurrent_parsers: Max concurrent parsers |
| max_concurrent_processors: Max concurrent processors |
| progress_callback: Progress callback function |
| **kwargs: Additional parameters |
| |
| Returns: |
| Processing results and statistics |
| """ |
| if output_dir is None: |
| output_dir = self.config.parser_output_dir |
| if parse_method is None: |
| parse_method = self.config.parse_method |
| if file_extensions is None: |
| file_extensions = self.config.supported_file_extensions |
|
|
| folder_path_obj = Path(folder_path) |
| if not folder_path_obj.exists(): |
| raise FileNotFoundError(f"Folder not found: {folder_path}") |
|
|
| |
| files_to_process = [] |
| for file_ext in file_extensions: |
| pattern = f"**/*{file_ext}" if recursive else f"*{file_ext}" |
| files_to_process.extend(folder_path_obj.glob(pattern)) |
|
|
| if not files_to_process: |
| self.logger.warning(f"No supported files found in {folder_path}") |
| return {"successful_files": [], "failed_files": [], "statistics": {}} |
|
|
| file_paths_str = [str(f) for f in files_to_process] |
|
|
| return await self.process_documents_batch_optimized( |
| file_paths=file_paths_str, |
| output_dir=output_dir, |
| parse_method=parse_method, |
| max_concurrent_parsers=max_concurrent_parsers, |
| max_concurrent_processors=max_concurrent_processors, |
| progress_callback=progress_callback, |
| **kwargs |
| ) |
|
|