gzdaniel commited on
Commit
6559f77
·
1 Parent(s): 33f0232

Update doc: concurrent explain

Browse files
docs/LightRAG_concurrent_explain.md CHANGED
@@ -1,281 +1,114 @@
1
- # LightRAG Multi-Document Processing: Concurrent Control Strategy Analysis
2
 
3
  LightRAG employs a multi-layered concurrent control strategy when processing multiple documents. This article provides an in-depth analysis of the concurrent control mechanisms at document level, chunk level, and LLM request level, helping you understand why specific concurrent behaviors occur.
4
 
5
- ## Overview
6
-
7
- LightRAG's concurrent control is divided into three layers:
8
-
9
- 1. **Document-level concurrency**: Controls the number of documents processed simultaneously
10
- 2. **Chunk-level concurrency**: Controls the number of chunks processed simultaneously within a single document
11
- 3. **LLM request-level concurrency**: Controls the global concurrent number of LLM requests
12
-
13
- ## 1. Document-Level Concurrent Control
14
 
15
  **Control Parameter**: `max_parallel_insert`
16
 
17
- Document-level concurrency is controlled by the `max_parallel_insert` parameter, with a default value of 2.
18
-
19
- ```python
20
- # lightrag/lightrag.py
21
- max_parallel_insert: int = field(default=int(os.getenv("MAX_PARALLEL_INSERT", 2)))
22
- ```
23
-
24
- ### Implementation Mechanism
25
-
26
- In the `apipeline_process_enqueue_documents` method, a semaphore is used to control document concurrency:
27
-
28
- ```python
29
- # lightrag/lightrag.py - apipeline_process_enqueue_documents method
30
- async def process_document(
31
- doc_id: str,
32
- status_doc: DocProcessingStatus,
33
- split_by_character: str | None,
34
- split_by_character_only: bool,
35
- pipeline_status: dict,
36
- pipeline_status_lock: asyncio.Lock,
37
- semaphore: asyncio.Semaphore, # Document-level semaphore
38
- ) -> None:
39
- """Process single document"""
40
- async with semaphore: # 🔥 Document-level concurrent control
41
- # ... Process all chunks of a single document
42
-
43
- # Create document-level semaphore
44
- semaphore = asyncio.Semaphore(self.max_parallel_insert) # Default 2
45
-
46
- # Create processing tasks for each document
47
- doc_tasks = []
48
- for doc_id, status_doc in to_process_docs.items():
49
- doc_tasks.append(
50
- process_document(
51
- doc_id, status_doc, split_by_character, split_by_character_only,
52
- pipeline_status, pipeline_status_lock, semaphore
53
- )
54
- )
55
-
56
- # Wait for all documents to complete processing
57
- await asyncio.gather(*doc_tasks)
58
- ```
59
 
60
- ## 2. Chunk-Level Concurrent Control
61
 
62
  **Control Parameter**: `llm_model_max_async`
63
 
64
- **Key Point**: Each document independently creates its own chunk semaphore!
65
-
66
- ```python
67
- # lightrag/lightrag.py
68
- llm_model_max_async: int = field(default=int(os.getenv("MAX_ASYNC", 4)))
69
- ```
70
-
71
- ### Implementation Mechanism
72
-
73
- In the `extract_entities` function, **each document independently creates** its own chunk semaphore:
74
-
75
- ```python
76
- # lightrag/operate.py - extract_entities function
77
- async def extract_entities(chunks: dict[str, TextChunkSchema], global_config: dict[str, str], ...):
78
- # 🔥 Key: Each document independently creates this semaphore!
79
- llm_model_max_async = global_config.get("llm_model_max_async", 4)
80
- semaphore = asyncio.Semaphore(llm_model_max_async) # Chunk semaphore for each document
81
-
82
- async def _process_with_semaphore(chunk):
83
- async with semaphore: # 🔥 Chunk concurrent control within document
84
- return await _process_single_content(chunk)
85
-
86
- # Create tasks for each chunk
87
- tasks = []
88
- for c in ordered_chunks:
89
- task = asyncio.create_task(_process_with_semaphore(c))
90
- tasks.append(task)
91
-
92
- # Wait for all chunks to complete processing
93
- done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
94
- chunk_results = [task.result() for task in tasks]
95
- return chunk_results
96
- ```
97
-
98
- ### Important Inference: System Overall Chunk Concurrency
99
-
100
- Since each document independently creates chunk semaphores, the theoretical chunk concurrency of the system is:
101
-
102
- **Theoretical Chunk Concurrency = max_parallel_insert × llm_model_max_async**
103
 
 
 
 
 
