PrismML Deploy commited on
Commit
0a882d1
Β·
1 Parent(s): 0633a27

Fix: metrics push (summary not commit_message), crash resilience + watchdog

Browse files
Files changed (2) hide show
  1. entrypoint.sh +10 -2
  2. metrics_pusher.py +54 -31
entrypoint.sh CHANGED
@@ -99,8 +99,16 @@ echo '{"ts":null,"gpus":[]}' > /tmp/gpu-stats.json
99
  echo '# waiting for first metrics scrape...' > /tmp/llama-metrics.txt
100
  echo '{"updated_at":null,"summary_24h":{"requests":0,"unique_users":0},"summary_7d":{"requests":0,"unique_users":0},"summary_total":{"requests":0,"unique_users":0},"requests_by_hour":[],"requests_by_day":[],"top_users":[]}' > /tmp/analytics.json
101
 
102
- # ── Start metrics pusher ──────────────────────────────────────────────────────
103
- python3 /app/metrics_pusher.py &
 
 
 
 
 
 
 
 
104
 
105
  echo ""
106
  echo "=== Bonsai-demo ==="
 
99
  echo '# waiting for first metrics scrape...' > /tmp/llama-metrics.txt
100
  echo '{"updated_at":null,"summary_24h":{"requests":0,"unique_users":0},"summary_7d":{"requests":0,"unique_users":0},"summary_total":{"requests":0,"unique_users":0},"requests_by_hour":[],"requests_by_day":[],"top_users":[]}' > /tmp/analytics.json
101
 
102
+ # ── Start metrics pusher with watchdog ────────────────────────────────────────
103
+ start_metrics_pusher() {
104
+ while true; do
105
+ echo "[watchdog] Starting metrics_pusher.py..."
106
+ python3 /app/metrics_pusher.py || true
107
+ echo "[watchdog] metrics_pusher.py exited β€” restarting in 5s..."
108
+ sleep 5
109
+ done
110
+ }
111
+ start_metrics_pusher &
112
 
113
  echo ""
114
  echo "=== Bonsai-demo ==="
metrics_pusher.py CHANGED
@@ -192,7 +192,7 @@ def hf_push(local_path):
192
  if not METRICS_REPO or not HF_TOKEN: return
193
  dest = f"metrics/{local_path.name}"
194
  content = base64.b64encode(local_path.read_bytes()).decode()
195
- payload = json.dumps({"commit_message": f"update {local_path.name}",
196
  "files": [{"path": dest, "encoding": "base64", "content": content}]}).encode()
197
  req = urllib.request.Request(
198
  f"https://huggingface.co/api/datasets/{METRICS_REPO}/commit/main",
@@ -208,10 +208,13 @@ def gpu_loop():
208
  """Fast loop: update gpu-stats.json every GPU_INTERVAL_SECS seconds."""
209
  print(f"[gpu] polling every {GPU_INTERVAL_SECS}s")
210
  while True:
211
- gpus = scrape_gpus()
212
- if gpus:
213
- ts = now_utc().isoformat()
214
- GPU_STATS_FILE.write_text(json.dumps({"ts": ts, "gpus": gpus}))
 
 
 
215
  time.sleep(GPU_INTERVAL_SECS)
216
 
217
  def wait_for_backends():
@@ -235,35 +238,55 @@ def metrics_loop():
235
  wait_for_backends()
236
  last_push, backends, first_push_done = 0.0, [], False
237
  while True:
238
- if not backends:
239
- backends = detect_backends()
240
- print(f"[metrics] backends: {backends}")
241
- ts = now_utc()
242
- data = scrape(backends)
243
- compute_analytics()
244
- if data is None:
245
- print(f"[metrics] scrape returned no data β€” will retry next tick")
246
- backends = [] # force re-detect next iteration
247
- else:
248
- gpus = json.loads(GPU_STATS_FILE.read_text()).get("gpus", []) if GPU_STATS_FILE.exists() else []
249
- gpu_s = {f"gpu{g['index']}_util": g["util_gpu"] for g in gpus}
250
- gpu_s.update({f"gpu{g['index']}_mem_used_mib": g["mem_used_mib"] for g in gpus})
251
- row = {"ts": ts.isoformat(), **data, **gpu_s}
252
- day = ts.strftime("%Y-%m-%d")
253
- path = LOG_DIR / f"metrics-{day}.jsonl"
254
- with open(path, "a") as f: f.write(json.dumps(row) + "\n")
255
- gpu_str = " ".join(f"GPU{g['index']} {g['util_gpu']:.0f}% {g['mem_used_mib']/1024:.1f}GB {g['temp_c']:.0f}Β°C" for g in gpus)
256
- print(f"[metrics] {ts.strftime('%H:%M:%S')} gen={data.get('predicted_tokens_seconds',0):.0f} tok/s active={data.get('requests_processing',0):.0f} {gpu_str}")
257
- if not first_push_done or time.time() - last_push >= PUSH_SECS:
258
- hf_push(path)
259
- last_push = time.time()
260
- first_push_done = True
 
 
 
 
 
 
 
261
  time.sleep(SNAPSHOT_SECS)
262
 
 
 
 
 
 
 
 
 
 
 
 
 
 
263
  def main():
264
- t = threading.Thread(target=gpu_loop, daemon=True)
265
- t.start()
266
- metrics_loop()
267
 
268
  if __name__ == "__main__":
269
  main()
 
192
  if not METRICS_REPO or not HF_TOKEN: return
193
  dest = f"metrics/{local_path.name}"
194
  content = base64.b64encode(local_path.read_bytes()).decode()
195
+ payload = json.dumps({"summary": f"update {local_path.name}",
196
  "files": [{"path": dest, "encoding": "base64", "content": content}]}).encode()
197
  req = urllib.request.Request(
198
  f"https://huggingface.co/api/datasets/{METRICS_REPO}/commit/main",
 
208
  """Fast loop: update gpu-stats.json every GPU_INTERVAL_SECS seconds."""
209
  print(f"[gpu] polling every {GPU_INTERVAL_SECS}s")
210
  while True:
211
+ try:
212
+ gpus = scrape_gpus()
213
+ if gpus:
214
+ ts = now_utc().isoformat()
215
+ GPU_STATS_FILE.write_text(json.dumps({"ts": ts, "gpus": gpus}))
216
+ except Exception as e:
217
+ print(f"[gpu] error: {e}")
218
  time.sleep(GPU_INTERVAL_SECS)
219
 
220
  def wait_for_backends():
 
238
  wait_for_backends()
239
  last_push, backends, first_push_done = 0.0, [], False
240
  while True:
241
+ try:
242
+ if not backends:
243
+ backends = detect_backends()
244
+ print(f"[metrics] backends: {backends}")
245
+ ts = now_utc()
246
+ data = scrape(backends)
247
+ compute_analytics()
248
+ if data is None:
249
+ print(f"[metrics] scrape returned no data β€” will retry next tick")
250
+ backends = []
251
+ else:
252
+ try:
253
+ gpus = json.loads(GPU_STATS_FILE.read_text()).get("gpus", []) if GPU_STATS_FILE.exists() else []
254
+ except Exception:
255
+ gpus = []
256
+ gpu_s = {f"gpu{g['index']}_util": g["util_gpu"] for g in gpus}
257
+ gpu_s.update({f"gpu{g['index']}_mem_used_mib": g["mem_used_mib"] for g in gpus})
258
+ row = {"ts": ts.isoformat(), **data, **gpu_s}
259
+ day = ts.strftime("%Y-%m-%d")
260
+ path = LOG_DIR / f"metrics-{day}.jsonl"
261
+ with open(path, "a") as f: f.write(json.dumps(row) + "\n")
262
+ gpu_str = " ".join(f"GPU{g['index']} {g['util_gpu']:.0f}% {g['mem_used_mib']/1024:.1f}GB {g['temp_c']:.0f}Β°C" for g in gpus)
263
+ print(f"[metrics] {ts.strftime('%H:%M:%S')} gen={data.get('predicted_tokens_seconds',0):.0f} tok/s active={data.get('requests_processing',0):.0f} {gpu_str}")
264
+ if not first_push_done or time.time() - last_push >= PUSH_SECS:
265
+ hf_push(path)
266
+ last_push = time.time()
267
+ first_push_done = True
268
+ except Exception as e:
269
+ print(f"[metrics] loop error (will retry): {e}")
270
+ backends = []
271
  time.sleep(SNAPSHOT_SECS)
272
 
273
+ def run_with_restart(name, func):
274
+ """Run a function in a loop, restarting on crash with backoff."""
275
+ backoff = 1
276
+ while True:
277
+ try:
278
+ func()
279
+ except Exception as e:
280
+ print(f"[{name}] CRASHED: {e} β€” restarting in {backoff}s")
281
+ time.sleep(backoff)
282
+ backoff = min(backoff * 2, 60)
283
+ else:
284
+ break
285
+
286
  def main():
287
+ gpu_thread = threading.Thread(target=run_with_restart, args=("gpu", gpu_loop), daemon=True)
288
+ gpu_thread.start()
289
+ run_with_restart("metrics", metrics_loop)
290
 
291
  if __name__ == "__main__":
292
  main()