Spaces:
Runtime error
Runtime error
PrismML Deploy
Fixes: push every 20min, trim to last 10 entries, pull on startup for restart recovery
e28b544 | #!/usr/bin/env python3 | |
| """ | |
| Scrapes /metrics + nvidia-smi every METRICS_INTERVAL seconds. | |
| Appends one JSON line per snapshot to /tmp/metrics-YYYY-MM-DD.jsonl. | |
| Pushes daily file to HF Dataset repo every METRICS_PUSH_INTERVAL seconds. | |
| Also writes /tmp/gpu-stats.json for the live dashboard GPU panel. | |
| """ | |
| import base64, json, os, subprocess, threading, time, urllib.request, urllib.error | |
| from datetime import datetime, timezone, timedelta | |
| from pathlib import Path | |
| BASE_PORT = 7861 | |
| LOG_DIR = Path("/tmp") | |
| GPU_STATS_FILE = Path("/tmp/gpu-stats.json") | |
| LLAMA_METRICS_FILE = Path("/tmp/llama-metrics.txt") | |
| GPU_INTERVAL_SECS = int(os.environ.get("GPU_INTERVAL", "10")) | |
| SNAPSHOT_SECS = int(os.environ.get("METRICS_INTERVAL", "3")) | |
| NGINX_LOG = Path("/tmp/nginx-access.log") | |
| ANALYTICS_FILE = Path("/tmp/analytics.json") | |
| PUSH_SECS = int(os.environ.get("METRICS_PUSH_INTERVAL", "1200")) | |
| MAX_JSONL_LINES = 10 | |
| METRICS_REPO = os.environ.get("METRICS_REPO", "") | |
| HF_TOKEN = os.environ.get("HF_TOKEN", "") | |
| def now_utc(): return datetime.now(timezone.utc) | |
| def detect_backends(): | |
| backends = [] | |
| for i in range(16): | |
| url = f"http://127.0.0.1:{BASE_PORT + i}" | |
| try: | |
| urllib.request.urlopen(f"{url}/health", timeout=1).close() | |
| backends.append(url) | |
| except Exception: | |
| break | |
| return backends or [f"http://127.0.0.1:{BASE_PORT}"] | |
| def parse_prometheus(text): | |
| values = {} | |
| for line in text.splitlines(): | |
| if line.startswith("#") or not line.strip(): continue | |
| parts = line.split() | |
| if len(parts) < 2: continue | |
| name = parts[0].split("{")[0].replace("llamacpp:", "") | |
| try: values[name] = float(parts[1]) | |
| except ValueError: pass | |
| return values | |
| _prev_tokens = {"predicted": 0.0, "prompt": 0.0, "ts": 0.0} | |
| def scrape(backends): | |
| GAUGES_AVG = {"prompt_tokens_seconds", "predicted_tokens_seconds", "kv_cache_usage_ratio"} | |
| GAUGES_SUM = {"requests_processing", "requests_deferred"} | |
| combined = {} | |
| raw_texts = [] | |
| for url in backends: | |
| try: | |
| with urllib.request.urlopen(f"{url}/metrics", timeout=5) as r: | |
| text = r.read().decode() | |
| raw_texts.append(text) | |
| for k, v in parse_prometheus(text).items(): | |
| combined.setdefault(k, []).append(v) | |
| except Exception as e: | |
| print(f"[metrics] scrape error {url}: {e}") | |
| if not combined: return None | |
| result = {k: sum(v)/len(v) if k in GAUGES_AVG else sum(v) for k, v in combined.items()} | |
| result["backend_count"] = len(backends) | |
| # Compute delta-based current rates (go to 0 when idle) | |
| now = time.time() | |
| elapsed = now - _prev_tokens["ts"] if _prev_tokens["ts"] else SNAPSHOT_SECS | |
| if elapsed > 0 and _prev_tokens["ts"]: | |
| cur_pred = result.get("tokens_predicted_total", 0) | |
| cur_prmpt = result.get("prompt_tokens_total", 0) | |
| result["gen_rate_current"] = max(0, cur_pred - _prev_tokens["predicted"]) / elapsed | |
| result["prompt_rate_current"] = max(0, cur_prmpt - _prev_tokens["prompt"]) / elapsed | |
| else: | |
| result["gen_rate_current"] = 0.0 | |
| result["prompt_rate_current"] = 0.0 | |
| _prev_tokens["predicted"] = result.get("tokens_predicted_total", 0) | |
| _prev_tokens["prompt"] = result.get("prompt_tokens_total", 0) | |
| _prev_tokens["ts"] = now | |
| # Write aggregated metrics as Prometheus text for the dashboard | |
| active = result.get("requests_processing", 0) | |
| per_slot = result["gen_rate_current"] / max(active, 1) if active else 0.0 | |
| lines = [] | |
| for k, v in result.items(): | |
| if k in ("backend_count", "gen_rate_current", "prompt_rate_current"): | |
| continue | |
| lines.append(f"llamacpp:{k} {v}") | |
| lines.append(f"bonsai:gen_rate_current {result['gen_rate_current']:.3f}") | |
| lines.append(f"bonsai:prompt_rate_current {result['prompt_rate_current']:.3f}") | |
| lines.append(f"bonsai:gen_rate_per_slot {per_slot:.3f}") | |
| lines.append(f"bonsai:backend_count {result['backend_count']}") | |
| LLAMA_METRICS_FILE.write_text("\n".join(lines) + "\n") | |
| return result | |
| def scrape_gpus(): | |
| try: | |
| out = subprocess.check_output([ | |
| "nvidia-smi", | |
| "--query-gpu=index,name,utilization.gpu,utilization.memory," | |
| "memory.used,memory.total,temperature.gpu,power.draw,power.limit,clocks.sm", | |
| "--format=csv,noheader,nounits", | |
| ], timeout=5, stderr=subprocess.DEVNULL).decode() | |
| gpus = [] | |
| for line in out.strip().splitlines(): | |
| p = [x.strip() for x in line.split(",")] | |
| if len(p) < 10: continue | |
| try: | |
| gpus.append({"index": int(p[0]), "name": p[1], | |
| "util_gpu": float(p[2]), "util_mem": float(p[3]), | |
| "mem_used_mib": float(p[4]), "mem_total_mib": float(p[5]), | |
| "temp_c": float(p[6]), "power_w": float(p[7]), | |
| "power_limit_w": float(p[8]), "clock_sm_mhz": float(p[9])}) | |
| except (ValueError, IndexError): pass | |
| return gpus | |
| except Exception: return [] | |
| def compute_analytics(): | |
| """Parse nginx access log and write analytics.json with request counts + avg latency.""" | |
| try: | |
| if not NGINX_LOG.exists(): | |
| return | |
| now = datetime.now(timezone.utc) | |
| cutoff_24h = now.replace(minute=0, second=0, microsecond=0) - timedelta(hours=24) | |
| cutoff_7d = now.replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=7) | |
| cutoff_5m = now - timedelta(minutes=5) | |
| by_hour = {} | |
| by_day = {} | |
| total = 0 | |
| latencies_5m = [] | |
| for line in NGINX_LOG.read_text(errors='replace').splitlines(): | |
| parts = line.split('|') | |
| if len(parts) < 6: continue | |
| ts_str, method, uri, status = parts[0], parts[1], parts[2], parts[3] | |
| req_time = parts[5] # nginx $request_time in seconds | |
| if method != 'POST' or not uri.startswith('/v1/'): continue | |
| if not status.startswith('2'): continue | |
| try: | |
| ts = datetime.fromisoformat(ts_str) | |
| if ts.tzinfo is None: | |
| ts = ts.replace(tzinfo=timezone.utc) | |
| except Exception: | |
| continue | |
| total += 1 | |
| hour_key = ts.strftime('%Y-%m-%dT%H') | |
| day_key = ts.strftime('%Y-%m-%d') | |
| by_hour[hour_key] = by_hour.get(hour_key, 0) + 1 | |
| by_day[day_key] = by_day.get(day_key, 0) + 1 | |
| if ts >= cutoff_5m: | |
| try: | |
| latencies_5m.append(float(req_time) * 1000) | |
| except ValueError: | |
| pass | |
| hours_24 = sorted(k for k in by_hour if k >= cutoff_24h.strftime('%Y-%m-%dT%H')) | |
| days_7 = sorted(k for k in by_day if k >= cutoff_7d.strftime('%Y-%m-%d')) | |
| req_24h = sum(by_hour[h] for h in hours_24) | |
| req_7d = sum(by_day[d] for d in days_7) | |
| avg_latency_ms = sum(latencies_5m) / len(latencies_5m) if latencies_5m else 0 | |
| p50_latency_ms = sorted(latencies_5m)[len(latencies_5m)//2] if latencies_5m else 0 | |
| p90_latency_ms = sorted(latencies_5m)[int(len(latencies_5m)*0.9)] if latencies_5m else 0 | |
| analytics = { | |
| "updated_at": now.isoformat(), | |
| "summary_24h": {"requests": req_24h, "unique_users": 0}, | |
| "summary_7d": {"requests": req_7d, "unique_users": 0}, | |
| "summary_total": {"requests": total, "unique_users": 0}, | |
| "requests_by_hour": [{"hour": h + ":00", "requests": by_hour[h]} for h in hours_24], | |
| "requests_by_day": [{"day": d, "requests": by_day[d]} for d in days_7], | |
| "top_users": [], | |
| "latency_5m": { | |
| "avg_ms": round(avg_latency_ms), | |
| "p50_ms": round(p50_latency_ms), | |
| "p90_ms": round(p90_latency_ms), | |
| "sample_count": len(latencies_5m), | |
| }, | |
| } | |
| ANALYTICS_FILE.write_text(json.dumps(analytics)) | |
| except Exception as e: | |
| print(f"[analytics] error: {e}") | |
| def hf_push(local_path): | |
| if not METRICS_REPO or not HF_TOKEN: return | |
| dest = f"metrics/{local_path.name}" | |
| # Trim to last MAX_JSONL_LINES entries before pushing | |
| try: | |
| lines = local_path.read_text().strip().splitlines() | |
| trimmed = "\n".join(lines[-MAX_JSONL_LINES:]) + "\n" | |
| except Exception: | |
| trimmed = local_path.read_bytes().decode() | |
| content = base64.b64encode(trimmed.encode()).decode() | |
| payload = json.dumps({"summary": f"update {local_path.name}", | |
| "files": [{"path": dest, "encoding": "base64", "content": content}]}).encode() | |
| req = urllib.request.Request( | |
| f"https://huggingface.co/api/datasets/{METRICS_REPO}/commit/main", | |
| data=payload, method="POST", | |
| headers={"Authorization": f"Bearer {HF_TOKEN}", "Content-Type": "application/json"}) | |
| try: | |
| with urllib.request.urlopen(req, timeout=30) as r: | |
| print(f"[metrics] pushed {local_path.name} → {METRICS_REPO}/{dest} ({r.status})") | |
| except Exception as e: | |
| print(f"[metrics] push failed: {e}") | |
| def gpu_loop(): | |
| """Fast loop: update gpu-stats.json every GPU_INTERVAL_SECS seconds.""" | |
| print(f"[gpu] polling every {GPU_INTERVAL_SECS}s") | |
| while True: | |
| try: | |
| gpus = scrape_gpus() | |
| if gpus: | |
| ts = now_utc().isoformat() | |
| GPU_STATS_FILE.write_text(json.dumps({"ts": ts, "gpus": gpus})) | |
| except Exception as e: | |
| print(f"[gpu] error: {e}") | |
| time.sleep(GPU_INTERVAL_SECS) | |
| def hf_pull(): | |
| """Pull the latest metrics from HF Dataset on startup to survive restarts.""" | |
| if not METRICS_REPO or not HF_TOKEN: | |
| return | |
| print(f"[metrics] pulling saved data from {METRICS_REPO}...") | |
| try: | |
| # List files in the metrics/ folder | |
| req = urllib.request.Request( | |
| f"https://huggingface.co/api/datasets/{METRICS_REPO}/tree/main/metrics", | |
| headers={"Authorization": f"Bearer {HF_TOKEN}"}) | |
| with urllib.request.urlopen(req, timeout=15) as r: | |
| files = json.loads(r.read().decode()) | |
| for f in files: | |
| name = f.get("path", "").split("/")[-1] | |
| if not name.endswith(".jsonl"): | |
| continue | |
| local = LOG_DIR / name | |
| url = f"https://huggingface.co/datasets/{METRICS_REPO}/resolve/main/metrics/{name}" | |
| req = urllib.request.Request(url, headers={"Authorization": f"Bearer {HF_TOKEN}"}) | |
| with urllib.request.urlopen(req, timeout=15) as r: | |
| local.write_bytes(r.read()) | |
| print(f"[metrics] restored {name} ({local.stat().st_size} bytes)") | |
| except Exception as e: | |
| print(f"[metrics] pull failed (starting fresh): {e}") | |
| def wait_for_backends(): | |
| """Block until at least one llama-server is healthy (retries every 5s).""" | |
| print("[metrics] waiting for llama-server to be ready...") | |
| while True: | |
| for i in range(16): | |
| url = f"http://127.0.0.1:{BASE_PORT + i}" | |
| try: | |
| with urllib.request.urlopen(f"{url}/health", timeout=2) as r: | |
| if r.status == 200: | |
| print(f"[metrics] backend ready: {url}") | |
| return | |
| except Exception: | |
| pass | |
| time.sleep(5) | |
| def metrics_loop(): | |
| """Slow loop: scrape llama metrics, append JSONL, push to HF every SNAPSHOT_SECS seconds.""" | |
| print(f"[metrics] snapshot={SNAPSHOT_SECS}s push={PUSH_SECS}s repo={METRICS_REPO or '(local only)'}") | |
| hf_pull() | |
| wait_for_backends() | |
| last_push, backends, first_push_done = 0.0, [], False | |
| while True: | |
| try: | |
| if not backends: | |
| backends = detect_backends() | |
| print(f"[metrics] backends: {backends}") | |
| ts = now_utc() | |
| data = scrape(backends) | |
| compute_analytics() | |
| if data is None: | |
| print(f"[metrics] scrape returned no data — will retry next tick") | |
| backends = [] | |
| else: | |
| try: | |
| gpus = json.loads(GPU_STATS_FILE.read_text()).get("gpus", []) if GPU_STATS_FILE.exists() else [] | |
| except Exception: | |
| gpus = [] | |
| gpu_s = {f"gpu{g['index']}_util": g["util_gpu"] for g in gpus} | |
| gpu_s.update({f"gpu{g['index']}_mem_used_mib": g["mem_used_mib"] for g in gpus}) | |
| row = {"ts": ts.isoformat(), **data, **gpu_s} | |
| day = ts.strftime("%Y-%m-%d") | |
| path = LOG_DIR / f"metrics-{day}.jsonl" | |
| with open(path, "a") as f: f.write(json.dumps(row) + "\n") | |
| gpu_str = " ".join(f"GPU{g['index']} {g['util_gpu']:.0f}% {g['mem_used_mib']/1024:.1f}GB {g['temp_c']:.0f}°C" for g in gpus) | |
| print(f"[metrics] {ts.strftime('%H:%M:%S')} gen={data.get('predicted_tokens_seconds',0):.0f} tok/s active={data.get('requests_processing',0):.0f} {gpu_str}") | |
| if not first_push_done or time.time() - last_push >= PUSH_SECS: | |
| hf_push(path) | |
| last_push = time.time() | |
| first_push_done = True | |
| except Exception as e: | |
| print(f"[metrics] loop error (will retry): {e}") | |
| backends = [] | |
| time.sleep(SNAPSHOT_SECS) | |
| def run_with_restart(name, func): | |
| """Run a function in a loop, restarting on crash with backoff.""" | |
| backoff = 1 | |
| while True: | |
| try: | |
| func() | |
| except Exception as e: | |
| print(f"[{name}] CRASHED: {e} — restarting in {backoff}s") | |
| time.sleep(backoff) | |
| backoff = min(backoff * 2, 60) | |
| else: | |
| break | |
| def main(): | |
| gpu_thread = threading.Thread(target=run_with_restart, args=("gpu", gpu_loop), daemon=True) | |
| gpu_thread.start() | |
| run_with_restart("metrics", metrics_loop) | |
| if __name__ == "__main__": | |
| main() | |