104
  For example:
105
  - `max_parallel_insert = 2` (process 2 documents simultaneously)
106
  - `llm_model_max_async = 4` (maximum 4 chunk concurrency per document)
107
- - **Theoretical result**: Maximum 2 × 4 = 8 chunks simultaneously in "processing" state
108
-
109
- ## 3. LLM Request-Level Concurrent Control (The Real Bottleneck)
110
-
111
- **Control Parameter**: `llm_model_max_async` (globally shared)
112
-
113
- **Key**: Although there might be 8 chunks "in processing", all LLM requests share the same global priority queue!
114
-
115
- ```python
116
- # lightrag/lightrag.py - __post_init__ method
117
- self.llm_model_func = priority_limit_async_func_call(self.llm_model_max_async)(
118
- partial(
119
- self.llm_model_func,
120
- hashing_kv=hashing_kv,
121
- **self.llm_model_kwargs,
122
- )
123
- )
124
- # 🔥 Global LLM queue size = llm_model_max_async = 4
125
- ```
126
-
127
- ### Priority Queue Implementation
128
-
129
- ```python
130
- # lightrag/utils.py - priority_limit_async_func_call function
131
- def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
132
- def final_decro(func):
133
- queue = asyncio.PriorityQueue(maxsize=max_queue_size)
134
- tasks = set()
135
-
136
- async def worker():
137
- """Worker that processes tasks in the priority queue"""
138
- while not shutdown_event.is_set():
139
- try:
140
- priority, count, future, args, kwargs = await asyncio.wait_for(queue.get(), timeout=1.0)
141
- result = await func(*args, **kwargs) # 🔥 Actual LLM call
142
- if not future.done():
143
- future.set_result(result)
144
- except Exception as e:
145
- # Error handling...
146
- finally:
147
- queue.task_done()
148
-
149
- # 🔥 Create fixed number of workers (max_size), this is the real concurrency limit
150
- for _ in range(max_size):
151
- task = asyncio.create_task(worker())
152
- tasks.add(task)
153
- ```
154
 
155
- ## 4. Chunk Internal Processing Mechanism (Serial)
156
 
157
- ### Why Serial?
158
 
159
- Internal processing of each chunk strictly follows this serial execution order:
160
 
