File size: 7,647 Bytes
167596f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
"""
Example: Optimized Batch Processing for RAGAnything

This example demonstrates the new optimized batch processing capabilities
that provide 2-3x faster processing for large document collections.

Features demonstrated:
- Concurrent document parsing with prefetching
- Pipeline architecture (parse + process simultaneously)
- Progress tracking with ETA estimation
- Adaptive rate limiting
- Performance statistics
"""

import asyncio
import time
from pathlib import Path
from raganything import RAGAnything

async def progress_callback(progress_data):
    """
    Callback function to handle progress updates

    Args:
        progress_data: Dict containing:
            - processed: Number of processed documents
            - total: Total number of documents
            - failed: Number of failed documents
            - percentage: Completion percentage
            - eta_seconds: Estimated time remaining
            - rate_docs_per_sec: Processing rate
    """
    print(f"\rProgress: {progress_data['processed']}/{progress_data['total']} "
          f"({progress_data['percentage']:.1f}%) | "
          f"Rate: {progress_data['rate_docs_per_sec']:.2f} docs/s | "
          f"ETA: {progress_data['eta_seconds']:.1f}s", end='', flush=True)


async def main():
    # Initialize RAGAnything
    rag = RAGAnything(
        working_dir="./rag_storage",
        rag_dir="./rag_index",
        parser="mineru",  # or "docling"
    )

    # Example 1: Process a list of documents with optimization
    print("=" * 60)
    print("Example 1: Optimized Batch Processing")
    print("=" * 60)

    documents = [
        "./data/report1.pdf",
        "./data/report2.pdf",
        "./data/research_paper.pdf",
        "./data/technical_spec.docx",
    ]

    start_time = time.time()

    result = await rag.process_documents_batch_optimized(
        file_paths=documents,
        max_concurrent_parsers=4,        # Parse up to 4 documents at once
        max_concurrent_processors=10,     # Process up to 10 chunks concurrently
        enable_progress_tracking=True,
        progress_callback=progress_callback,
    )

    print()  # New line after progress bar

    elapsed_time = time.time() - start_time

    # Display results
    print(f"\n📊 Processing Results:")
    print(f"  ✅ Successful: {len(result['successful_files'])} documents")
    print(f"  ❌ Failed: {len(result['failed_files'])} documents")
    print(f"  ⏱️  Total time: {elapsed_time:.2f}s")

    # Display detailed statistics
    stats = result['statistics']
    print(f"\n📈 Performance Statistics:")
    print(f"  Processing rate: {stats['processing_rate_docs_per_sec']:.2f} docs/sec")
    print(f"  Parsing time: {stats['parsing_time']:.2f}s")
    print(f"  Text processing: {stats['text_processing_time']:.2f}s")
    print(f"  Multimodal processing: {stats['multimodal_processing_time']:.2f}s")
    print(f"  Cache hit rate: {stats['cache_hit_rate']:.1f}%")

    # Show per-document results
    if result['successful_files']:
        print(f"\n✅ Successfully processed files:")
        for file_info in result['successful_files'][:5]:  # Show first 5
            print(f"  - {Path(file_info['file_path']).name} "
                  f"(processing: {file_info['processing_time']:.2f}s, "
                  f"parsing: {file_info['parse_time']:.2f}s)")

    if result['failed_files']:
        print(f"\n❌ Failed files:")
        for file_info in result['failed_files']:
            print(f"  - {Path(file_info['file_path']).name}: {file_info['error']}")

    # Example 2: Process an entire folder with optimization
    print("\n" + "=" * 60)
    print("Example 2: Optimized Folder Processing")
    print("=" * 60)

    folder_result = await rag.process_folder_optimized(
        folder_path="./data/documents",
        file_extensions=['.pdf', '.docx', '.pptx'],
        recursive=True,
        max_concurrent_parsers=6,
        max_concurrent_processors=12,
        progress_callback=progress_callback,
    )

    print()  # New line after progress bar

    print(f"\n📁 Folder Processing Complete:")
    print(f"  Successful: {len(folder_result['successful_files'])} files")
    print(f"  Failed: {len(folder_result['failed_files'])} files")
    print(f"  Rate: {folder_result['statistics']['processing_rate_docs_per_sec']:.2f} docs/sec")

    # Example 3: Compare standard vs optimized processing
    print("\n" + "=" * 60)
    print("Example 3: Performance Comparison")
    print("=" * 60)

    test_docs = ["./data/test1.pdf", "./data/test2.pdf", "./data/test3.pdf"]

    # Standard processing
    print("\n🐢 Standard batch processing...")
    standard_start = time.time()
    await rag.process_folder_complete(
        folder_path="./data/test",
        max_workers=4,
        display_stats=False
    )
    standard_time = time.time() - standard_start

    # Optimized processing (on different set to avoid cache)
    print("🚀 Optimized batch processing...")
    optimized_start = time.time()
    await rag.process_documents_batch_optimized(
        file_paths=test_docs,
        max_concurrent_parsers=4,
        max_concurrent_processors=10,
        enable_progress_tracking=False,
    )
    optimized_time = time.time() - optimized_start

    print(f"\n⚡ Performance Improvement:")
    print(f"  Standard: {standard_time:.2f}s")
    print(f"  Optimized: {optimized_time:.2f}s")
    if standard_time > 0:
        speedup = (standard_time / optimized_time)
        print(f"  Speedup: {speedup:.2f}x faster")

    # Example 4: Custom progress tracking
    print("\n" + "=" * 60)
    print("Example 4: Custom Progress Tracking")
    print("=" * 60)

    class CustomProgressTracker:
        def __init__(self):
            self.start_time = time.time()
            self.logs = []

        def __call__(self, progress):
            """Progress callback"""
            elapsed = time.time() - self.start_time
            log_entry = {
                "timestamp": elapsed,
                "processed": progress['processed'],
                "total": progress['total'],
                "percentage": progress['percentage'],
                "rate": progress['rate_docs_per_sec'],
            }
            self.logs.append(log_entry)

            # Print formatted progress
            bar_length = 40
            filled_length = int(bar_length * progress['percentage'] / 100)
            bar = '█' * filled_length + '-' * (bar_length - filled_length)

            print(f"\r|{bar}| {progress['percentage']:.1f}% "
                  f"[{progress['processed']}/{progress['total']}] "
                  f"ETA: {progress['eta_seconds']:.0f}s", end='', flush=True)

        def save_log(self, filename="processing_log.txt"):
            """Save progress log to file"""
            with open(filename, 'w') as f:
                f.write("Batch Processing Log\n")
                f.write("=" * 50 + "\n")
                for entry in self.logs:
                    f.write(f"Time: {entry['timestamp']:.2f}s | "
                           f"Progress: {entry['processed']}/{entry['total']} "
                           f"({entry['percentage']:.1f}%) | "
                           f"Rate: {entry['rate']:.2f} docs/s\n")

    tracker = CustomProgressTracker()

    await rag.process_documents_batch_optimized(
        file_paths=documents,
        progress_callback=tracker,
    )

    print()  # New line
    tracker.save_log("./batch_processing_log.txt")
    print("📝 Progress log saved to batch_processing_log.txt")

    print("\n" + "=" * 60)
    print("All examples completed!")
    print("=" * 60)


if __name__ == "__main__":
    asyncio.run(main())