File size: 9,723 Bytes
167596f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 | # Batch Processing
This document describes the batch processing feature for RAG-Anything, which allows you to process multiple documents in parallel for improved throughput.
## Overview
The batch processing feature allows you to process multiple documents concurrently, significantly improving throughput for large document collections. It provides parallel processing, progress tracking, error handling, and flexible configuration options.
## Key Features
- **Parallel Processing**: Process multiple files concurrently using thread pools
- **Progress Tracking**: Real-time progress bars with `tqdm`
- **Error Handling**: Comprehensive error reporting and recovery
- **Flexible Input**: Support for files, directories, and recursive search
- **Configurable Workers**: Adjustable number of parallel workers
- **Installation Check Bypass**: Optional skip for environments with package conflicts
## Installation
```bash
# Basic installation
pip install raganything[all]
# Required for batch processing
pip install tqdm
```
## Usage
### Basic Batch Processing
```python
from raganything.batch_parser import BatchParser
# Create batch parser
batch_parser = BatchParser(
parser_type="mineru", # or "docling"
max_workers=4,
show_progress=True,
timeout_per_file=300,
skip_installation_check=False # Set to True if having parser installation issues
)
# Process multiple files
result = batch_parser.process_batch(
file_paths=["doc1.pdf", "doc2.docx", "folder/"],
output_dir="./batch_output",
parse_method="auto",
recursive=True
)
# Check results
print(result.summary())
print(f"Success rate: {result.success_rate:.1f}%")
print(f"Processing time: {result.processing_time:.2f} seconds")
```
### Asynchronous Batch Processing
```python
import asyncio
from raganything.batch_parser import BatchParser
async def async_batch_processing():
batch_parser = BatchParser(
parser_type="mineru",
max_workers=4,
show_progress=True
)
# Process files asynchronously
result = await batch_parser.process_batch_async(
file_paths=["doc1.pdf", "doc2.docx"],
output_dir="./output",
parse_method="auto"
)
return result
# Run async processing
result = asyncio.run(async_batch_processing())
```
### Integration with RAG-Anything
```python
from raganything import RAGAnything
rag = RAGAnything()
# Process documents with batch functionality
result = rag.process_documents_batch(
file_paths=["doc1.pdf", "doc2.docx"],
output_dir="./output",
max_workers=4,
show_progress=True
)
print(f"Processed {len(result.successful_files)} files successfully")
```
### Process Documents with RAG Integration
```python
# Process documents in batch and then add them to RAG
result = await rag.process_documents_with_rag_batch(
file_paths=["doc1.pdf", "doc2.docx"],
output_dir="./output",
max_workers=4,
show_progress=True
)
print(f"Processed {result['successful_rag_files']} files with RAG")
print(f"Total processing time: {result['total_processing_time']:.2f} seconds")
```
### Command Line Interface
```bash
# Basic batch processing
python -m raganything.batch_parser path/to/docs/ --output ./output --workers 4
# With specific parser
python -m raganything.batch_parser path/to/docs/ --parser mineru --method auto
# Without progress bar
python -m raganything.batch_parser path/to/docs/ --output ./output --no-progress
# Help
python -m raganything.batch_parser --help
```
## Configuration
### Environment Variables
```env
# Batch processing configuration
MAX_CONCURRENT_FILES=4
SUPPORTED_FILE_EXTENSIONS=.pdf,.docx,.doc,.pptx,.ppt,.xlsx,.xls,.txt,.md
RECURSIVE_FOLDER_PROCESSING=true
PARSER_OUTPUT_DIR=./parsed_output
```
### BatchParser Parameters
- **parser_type**: `"mineru"` or `"docling"` (default: `"mineru"`)
- **max_workers**: Number of parallel workers (default: `4`)
- **show_progress**: Show progress bar (default: `True`)
- **timeout_per_file**: Timeout per file in seconds (default: `300`)
- **skip_installation_check**: Skip parser installation check (default: `False`)
## Supported File Types
- **PDF files**: `.pdf`
- **Office documents**: `.doc`, `.docx`, `.ppt`, `.pptx`, `.xls`, `.xlsx`
- **Images**: `.png`, `.jpg`, `.jpeg`, `.bmp`, `.tiff`, `.tif`, `.gif`, `.webp`
- **Text files**: `.txt`, `.md`
## API Reference
### BatchProcessingResult
```python
@dataclass
class BatchProcessingResult:
successful_files: List[str] # Successfully processed files
failed_files: List[str] # Failed files
total_files: int # Total number of files
processing_time: float # Total processing time in seconds
errors: Dict[str, str] # Error messages for failed files
output_dir: str # Output directory used
def summary(self) -> str: # Human-readable summary
def success_rate(self) -> float: # Success rate as percentage
```
### BatchParser Methods
```python
class BatchParser:
def __init__(self, parser_type: str = "mineru", max_workers: int = 4, ...):
"""Initialize batch parser"""
def get_supported_extensions(self) -> List[str]:
"""Get list of supported file extensions"""
def filter_supported_files(self, file_paths: List[str], recursive: bool = True) -> List[str]:
"""Filter files to only supported types"""
def process_batch(self, file_paths: List[str], output_dir: str, ...) -> BatchProcessingResult:
"""Process files in batch"""
async def process_batch_async(self, file_paths: List[str], output_dir: str, ...) -> BatchProcessingResult:
"""Process files in batch asynchronously"""
```
## Performance Considerations
### Memory Usage
- Each worker uses additional memory
- Recommended: 2-4 workers for most systems
- Monitor memory usage with large files
### CPU Usage
- Parallel processing utilizes multiple cores
- Optimal worker count depends on CPU cores and file sizes
- I/O may become bottleneck with many small files
### Recommended Settings
- **Small files** (< 1MB): Higher worker count (6-8)
- **Large files** (> 100MB): Lower worker count (2-3)
- **Mixed sizes**: Start with 4 workers and adjust
## Troubleshooting
### Common Issues
#### Memory Errors
```python
# Solution: Reduce max_workers
batch_parser = BatchParser(max_workers=2)
```
#### Timeout Errors
```python
# Solution: Increase timeout_per_file
batch_parser = BatchParser(timeout_per_file=600) # 10 minutes
```
#### Parser Installation Issues
```python
# Solution: Skip installation check
batch_parser = BatchParser(skip_installation_check=True)
```
#### File Not Found Errors
- Check file paths and permissions
- Ensure input files exist
- Verify directory access rights
### Debug Mode
Enable debug logging for detailed information:
```python
import logging
logging.basicConfig(level=logging.DEBUG)
# Create batch parser with debug logging
batch_parser = BatchParser(parser_type="mineru", max_workers=2)
```
### Error Handling
The batch processor provides comprehensive error handling:
```python
result = batch_parser.process_batch(file_paths=["doc1.pdf", "doc2.docx"])
# Check for errors
if result.failed_files:
print("Failed files:")
for file_path in result.failed_files:
error_message = result.errors.get(file_path, "Unknown error")
print(f" - {file_path}: {error_message}")
# Process only successful files
for file_path in result.successful_files:
print(f"Successfully processed: {file_path}")
```
## Examples
### Process Entire Directory
```python
from pathlib import Path
# Process all supported files in a directory
batch_parser = BatchParser(max_workers=4)
directory_path = Path("./documents")
result = batch_parser.process_batch(
file_paths=[str(directory_path)],
output_dir="./processed",
recursive=True # Include subdirectories
)
print(f"Processed {len(result.successful_files)} out of {result.total_files} files")
```
### Filter Files Before Processing
```python
# Get all files in directory
all_files = ["doc1.pdf", "image.png", "spreadsheet.xlsx", "unsupported.xyz"]
# Filter to supported files only
supported_files = batch_parser.filter_supported_files(all_files)
print(f"Will process {len(supported_files)} out of {len(all_files)} files")
# Process only supported files
result = batch_parser.process_batch(
file_paths=supported_files,
output_dir="./output"
)
```
### Custom Error Handling
```python
def process_with_retry(file_paths, max_retries=3):
"""Process files with retry logic"""
for attempt in range(max_retries):
result = batch_parser.process_batch(file_paths, "./output")
if not result.failed_files:
break # All files processed successfully
print(f"Attempt {attempt + 1}: {len(result.failed_files)} files failed")
file_paths = result.failed_files # Retry failed files
return result
```
## Best Practices
1. **Start with default settings** and adjust based on performance
2. **Monitor system resources** during batch processing
3. **Use appropriate worker counts** for your hardware
4. **Handle errors gracefully** with retry logic
5. **Test with small batches** before processing large collections
6. **Use skip_installation_check** if facing parser installation issues
7. **Enable progress tracking** for long-running operations
8. **Set appropriate timeouts** based on expected file processing times
## Conclusion
The batch processing feature significantly improves RAG-Anything's throughput for large document collections. It provides flexible configuration options, comprehensive error handling, and seamless integration with the existing RAG-Anything pipeline.
|