nothex commited on
Commit
ffa9f99
·
1 Parent(s): ca5537d

fix: stabilize celery reconnect handling

Browse files
.env.example CHANGED
@@ -45,6 +45,18 @@ HF_HUB_DISABLE_SYMLINKS_WARNING=1
45
  HF_TOKEN=****
46
 
47
  REDIS_URL=redis://localhost:6379/0
 
 
 
 
 
 
 
 
 
 
 
 
48
 
49
  MASTER_ADMIN_KEY=****
50
 
 
45
  HF_TOKEN=****
46
 
47
  REDIS_URL=redis://localhost:6379/0
48
+ # Set false if you start Celery manually in a second terminal.
49
+ AUTO_START_CELERY=true
50
+ CELERY_TASK_ACKS_LATE=false
51
+ CELERY_TASK_REJECT_ON_WORKER_LOST=false
52
+ CELERY_WORKER_CANCEL_ON_CONNECTION_LOSS=false
53
+ CELERY_WORKER_PREFETCH_MULTIPLIER=1
54
+ CELERY_VISIBILITY_TIMEOUT_S=7200
55
+ CELERY_BROKER_HEARTBEAT_S=30
56
+ CELERY_BROKER_POOL_LIMIT=1
57
+ CELERY_REDIS_SOCKET_TIMEOUT_S=30
58
+ CELERY_REDIS_HEALTH_CHECK_INTERVAL_S=30
59
+ CELERY_RESULT_EXPIRES_S=86400
60
 
61
  MASTER_ADMIN_KEY=****
62
 
ARCHITECTURE.md CHANGED
@@ -637,6 +637,9 @@ Login
637
  | `GEMINI_API_KEY` | Google Gemini access (ingestion + vision) | Yes |
638
  | `COHERE_API_KEY` | Cohere reranking | Yes |
639
  | `REDIS_URL` | Redis connection string | Yes |
 
 
 
640
  | `MASTER_ADMIN_KEY` | Admin endpoint access | Yes |
641
  | `ALLOWED_ORIGINS` | CORS allowed origins (use `*` for dev only) | Yes |
642
  | `DOCS_ENABLED` | Enable /docs and /redoc (set `false` in prod) | No |
 
637
  | `GEMINI_API_KEY` | Google Gemini access (ingestion + vision) | Yes |
638
  | `COHERE_API_KEY` | Cohere reranking | Yes |
639
  | `REDIS_URL` | Redis connection string | Yes |
640
+ | `CELERY_WORKER_CANCEL_ON_CONNECTION_LOSS` | Whether broker disconnect cancels active ingestion tasks; keep `false` for long uploads | No |
641
+ | `CELERY_TASK_ACKS_LATE` | Whether ingestion tasks acknowledge only after completion; keep `false` to avoid redelivery loops on flaky Redis | No |
642
+ | `CELERY_TASK_REJECT_ON_WORKER_LOST` | Whether lost workers requeue tasks; keep `false` unless duplicate-safe retries are required | No |
643
  | `MASTER_ADMIN_KEY` | Admin endpoint access | Yes |
644
  | `ALLOWED_ORIGINS` | CORS allowed origins (use `*` for dev only) | Yes |
645
  | `DOCS_ENABLED` | Enable /docs and /redoc (set `false` in prod) | No |
backend/api/ingest.py CHANGED
@@ -8,6 +8,7 @@ from backend.core.auth_utils import is_guest_token, require_auth_token
8
  from backend.core.rate_limit import limiter
9
  from backend.core.tasks import process_document_task
10
  from backend.core.tasks import celery_app
 
11
 
12
  log = logging.getLogger("morpheus.api.ingest")
13
  router = APIRouter()
@@ -64,6 +65,25 @@ def _ensure_ingest_worker_available() -> None:
64
  )
65
 
66
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67
  def _register_ingest_task(task_id: str, *, user_id: str, tmp_path: str, filename: str) -> None:
68
  with _TASK_LOCK:
69
  _ACTIVE_INGEST_TASKS[task_id] = {
@@ -172,7 +192,14 @@ async def upload(
172
  )
173
  with open(tmp_path, "wb") as f:
174
  f.write(contents)
175
- task = process_document_task.delay(tmp_path, file.filename, x_auth_token)
 
 
 
 
 
 
 
176
  _register_ingest_task(
177
  task.id,
178
  user_id=user_id,
@@ -202,7 +229,7 @@ def get_ingest_status(task_id: str):
202
  _forget_ingest_task(task_id)
203
  return {"status": "CANCELLED", "message": "Ingestion cancelled."}
204
 
205
- task_result = celery_app.AsyncResult(task_id)
206
 
207
  if task_result.state == "PENDING":
208
  return {"status": "PENDING", "message": "Waiting in queue..."}
@@ -240,7 +267,7 @@ def cancel_ingest(
240
  ):
241
  _ensure_ingest_worker_available()
242
  active = _get_user_ingest_task(task_id, user_id)
243
- task_result = celery_app.AsyncResult(task_id)
244
  if task_result.state in {"SUCCESS", "FAILURE", "REVOKED"}:
245
  _forget_ingest_task(task_id)
246
  return {"ok": True, "status": task_result.state}
@@ -251,6 +278,13 @@ def cancel_ingest(
251
  else:
252
  celery_app.control.revoke(task_id, terminate=True, signal="SIGTERM")
253
  _CANCELLED_INGEST_TASKS.add(task_id)
 
 
 
 
 
 
 
254
  except Exception as exc:
255
  log.error("Could not cancel ingestion task %s: %s", task_id, exc)
256
  raise HTTPException(status_code=500, detail="Could not cancel ingestion task.") from exc
 
8
  from backend.core.rate_limit import limiter
9
  from backend.core.tasks import process_document_task
10
  from backend.core.tasks import celery_app
11
+ from backend.core.tasks import TRANSIENT_CELERY_CONNECTION_ERRORS
12
 
13
  log = logging.getLogger("morpheus.api.ingest")
14
  router = APIRouter()
 
65
  )
66
 
67
 
68
+ def _raise_reconnecting(exc: BaseException) -> None:
69
+ raise HTTPException(
70
+ status_code=503,
71
+ detail="Background ingestion queue is reconnecting. Please retry shortly.",
72
+ ) from exc
73
+
74
+
75
+ def _get_task_result(task_id: str):
76
+ try:
77
+ return celery_app.AsyncResult(task_id)
78
+ except TRANSIENT_CELERY_CONNECTION_ERRORS as exc:
79
+ log.warning(
80
+ "Celery result backend reconnecting while reading task %s: %s",
81
+ task_id,
82
+ exc,
83
+ )
84
+ _raise_reconnecting(exc)
85
+
86
+
87
  def _register_ingest_task(task_id: str, *, user_id: str, tmp_path: str, filename: str) -> None:
88
  with _TASK_LOCK:
89
  _ACTIVE_INGEST_TASKS[task_id] = {
 
192
  )
193
  with open(tmp_path, "wb") as f:
194
  f.write(contents)
195
+ try:
196
+ task = process_document_task.delay(tmp_path, file.filename, x_auth_token)
197
+ except TRANSIENT_CELERY_CONNECTION_ERRORS as exc:
198
+ log.warning(
199
+ "Failed to queue ingestion while Celery/Redis is reconnecting: %s",
200
+ exc,
201
+ )
202
+ _raise_reconnecting(exc)
203
  _register_ingest_task(
204
  task.id,
205
  user_id=user_id,
 
229
  _forget_ingest_task(task_id)
230
  return {"status": "CANCELLED", "message": "Ingestion cancelled."}
231
 
232
+ task_result = _get_task_result(task_id)
233
 
234
  if task_result.state == "PENDING":
235
  return {"status": "PENDING", "message": "Waiting in queue..."}
 
267
  ):
268
  _ensure_ingest_worker_available()
269
  active = _get_user_ingest_task(task_id, user_id)
270
+ task_result = _get_task_result(task_id)
271
  if task_result.state in {"SUCCESS", "FAILURE", "REVOKED"}:
272
  _forget_ingest_task(task_id)
273
  return {"ok": True, "status": task_result.state}
 
278
  else:
279
  celery_app.control.revoke(task_id, terminate=True, signal="SIGTERM")
280
  _CANCELLED_INGEST_TASKS.add(task_id)
281
+ except TRANSIENT_CELERY_CONNECTION_ERRORS as exc:
282
+ log.warning(
283
+ "Could not cancel ingestion task %s while Celery/Redis reconnects: %s",
284
+ task_id,
285
+ exc,
286
+ )
287
+ _raise_reconnecting(exc)
288
  except Exception as exc:
289
  log.error("Could not cancel ingestion task %s: %s", task_id, exc)
290
  raise HTTPException(status_code=500, detail="Could not cancel ingestion task.") from exc
backend/core/config.py CHANGED
@@ -197,6 +197,21 @@ UPLOAD_RETRY_MAX_SLEEP_S = float(os.getenv("UPLOAD_RETRY_MAX_SLEEP_S", "20"))
197
  CELERY_VISIBILITY_TIMEOUT_S = int(os.getenv("CELERY_VISIBILITY_TIMEOUT_S", "7200"))
198
  CELERY_BROKER_HEARTBEAT_S = int(os.getenv("CELERY_BROKER_HEARTBEAT_S", "30"))
199
  CELERY_BROKER_POOL_LIMIT = int(os.getenv("CELERY_BROKER_POOL_LIMIT", "1"))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
200
  CELERY_REDIS_SOCKET_TIMEOUT_S = float(
201
  os.getenv("CELERY_REDIS_SOCKET_TIMEOUT_S", "30")
202
  )
 
197
  CELERY_VISIBILITY_TIMEOUT_S = int(os.getenv("CELERY_VISIBILITY_TIMEOUT_S", "7200"))
198
  CELERY_BROKER_HEARTBEAT_S = int(os.getenv("CELERY_BROKER_HEARTBEAT_S", "30"))
199
  CELERY_BROKER_POOL_LIMIT = int(os.getenv("CELERY_BROKER_POOL_LIMIT", "1"))
200
+ CELERY_TASK_ACKS_LATE = os.getenv("CELERY_TASK_ACKS_LATE", "false").strip().lower() in {
201
+ "1",
202
+ "true",
203
+ "yes",
204
+ }
205
+ CELERY_TASK_REJECT_ON_WORKER_LOST = os.getenv(
206
+ "CELERY_TASK_REJECT_ON_WORKER_LOST", "false"
207
+ ).strip().lower() in {"1", "true", "yes"}
208
+ CELERY_WORKER_CANCEL_ON_CONNECTION_LOSS = os.getenv(
209
+ "CELERY_WORKER_CANCEL_ON_CONNECTION_LOSS", "false"
210
+ ).strip().lower() in {"1", "true", "yes"}
211
+ CELERY_WORKER_PREFETCH_MULTIPLIER = int(
212
+ os.getenv("CELERY_WORKER_PREFETCH_MULTIPLIER", "1")
213
+ )
214
+ CELERY_RESULT_EXPIRES_S = int(os.getenv("CELERY_RESULT_EXPIRES_S", "86400"))
215
  CELERY_REDIS_SOCKET_TIMEOUT_S = float(
216
  os.getenv("CELERY_REDIS_SOCKET_TIMEOUT_S", "30")
217
  )
backend/core/tasks.py CHANGED
@@ -14,17 +14,47 @@ try:
14
  except Exception:
15
  Celery = None
16
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
17
  if Celery is not None:
18
  celery_app = Celery("morpheus_worker", broker=REDIS_URL, backend=REDIS_URL)
19
  celery_app.conf.update(
20
  task_track_started=True,
21
- task_acks_late=True,
22
- task_reject_on_worker_lost=True,
23
- worker_cancel_long_running_tasks_on_connection_loss=True,
 
 
 
24
  broker_connection_retry_on_startup=True,
 
25
  broker_connection_max_retries=None,
26
  broker_heartbeat=config.CELERY_BROKER_HEARTBEAT_S,
27
  broker_pool_limit=config.CELERY_BROKER_POOL_LIMIT,
 
28
  broker_transport_options={
29
  "visibility_timeout": config.CELERY_VISIBILITY_TIMEOUT_S,
30
  "socket_keepalive": True,
@@ -69,10 +99,16 @@ def _process_document_task_impl(
69
  """
70
  def update_progress(step: int, total_steps: int, msg: str):
71
  # This updates the Redis cache so FastAPI can read the current status
72
- self.update_state(
73
- state="PROCESSING",
74
- meta={"step": step, "total": total_steps, "message": msg}
75
- )
 
 
 
 
 
 
76
 
77
  try:
78
  return run_ingestion(
 
14
  except Exception:
15
  Celery = None
16
 
17
+ try:
18
+ from kombu.exceptions import OperationalError as KombuOperationalError
19
+ except Exception:
20
+ KombuOperationalError = None
21
+
22
+ try:
23
+ from redis.exceptions import ConnectionError as RedisConnectionError
24
+ from redis.exceptions import TimeoutError as RedisTimeoutError
25
+ except Exception:
26
+ RedisConnectionError = None
27
+ RedisTimeoutError = None
28
+
29
+
30
+ TRANSIENT_CELERY_CONNECTION_ERRORS = tuple(
31
+ err
32
+ for err in (
33
+ KombuOperationalError,
34
+ RedisConnectionError,
35
+ RedisTimeoutError,
36
+ ConnectionError,
37
+ TimeoutError,
38
+ )
39
+ if err is not None
40
+ )
41
+
42
  if Celery is not None:
43
  celery_app = Celery("morpheus_worker", broker=REDIS_URL, backend=REDIS_URL)
44
  celery_app.conf.update(
45
  task_track_started=True,
46
+ task_acks_late=config.CELERY_TASK_ACKS_LATE,
47
+ task_reject_on_worker_lost=config.CELERY_TASK_REJECT_ON_WORKER_LOST,
48
+ worker_cancel_long_running_tasks_on_connection_loss=(
49
+ config.CELERY_WORKER_CANCEL_ON_CONNECTION_LOSS
50
+ ),
51
+ worker_prefetch_multiplier=config.CELERY_WORKER_PREFETCH_MULTIPLIER,
52
  broker_connection_retry_on_startup=True,
53
+ broker_connection_retry=True,
54
  broker_connection_max_retries=None,
55
  broker_heartbeat=config.CELERY_BROKER_HEARTBEAT_S,
56
  broker_pool_limit=config.CELERY_BROKER_POOL_LIMIT,
57
+ result_expires=config.CELERY_RESULT_EXPIRES_S,
58
  broker_transport_options={
59
  "visibility_timeout": config.CELERY_VISIBILITY_TIMEOUT_S,
60
  "socket_keepalive": True,
 
99
  """
100
  def update_progress(step: int, total_steps: int, msg: str):
101
  # This updates the Redis cache so FastAPI can read the current status
102
+ try:
103
+ self.update_state(
104
+ state="PROCESSING",
105
+ meta={"step": step, "total": total_steps, "message": msg},
106
+ )
107
+ except TRANSIENT_CELERY_CONNECTION_ERRORS as exc:
108
+ log.warning(
109
+ "Skipped ingestion progress update because Celery/Redis is reconnecting: %s",
110
+ exc,
111
+ )
112
 
113
  try:
114
  return run_ingestion(
backend/main.py CHANGED
@@ -52,6 +52,8 @@ async def lifespan(app: FastAPI):
52
  "backend.core.tasks",
53
  "worker",
54
  "--pool=solo",
 
 
55
  "--loglevel=info",
56
  ]
57
  )
 
52
  "backend.core.tasks",
53
  "worker",
54
  "--pool=solo",
55
+ "--without-gossip",
56
+ "--without-mingle",
57
  "--loglevel=info",
58
  ]
59
  )
tests/test_ingest_api.py CHANGED
@@ -138,6 +138,21 @@ def test_get_ingest_status_requires_available_worker(monkeypatch):
138
  assert exc_info.value.status_code == 503
139
 
140
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
141
  def test_cancel_ingest_rejects_unknown_task(monkeypatch):
142
  monkeypatch.setattr(ingest_api, "celery_app", SimpleNamespace())
143
  monkeypatch.setattr(ingest_api, "process_document_task", SimpleNamespace(delay=lambda *_args: None))
@@ -181,6 +196,36 @@ def test_cancel_ingest_revokes_owned_task_and_cleans_temp_file(monkeypatch):
181
  assert not os.path.exists(tmp_path)
182
 
183
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
184
  def test_process_pdf_task_impl_preserves_original_exception_and_cleans_temp_file(monkeypatch):
185
  fd, tmp_path = tempfile.mkstemp(suffix="_guide.pdf")
186
  os.close(fd)
@@ -199,6 +244,29 @@ def test_process_pdf_task_impl_preserves_original_exception_and_cleans_temp_file
199
  assert not os.path.exists(tmp_path)
200
 
201
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
202
  def test_upload_accepts_markdown_and_queues_document_task(monkeypatch):
203
  _install_fake_magic(monkeypatch, mime_type="text/plain")
204
  monkeypatch.setattr(
 
138
  assert exc_info.value.status_code == 503
139
 
140
 
141
+ def test_get_ingest_status_returns_503_when_result_backend_reconnects(monkeypatch):
142
+ class ReconnectingApp:
143
+ def AsyncResult(self, _task_id):
144
+ raise ConnectionError("redis reconnecting")
145
+
146
+ monkeypatch.setattr(ingest_api, "celery_app", ReconnectingApp())
147
+ monkeypatch.setattr(ingest_api, "process_document_task", SimpleNamespace(delay=lambda *_args: None))
148
+
149
+ with pytest.raises(HTTPException) as exc_info:
150
+ ingest_api.get_ingest_status("task-1")
151
+
152
+ assert exc_info.value.status_code == 503
153
+ assert "reconnecting" in exc_info.value.detail.lower()
154
+
155
+
156
  def test_cancel_ingest_rejects_unknown_task(monkeypatch):
157
  monkeypatch.setattr(ingest_api, "celery_app", SimpleNamespace())
158
  monkeypatch.setattr(ingest_api, "process_document_task", SimpleNamespace(delay=lambda *_args: None))
 
196
  assert not os.path.exists(tmp_path)
197
 
198
 
199
+ def test_upload_returns_503_when_broker_reconnects_during_queue(monkeypatch):
200
+ _install_fake_magic(monkeypatch)
201
+ monkeypatch.setattr(
202
+ pipeline,
203
+ "_build_supabase_client",
204
+ lambda **_kwargs: FakeCountSupabase(count=0),
205
+ )
206
+ monkeypatch.setattr(ingest_api, "celery_app", SimpleNamespace())
207
+ monkeypatch.setattr(
208
+ ingest_api,
209
+ "process_document_task",
210
+ SimpleNamespace(delay=lambda *_args: (_ for _ in ()).throw(ConnectionError("redis reconnecting"))),
211
+ )
212
+
213
+ file = FakeUploadFile("guide.pdf", b"%PDF-1.4\nsmall")
214
+
215
+ with pytest.raises(HTTPException) as exc_info:
216
+ asyncio.run(
217
+ ingest_api.upload(
218
+ request=_fake_request(),
219
+ file=file,
220
+ user_id="user-1",
221
+ x_auth_token="token",
222
+ )
223
+ )
224
+
225
+ assert exc_info.value.status_code == 503
226
+ assert "reconnecting" in exc_info.value.detail.lower()
227
+
228
+
229
  def test_process_pdf_task_impl_preserves_original_exception_and_cleans_temp_file(monkeypatch):
230
  fd, tmp_path = tempfile.mkstemp(suffix="_guide.pdf")
231
  os.close(fd)
 
244
  assert not os.path.exists(tmp_path)
245
 
246
 
247
+ def test_process_pdf_task_impl_ignores_transient_progress_backend_error(monkeypatch):
248
+ fd, tmp_path = tempfile.mkstemp(suffix="_guide.pdf")
249
+ os.close(fd)
250
+ progress_callback = {}
251
+
252
+ def _run_ingestion(**kwargs):
253
+ progress_callback["callback"] = kwargs["progress_callback"]
254
+ kwargs["progress_callback"](1, 2, "Parsing")
255
+ return {"ok": True}
256
+
257
+ monkeypatch.setattr(tasks, "run_ingestion", _run_ingestion)
258
+
259
+ class ReconnectingTask:
260
+ def update_state(self, **_kwargs):
261
+ raise ConnectionError("redis reconnecting")
262
+
263
+ result = tasks._process_pdf_task_impl(ReconnectingTask(), tmp_path, "guide.pdf", "token")
264
+
265
+ assert result == {"ok": True}
266
+ assert "callback" in progress_callback
267
+ assert not os.path.exists(tmp_path)
268
+
269
+
270
  def test_upload_accepts_markdown_and_queues_document_task(monkeypatch):
271
  _install_fake_magic(monkeypatch, mime_type="text/plain")
272
  monkeypatch.setattr(
tests/test_url_ingestion.py CHANGED
@@ -9,10 +9,81 @@ from backend.core import pipeline
9
  from backend.core import url_ingestion
10
 
11
 
 
 
 
 
 
 
 
 
 
12
  class _FakeTask:
13
  id = "task-123"
14
 
15
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16
  class _FakeResponse:
17
  def __init__(self, *, url: str, content_type: str, body: bytes, headers=None, status_code: int = 200):
18
  self.url = url
@@ -88,6 +159,7 @@ def test_fetch_url_to_tempfile_extracts_readable_html(monkeypatch):
88
 
89
  def test_fetch_url_to_tempfile_uses_crawl4ai_extractor_when_enabled(monkeypatch):
90
  monkeypatch.setattr(url_ingestion.config, "URL_INGEST_EXTRACTOR", "crawl4ai")
 
91
  monkeypatch.setattr(
92
  url_ingestion,
93
  "_run_crawl4ai_single_page",
@@ -162,9 +234,7 @@ def test_fetch_url_to_tempfile_bounded_crawl_requires_allowlist(monkeypatch):
162
 
163
 
164
  def test_run_ingestion_with_url_override_persists_url_metadata(monkeypatch):
165
- from tests.test_pipeline_regressions import FakeIngestionSupabase
166
-
167
- fake_supabase = FakeIngestionSupabase()
168
  captured = {}
169
  root = Path("tests") / "_tmp_graph_hybrid" / "url_ingestion"
170
  root.mkdir(parents=True, exist_ok=True)
@@ -240,7 +310,11 @@ def test_run_url_ingest_requires_admin_and_queues_task(monkeypatch):
240
  pages=[{"final_url": url}],
241
  ),
242
  )
243
- monkeypatch.setattr(admin_api.process_document_task, "delay", lambda *args: _FakeTask())
 
 
 
 
244
 
245
  result = admin_api.run_url_ingest(
246
  admin_api.UrlIngestPayload(
 
9
  from backend.core import url_ingestion
10
 
11
 
12
+ @pytest.fixture(autouse=True)
13
+ def _isolate_url_ingestion_config(monkeypatch):
14
+ monkeypatch.setattr(url_ingestion.config, "URL_INGEST_EXTRACTOR", "basic")
15
+ monkeypatch.setattr(url_ingestion.config, "URL_INGEST_CRAWL4AI_ENABLED", False)
16
+ monkeypatch.setattr(url_ingestion.config, "URL_INGEST_CRAWL4AI_FALLBACK", False)
17
+ monkeypatch.setattr(url_ingestion.config, "URL_INGEST_DEFAULT_MODE", "single_page")
18
+ monkeypatch.setattr(url_ingestion.config, "URL_INGEST_ALLOWED_HOSTS", [])
19
+
20
+
21
  class _FakeTask:
22
  id = "task-123"
23
 
24
 
25
+ class _FakeIngestionTable:
26
+ def __init__(self, supabase, name: str):
27
+ self.supabase = supabase
28
+ self.name = name
29
+ self.action = None
30
+ self.filters = {}
31
+ self.payload = None
32
+
33
+ def select(self, *_args):
34
+ self.action = "select"
35
+ return self
36
+
37
+ def delete(self):
38
+ self.action = "delete"
39
+ return self
40
+
41
+ def upsert(self, payload, on_conflict=None):
42
+ self.action = "upsert"
43
+ self.payload = payload
44
+ self.on_conflict = on_conflict
45
+ return self
46
+
47
+ def insert(self, payload):
48
+ self.action = "insert"
49
+ self.payload = payload
50
+ return self
51
+
52
+ def eq(self, key, value):
53
+ self.filters[key] = value
54
+ return self
55
+
56
+ def contains(self, key, value):
57
+ self.filters[key] = value
58
+ return self
59
+
60
+ def limit(self, value):
61
+ self.filters["limit"] = value
62
+ return self
63
+
64
+ def execute(self):
65
+ self.supabase.ops.append((self.name, self.action, dict(self.filters)))
66
+ if self.action == "insert":
67
+ self.supabase.inserts.append((self.name, self.payload))
68
+ if self.action == "upsert":
69
+ self.supabase.upserts.append(
70
+ (self.name, self.payload, getattr(self, "on_conflict", None))
71
+ )
72
+ if self.name == "ingested_files" and self.action == "select":
73
+ return SimpleNamespace(data=[])
74
+ return SimpleNamespace(data=[])
75
+
76
+
77
+ class _FakeIngestionSupabase:
78
+ def __init__(self):
79
+ self.ops = []
80
+ self.inserts = []
81
+ self.upserts = []
82
+
83
+ def table(self, name: str):
84
+ return _FakeIngestionTable(self, name)
85
+
86
+
87
  class _FakeResponse:
88
  def __init__(self, *, url: str, content_type: str, body: bytes, headers=None, status_code: int = 200):
89
  self.url = url
 
159
 
160
  def test_fetch_url_to_tempfile_uses_crawl4ai_extractor_when_enabled(monkeypatch):
161
  monkeypatch.setattr(url_ingestion.config, "URL_INGEST_EXTRACTOR", "crawl4ai")
162
+ monkeypatch.setattr(url_ingestion.config, "URL_INGEST_CRAWL4AI_ENABLED", True)
163
  monkeypatch.setattr(
164
  url_ingestion,
165
  "_run_crawl4ai_single_page",
 
234
 
235
 
236
  def test_run_ingestion_with_url_override_persists_url_metadata(monkeypatch):
237
+ fake_supabase = _FakeIngestionSupabase()
 
 
238
  captured = {}
239
  root = Path("tests") / "_tmp_graph_hybrid" / "url_ingestion"
240
  root.mkdir(parents=True, exist_ok=True)
 
310
  pages=[{"final_url": url}],
311
  ),
312
  )
313
+ monkeypatch.setattr(
314
+ admin_api,
315
+ "process_document_task",
316
+ SimpleNamespace(delay=lambda *args: _FakeTask()),
317
+ )
318
 
319
  result = admin_api.run_url_ingest(
320
  admin_api.UrlIngestPayload(