161
- ```python
162
- # lightrag/operate.py - _process_single_content function
163
- async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]):
164
- # Step 1: Initial entity extraction
165
- hint_prompt = entity_extract_prompt.format(**{**context_base, "input_text": content})
166
- final_result = await use_llm_func_with_cache(hint_prompt, use_llm_func, ...)
167
 
168
- # Process initial extraction results
169
- maybe_nodes, maybe_edges = await _process_extraction_result(final_result, chunk_key, file_path)
170
 
171
- # Step 2: Gleaning phase
172
- for now_glean_index in range(entity_extract_max_gleaning):
173
- # 🔥 Serial wait for gleaning results
174
- glean_result = await use_llm_func_with_cache(
175
- continue_prompt, use_llm_func,
176
- llm_response_cache=llm_response_cache,
177
- history_messages=history, cache_type="extract"
178
- )
179
-
180
- # Process gleaning results
181
- glean_nodes, glean_edges = await _process_extraction_result(glean_result, chunk_key, file_path)
182
-
183
- # Merge results...
184
-
185
- # Step 3: Determine whether to continue loop
186
- if now_glean_index == entity_extract_max_gleaning - 1:
187
- break
188
-
189
- # 🔥 Serial wait for loop decision results
190
- if_loop_result = await use_llm_func_with_cache(
191
- if_loop_prompt, use_llm_func,
192
- llm_response_cache=llm_response_cache,
193
- history_messages=history, cache_type="extract"
194
- )
195
-
196
- if if_loop_result.strip().strip('"').strip("'").lower() != "yes":
197
- break
198
-
199
- return maybe_nodes, maybe_edges
200
- ```
201
-
202
- ## 5. Complete Concurrent Hierarchy Diagram
203
- ![lightrag_indexing.png](assets%2Flightrag_indexing.png)
204
-
205
- ### Chunk Internal Processing (Serial)
206
- ```
207
- Initial Extraction → Gleaning → Loop Decision → Complete
208
- ```
209
-
210
- ## 6. Real-World Scenario Analysis
211
-
212
- ### Scenario 1: Single Document with Multiple Chunks
213
- Assume 1 document with 6 chunks:
214
-
215
- - **Document level**: Only 1 document, not limited by `max_parallel_insert`
216
- - **Chunk level**: Maximum 4 chunks processed simultaneously (limited by `llm_model_max_async=4`)
217
- - **LLM level**: Global maximum 4 LLM requests concurrent
218
-
219
- **Expected behavior**: 4 chunks process concurrently, remaining 2 chunks wait.
220
-
221
- ### Scenario 2: Multiple Documents with Multiple Chunks
222
- Assume 3 documents, each with 10 chunks:
223
-
224
- - **Document level**: Maximum 2 documents processed simultaneously
225
- - **Chunk level**: Maximum 4 chunks per document processed simultaneously
226
- - **Theoretical Chunk concurrency**: 2 × 4 = 8 chunks processed simultaneously
227
- - **Actual LLM concurrency**: Only 4 LLM requests actually execute
228
-
229
- **Actual state distribution**:
230
- ```
231
- # Possible system state:
232
- Document 1: 4 chunks "processing" (2 executing LLM, 2 waiting for LLM response)
233
- Document 2: 4 chunks "processing" (2 executing LLM, 2 waiting for LLM response)
234
- Document 3: Waiting for document-level semaphore
235
-
236
- Total:
237
- - 8 chunks in "processing" state
238
- - 4 LLM requests actually executing
239
- - 4 chunks waiting for LLM response
240
- ```
241
-
242
- ## 7. Performance Optimization Recommendations
243
-
244
- ### Understanding the Bottleneck
245
-
246
- The real bottleneck is the global LLM queue, not the chunk semaphores!
247
-
248
- ### Adjustment Strategies
249
 
250
- **Strategy 1: Increase LLM Concurrent Capacity**
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
251
 
252
- ```bash
253
- # Environment variable configuration
254
- export MAX_PARALLEL_INSERT=2 # Keep document concurrency
255
- export MAX_ASYNC=8 # 🔥 Increase LLM request concurrency
256
  ```
257
 
258
- **Strategy 2: Balance Document and LLM Concurrency**
259
 
260
- ```python
261
- rag = LightRAG(
262
- max_parallel_insert=3, # Moderately increase document concurrency
263
- llm_model_max_async=12, # Significantly increase LLM concurrency
264
- entity_extract_max_gleaning=0, # Reduce serial steps within chunks
265
- )
266
- ```
267
 
268
- ## 8. Summary
269
 
270
- Key characteristics of LightRAG's multi-document concurrent processing mechanism:
271
 
272
- ### Concurrent Layers
273
- 1. **Inter-document competition**: Controlled by `max_parallel_insert`, default 2 documents concurrent
274
- 2. **Theoretical Chunk concurrency**: Each document independently creates semaphores, total = max_parallel_insert × llm_model_max_async
275
- 3. **Actual LLM concurrency**: All chunks share global LLM queue, controlled by `llm_model_max_async`
276
- 4. **Intra-chunk serial**: Multiple LLM requests within each chunk execute strictly serially
277
 
278
- ### Key Insights
279
- - **Theoretical vs Actual**: System may have many chunks "in processing", but only few are actually executing LLM requests
280
- - **Real Bottleneck**: Global LLM request queue is the performance bottleneck, not chunk semaphores
281
- - **Optimization Focus**: Increasing `llm_model_max_async` is more effective than increasing `max_parallel_insert`
 
1
+ ## LightRAG Multi-Document Processing: Concurrent Control Strategy
2
 
3
  LightRAG employs a multi-layered concurrent control strategy when processing multiple documents. This article provides an in-depth analysis of the concurrent control mechanisms at document level, chunk level, and LLM request level, helping you understand why specific concurrent behaviors occur.
