Aksel Joonas Reedi commited on
Commit
f257894
·
2 Parent(s): 8031995648dc32

job-streaming-retry

Browse files

Add retry logic for job log streaming

Files changed (1) hide show
  1. agent/tools/jobs_tool.py +49 -9
agent/tools/jobs_tool.py CHANGED
@@ -6,6 +6,7 @@ Refactored to use official huggingface-hub library instead of custom HTTP client
6
 
7
  import asyncio
8
  import base64
 
9
  import os
10
  import re
11
  from typing import Any, Dict, Literal, Optional
@@ -346,21 +347,60 @@ class HfJobsTool:
346
  ) -> tuple[str, list[str]]:
347
  """
348
  Stream job logs until completion, printing them in real-time.
 
349
 
350
  Returns:
351
  tuple: (final_status, all_logs)
352
  """
353
  all_logs = []
 
 
 
354
 
355
- # Fetch logs - generator streams logs as they arrive and ends when job completes
356
- logs_gen = self.api.fetch_job_logs(job_id=job_id, namespace=namespace)
357
-
358
- # Stream logs in real-time
359
- for log_line in logs_gen:
360
- print("\t" + log_line)
361
- all_logs.append(log_line)
362
-
363
- # After logs complete, fetch final job status
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
364
  job_info = await _async_call(
365
  self.api.inspect_job, job_id=job_id, namespace=namespace
366
  )
 
6
 
7
  import asyncio
8
  import base64
9
+ import http.client
10
  import os
11
  import re
12
  from typing import Any, Dict, Literal, Optional
 
347
  ) -> tuple[str, list[str]]:
348
  """
349
  Stream job logs until completion, printing them in real-time.
350
+ Implements retry logic to handle connection drops during long-running jobs.
351
 
352
  Returns:
353
  tuple: (final_status, all_logs)
354
  """
355
  all_logs = []
356
+ terminal_states = {"COMPLETED", "FAILED", "CANCELED", "ERROR"}
357
+ max_retries = 100 # Allow many retries for 8h+ jobs
358
+ retry_delay = 5 # Seconds between retries
359
 
360
+ for _ in range(max_retries):
361
+ try:
362
+ # Fetch logs - generator streams logs as they arrive
363
+ logs_gen = self.api.fetch_job_logs(job_id=job_id, namespace=namespace)
364
+
365
+ # Stream logs in real-time
366
+ for log_line in logs_gen:
367
+ print("\t" + log_line)
368
+ all_logs.append(log_line)
369
+
370
+ # If we get here, streaming completed normally
371
+ break
372
+
373
+ except (
374
+ ConnectionError,
375
+ TimeoutError,
376
+ http.client.IncompleteRead,
377
+ ) as e:
378
+ # Connection dropped - check if job is still running
379
+ try:
380
+ job_info = await _async_call(
381
+ self.api.inspect_job, job_id=job_id, namespace=namespace
382
+ )
383
+ current_status = job_info.status.stage
384
+
385
+ if current_status in terminal_states:
386
+ # Job finished, no need to retry
387
+ print(f"\tJob reached terminal state: {current_status}")
388
+ break
389
+
390
+ # Job still running, retry connection
391
+ print(
392
+ f"\tConnection interrupted ({str(e)[:50]}...), reconnecting in {retry_delay}s..."
393
+ )
394
+ await asyncio.sleep(retry_delay)
395
+ continue
396
+
397
+ except (ConnectionError, TimeoutError, OSError):
398
+ # Can't even check job status, wait and retry
399
+ print(f"\tConnection error, retrying in {retry_delay}s...")
400
+ await asyncio.sleep(retry_delay)
401
+ continue
402
+
403
+ # Fetch final job status
404
  job_info = await _async_call(
405
  self.api.inspect_job, job_id=job_id, namespace=namespace
406
  )