File size: 7,647 Bytes
167596f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 | """
Example: Optimized Batch Processing for RAGAnything
This example demonstrates the new optimized batch processing capabilities
that provide 2-3x faster processing for large document collections.
Features demonstrated:
- Concurrent document parsing with prefetching
- Pipeline architecture (parse + process simultaneously)
- Progress tracking with ETA estimation
- Adaptive rate limiting
- Performance statistics
"""
import asyncio
import time
from pathlib import Path
from raganything import RAGAnything
async def progress_callback(progress_data):
"""
Callback function to handle progress updates
Args:
progress_data: Dict containing:
- processed: Number of processed documents
- total: Total number of documents
- failed: Number of failed documents
- percentage: Completion percentage
- eta_seconds: Estimated time remaining
- rate_docs_per_sec: Processing rate
"""
print(f"\rProgress: {progress_data['processed']}/{progress_data['total']} "
f"({progress_data['percentage']:.1f}%) | "
f"Rate: {progress_data['rate_docs_per_sec']:.2f} docs/s | "
f"ETA: {progress_data['eta_seconds']:.1f}s", end='', flush=True)
async def main():
# Initialize RAGAnything
rag = RAGAnything(
working_dir="./rag_storage",
rag_dir="./rag_index",
parser="mineru", # or "docling"
)
# Example 1: Process a list of documents with optimization
print("=" * 60)
print("Example 1: Optimized Batch Processing")
print("=" * 60)
documents = [
"./data/report1.pdf",
"./data/report2.pdf",
"./data/research_paper.pdf",
"./data/technical_spec.docx",
]
start_time = time.time()
result = await rag.process_documents_batch_optimized(
file_paths=documents,
max_concurrent_parsers=4, # Parse up to 4 documents at once
max_concurrent_processors=10, # Process up to 10 chunks concurrently
enable_progress_tracking=True,
progress_callback=progress_callback,
)
print() # New line after progress bar
elapsed_time = time.time() - start_time
# Display results
print(f"\n📊 Processing Results:")
print(f" ✅ Successful: {len(result['successful_files'])} documents")
print(f" ❌ Failed: {len(result['failed_files'])} documents")
print(f" ⏱️ Total time: {elapsed_time:.2f}s")
# Display detailed statistics
stats = result['statistics']
print(f"\n📈 Performance Statistics:")
print(f" Processing rate: {stats['processing_rate_docs_per_sec']:.2f} docs/sec")
print(f" Parsing time: {stats['parsing_time']:.2f}s")
print(f" Text processing: {stats['text_processing_time']:.2f}s")
print(f" Multimodal processing: {stats['multimodal_processing_time']:.2f}s")
print(f" Cache hit rate: {stats['cache_hit_rate']:.1f}%")
# Show per-document results
if result['successful_files']:
print(f"\n✅ Successfully processed files:")
for file_info in result['successful_files'][:5]: # Show first 5
print(f" - {Path(file_info['file_path']).name} "
f"(processing: {file_info['processing_time']:.2f}s, "
f"parsing: {file_info['parse_time']:.2f}s)")
if result['failed_files']:
print(f"\n❌ Failed files:")
for file_info in result['failed_files']:
print(f" - {Path(file_info['file_path']).name}: {file_info['error']}")
# Example 2: Process an entire folder with optimization
print("\n" + "=" * 60)
print("Example 2: Optimized Folder Processing")
print("=" * 60)
folder_result = await rag.process_folder_optimized(
folder_path="./data/documents",
file_extensions=['.pdf', '.docx', '.pptx'],
recursive=True,
max_concurrent_parsers=6,
max_concurrent_processors=12,
progress_callback=progress_callback,
)
print() # New line after progress bar
print(f"\n📁 Folder Processing Complete:")
print(f" Successful: {len(folder_result['successful_files'])} files")
print(f" Failed: {len(folder_result['failed_files'])} files")
print(f" Rate: {folder_result['statistics']['processing_rate_docs_per_sec']:.2f} docs/sec")
# Example 3: Compare standard vs optimized processing
print("\n" + "=" * 60)
print("Example 3: Performance Comparison")
print("=" * 60)
test_docs = ["./data/test1.pdf", "./data/test2.pdf", "./data/test3.pdf"]
# Standard processing
print("\n🐢 Standard batch processing...")
standard_start = time.time()
await rag.process_folder_complete(
folder_path="./data/test",
max_workers=4,
display_stats=False
)
standard_time = time.time() - standard_start
# Optimized processing (on different set to avoid cache)
print("🚀 Optimized batch processing...")
optimized_start = time.time()
await rag.process_documents_batch_optimized(
file_paths=test_docs,
max_concurrent_parsers=4,
max_concurrent_processors=10,
enable_progress_tracking=False,
)
optimized_time = time.time() - optimized_start
print(f"\n⚡ Performance Improvement:")
print(f" Standard: {standard_time:.2f}s")
print(f" Optimized: {optimized_time:.2f}s")
if standard_time > 0:
speedup = (standard_time / optimized_time)
print(f" Speedup: {speedup:.2f}x faster")
# Example 4: Custom progress tracking
print("\n" + "=" * 60)
print("Example 4: Custom Progress Tracking")
print("=" * 60)
class CustomProgressTracker:
def __init__(self):
self.start_time = time.time()
self.logs = []
def __call__(self, progress):
"""Progress callback"""
elapsed = time.time() - self.start_time
log_entry = {
"timestamp": elapsed,
"processed": progress['processed'],
"total": progress['total'],
"percentage": progress['percentage'],
"rate": progress['rate_docs_per_sec'],
}
self.logs.append(log_entry)
# Print formatted progress
bar_length = 40
filled_length = int(bar_length * progress['percentage'] / 100)
bar = '█' * filled_length + '-' * (bar_length - filled_length)
print(f"\r|{bar}| {progress['percentage']:.1f}% "
f"[{progress['processed']}/{progress['total']}] "
f"ETA: {progress['eta_seconds']:.0f}s", end='', flush=True)
def save_log(self, filename="processing_log.txt"):
"""Save progress log to file"""
with open(filename, 'w') as f:
f.write("Batch Processing Log\n")
f.write("=" * 50 + "\n")
for entry in self.logs:
f.write(f"Time: {entry['timestamp']:.2f}s | "
f"Progress: {entry['processed']}/{entry['total']} "
f"({entry['percentage']:.1f}%) | "
f"Rate: {entry['rate']:.2f} docs/s\n")
tracker = CustomProgressTracker()
await rag.process_documents_batch_optimized(
file_paths=documents,
progress_callback=tracker,
)
print() # New line
tracker.save_log("./batch_processing_log.txt")
print("📝 Progress log saved to batch_processing_log.txt")
print("\n" + "=" * 60)
print("All examples completed!")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())
|