4
 
5
+ ### 1. Document-Level Concurrent Control
 
 
 
 
 
 
 
 
6
 
7
  **Control Parameter**: `max_parallel_insert`
8
 
9
+ This parameter controls the number of documents processed simultaneously. The purpose is to prevent excessive parallelism from overwhelming system resources, which could lead to extended processing times for individual files. Document-level concurrency is governed by the `max_parallel_insert` attribute within LightRAG, which defaults to 2 and is configurable via the `MAX_PARALLEL_INSERT` environment variable.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
10
 
11
+ ### 2. Chunk-Level Concurrent Control
12
 
13
  **Control Parameter**: `llm_model_max_async`
14
 
15
+ This parameter controls the number of chunks processed simultaneously in the extraction stage within a document. The purpose is to prevent a high volume of concurrent requests from monopolizing LLM processing resources, which would impede the efficient parallel processing of multiple files. Chunk-Level Concurrent Control is governed by the `llm_model_max_async` attribute within LightRAG, which defaults to 4 and is configurable via the `MAX_ASYNC` environment variable. The purpose of this parameter is to fully leverage the LLM's concurrency capabilities when processing individual documents.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16
 
17
+ In the `extract_entities` function, **each document independently creates** its own chunk semaphore. Since each document independently creates chunk semaphores, the theoretical chunk concurrency of the system is:
18
+ $$
19
+ ChunkConcurrency = Max Parallel Insert × LLM Model Max Async
20
+ $$
21
  For example:
22
  - `max_parallel_insert = 2` (process 2 documents simultaneously)
23
  - `llm_model_max_async = 4` (maximum 4 chunk concurrency per document)
24
+ - Theoretical chunk-level concurrent: 2 × 4 = 8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
25
 
26
+ ### 3. Graph-Level Concurrent Control
27
 
28
+ **Control Parameter**: `llm_model_max_async * 2`
29
 
30
+ This parameter controls the number of entities and relations processed simultaneously in the merging stage within a document. The purpose is to prevent a high volume of concurrent requests from monopolizing LLM processing resources, which would impede the efficient parallel processing of multiple files. Graph-level concurrency is governed by the `llm_model_max_async` attribute within LightRAG, which defaults to 4 and is configurable via the `MAX_ASYNC` environment variable. Graph-level parallelism control parameters are equally applicable to managing parallelism during the entity relationship reconstruction phase after document deletion.
31
 
32
+ Given that the entity relationship merging phase doesn't necessitate LLM interaction for every operation, its parallelism is set at double the LLM's parallelism. This optimizes machine utilization while concurrently preventing excessive queuing resource contention for the LLM.
 
 
 
 
 
33
 
34
+ ### 4. LLM-Level Concurrent Control
 
35
 
36
+ **Control Parameter**: `llm_model_max_async`
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37
 
