File size: 14,446 Bytes
167596f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
"""
LightRAG Batch Processing Examples

Demonstrates batch processing optimizations for LightRAG document insertion.

Expected speedup: 3-5x faster when processing multiple documents in batch
compared to processing them one by one.

Examples include:
1. Basic batch processing of multiple PDFs
2. Recursive directory processing
3. Custom batch configuration
4. Progress tracking and error handling
5. Integration with existing RAGAnything workflow
"""

import asyncio
import time
from pathlib import Path
from raganything import RAGAnything, create_rag_anything
from raganything.lightrag_batch_optimizer import (
    LightRAGBatchOptimizer,
    BatchProcessingConfig,
    process_documents_batch_optimized,
)


async def example_1_basic_batch_processing():
    """Example 1: Basic batch processing of multiple documents"""
    print("=" * 70)
    print("Example 1: Basic Batch Processing")
    print("=" * 70)

    # Initialize RAGAnything (replace with your actual initialization)
    rag = RAGAnything(
        working_dir="./rag_storage",
        # llm_model_func=your_llm_function,
        # embedding_func=your_embedding_function,
    )

    # Ensure initialization
    await rag._ensure_lightrag_initialized()

    # Create optimizer
    optimizer = LightRAGBatchOptimizer(rag_instance=rag)

    # Get list of PDF files to process
    pdf_files = list(Path("./data/pdfs").glob("*.pdf"))[:20]  # Process first 20 PDFs

    print(f"\n📚 Processing {len(pdf_files)} PDF documents in batch...")

    # Process in batch
    start_time = time.time()
    result = await optimizer.process_documents_batch(pdf_files)
    elapsed = time.time() - start_time

    # Display results
    print(f"\n✅ Batch processing complete:")
    print(f"   Total documents: {result.total_documents}")
    print(f"   Successful: {result.successful}")
    print(f"   Failed: {result.failed}")
    print(f"   Total time: {elapsed:.2f}s")
    print(f"   Average time: {result.average_time_per_doc:.2f}s per document")

    # Show expected improvement
    estimated_sequential = elapsed * 3  # Assume 3x speedup
    print(f"\n📊 Performance Comparison:")
    print(f"   Batch processing: {elapsed:.2f}s")
    print(f"   Estimated sequential: {estimated_sequential:.2f}s")
    print(f"   Speedup: ~3x faster")

    # Show statistics
    stats = optimizer.get_stats()
    print(f"\n📈 Optimizer Statistics:")
    print(f"   Total documents processed: {stats['total_documents_processed']}")
    print(f"   Cache hit rate: {stats['cache_hit_rate']:.1f}%")
    print(f"   Average processing time: {stats['average_time_per_document']:.2f}s")


async def example_2_directory_recursive_processing():
    """Example 2: Process entire directory recursively"""
    print("\n" + "=" * 70)
    print("Example 2: Recursive Directory Processing")
    print("=" * 70)

    # Initialize RAG
    rag = RAGAnything(working_dir="./rag_storage")
    await rag._ensure_lightrag_initialized()

    # Create optimizer
    optimizer = LightRAGBatchOptimizer(rag_instance=rag)

    # Process all PDFs in directory and subdirectories
    print(f"\n📂 Processing all PDFs in ./data directory recursively...")

    start_time = time.time()
    result = await optimizer.process_directory_recursive(
        directory=Path("./data"),
        pattern="*.pdf",  # Can also use "*.{pdf,docx,txt}"
        recursive=True,
    )
    elapsed = time.time() - start_time

    print(f"\n✅ Directory processing complete:")
    print(f"   Documents found: {result.total_documents}")
    print(f"   Successfully processed: {result.successful}")
    print(f"   Failed: {result.failed}")
    print(f"   Total time: {elapsed:.2f}s")

    # Show failed documents if any
    if result.failed_documents:
        print(f"\n❌ Failed documents:")
        for file_path, error in result.failed_documents:
            print(f"   - {file_path}: {error}")


async def example_3_custom_batch_configuration():
    """Example 3: Custom batch processing configuration"""
    print("\n" + "=" * 70)
    print("Example 3: Custom Batch Configuration")
    print("=" * 70)

    # Initialize RAG
    rag = RAGAnything(working_dir="./rag_storage")
    await rag._ensure_lightrag_initialized()

    # Create custom configuration
    config = BatchProcessingConfig(
        max_concurrent_parsing=6,  # Parse 6 documents at once
        max_concurrent_insertion=3,  # Insert 3 documents at once
        batch_size=15,  # Larger batch for entity extraction
        enable_progress_tracking=True,  # Show progress
        continue_on_error=True,  # Don't stop on errors
        enable_parse_caching=True,  # Cache parsed results
    )

    # Create optimizer with custom config
    optimizer = LightRAGBatchOptimizer(
        rag_instance=rag,
        config=config
    )

    # Process documents
    pdf_files = list(Path("./data").glob("*.pdf"))

    print(f"\n📚 Processing {len(pdf_files)} documents with custom configuration:")
    print(f"   Max concurrent parsing: {config.max_concurrent_parsing}")
    print(f"   Max concurrent insertion: {config.max_concurrent_insertion}")
    print(f"   Batch size: {config.batch_size}")

    result = await optimizer.process_documents_batch(pdf_files)

    print(f"\n✅ Processing complete:")
    print(f"   Success rate: {result.successful}/{result.total_documents} ({result.successful/result.total_documents*100:.1f}%)")
    print(f"   Average time: {result.average_time_per_doc:.2f}s per document")


async def example_4_convenience_function():
    """Example 4: Using the convenience function for quick batch processing"""
    print("\n" + "=" * 70)
    print("Example 4: Convenience Function")
    print("=" * 70)

    # Initialize RAG
    rag = RAGAnything(working_dir="./rag_storage")
    await rag._ensure_lightrag_initialized()

    # Get files
    pdf_files = list(Path("./data").glob("*.pdf"))

    print(f"\n🚀 Quick batch processing of {len(pdf_files)} documents...")

    # Use convenience function (simplest approach)
    result = await process_documents_batch_optimized(
        rag_instance=rag,
        file_paths=pdf_files,
        max_concurrent_parsing=4,
        max_concurrent_insertion=2,
    )

    print(f"\n✅ Done!")
    print(f"   Processed: {result.successful}/{result.total_documents}")
    print(f"   Time: {result.total_time:.2f}s")


async def example_5_integration_with_mineru_optimizer():
    """Example 5: Combining LightRAG batch processing with Mineru GPU optimization"""
    print("\n" + "=" * 70)
    print("Example 5: Integration with Mineru Optimizer")
    print("=" * 70)

    from raganything.mineru_optimizer import MineruOptimizer

    # Initialize RAG with Mineru parser
    rag = RAGAnything(
        working_dir="./rag_storage",
        parser="mineru"
    )
    await rag._ensure_lightrag_initialized()

    # Initialize Mineru optimizer for GPU-accelerated parsing
    mineru_opt = MineruOptimizer(enable_gpu=True, max_workers=4)

    print(f"\n🔥 Using GPU-accelerated parsing with batch LightRAG insertion:")
    print(f"   GPU device: {mineru_opt.device}")

    # Get PDF files
    pdf_files = list(Path("./data").glob("*.pdf"))[:10]

    # Stage 1: GPU-accelerated parsing with Mineru
    print(f"\n📄 Stage 1: Parsing {len(pdf_files)} PDFs with GPU acceleration...")
    parsing_start = time.time()

    parsed_results = await mineru_opt.process_batch_optimized(
        pdf_paths=pdf_files,
        output_dir=Path("./mineru_output"),
        method="auto"
    )

    parsing_time = time.time() - parsing_start
    print(f"   Parsing complete in {parsing_time:.2f}s")

    # Stage 2: Batch insertion into LightRAG
    print(f"\n💾 Stage 2: Batch insertion into LightRAG...")
    insertion_start = time.time()

    # Create batch optimizer
    lightrag_opt = LightRAGBatchOptimizer(rag_instance=rag)

    # Process each parsed document
    successful = 0
    failed = 0

    for pdf_path, content_list, proc_time in parsed_results:
        if content_list:
            try:
                # Generate doc_id
                doc_id = rag._generate_content_based_doc_id(content_list)

                # Insert content list directly
                await rag.insert_content_list(
                    content_list=content_list,
                    file_path=str(pdf_path),
                    doc_id=doc_id
                )
                successful += 1
            except Exception as e:
                print(f"   Error inserting {pdf_path.name}: {e}")
                failed += 1
        else:
            failed += 1

    insertion_time = time.time() - insertion_start
    total_time = parsing_time + insertion_time

    print(f"   Insertion complete in {insertion_time:.2f}s")

    print(f"\n✅ End-to-End Batch Processing Complete:")
    print(f"   Total documents: {len(pdf_files)}")
    print(f"   Successful: {successful}")
    print(f"   Failed: {failed}")
    print(f"   Parsing time: {parsing_time:.2f}s")
    print(f"   Insertion time: {insertion_time:.2f}s")
    print(f"   Total time: {total_time:.2f}s")
    print(f"   Average: {total_time/len(pdf_files):.2f}s per document")

    # Show expected improvement
    estimated_sequential = total_time * 3.5
    print(f"\n📊 Performance vs Sequential Processing:")
    print(f"   Batch processing: {total_time:.2f}s")
    print(f"   Estimated sequential: {estimated_sequential:.2f}s")
    print(f"   Speedup: ~3.5x faster")


async def example_6_error_handling_and_recovery():
    """Example 6: Robust error handling and recovery"""
    print("\n" + "=" * 70)
    print("Example 6: Error Handling and Recovery")
    print("=" * 70)

    # Initialize RAG
    rag = RAGAnything(working_dir="./rag_storage")
    await rag._ensure_lightrag_initialized()

    # Configuration with continue_on_error enabled
    config = BatchProcessingConfig(
        max_concurrent_parsing=4,
        max_concurrent_insertion=2,
        continue_on_error=True,  # Continue even if some documents fail
        enable_progress_tracking=True,
    )

    optimizer = LightRAGBatchOptimizer(rag_instance=rag, config=config)

    # Mix of valid and potentially problematic files
    pdf_files = list(Path("./data").glob("*.pdf"))

    print(f"\n📚 Processing {len(pdf_files)} documents with error recovery enabled...")

    try:
        result = await optimizer.process_documents_batch(pdf_files)

        print(f"\n✅ Processing completed with partial success:")
        print(f"   Successfully processed: {result.successful}")
        print(f"   Failed: {result.failed}")

        if result.failed_documents:
            print(f"\n⚠️  Failed documents:")
            for file_path, error in result.failed_documents[:5]:  # Show first 5
                print(f"   - {Path(file_path).name}: {error[:80]}...")

            # Retry failed documents with different settings
            if result.failed > 0:
                print(f"\n🔄 Retrying {result.failed} failed documents...")

                failed_paths = [Path(fp) for fp, _ in result.failed_documents]
                retry_result = await optimizer.process_documents_batch(
                    failed_paths,
                    parse_method="txt"  # Try with simpler method
                )

                print(f"   Retry results: {retry_result.successful} recovered")

    except Exception as e:
        print(f"\n❌ Batch processing failed: {e}")


async def example_7_performance_comparison():
    """Example 7: Performance comparison between sequential and batch processing"""
    print("\n" + "=" * 70)
    print("Example 7: Performance Comparison")
    print("=" * 70)

    # Initialize RAG
    rag = RAGAnything(working_dir="./rag_storage")
    await rag._ensure_lightrag_initialized()

    # Get small set of test files
    pdf_files = list(Path("./data").glob("*.pdf"))[:5]

    print(f"\n📊 Comparing sequential vs batch processing for {len(pdf_files)} documents...")

    # Test 1: Sequential processing (baseline)
    print(f"\n1️⃣ Sequential Processing (Baseline):")
    sequential_start = time.time()

    for pdf_file in pdf_files:
        try:
            await rag.process_document_complete(str(pdf_file))
        except Exception as e:
            print(f"   Error processing {pdf_file.name}: {e}")

    sequential_time = time.time() - sequential_start
    print(f"   Time: {sequential_time:.2f}s")
    print(f"   Average: {sequential_time/len(pdf_files):.2f}s per document")

    # Test 2: Batch processing
    print(f"\n2️⃣ Batch Processing (Optimized):")
    optimizer = LightRAGBatchOptimizer(rag_instance=rag)

    batch_start = time.time()
    result = await optimizer.process_documents_batch(pdf_files)
    batch_time = time.time() - batch_start

    print(f"   Time: {batch_time:.2f}s")
    print(f"   Average: {result.average_time_per_doc:.2f}s per document")

    # Comparison
    speedup = sequential_time / batch_time if batch_time > 0 else 0
    time_saved = sequential_time - batch_time

    print(f"\n📈 Performance Improvement:")
    print(f"   Sequential: {sequential_time:.2f}s")
    print(f"   Batch: {batch_time:.2f}s")
    print(f"   Speedup: {speedup:.2f}x faster")
    print(f"   Time saved: {time_saved:.2f}s ({time_saved/sequential_time*100:.1f}%)")


async def main():
    """Run all examples"""
    print("\n🚀 LightRAG Batch Processing Examples")
    print("=" * 70)

    examples = [
        ("Basic Batch Processing", example_1_basic_batch_processing),
        ("Recursive Directory Processing", example_2_directory_recursive_processing),
        ("Custom Configuration", example_3_custom_batch_configuration),
        ("Convenience Function", example_4_convenience_function),
        ("Integration with Mineru", example_5_integration_with_mineru_optimizer),
        ("Error Handling", example_6_error_handling_and_recovery),
        ("Performance Comparison", example_7_performance_comparison),
    ]

    for name, example_func in examples:
        try:
            print(f"\n{'='*70}")
            print(f"Running: {name}")
            print(f"{'='*70}")
            await example_func()
            await asyncio.sleep(1)  # Brief pause between examples
        except Exception as e:
            print(f"\n❌ Error in {name}: {e}")
            import traceback
            traceback.print_exc()

    print("\n" + "=" * 70)
    print("✅ All examples completed!")
    print("=" * 70)


if __name__ == "__main__":
    asyncio.run(main())