Henri Bonamy commited on
Commit
ff2fb51
·
1 Parent(s): af1a664

Log streaming enabled

Browse files
Files changed (1) hide show
  1. agent/tools/jobs_tool.py +81 -18
agent/tools/jobs_tool.py CHANGED
@@ -12,12 +12,9 @@ from huggingface_hub import HfApi
12
  from huggingface_hub.utils import HfHubHTTPError
13
 
14
  from agent.tools.types import ToolResult
15
- from agent.tools.utilities import (
16
- format_job_details,
17
- format_jobs_table,
18
- format_scheduled_job_details,
19
- format_scheduled_jobs_table,
20
- )
21
 
22
  # Hardware flavors
23
  CPU_FLAVORS = ["cpu-basic", "cpu-upgrade", "cpu-performance", "cpu-xl"]
@@ -60,7 +57,6 @@ OperationType = Literal[
60
  ]
61
 
62
  # Constants
63
- DEFAULT_LOG_WAIT_SECONDS = 10
64
  UV_DEFAULT_IMAGE = "ghcr.io/astral-sh/uv:python3.12-bookworm"
65
 
66
 
@@ -316,7 +312,7 @@ Call this tool with:
316
  {{
317
  "operation": "uv",
318
  "args": {{
319
- "script": "import random\\nprint(42 + random.randint(1, 5))"
320
  }}
321
  }}