38
+ This parameter governs the **concurrent volume** of LLM requests dispatched by the entire LightRAG system, encompassing the document extraction stage, merging stage, and user query handling.
39
+
40
+ LLM request prioritization is managed via a global priority queue, which **systematically prioritizes user queries** over merging-related requests, and merging-related requests over extraction-related requests. This strategic prioritization **minimizes user query latency**.
41
+
42
+ LLM-level concurrency is governed by the `llm_model_max_async` attribute within LightRAG, which defaults to 4 and is configurable via the `MAX_ASYNC` environment variable.
43
+
44
+ ### 5. Complete Concurrent Hierarchy Diagram
45
+
46
+ ```mermaid
47
+ graph TD
48
+ classDef doc fill:#e6f3ff,stroke:#5b9bd5,stroke-width:2px;
49
+ classDef chunk fill:#fbe5d6,stroke:#ed7d31,stroke-width:1px;
50
+ classDef merge fill:#e2f0d9,stroke:#70ad47,stroke-width:2px;
51
+
52
+ A["Multiple Documents<br>max_parallel_insert = 2"] --> A1
53
+ A --> B1
54
+
55
+ A1[DocA: split to n chunks] --> A_chunk;
56
+ B1[DocB: split to m chunks] --> B_chunk;
57
+
58
+ subgraph A_chunk[Extraction Stage]
59
+ A_chunk_title[Entity Relation Extraction<br>llm_model_max_async = 4];
60
+ A_chunk_title --> A_chunk1[Chunk A1]:::chunk;
61
+ A_chunk_title --> A_chunk2[Chunk A2]:::chunk;
62
+ A_chunk_title --> A_chunk3[Chunk A3]:::chunk;
63
+ A_chunk_title --> A_chunk4[Chunk A4]:::chunk;
64
+ A_chunk1 & A_chunk2 & A_chunk3 & A_chunk4 --> A_chunk_done([Extraction Complete]);
65
+ end
66
+
67
+ subgraph B_chunk[Extraction Stage]
68
+ B_chunk_title[Entity Relation Extraction<br>llm_model_max_async = 4];
69
+ B_chunk_title --> B_chunk1[Chunk B1]:::chunk;
70
+ B_chunk_title --> B_chunk2[Chunk B2]:::chunk;
71
+ B_chunk_title --> B_chunk3[Chunk B3]:::chunk;
72
+ B_chunk_title --> B_chunk4[Chunk B4]:::chunk;
73
+ B_chunk1 & B_chunk2 & B_chunk3 & B_chunk4 --> B_chunk_done([Extraction Complete]);
74
+ end
75
+ A_chunk -.->|LLM Request| LLM_Queue;
76
+
77
+ A_chunk --> A_merge;
78
+ B_chunk --> B_merge;
79
+
80
+ subgraph A_merge[Merge Stage]
81
+ A_merge_title[Entity Relation Merging<br>llm_model_max_async * 2 = 8];
82
+ A_merge_title --> A1_entity[Ent a1]:::merge;
83
+ A_merge_title --> A2_entity[Ent a2]:::merge;
84
+ A_merge_title --> A3_entity[Rel a3]:::merge;
85
+ A_merge_title --> A4_entity[Rel a4]:::merge;
86
+ A1_entity & A2_entity & A3_entity & A4_entity --> A_done([Merge Complete])
87
+ end
88
+
89
+ subgraph B_merge[Merge Stage]
90
+ B_merge_title[Entity Relation Merging<br>llm_model_max_async * 2 = 8];
91
+ B_merge_title --> B1_entity[Ent b1]:::merge;
92
+ B_merge_title --> B2_entity[Ent b2]:::merge;
93
+ B_merge_title --> B3_entity[Rel b3]:::merge;
94
+ B_merge_title --> B4_entity[Rel b4]:::merge;
95
+ B1_entity & B2_entity & B3_entity & B4_entity --> B_done([Merge Complete])
96
+ end
97
+
98
+ A_merge -.->|LLM Request| LLM_Queue["LLM Request Prioritized Queue<br>llm_model_max_async = 4"];
99
+ B_merge -.->|LLM Request| LLM_Queue;
100
+ B_chunk -.->|LLM Request| LLM_Queue;
101
 
 
 
 
 
102
  ```
103
 
104
+ > The extraction and merge stages share a global prioritized LLM queue, regulated by `llm_model_max_async`. While numerous entity and relation extraction and merging operations may be "actively processing", **only a limited number will concurrently execute LLM requests** the remainder will be queued and awaiting their turn.
105
 
106
+ ### 6. Performance Optimization Recommendations
 
 
 
 
 
 
107
 
108
+ * **Increase LLM Concurrent Setting based on the capabilities of your LLM server or API provider**
109
 
110
+ During the file processing phase, the performance and concurrency capabilities of the LLM are critical bottlenecks. When deploying LLMs locally, the service's concurrency capacity must adequately account for the context length requirements of LightRAG. LightRAG recommends that LLMs support a minimum context length of 32KB; therefore, server concurrency should be calculated based on this benchmark. For API providers, LightRAG will retry requests up to three times if the client's request is rejected due to concurrent request limits. Backend logs can be used to determine if LLM retries are occurring, thereby indicating whether `MAX_ASYNC` has exceeded the API provider's limits.
111
 
112
+ * **Align Parallel Document Insertion Settings with LLM Concurrency Configurations**
 
 
 
 
113
 
