umanggarg commited on
Commit
eae79ac
Β·
1 Parent(s): 25d2210

Parallelize Voyage embeds, checkpoint ingestion, add client timeout

Browse files

- _voyage_embed now dispatches batches through an 8-worker ThreadPoolExecutor,
preserving input order by filling a pre-sized results array by batch index.
Expected ~6x speedup on large repos (langchain 13594 chunks: 15min β†’ ~3min).
- Voyage client gets timeout=60 + max_retries=0 per the Provider Client Rule;
the SDK uses plain `requests` and silently hangs otherwise (same failure
class as the Gemma 4 incident).
- Bumped Voyage batch size 32 β†’ 96 (kept _BATCH_SIZE=32 for Nomic/Gemini).
- Ingestion now embeds+upserts in groups of 500 so a mid-run crash only loses
the current group's work; retry resumes via existing find_vectors_by_hash.
- Plumbed a progress(done,total) callback from the service through embed_chunks
so the UI and HF Space logs show live per-batch progress instead of a silent
15-minute block.

backend/services/ingestion_service.py CHANGED
@@ -186,27 +186,58 @@ class IngestionService:
186
  _emit("embedding", f"Embedding {len(chunks)} chunks...")
187
 
188
  print("Embedding chunks...")
189
- new_vectors = self.embedder.embed_chunks(new_chunks) if new_chunks else []
190
- if new_vectors:
191
- print(f" Produced {len(new_vectors)} vectors ({len(new_vectors[0])}-dim each)")
192
-
193
- # Reconstruct the full vectors list in original chunk order.
194
- # Chunks with existing vectors use the stored vector; new ones use the
195
- # freshly computed one. This preserves the 1-to-1 chunks↔vectors pairing
196
- # that upsert_chunks requires.
197
- new_hash_to_vec = {c["text_hash"]: v for c, v in zip(new_chunks, new_vectors)}
198
- vectors = [
199
- existing_vectors.get(c["text_hash"]) or new_hash_to_vec[c["text_hash"]]
200
- for c in chunks
201
- ]
202
 
203
- # ── Step 7: Store, then sweep stale chunks ────────────────────────────
204
- # Upsert new chunks first β€” at this point old chunks are still visible.
205
- # Any chunk whose source code hasn't changed will be overwritten with an
206
- # identical payload (same ID, stable per repo::filepath::start_line).
207
- _emit("storing", f"Storing {len(chunks)} chunks in Qdrant...")
208
- print("Storing in Qdrant...")
209
- written_ids = self.store.upsert_chunks(chunks, vectors)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
210
 
211
  # On a force re-index, delete chunks that no longer exist in the source.
212
  # This handles deleted files and renamed functions β€” their old IDs won't
 
186
  _emit("embedding", f"Embedding {len(chunks)} chunks...")
187
 
188
  print("Embedding chunks...")
 
 
 
 
 
 
 
 
 
 
 
 
 
189
 
190
+ # ── Step 7: Embed + upsert in checkpointed groups ─────────────────────
191
+ # Stream embed→upsert in groups so a crash mid-run leaves earlier
192
+ # chunks safely in Qdrant. Retry then skips them via the existing
193
+ # find_vectors_by_hash dedup path above. Without checkpoints, a 15-min
194
+ # ingest that dies at chunk 13000/13594 loses 100% of the work.
195
+ #
196
+ # Group size 500: big enough that Qdrant upsert overhead amortises,
197
+ # small enough that a crash loses at most ~500 re-embeddings on retry.
198
+ CHECKPOINT_SIZE = 500
199
+ total_new = len(new_chunks)
200
+ new_done = 0
201
+ written_ids: list = []
202
+
203
+ # Progress callback for the embedder β€” maps batch-level progress
204
+ # within a checkpoint group to an overall "chunks embedded / total"
205
+ # count. `new_done` snapshots the running total across groups.
206
+ def _embed_progress(batch_done: int, batch_total: int) -> None:
207
+ overall_done = new_done + batch_done
208
+ _emit("embedding", f"Embedded {overall_done}/{total_new} chunks...")
209
+
210
+ for group_start in range(0, len(chunks), CHECKPOINT_SIZE):
211
+ group = chunks[group_start : group_start + CHECKPOINT_SIZE]
212
+
213
+ # Within the group, split reused vs new. Only the new ones hit
214
+ # the embedding API; reused chunks pull from `existing_vectors`.
215
+ group_new_chunks = [c for c in group if c["text_hash"] not in existing_vectors]
216
+
217
+ if group_new_chunks:
218
+ group_new_vectors = self.embedder.embed_chunks(
219
+ group_new_chunks, progress=_embed_progress,
220
+ )
221
+ new_done += len(group_new_chunks)
222
+ else:
223
+ group_new_vectors = []
224
+
225
+ # Stitch back into group order so each chunk lines up with its vector.
226
+ group_hash_to_vec = {
227
+ c["text_hash"]: v for c, v in zip(group_new_chunks, group_new_vectors)
228
+ }
229
+ group_vectors = [
230
+ existing_vectors.get(c["text_hash"]) or group_hash_to_vec[c["text_hash"]]
231
+ for c in group
232
+ ]
233
+
234
+ # Upsert this group before touching the next β€” that's the actual
235
+ # checkpoint. If the next group's embedding call dies, everything
236
+ # up to here is already in Qdrant.
237
+ _emit("storing", f"Storing checkpoint {group_start + len(group)}/{len(chunks)}...")
238
+ group_ids = self.store.upsert_chunks(group, group_vectors)
239
+ written_ids.extend(group_ids)
240
+ print(f" Checkpoint {group_start + len(group)}/{len(chunks)} stored")
241
 
242
  # On a force re-index, delete chunks that no longer exist in the source.
243
  # This handles deleted files and renamed functions β€” their old IDs won't
ingestion/embedder.py CHANGED
@@ -46,6 +46,7 @@ import time
46
  from pathlib import Path
47
  import sys
48
  import re
 
49
 
50
  import requests as http
51
 
@@ -55,8 +56,12 @@ from backend.config import settings
55
 
56
  _NOMIC_API_URL = "https://api-atlas.nomic.ai/v1/embedding/text"
57
  _GEMINI_API_BASE = "https://generativelanguage.googleapis.com/v1beta/models"
58
- _BATCH_SIZE = 32 # conservative for all providers: stays under ~10MB body
59
  # and keeps each failed batch cheap to retry
 
 
 
 
60
  _MAX_CHARS = 8000 # truncate each text before sending β€” embeddings degrade
61
  # gracefully on truncation and models silently clip anyway
62
 
@@ -118,10 +123,21 @@ class Embedder:
118
  )
119
 
120
  def _init_voyage(self):
121
- """Initialise Voyage AI client. voyage-code-3 is code-optimised 1024-dim."""
 
 
 
 
 
 
 
122
  try:
123
  import voyageai
124
- self._voyage = voyageai.Client(api_key=settings.voyage_api_key)
 
 
 
 
125
  except ImportError:
126
  raise ImportError(
127
  "voyageai package not installed. Run: pip install voyageai"
@@ -153,7 +169,11 @@ class Embedder:
153
 
154
  # ── Public interface ───────────────────────────────────────────────────────
155
 
156
- def embed_chunks(self, chunks: list[dict]) -> list[list[float]]:
 
 
 
 
157
  """
158
  Embed a list of chunk dicts for indexing (document role).
159
 
@@ -165,10 +185,14 @@ class Embedder:
165
  a token limit (~8192 tokens) and API gateways have a request body size
166
  limit (~10MB). Truncation degrades retrieval quality marginally but
167
  avoids 413 errors on large class definitions or contextually-enriched chunks.
 
 
 
 
168
  """
169
  texts = [c["text"][:_MAX_CHARS] for c in chunks]
170
  if self._provider == "voyage":
171
- return self._voyage_embed(texts, input_type="document")
172
  if self._provider == "gemini":
173
  return self._gemini_embed(texts, task_type="RETRIEVAL_DOCUMENT")
174
  return self._nomic_embed(texts, task_type="search_document")
@@ -188,23 +212,60 @@ class Embedder:
188
 
189
  # ── Voyage AI implementation ───────────────────────────────────────────────
190
 
191
- def _voyage_embed(self, texts: list[str], input_type: str) -> list[list[float]]:
 
 
 
 
 
192
  """
193
- Call Voyage AI API with batching.
194
 
195
  voyage-code-3 is specifically trained on (code, docstring) pairs
196
  and GitHub issues, giving it much better code retrieval than
197
  general-purpose text embedders.
198
 
199
- Batching: Voyage API accepts up to 128 texts per call on free tier.
200
- We use 96 to leave headroom for large chunks.
 
 
 
 
 
 
 
 
 
201
  """
202
- all_embeddings: list[list[float]] = []
203
- for i in range(0, len(texts), _BATCH_SIZE):
204
- batch = texts[i : i + _BATCH_SIZE]
205
- result = self._voyage_call_api(batch, input_type)
206
- all_embeddings.extend(result)
207
- return all_embeddings
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
208
 
209
  def _voyage_call_api(
210
  self,
 
46
  from pathlib import Path
47
  import sys
48
  import re
49
+ from concurrent.futures import ThreadPoolExecutor, as_completed
50
 
51
  import requests as http
52
 
 
56
 
57
  _NOMIC_API_URL = "https://api-atlas.nomic.ai/v1/embedding/text"
58
  _GEMINI_API_BASE = "https://generativelanguage.googleapis.com/v1beta/models"
59
+ _BATCH_SIZE = 32 # conservative default (Nomic): stays under ~10MB body
60
  # and keeps each failed batch cheap to retry
61
+ _VOYAGE_BATCH_SIZE = 96 # Voyage accepts up to 128 per call; 96 at _MAX_CHARS=8000
62
+ # stays well under their 120K-token-per-request cap.
63
+ _VOYAGE_CONCURRENCY = 8 # parallel workers for Voyage. Their paid tier is 2000 RPM
64
+ # (~33 req/s); 8 workers * ~2s/batch β‰ˆ 4 req/s β€” comfortable.
65
  _MAX_CHARS = 8000 # truncate each text before sending β€” embeddings degrade
66
  # gracefully on truncation and models silently clip anyway
67
 
 
123
  )
124
 
125
  def _init_voyage(self):
126
+ """Initialise Voyage AI client. voyage-code-3 is code-optimised 1024-dim.
127
+
128
+ timeout=60 is mandatory per the Provider Client Rule in CLAUDE.md β€” a
129
+ stuck request with no timeout will block an entire ingestion silently,
130
+ since the voyageai SDK uses plain `requests` and doesn't log each call.
131
+ max_retries=0 leaves retry logic to us (see _voyage_call_api), so we
132
+ don't double-retry on transient failures.
133
+ """
134
  try:
135
  import voyageai
136
+ self._voyage = voyageai.Client(
137
+ api_key=settings.voyage_api_key,
138
+ timeout=60,
139
+ max_retries=0,
140
+ )
141
  except ImportError:
142
  raise ImportError(
143
  "voyageai package not installed. Run: pip install voyageai"
 
169
 
170
  # ── Public interface ───────────────────────────────────────────────────────
171
 
172
+ def embed_chunks(
173
+ self,
174
+ chunks: list[dict],
175
+ progress: callable = None,
176
+ ) -> list[list[float]]:
177
  """
178
  Embed a list of chunk dicts for indexing (document role).
179
 
 
185
  a token limit (~8192 tokens) and API gateways have a request body size
186
  limit (~10MB). Truncation degrades retrieval quality marginally but
187
  avoids 413 errors on large class definitions or contextually-enriched chunks.
188
+
189
+ progress: optional callback progress(done_chunks, total_chunks) called
190
+ as each batch completes. Lets callers render a live progress bar without
191
+ knowing the provider's internal batch size.
192
  """
193
  texts = [c["text"][:_MAX_CHARS] for c in chunks]
194
  if self._provider == "voyage":
195
+ return self._voyage_embed(texts, input_type="document", progress=progress)
196
  if self._provider == "gemini":
197
  return self._gemini_embed(texts, task_type="RETRIEVAL_DOCUMENT")
198
  return self._nomic_embed(texts, task_type="search_document")
 
212
 
213
  # ── Voyage AI implementation ───────────────────────────────────────────────
214
 
215
+ def _voyage_embed(
216
+ self,
217
+ texts: list[str],
218
+ input_type: str,
219
+ progress: callable = None,
220
+ ) -> list[list[float]]:
221
  """
222
+ Call Voyage AI API with parallel batching.
223
 
224
  voyage-code-3 is specifically trained on (code, docstring) pairs
225
  and GitHub issues, giving it much better code retrieval than
226
  general-purpose text embedders.
227
 
228
+ Concurrency: Voyage's paid tier is 2000 RPM (~33 req/s). We run
229
+ _VOYAGE_CONCURRENCY=8 workers so a 13K-chunk ingest drops from ~15 min
230
+ serial to ~2 min parallel. Each worker serialises its HTTP call through
231
+ the shared voyageai.Client (which uses a thread-safe requests.Session
232
+ internally), so no per-worker client is needed.
233
+
234
+ Order preservation: batches may complete out of order, so we fill a
235
+ pre-sized results array by batch index, then flatten in original order.
236
+ Without this, a chunk's vector could land on the wrong chunk payload.
237
+
238
+ progress(done_chunks, total_chunks) fires after each completed batch.
239
  """
240
+ batches = [
241
+ texts[i : i + _VOYAGE_BATCH_SIZE]
242
+ for i in range(0, len(texts), _VOYAGE_BATCH_SIZE)
243
+ ]
244
+ if not batches:
245
+ return []
246
+
247
+ results: list[list[list[float]] | None] = [None] * len(batches)
248
+ total_chunks = len(texts)
249
+ done_chunks = 0
250
+
251
+ workers = min(_VOYAGE_CONCURRENCY, len(batches))
252
+ with ThreadPoolExecutor(max_workers=workers) as pool:
253
+ future_to_idx = {
254
+ pool.submit(self._voyage_call_api, batch, input_type): idx
255
+ for idx, batch in enumerate(batches)
256
+ }
257
+ for fut in as_completed(future_to_idx):
258
+ idx = future_to_idx[fut]
259
+ results[idx] = fut.result()
260
+ done_chunks += len(batches[idx])
261
+ print(
262
+ f" Voyage batch {sum(1 for r in results if r is not None)}"
263
+ f"/{len(batches)} done ({done_chunks}/{total_chunks} chunks)"
264
+ )
265
+ if progress:
266
+ progress(done_chunks, total_chunks)
267
+
268
+ return [vec for batch_result in results for vec in batch_result]
269
 
270
  def _voyage_call_api(
271
  self,