#!/usr/bin/env python3 """ Scrapes /metrics + nvidia-smi every METRICS_INTERVAL seconds. Appends one JSON line per snapshot to /tmp/metrics-YYYY-MM-DD.jsonl. Pushes daily file to HF Dataset repo every METRICS_PUSH_INTERVAL seconds. Also writes /tmp/gpu-stats.json for the live dashboard GPU panel. """ import base64, json, os, subprocess, threading, time, urllib.request, urllib.error from datetime import datetime, timezone, timedelta from pathlib import Path BASE_PORT = 7861 LOG_DIR = Path("/tmp") GPU_STATS_FILE = Path("/tmp/gpu-stats.json") LLAMA_METRICS_FILE = Path("/tmp/llama-metrics.txt") GPU_INTERVAL_SECS = int(os.environ.get("GPU_INTERVAL", "10")) SNAPSHOT_SECS = int(os.environ.get("METRICS_INTERVAL", "3")) NGINX_LOG = Path("/tmp/nginx-access.log") ANALYTICS_FILE = Path("/tmp/analytics.json") PUSH_SECS = int(os.environ.get("METRICS_PUSH_INTERVAL", "1200")) MAX_JSONL_LINES = 10 METRICS_REPO = os.environ.get("METRICS_REPO", "") HF_TOKEN = os.environ.get("HF_TOKEN", "") def now_utc(): return datetime.now(timezone.utc) def detect_backends(): backends = [] for i in range(16): url = f"http://127.0.0.1:{BASE_PORT + i}" try: urllib.request.urlopen(f"{url}/health", timeout=1).close() backends.append(url) except Exception: break return backends or [f"http://127.0.0.1:{BASE_PORT}"] def parse_prometheus(text): values = {} for line in text.splitlines(): if line.startswith("#") or not line.strip(): continue parts = line.split() if len(parts) < 2: continue name = parts[0].split("{")[0].replace("llamacpp:", "") try: values[name] = float(parts[1]) except ValueError: pass return values _prev_tokens = {"predicted": 0.0, "prompt": 0.0, "ts": 0.0} def scrape(backends): GAUGES_AVG = {"prompt_tokens_seconds", "predicted_tokens_seconds", "kv_cache_usage_ratio"} GAUGES_SUM = {"requests_processing", "requests_deferred"} combined = {} raw_texts = [] for url in backends: try: with urllib.request.urlopen(f"{url}/metrics", timeout=5) as r: text = r.read().decode() raw_texts.append(text) for k, v in parse_prometheus(text).items(): combined.setdefault(k, []).append(v) except Exception as e: print(f"[metrics] scrape error {url}: {e}") if not combined: return None result = {k: sum(v)/len(v) if k in GAUGES_AVG else sum(v) for k, v in combined.items()} result["backend_count"] = len(backends) # Compute delta-based current rates (go to 0 when idle) now = time.time() elapsed = now - _prev_tokens["ts"] if _prev_tokens["ts"] else SNAPSHOT_SECS if elapsed > 0 and _prev_tokens["ts"]: cur_pred = result.get("tokens_predicted_total", 0) cur_prmpt = result.get("prompt_tokens_total", 0) result["gen_rate_current"] = max(0, cur_pred - _prev_tokens["predicted"]) / elapsed result["prompt_rate_current"] = max(0, cur_prmpt - _prev_tokens["prompt"]) / elapsed else: result["gen_rate_current"] = 0.0 result["prompt_rate_current"] = 0.0 _prev_tokens["predicted"] = result.get("tokens_predicted_total", 0) _prev_tokens["prompt"] = result.get("prompt_tokens_total", 0) _prev_tokens["ts"] = now # Write aggregated metrics as Prometheus text for the dashboard active = result.get("requests_processing", 0) per_slot = result["gen_rate_current"] / max(active, 1) if active else 0.0 lines = [] for k, v in result.items(): if k in ("backend_count", "gen_rate_current", "prompt_rate_current"): continue lines.append(f"llamacpp:{k} {v}") lines.append(f"bonsai:gen_rate_current {result['gen_rate_current']:.3f}") lines.append(f"bonsai:prompt_rate_current {result['prompt_rate_current']:.3f}") lines.append(f"bonsai:gen_rate_per_slot {per_slot:.3f}") lines.append(f"bonsai:backend_count {result['backend_count']}") LLAMA_METRICS_FILE.write_text("\n".join(lines) + "\n") return result def scrape_gpus(): try: out = subprocess.check_output([ "nvidia-smi", "--query-gpu=index,name,utilization.gpu,utilization.memory," "memory.used,memory.total,temperature.gpu,power.draw,power.limit,clocks.sm", "--format=csv,noheader,nounits", ], timeout=5, stderr=subprocess.DEVNULL).decode() gpus = [] for line in out.strip().splitlines(): p = [x.strip() for x in line.split(",")] if len(p) < 10: continue try: gpus.append({"index": int(p[0]), "name": p[1], "util_gpu": float(p[2]), "util_mem": float(p[3]), "mem_used_mib": float(p[4]), "mem_total_mib": float(p[5]), "temp_c": float(p[6]), "power_w": float(p[7]), "power_limit_w": float(p[8]), "clock_sm_mhz": float(p[9])}) except (ValueError, IndexError): pass return gpus except Exception: return [] def compute_analytics(): """Parse nginx access log and write analytics.json with request counts + avg latency.""" try: if not NGINX_LOG.exists(): return now = datetime.now(timezone.utc) cutoff_24h = now.replace(minute=0, second=0, microsecond=0) - timedelta(hours=24) cutoff_7d = now.replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=7) cutoff_5m = now - timedelta(minutes=5) by_hour = {} by_day = {} total = 0 latencies_5m = [] for line in NGINX_LOG.read_text(errors='replace').splitlines(): parts = line.split('|') if len(parts) < 6: continue ts_str, method, uri, status = parts[0], parts[1], parts[2], parts[3] req_time = parts[5] # nginx $request_time in seconds if method != 'POST' or not uri.startswith('/v1/'): continue if not status.startswith('2'): continue try: ts = datetime.fromisoformat(ts_str) if ts.tzinfo is None: ts = ts.replace(tzinfo=timezone.utc) except Exception: continue total += 1 hour_key = ts.strftime('%Y-%m-%dT%H') day_key = ts.strftime('%Y-%m-%d') by_hour[hour_key] = by_hour.get(hour_key, 0) + 1 by_day[day_key] = by_day.get(day_key, 0) + 1 if ts >= cutoff_5m: try: latencies_5m.append(float(req_time) * 1000) except ValueError: pass hours_24 = sorted(k for k in by_hour if k >= cutoff_24h.strftime('%Y-%m-%dT%H')) days_7 = sorted(k for k in by_day if k >= cutoff_7d.strftime('%Y-%m-%d')) req_24h = sum(by_hour[h] for h in hours_24) req_7d = sum(by_day[d] for d in days_7) avg_latency_ms = sum(latencies_5m) / len(latencies_5m) if latencies_5m else 0 p50_latency_ms = sorted(latencies_5m)[len(latencies_5m)//2] if latencies_5m else 0 p90_latency_ms = sorted(latencies_5m)[int(len(latencies_5m)*0.9)] if latencies_5m else 0 analytics = { "updated_at": now.isoformat(), "summary_24h": {"requests": req_24h, "unique_users": 0}, "summary_7d": {"requests": req_7d, "unique_users": 0}, "summary_total": {"requests": total, "unique_users": 0}, "requests_by_hour": [{"hour": h + ":00", "requests": by_hour[h]} for h in hours_24], "requests_by_day": [{"day": d, "requests": by_day[d]} for d in days_7], "top_users": [], "latency_5m": { "avg_ms": round(avg_latency_ms), "p50_ms": round(p50_latency_ms), "p90_ms": round(p90_latency_ms), "sample_count": len(latencies_5m), }, } ANALYTICS_FILE.write_text(json.dumps(analytics)) except Exception as e: print(f"[analytics] error: {e}") def hf_push(local_path): if not METRICS_REPO or not HF_TOKEN: return dest = f"metrics/{local_path.name}" # Trim to last MAX_JSONL_LINES entries before pushing try: lines = local_path.read_text().strip().splitlines() trimmed = "\n".join(lines[-MAX_JSONL_LINES:]) + "\n" except Exception: trimmed = local_path.read_bytes().decode() content = base64.b64encode(trimmed.encode()).decode() payload = json.dumps({"summary": f"update {local_path.name}", "files": [{"path": dest, "encoding": "base64", "content": content}]}).encode() req = urllib.request.Request( f"https://huggingface.co/api/datasets/{METRICS_REPO}/commit/main", data=payload, method="POST", headers={"Authorization": f"Bearer {HF_TOKEN}", "Content-Type": "application/json"}) try: with urllib.request.urlopen(req, timeout=30) as r: print(f"[metrics] pushed {local_path.name} → {METRICS_REPO}/{dest} ({r.status})") except Exception as e: print(f"[metrics] push failed: {e}") def gpu_loop(): """Fast loop: update gpu-stats.json every GPU_INTERVAL_SECS seconds.""" print(f"[gpu] polling every {GPU_INTERVAL_SECS}s") while True: try: gpus = scrape_gpus() if gpus: ts = now_utc().isoformat() GPU_STATS_FILE.write_text(json.dumps({"ts": ts, "gpus": gpus})) except Exception as e: print(f"[gpu] error: {e}") time.sleep(GPU_INTERVAL_SECS) def hf_pull(): """Pull the latest metrics from HF Dataset on startup to survive restarts.""" if not METRICS_REPO or not HF_TOKEN: return print(f"[metrics] pulling saved data from {METRICS_REPO}...") try: # List files in the metrics/ folder req = urllib.request.Request( f"https://huggingface.co/api/datasets/{METRICS_REPO}/tree/main/metrics", headers={"Authorization": f"Bearer {HF_TOKEN}"}) with urllib.request.urlopen(req, timeout=15) as r: files = json.loads(r.read().decode()) for f in files: name = f.get("path", "").split("/")[-1] if not name.endswith(".jsonl"): continue local = LOG_DIR / name url = f"https://huggingface.co/datasets/{METRICS_REPO}/resolve/main/metrics/{name}" req = urllib.request.Request(url, headers={"Authorization": f"Bearer {HF_TOKEN}"}) with urllib.request.urlopen(req, timeout=15) as r: local.write_bytes(r.read()) print(f"[metrics] restored {name} ({local.stat().st_size} bytes)") except Exception as e: print(f"[metrics] pull failed (starting fresh): {e}") def wait_for_backends(): """Block until at least one llama-server is healthy (retries every 5s).""" print("[metrics] waiting for llama-server to be ready...") while True: for i in range(16): url = f"http://127.0.0.1:{BASE_PORT + i}" try: with urllib.request.urlopen(f"{url}/health", timeout=2) as r: if r.status == 200: print(f"[metrics] backend ready: {url}") return except Exception: pass time.sleep(5) def metrics_loop(): """Slow loop: scrape llama metrics, append JSONL, push to HF every SNAPSHOT_SECS seconds.""" print(f"[metrics] snapshot={SNAPSHOT_SECS}s push={PUSH_SECS}s repo={METRICS_REPO or '(local only)'}") hf_pull() wait_for_backends() last_push, backends, first_push_done = 0.0, [], False while True: try: if not backends: backends = detect_backends() print(f"[metrics] backends: {backends}") ts = now_utc() data = scrape(backends) compute_analytics() if data is None: print(f"[metrics] scrape returned no data — will retry next tick") backends = [] else: try: gpus = json.loads(GPU_STATS_FILE.read_text()).get("gpus", []) if GPU_STATS_FILE.exists() else [] except Exception: gpus = [] gpu_s = {f"gpu{g['index']}_util": g["util_gpu"] for g in gpus} gpu_s.update({f"gpu{g['index']}_mem_used_mib": g["mem_used_mib"] for g in gpus}) row = {"ts": ts.isoformat(), **data, **gpu_s} day = ts.strftime("%Y-%m-%d") path = LOG_DIR / f"metrics-{day}.jsonl" with open(path, "a") as f: f.write(json.dumps(row) + "\n") gpu_str = " ".join(f"GPU{g['index']} {g['util_gpu']:.0f}% {g['mem_used_mib']/1024:.1f}GB {g['temp_c']:.0f}°C" for g in gpus) print(f"[metrics] {ts.strftime('%H:%M:%S')} gen={data.get('predicted_tokens_seconds',0):.0f} tok/s active={data.get('requests_processing',0):.0f} {gpu_str}") if not first_push_done or time.time() - last_push >= PUSH_SECS: hf_push(path) last_push = time.time() first_push_done = True except Exception as e: print(f"[metrics] loop error (will retry): {e}") backends = [] time.sleep(SNAPSHOT_SECS) def run_with_restart(name, func): """Run a function in a loop, restarting on crash with backoff.""" backoff = 1 while True: try: func() except Exception as e: print(f"[{name}] CRASHED: {e} — restarting in {backoff}s") time.sleep(backoff) backoff = min(backoff * 2, 60) else: break def main(): gpu_thread = threading.Thread(target=run_with_restart, args=("gpu", gpu_loop), daemon=True) gpu_thread.start() run_with_restart("metrics", metrics_loop) if __name__ == "__main__": main()