114
+ The recommended number of parallel document processing tasks is 1/4 of the LLM's concurrency, with a minimum of 2 and a maximum of 10. Setting a higher number of parallel document processing tasks typically does not accelerate overall document processing speed, as even a small number of concurrently processed documents can fully utilize the LLM's parallel processing capabilities. Excessive parallel document processing can significantly increase the processing time for each individual document. Since LightRAG commits processing results on a file-by-file basis, a large number of concurrent files would necessitate caching a substantial amount of data. In the event of a system error, all documents in the middle stage would require reprocessing, thereby increasing error handling costs. For instance, setting `MAX_PARALLEL_INSERT` to 3 is appropriate when `MAX_ASYNC` is configured to 12.
 
 
 
docs/assets/lightrag_indexing.png DELETED

Git LFS Details

  • SHA256: d6aa96d364e7172712a83b03ae9b3c73eb55c4d34c0f269f42503d8c30718b23
  • Pointer size: 131 Bytes
  • Size of remote file: 187 kB
docs/zh/LightRAG_concurrent_explain_zh.md DELETED
@@ -1,277 +0,0 @@
1
- # LightRAG 多文档并发控制机制详解
2
-
3
- LightRAG 在处理多个文档时采用了多层次的并发控制策略。本文将深入分析文档级别、chunk级别和LLM请求级别的并发控制机制,帮助您理解为什么会出现特定的并发行为。
4
-
5
- ## 概述
6
-
7
- LightRAG 的并发控制分为三个层次:
8
-
9
- 1. 文档级别并发:控制同时处理的文档数量
10
- 2. Chunk级别并发:控制单个文档内同时处理的chunk数量
11
- 3. LLM请求级别并发:控制全局LLM请求的并发数量
12
-
13
- ## 1. 文档级别并发控制
14
-
15
- **控制参数**:`max_parallel_insert`
16
-
17
- 文档级别的并发由 `max_parallel_insert` 参数控制,默认值为2。
18
-
19
- ```python
20
- # lightrag/lightrag.py
21
- max_parallel_insert: int = field(default=int(os.getenv("MAX_PARALLEL_INSERT", 2)))
22
- ```
23
-
24
- ### 实现机制
25
-
26
- 在 `apipeline_process_enqueue_documents` 方法中,使用信号量控制文档并发:
27
-
28
- ```python
29
- # lightrag/lightrag.py - apipeline_process_enqueue_documents方法
30
- async def process_document(
31
- doc_id: str,
32
- status_doc: DocProcessingStatus,
33
- split_by_character: str | None,
34
- split_by_character_only: bool,
35
- pipeline_status: dict,
36
- pipeline_status_lock: asyncio.Lock,
37
- semaphore: asyncio.Semaphore, # 文档级别信号量
38
- ) -> None:
39
- """Process single document"""
40
- async with semaphore: # 🔥 文档级别并发控制
41
- # ... 处理单个文档的所有chunks
42
-
43
- # 创建文档级别信号量
44
- semaphore = asyncio.Semaphore(self.max_parallel_insert) # 默认2
45
-
46
- # 为每个文档创建处理任务
47
- doc_tasks = []
48
- for doc_id, status_doc in to_process_docs.items():
49
- doc_tasks.append(
50
- process_document(
51
- doc_id, status_doc, split_by_character, split_by_character_only,
52
- pipeline_status, pipeline_status_lock, semaphore
53
- )
54
- )
55
-
56
- # 等待所有文档处理完成
57
- await asyncio.gather(*doc_tasks)
58
- ```
59
-
60
- ## 2. Chunk级别并发控制
61
-
62
- **控制参数**:`llm_model_max_async`
63
-
64
- **关键点**:每个文档都会独立创建自己的chunk信号量!
65
-
66
- ```python
67
- # lightrag/lightrag.py
68
- llm_model_max_async: int = field(default=int(os.getenv("MAX_ASYNC", 4)))
69
- ```
70
-
71
- ### 实现机制
72
-
73
- 在 `extract_entities` 函数中,**每个文档独立创建**自己的chunk信号量:
74
-
75
- ```python
76
- # lightrag/operate.py - extract_entities函数
77
- async def extract_entities(chunks: dict[str, TextChunkSchema], global_config: dict[str, str], ...):
78
- # 🔥 关键:每个文档都会独立创建这个信号量!
79
- llm_model_max_async = global_config.get("llm_model_max_async", 4)
80
- semaphore = asyncio.Semaphore(llm_model_max_async) # 每个文档的chunk信号量
81
-
82
- async def _process_with_semaphore(chunk):
83
- async with semaphore: # 🔥 文档内部的chunk并发控制
84
- return await _process_single_content(chunk)
85
-
86
- # 为每个chunk创建任务
87
- tasks = []
88
- for c in ordered_chunks:
89
- task = asyncio.create_task(_process_with_semaphore(c))
90
- tasks.append(task)
91
-
92
- # 等待所有chunk处理完成
93
- done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
94
- chunk_results = [task.result() for task in tasks]
95
- return chunk_results
96
- ```
97
-
98
- ### 重要推论:系统整体Chunk并发数
99
-
100
- 由于每个文档独立创建chunk信号量,系统理论上的chunk并发数是:
101
-
102
- **理论Chunk并发数 = max_parallel_insert × llm_model_max_async**
103
-
104
- 例如:
105
- - `max_parallel_insert = 2`(同时处理2个文档)
106
- - `llm_model_max_async = 4`(每个文档最多4个chunk并发)
107
- - 理论结果:最多 2 × 4 = 8个chunk同时处于"处理中"状态
108
-
109
- ## 3. LLM请求级别并发控制(真正的瓶颈)
110
-
111
- **控制参数**:`llm_model_max_async`(全局共享)
112
-
113
- **关键**:尽管可能有8个chunk在"处理中",但所有LLM请求共享同一个全局优先级队列!
114
-
115
- ```python
116
- # lightrag/lightrag.py - __post_init__方法
117
- self.llm_model_func = priority_limit_async_func_call(self.llm_model_max_async)(
118
- partial(
119
- self.llm_model_func,
120
- hashing_kv=hashing_kv,
121
- **self.llm_model_kwargs,
122
- )
123
- )
124
- # 🔥 全局LLM队列大小 = llm_model_max_async = 4
125
- ```
126
-
127
- ### 优先级队列实现
128
-
129
- ```python
130
- # lightrag/utils.py - priority_limit_async_func_call函数
131
- def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
132
- def final_decro(func):
133
- queue = asyncio.PriorityQueue(maxsize=max_queue_size)
134
- tasks = set()
135
-
136
- async def worker():
137
- """Worker that processes tasks in the priority queue"""
138
- while not shutdown_event.is_set():
139
- try:
140
- priority, count, future, args, kwargs = await asyncio.wait_for(queue.get(), timeout=1.0)
141
- result = await func(*args, **kwargs) # 🔥 实际LLM调用
142
- if not future.done():
143
- future.set_result(result)
144
- except Exception as e:
145
- # 错误处理...
146
- finally:
147
- queue.task_done()
148
-
149
- # 🔥 创建固定数量的worker(max_size个),这是真正的并发限制
150
- for _ in range(max_size):
151
- task = asyncio.create_task(worker())
152
- tasks.add(task)
153
- ```
154
-
155
- ## 4. Chunk内部处理机制(串行)
156
-
157
- ### 为什么是串行?
158
-
159
- 每个chunk内部的处理严格按照以下顺序串行执行:
160
-
161
- ```python
162
- # lightrag/operate.py - _process_single_content函数
163
- async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]):
164
- # 步骤1:初始实体提取
165
- hint_prompt = entity_extract_prompt.format(**{**context_base, "input_text": content})
166
- final_result = await use_llm_func_with_cache(hint_prompt, use_llm_func, ...)
167
-
168
- # 处理初始提取结果
169
- maybe_nodes, maybe_edges = await _process_extraction_result(final_result, chunk_key, file_path)
170
-
171
- # 步骤2:Gleaning(深挖)阶段
172
- for now_glean_index in range(entity_extract_max_gleaning):
173
- # 🔥 串行等待gleaning结果
174
- glean_result = await use_llm_func_with_cache(
175
- continue_prompt, use_llm_func,
176
- llm_response_cache=llm_response_cache,
177
- history_messages=history, cache_type="extract"
178
- )
179
-
180
- # 处理gleaning结果
181
- glean_nodes, glean_edges = await _process_extraction_result(glean_result, chunk_key, file_path)
182
-
183
- # 合并结果...
184
-
185
- # 步骤3:判断是否继续循环
186
- if now_glean_index == entity_extract_max_gleaning - 1:
187
- break
188
-
189
- # 🔥 串行等待循环判断结果
190
- if_loop_result = await use_llm_func_with_cache(
191
- if_loop_prompt, use_llm_func,
192
- llm_response_cache=llm_response_cache,
193
- history_messages=history, cache_type="extract"
194
- )
195
-
196
- if if_loop_result.strip().strip('"').strip("'").lower() != "yes":
197
- break
198
-
199
- return maybe_nodes, maybe_edges
200
- ```
201
-
202
- ## 5. 完整的并发层次图
203
- ![lightrag_indexing.png](..%2Fassets%2Flightrag_indexing.png)
204
-
205
-
206
- ## 6. 实际运行场景分析
207
-
208
- ### 场景1:单文档多Chunk
209
- 假设有1个文档,包含6个chunks:
210
-
211
- - 文档级别:只有1个文档,不受 `max_parallel_insert` 限制
212
- - Chunk级别:最多4个chunks同时处理(受 `llm_model_max_async=4` 限制)
213
- - LLM级别:全局最多4个LLM请求并发
214
-
215
- **预期行为**:4个chunks并发处理,剩余2个chunks等待。
216
-
217
- ### 场景2:多文档多Chunk
218
- 假设有3个文档,每个文档包含10个chunks:
219
-
220
- - 文档级别:最多2个文档同时处理
221
- - Chunk级别:每个文档最多4个chunks同时处理
222
- - 理论Chunk并发:2 × 4 = 8个chunks同时处理
223
- - 实际LLM并发:只有4个LLM请求真正执行
224
-
225
- **实际状态分布**:
226
- ```
227
- # 可能的系统状态:
228
- 文档1: 4个chunks"处理中"(其中2个在执行LLM,2个在等待LLM响应)
229
- 文档2: 4个chunks"处理中"(其中2个在执行LLM,2个在等待LLM响应)
230
- 文档3: 等待文档级别信号量
231
-
232
- 总计:
233
- - 8个chunks处于"处理中"状态
234
- - 4个LLM请求真正执行
235
- - 4个chunks等待LLM响应
236
- ```
237
-
238
- ## 7. 性能优化建议
239
-
240
- ### 理解瓶颈
241
-
242
- **真正的瓶颈是全局LLM队列,而不是chunk信号量!**
243
-
244
- ### 调整策略
245
-
246
- **策略1:提高LLM并发能力**
247
-
248
- ```bash
249
- # 环境变量配置
250
- export MAX_PARALLEL_INSERT=2 # 保持文档并发
251
- export MAX_ASYNC=8 # 🔥 增加LLM请求并发数
252
- ```
253
-
254
- **策略2:平衡文档和LLM并发**
255
-
256
- ```python
257
- rag = LightRAG(
258
- max_parallel_insert=3, # 适度增加文档并发
259
- llm_model_max_async=12, # 大幅增加LLM并发
260
- entity_extract_max_gleaning=0, # 减少chunk内串行步骤
261
- )
262
- ```
263
-
264
- ## 8. 总结
265
-
266
- LightRAG的多文档并发处理机制的关键特点:
267
-
268
- ### 并发层次
269
- 1. **文档间争抢**:受 `max_parallel_insert` 控制,默认2个文档并发
270
- 2. **理论Chunk并发**:每个文档独立创建信号量,总数 = `max_parallel_insert × llm_model_max_async`
271
- 3. **实际LLM并发**:所有chunk共享全局LLM队列,受 `llm_model_max_async` 控制
272
- 4. **单Chunk内串行**:每个chunk内的多个LLM请求严格串行执行
273
-
274
- ### 关键洞察
275
- - **理论vs实际**:系统可能有很多chunk在"处理中",但只有少数在真正执行LLM请求
276
- - **真正瓶颈**:全局LLM请求队列是性能瓶颈,而不是chunk信号量
277
- - **优化重点**:提高 `llm_model_max_async` 比增加 `max_parallel_insert` 更有效