322
  ```
@@ -345,7 +341,7 @@ Call this tool with:
345
 
346
  ## Tips
347
 
348
- - Jobs default to non-detached mode (tail logs for up to {DEFAULT_LOG_WAIT_SECONDS}s or until completion). Set `detach: true` to return immediately.
349
  - Prefer array commands to avoid shell parsing surprises
350
  - To access private Hub assets, include `secrets: {{ "HF_TOKEN": "$HF_TOKEN" }}` to inject your auth token.
351
  """
@@ -356,6 +352,33 @@ Call this tool with:
356
  help_text = f"Help for operation: {operation}\n\nCall with appropriate arguments. Use the main help for examples."
357
  return {"formatted": help_text, "totalResults": 1, "resultsShared": 1}
358
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
359
  async def _run_job(self, args: Dict[str, Any]) -> ToolResult:
360
  """Run a job using HfApi.run_job()"""
361
  try:
@@ -382,14 +405,28 @@ To check logs, call this tool with `{{"operation": "logs", "args": {{"job_id": "
382
  To inspect, call this tool with `{{"operation": "inspect", "args": {{"job_id": "{job.id}"}}}}`"""
383
  return {"formatted": response, "totalResults": 1, "resultsShared": 1}
384
 
385
- # Not detached - return job info
386
- response = f"""Job started: {job.id}
 
387
 
388
- **Status:** {job.status.stage}
389
- **View logs at:** {job.url}
 
 
390
 
391
- Note: Logs are being collected. Check the job page for real-time logs.
392
- """
 
 
 
 
 
 
 
 
 
 
 
393
  return {"formatted": response, "totalResults": 1, "resultsShared": 1}
394
 
395
  except Exception as e:
@@ -422,13 +459,39 @@ Note: Logs are being collected. Check the job page for real-time logs.
422
  namespace=args.get("namespace") or self.namespace,
423
  )
424
 
425
- response = f"""UV Job started: {job.id}
 
 
426
 
 
427
  **Status:** {job.status.stage}
428
  **View at:** {job.url}
429
 
430
- To check logs, call this tool with `{{"operation": "logs", "args": {{"job_id": "{job.id}"}}}}`
431
- """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
432
  return {"formatted": response, "totalResults": 1, "resultsShared": 1}
433
 
434
  except Exception as e:
 
12
  from huggingface_hub.utils import HfHubHTTPError
13
 
14
  from agent.tools.types import ToolResult
15
+ from agent.tools.utilities import (format_job_details, format_jobs_table,
16
+ format_scheduled_job_details,
17
+ format_scheduled_jobs_table)
 
 
 
18
 
19
  # Hardware flavors
20
  CPU_FLAVORS = ["cpu-basic", "cpu-upgrade", "cpu-performance", "cpu-xl"]
 
57
  ]
58
 
59
  # Constants
 
60
  UV_DEFAULT_IMAGE = "ghcr.io/astral-sh/uv:python3.12-bookworm"
61
 
62
 
 
312
  {{
313
  "operation": "uv",
314
  "args": {{
315
+ "script": "import random\\nprint(42 + random.randint(1, 5))",
316
  }}
317
  }}
318
  ```
 
341
 
342
  ## Tips
343
 
344
+ - Jobs default to non-detached mode (stream logs until completion). Set `detach: true` to return immediately.
345
  - Prefer array commands to avoid shell parsing surprises
346
  - To access private Hub assets, include `secrets: {{ "HF_TOKEN": "$HF_TOKEN" }}` to inject your auth token.
347
  """
 
352
  help_text = f"Help for operation: {operation}\n\nCall with appropriate arguments. Use the main help for examples."
353
  return {"formatted": help_text, "totalResults": 1, "resultsShared": 1}
354
 
355
+ async def _wait_for_job_completion(
356
+ self, job_id: str, namespace: Optional[str] = None
357
+ ) -> tuple[str, list[str]]:
358
+ """
359
+ Stream job logs until completion, printing them in real-time.
360
+
361
+ Returns:
362
+ tuple: (final_status, all_logs)
363
+ """
364
+ all_logs = []
365
+
366
+ # Fetch logs - generator streams logs as they arrive and ends when job completes
367
+ logs_gen = self.api.fetch_job_logs(job_id=job_id, namespace=namespace)
368
+
369
+ # Stream logs in real-time
370
+ for log_line in logs_gen:
371
+ print("\t" + log_line)
372
+ all_logs.append(log_line)
373
+
374
+ # After logs complete, fetch final job status
375
+ job_info = await _async_call(
376
+ self.api.inspect_job, job_id=job_id, namespace=namespace
377
+ )
378
+ final_status = job_info.status.stage
379
+
380
+ return final_status, all_logs
381
+
382
  async def _run_job(self, args: Dict[str, Any]) -> ToolResult:
383
  """Run a job using HfApi.run_job()"""
384
  try:
 
405
  To inspect, call this tool with `{{"operation": "inspect", "args": {{"job_id": "{job.id}"}}}}`"""
406
  return {"formatted": response, "totalResults": 1, "resultsShared": 1}
407
 
408
+ # Not detached - wait for completion and stream logs
409
+ print(f"Job started: {job.id}")
410
+ print(f"Streaming logs...\n---\n")
411
 
412
+ final_status, all_logs = await self._wait_for_job_completion(
413
+ job_id=job.id,
414
+ namespace=args.get("namespace") or self.namespace,
415
+ )
416
 
417
+ # Format all logs for the agent
418
+ log_text = "\n".join(all_logs) if all_logs else "(no logs)"
419
+
420
+ response = f"""Job completed!
421
+
422
+ **Job ID:** {job.id}
423
+ **Final Status:** {final_status}
424
+ **View at:** {job.url}
425
+
426
+ **Logs:**
427
+ ```
428
+ {log_text}
429
+ ```"""
430
  return {"formatted": response, "totalResults": 1, "resultsShared": 1}
431
 
432
  except Exception as e:
 
459
  namespace=args.get("namespace") or self.namespace,
460
  )
461
 
462
+ # If detached, return immediately
463
+ if args.get("detach", False):
464
+ response = f"""UV Job started successfully!
465
 
466
+ **Job ID:** {job.id}
467
  **Status:** {job.status.stage}
468
  **View at:** {job.url}
469
 
470
+ To check logs, call this tool with `{{"operation": "logs", "args": {{"job_id": "{job.id}"}}}}`"""
471
+ return {"formatted": response, "totalResults": 1, "resultsShared": 1}
472
+
473
+ # Not detached - wait for completion and stream logs
474
+ print(f"UV Job started: {job.id}")
475
+ print(f"Streaming logs...\n---\n")
476
+
477
+ final_status, all_logs = await self._wait_for_job_completion(
478
+ job_id=job.id,
479
+ namespace=args.get("namespace") or self.namespace,
480
+ )
481
+
482
+ # Format all logs for the agent
483
+ log_text = "\n".join(all_logs) if all_logs else "(no logs)"
484
+
485
+ response = f"""UV Job completed!
486
+
487
+ **Job ID:** {job.id}
488
+ **Final Status:** {final_status}
489
+ **View at:** {job.url}
490
+
491
+ **Logs:**
492
+ ```
493
+ {log_text}
494
+ ```"""
495
  return {"formatted": response, "totalResults": 1, "resultsShared": 1}
496
 
497
  except Exception as e: