PrismML Deploy commited on
Commit
e28b544
·
1 Parent(s): 0a882d1

Fixes: push every 20min, trim to last 10 entries, pull on startup for restart recovery

Browse files
Files changed (1) hide show
  1. metrics_pusher.py +35 -2
metrics_pusher.py CHANGED
@@ -17,7 +17,8 @@ GPU_INTERVAL_SECS = int(os.environ.get("GPU_INTERVAL", "10"))
17
  SNAPSHOT_SECS = int(os.environ.get("METRICS_INTERVAL", "3"))
18
  NGINX_LOG = Path("/tmp/nginx-access.log")
19
  ANALYTICS_FILE = Path("/tmp/analytics.json")
20
- PUSH_SECS = int(os.environ.get("METRICS_PUSH_INTERVAL", "600"))
 
21
  METRICS_REPO = os.environ.get("METRICS_REPO", "")
22
  HF_TOKEN = os.environ.get("HF_TOKEN", "")
23
 
@@ -191,7 +192,13 @@ def compute_analytics():
191
  def hf_push(local_path):
192
  if not METRICS_REPO or not HF_TOKEN: return
193
  dest = f"metrics/{local_path.name}"
194
- content = base64.b64encode(local_path.read_bytes()).decode()
 
 
 
 
 
 
195
  payload = json.dumps({"summary": f"update {local_path.name}",
196
  "files": [{"path": dest, "encoding": "base64", "content": content}]}).encode()
197
  req = urllib.request.Request(
@@ -217,6 +224,31 @@ def gpu_loop():
217
  print(f"[gpu] error: {e}")
218
  time.sleep(GPU_INTERVAL_SECS)
219
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
220
  def wait_for_backends():
221
  """Block until at least one llama-server is healthy (retries every 5s)."""
222
  print("[metrics] waiting for llama-server to be ready...")
@@ -235,6 +267,7 @@ def wait_for_backends():
235
  def metrics_loop():
236
  """Slow loop: scrape llama metrics, append JSONL, push to HF every SNAPSHOT_SECS seconds."""
237
  print(f"[metrics] snapshot={SNAPSHOT_SECS}s push={PUSH_SECS}s repo={METRICS_REPO or '(local only)'}")
 
238
  wait_for_backends()
239
  last_push, backends, first_push_done = 0.0, [], False
240
  while True:
 
17
  SNAPSHOT_SECS = int(os.environ.get("METRICS_INTERVAL", "3"))
18
  NGINX_LOG = Path("/tmp/nginx-access.log")
19
  ANALYTICS_FILE = Path("/tmp/analytics.json")
20
+ PUSH_SECS = int(os.environ.get("METRICS_PUSH_INTERVAL", "1200"))
21
+ MAX_JSONL_LINES = 10
22
  METRICS_REPO = os.environ.get("METRICS_REPO", "")
23
  HF_TOKEN = os.environ.get("HF_TOKEN", "")
24
 
 
192
  def hf_push(local_path):
193
  if not METRICS_REPO or not HF_TOKEN: return
194
  dest = f"metrics/{local_path.name}"
195
+ # Trim to last MAX_JSONL_LINES entries before pushing
196
+ try:
197
+ lines = local_path.read_text().strip().splitlines()
198
+ trimmed = "\n".join(lines[-MAX_JSONL_LINES:]) + "\n"
199
+ except Exception:
200
+ trimmed = local_path.read_bytes().decode()
201
+ content = base64.b64encode(trimmed.encode()).decode()
202
  payload = json.dumps({"summary": f"update {local_path.name}",
203
  "files": [{"path": dest, "encoding": "base64", "content": content}]}).encode()
204
  req = urllib.request.Request(
 
224
  print(f"[gpu] error: {e}")
225
  time.sleep(GPU_INTERVAL_SECS)
226
 
227
+ def hf_pull():
228
+ """Pull the latest metrics from HF Dataset on startup to survive restarts."""
229
+ if not METRICS_REPO or not HF_TOKEN:
230
+ return
231
+ print(f"[metrics] pulling saved data from {METRICS_REPO}...")
232
+ try:
233
+ # List files in the metrics/ folder
234
+ req = urllib.request.Request(
235
+ f"https://huggingface.co/api/datasets/{METRICS_REPO}/tree/main/metrics",
236
+ headers={"Authorization": f"Bearer {HF_TOKEN}"})
237
+ with urllib.request.urlopen(req, timeout=15) as r:
238
+ files = json.loads(r.read().decode())
239
+ for f in files:
240
+ name = f.get("path", "").split("/")[-1]
241
+ if not name.endswith(".jsonl"):
242
+ continue
243
+ local = LOG_DIR / name
244
+ url = f"https://huggingface.co/datasets/{METRICS_REPO}/resolve/main/metrics/{name}"
245
+ req = urllib.request.Request(url, headers={"Authorization": f"Bearer {HF_TOKEN}"})
246
+ with urllib.request.urlopen(req, timeout=15) as r:
247
+ local.write_bytes(r.read())
248
+ print(f"[metrics] restored {name} ({local.stat().st_size} bytes)")
249
+ except Exception as e:
250
+ print(f"[metrics] pull failed (starting fresh): {e}")
251
+
252
  def wait_for_backends():
253
  """Block until at least one llama-server is healthy (retries every 5s)."""
254
  print("[metrics] waiting for llama-server to be ready...")
 
267
  def metrics_loop():
268
  """Slow loop: scrape llama metrics, append JSONL, push to HF every SNAPSHOT_SECS seconds."""
269
  print(f"[metrics] snapshot={SNAPSHOT_SECS}s push={PUSH_SECS}s repo={METRICS_REPO or '(local only)'}")
270
+ hf_pull()
271
  wait_for_backends()
272
  last_push, backends, first_push_done = 0.0, [], False
273
  while True: