# Batch Processing This document describes the batch processing feature for RAG-Anything, which allows you to process multiple documents in parallel for improved throughput. ## Overview The batch processing feature allows you to process multiple documents concurrently, significantly improving throughput for large document collections. It provides parallel processing, progress tracking, error handling, and flexible configuration options. ## Key Features - **Parallel Processing**: Process multiple files concurrently using thread pools - **Progress Tracking**: Real-time progress bars with `tqdm` - **Error Handling**: Comprehensive error reporting and recovery - **Flexible Input**: Support for files, directories, and recursive search - **Configurable Workers**: Adjustable number of parallel workers - **Installation Check Bypass**: Optional skip for environments with package conflicts ## Installation ```bash # Basic installation pip install raganything[all] # Required for batch processing pip install tqdm ``` ## Usage ### Basic Batch Processing ```python from raganything.batch_parser import BatchParser # Create batch parser batch_parser = BatchParser( parser_type="mineru", # or "docling" max_workers=4, show_progress=True, timeout_per_file=300, skip_installation_check=False # Set to True if having parser installation issues ) # Process multiple files result = batch_parser.process_batch( file_paths=["doc1.pdf", "doc2.docx", "folder/"], output_dir="./batch_output", parse_method="auto", recursive=True ) # Check results print(result.summary()) print(f"Success rate: {result.success_rate:.1f}%") print(f"Processing time: {result.processing_time:.2f} seconds") ``` ### Asynchronous Batch Processing ```python import asyncio from raganything.batch_parser import BatchParser async def async_batch_processing(): batch_parser = BatchParser( parser_type="mineru", max_workers=4, show_progress=True ) # Process files asynchronously result = await batch_parser.process_batch_async( file_paths=["doc1.pdf", "doc2.docx"], output_dir="./output", parse_method="auto" ) return result # Run async processing result = asyncio.run(async_batch_processing()) ``` ### Integration with RAG-Anything ```python from raganything import RAGAnything rag = RAGAnything() # Process documents with batch functionality result = rag.process_documents_batch( file_paths=["doc1.pdf", "doc2.docx"], output_dir="./output", max_workers=4, show_progress=True ) print(f"Processed {len(result.successful_files)} files successfully") ``` ### Process Documents with RAG Integration ```python # Process documents in batch and then add them to RAG result = await rag.process_documents_with_rag_batch( file_paths=["doc1.pdf", "doc2.docx"], output_dir="./output", max_workers=4, show_progress=True ) print(f"Processed {result['successful_rag_files']} files with RAG") print(f"Total processing time: {result['total_processing_time']:.2f} seconds") ``` ### Command Line Interface ```bash # Basic batch processing python -m raganything.batch_parser path/to/docs/ --output ./output --workers 4 # With specific parser python -m raganything.batch_parser path/to/docs/ --parser mineru --method auto # Without progress bar python -m raganything.batch_parser path/to/docs/ --output ./output --no-progress # Help python -m raganything.batch_parser --help ``` ## Configuration ### Environment Variables ```env # Batch processing configuration MAX_CONCURRENT_FILES=4 SUPPORTED_FILE_EXTENSIONS=.pdf,.docx,.doc,.pptx,.ppt,.xlsx,.xls,.txt,.md RECURSIVE_FOLDER_PROCESSING=true PARSER_OUTPUT_DIR=./parsed_output ``` ### BatchParser Parameters - **parser_type**: `"mineru"` or `"docling"` (default: `"mineru"`) - **max_workers**: Number of parallel workers (default: `4`) - **show_progress**: Show progress bar (default: `True`) - **timeout_per_file**: Timeout per file in seconds (default: `300`) - **skip_installation_check**: Skip parser installation check (default: `False`) ## Supported File Types - **PDF files**: `.pdf` - **Office documents**: `.doc`, `.docx`, `.ppt`, `.pptx`, `.xls`, `.xlsx` - **Images**: `.png`, `.jpg`, `.jpeg`, `.bmp`, `.tiff`, `.tif`, `.gif`, `.webp` - **Text files**: `.txt`, `.md` ## API Reference ### BatchProcessingResult ```python @dataclass class BatchProcessingResult: successful_files: List[str] # Successfully processed files failed_files: List[str] # Failed files total_files: int # Total number of files processing_time: float # Total processing time in seconds errors: Dict[str, str] # Error messages for failed files output_dir: str # Output directory used def summary(self) -> str: # Human-readable summary def success_rate(self) -> float: # Success rate as percentage ``` ### BatchParser Methods ```python class BatchParser: def __init__(self, parser_type: str = "mineru", max_workers: int = 4, ...): """Initialize batch parser""" def get_supported_extensions(self) -> List[str]: """Get list of supported file extensions""" def filter_supported_files(self, file_paths: List[str], recursive: bool = True) -> List[str]: """Filter files to only supported types""" def process_batch(self, file_paths: List[str], output_dir: str, ...) -> BatchProcessingResult: """Process files in batch""" async def process_batch_async(self, file_paths: List[str], output_dir: str, ...) -> BatchProcessingResult: """Process files in batch asynchronously""" ``` ## Performance Considerations ### Memory Usage - Each worker uses additional memory - Recommended: 2-4 workers for most systems - Monitor memory usage with large files ### CPU Usage - Parallel processing utilizes multiple cores - Optimal worker count depends on CPU cores and file sizes - I/O may become bottleneck with many small files ### Recommended Settings - **Small files** (< 1MB): Higher worker count (6-8) - **Large files** (> 100MB): Lower worker count (2-3) - **Mixed sizes**: Start with 4 workers and adjust ## Troubleshooting ### Common Issues #### Memory Errors ```python # Solution: Reduce max_workers batch_parser = BatchParser(max_workers=2) ``` #### Timeout Errors ```python # Solution: Increase timeout_per_file batch_parser = BatchParser(timeout_per_file=600) # 10 minutes ``` #### Parser Installation Issues ```python # Solution: Skip installation check batch_parser = BatchParser(skip_installation_check=True) ``` #### File Not Found Errors - Check file paths and permissions - Ensure input files exist - Verify directory access rights ### Debug Mode Enable debug logging for detailed information: ```python import logging logging.basicConfig(level=logging.DEBUG) # Create batch parser with debug logging batch_parser = BatchParser(parser_type="mineru", max_workers=2) ``` ### Error Handling The batch processor provides comprehensive error handling: ```python result = batch_parser.process_batch(file_paths=["doc1.pdf", "doc2.docx"]) # Check for errors if result.failed_files: print("Failed files:") for file_path in result.failed_files: error_message = result.errors.get(file_path, "Unknown error") print(f" - {file_path}: {error_message}") # Process only successful files for file_path in result.successful_files: print(f"Successfully processed: {file_path}") ``` ## Examples ### Process Entire Directory ```python from pathlib import Path # Process all supported files in a directory batch_parser = BatchParser(max_workers=4) directory_path = Path("./documents") result = batch_parser.process_batch( file_paths=[str(directory_path)], output_dir="./processed", recursive=True # Include subdirectories ) print(f"Processed {len(result.successful_files)} out of {result.total_files} files") ``` ### Filter Files Before Processing ```python # Get all files in directory all_files = ["doc1.pdf", "image.png", "spreadsheet.xlsx", "unsupported.xyz"] # Filter to supported files only supported_files = batch_parser.filter_supported_files(all_files) print(f"Will process {len(supported_files)} out of {len(all_files)} files") # Process only supported files result = batch_parser.process_batch( file_paths=supported_files, output_dir="./output" ) ``` ### Custom Error Handling ```python def process_with_retry(file_paths, max_retries=3): """Process files with retry logic""" for attempt in range(max_retries): result = batch_parser.process_batch(file_paths, "./output") if not result.failed_files: break # All files processed successfully print(f"Attempt {attempt + 1}: {len(result.failed_files)} files failed") file_paths = result.failed_files # Retry failed files return result ``` ## Best Practices 1. **Start with default settings** and adjust based on performance 2. **Monitor system resources** during batch processing 3. **Use appropriate worker counts** for your hardware 4. **Handle errors gracefully** with retry logic 5. **Test with small batches** before processing large collections 6. **Use skip_installation_check** if facing parser installation issues 7. **Enable progress tracking** for long-running operations 8. **Set appropriate timeouts** based on expected file processing times ## Conclusion The batch processing feature significantly improves RAG-Anything's throughput for large document collections. It provides flexible configuration options, comprehensive error handling, and seamless integration with the existing RAG-Anything pipeline.