personalgpt / metrics_pusher.py
PrismML Deploy
Fixes: push every 20min, trim to last 10 entries, pull on startup for restart recovery
e28b544
#!/usr/bin/env python3
"""
Scrapes /metrics + nvidia-smi every METRICS_INTERVAL seconds.
Appends one JSON line per snapshot to /tmp/metrics-YYYY-MM-DD.jsonl.
Pushes daily file to HF Dataset repo every METRICS_PUSH_INTERVAL seconds.
Also writes /tmp/gpu-stats.json for the live dashboard GPU panel.
"""
import base64, json, os, subprocess, threading, time, urllib.request, urllib.error
from datetime import datetime, timezone, timedelta
from pathlib import Path
BASE_PORT = 7861
LOG_DIR = Path("/tmp")
GPU_STATS_FILE = Path("/tmp/gpu-stats.json")
LLAMA_METRICS_FILE = Path("/tmp/llama-metrics.txt")
GPU_INTERVAL_SECS = int(os.environ.get("GPU_INTERVAL", "10"))
SNAPSHOT_SECS = int(os.environ.get("METRICS_INTERVAL", "3"))
NGINX_LOG = Path("/tmp/nginx-access.log")
ANALYTICS_FILE = Path("/tmp/analytics.json")
PUSH_SECS = int(os.environ.get("METRICS_PUSH_INTERVAL", "1200"))
MAX_JSONL_LINES = 10
METRICS_REPO = os.environ.get("METRICS_REPO", "")
HF_TOKEN = os.environ.get("HF_TOKEN", "")
def now_utc(): return datetime.now(timezone.utc)
def detect_backends():
backends = []
for i in range(16):
url = f"http://127.0.0.1:{BASE_PORT + i}"
try:
urllib.request.urlopen(f"{url}/health", timeout=1).close()
backends.append(url)
except Exception:
break
return backends or [f"http://127.0.0.1:{BASE_PORT}"]
def parse_prometheus(text):
values = {}
for line in text.splitlines():
if line.startswith("#") or not line.strip(): continue
parts = line.split()
if len(parts) < 2: continue
name = parts[0].split("{")[0].replace("llamacpp:", "")
try: values[name] = float(parts[1])
except ValueError: pass
return values
_prev_tokens = {"predicted": 0.0, "prompt": 0.0, "ts": 0.0}
def scrape(backends):
GAUGES_AVG = {"prompt_tokens_seconds", "predicted_tokens_seconds", "kv_cache_usage_ratio"}
GAUGES_SUM = {"requests_processing", "requests_deferred"}
combined = {}
raw_texts = []
for url in backends:
try:
with urllib.request.urlopen(f"{url}/metrics", timeout=5) as r:
text = r.read().decode()
raw_texts.append(text)
for k, v in parse_prometheus(text).items():
combined.setdefault(k, []).append(v)
except Exception as e:
print(f"[metrics] scrape error {url}: {e}")
if not combined: return None
result = {k: sum(v)/len(v) if k in GAUGES_AVG else sum(v) for k, v in combined.items()}
result["backend_count"] = len(backends)
# Compute delta-based current rates (go to 0 when idle)
now = time.time()
elapsed = now - _prev_tokens["ts"] if _prev_tokens["ts"] else SNAPSHOT_SECS
if elapsed > 0 and _prev_tokens["ts"]:
cur_pred = result.get("tokens_predicted_total", 0)
cur_prmpt = result.get("prompt_tokens_total", 0)
result["gen_rate_current"] = max(0, cur_pred - _prev_tokens["predicted"]) / elapsed
result["prompt_rate_current"] = max(0, cur_prmpt - _prev_tokens["prompt"]) / elapsed
else:
result["gen_rate_current"] = 0.0
result["prompt_rate_current"] = 0.0
_prev_tokens["predicted"] = result.get("tokens_predicted_total", 0)
_prev_tokens["prompt"] = result.get("prompt_tokens_total", 0)
_prev_tokens["ts"] = now
# Write aggregated metrics as Prometheus text for the dashboard
active = result.get("requests_processing", 0)
per_slot = result["gen_rate_current"] / max(active, 1) if active else 0.0
lines = []
for k, v in result.items():
if k in ("backend_count", "gen_rate_current", "prompt_rate_current"):
continue
lines.append(f"llamacpp:{k} {v}")
lines.append(f"bonsai:gen_rate_current {result['gen_rate_current']:.3f}")
lines.append(f"bonsai:prompt_rate_current {result['prompt_rate_current']:.3f}")
lines.append(f"bonsai:gen_rate_per_slot {per_slot:.3f}")
lines.append(f"bonsai:backend_count {result['backend_count']}")
LLAMA_METRICS_FILE.write_text("\n".join(lines) + "\n")
return result
def scrape_gpus():
try:
out = subprocess.check_output([
"nvidia-smi",
"--query-gpu=index,name,utilization.gpu,utilization.memory,"
"memory.used,memory.total,temperature.gpu,power.draw,power.limit,clocks.sm",
"--format=csv,noheader,nounits",
], timeout=5, stderr=subprocess.DEVNULL).decode()
gpus = []
for line in out.strip().splitlines():
p = [x.strip() for x in line.split(",")]
if len(p) < 10: continue
try:
gpus.append({"index": int(p[0]), "name": p[1],
"util_gpu": float(p[2]), "util_mem": float(p[3]),
"mem_used_mib": float(p[4]), "mem_total_mib": float(p[5]),
"temp_c": float(p[6]), "power_w": float(p[7]),
"power_limit_w": float(p[8]), "clock_sm_mhz": float(p[9])})
except (ValueError, IndexError): pass
return gpus
except Exception: return []
def compute_analytics():
"""Parse nginx access log and write analytics.json with request counts + avg latency."""
try:
if not NGINX_LOG.exists():
return
now = datetime.now(timezone.utc)
cutoff_24h = now.replace(minute=0, second=0, microsecond=0) - timedelta(hours=24)
cutoff_7d = now.replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=7)
cutoff_5m = now - timedelta(minutes=5)
by_hour = {}
by_day = {}
total = 0
latencies_5m = []
for line in NGINX_LOG.read_text(errors='replace').splitlines():
parts = line.split('|')
if len(parts) < 6: continue
ts_str, method, uri, status = parts[0], parts[1], parts[2], parts[3]
req_time = parts[5] # nginx $request_time in seconds
if method != 'POST' or not uri.startswith('/v1/'): continue
if not status.startswith('2'): continue
try:
ts = datetime.fromisoformat(ts_str)
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
except Exception:
continue
total += 1
hour_key = ts.strftime('%Y-%m-%dT%H')
day_key = ts.strftime('%Y-%m-%d')
by_hour[hour_key] = by_hour.get(hour_key, 0) + 1
by_day[day_key] = by_day.get(day_key, 0) + 1
if ts >= cutoff_5m:
try:
latencies_5m.append(float(req_time) * 1000)
except ValueError:
pass
hours_24 = sorted(k for k in by_hour if k >= cutoff_24h.strftime('%Y-%m-%dT%H'))
days_7 = sorted(k for k in by_day if k >= cutoff_7d.strftime('%Y-%m-%d'))
req_24h = sum(by_hour[h] for h in hours_24)
req_7d = sum(by_day[d] for d in days_7)
avg_latency_ms = sum(latencies_5m) / len(latencies_5m) if latencies_5m else 0
p50_latency_ms = sorted(latencies_5m)[len(latencies_5m)//2] if latencies_5m else 0
p90_latency_ms = sorted(latencies_5m)[int(len(latencies_5m)*0.9)] if latencies_5m else 0
analytics = {
"updated_at": now.isoformat(),
"summary_24h": {"requests": req_24h, "unique_users": 0},
"summary_7d": {"requests": req_7d, "unique_users": 0},
"summary_total": {"requests": total, "unique_users": 0},
"requests_by_hour": [{"hour": h + ":00", "requests": by_hour[h]} for h in hours_24],
"requests_by_day": [{"day": d, "requests": by_day[d]} for d in days_7],
"top_users": [],
"latency_5m": {
"avg_ms": round(avg_latency_ms),
"p50_ms": round(p50_latency_ms),
"p90_ms": round(p90_latency_ms),
"sample_count": len(latencies_5m),
},
}
ANALYTICS_FILE.write_text(json.dumps(analytics))
except Exception as e:
print(f"[analytics] error: {e}")
def hf_push(local_path):
if not METRICS_REPO or not HF_TOKEN: return
dest = f"metrics/{local_path.name}"
# Trim to last MAX_JSONL_LINES entries before pushing
try:
lines = local_path.read_text().strip().splitlines()
trimmed = "\n".join(lines[-MAX_JSONL_LINES:]) + "\n"
except Exception:
trimmed = local_path.read_bytes().decode()
content = base64.b64encode(trimmed.encode()).decode()
payload = json.dumps({"summary": f"update {local_path.name}",
"files": [{"path": dest, "encoding": "base64", "content": content}]}).encode()
req = urllib.request.Request(
f"https://huggingface.co/api/datasets/{METRICS_REPO}/commit/main",
data=payload, method="POST",
headers={"Authorization": f"Bearer {HF_TOKEN}", "Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=30) as r:
print(f"[metrics] pushed {local_path.name}{METRICS_REPO}/{dest} ({r.status})")
except Exception as e:
print(f"[metrics] push failed: {e}")
def gpu_loop():
"""Fast loop: update gpu-stats.json every GPU_INTERVAL_SECS seconds."""
print(f"[gpu] polling every {GPU_INTERVAL_SECS}s")
while True:
try:
gpus = scrape_gpus()
if gpus:
ts = now_utc().isoformat()
GPU_STATS_FILE.write_text(json.dumps({"ts": ts, "gpus": gpus}))
except Exception as e:
print(f"[gpu] error: {e}")
time.sleep(GPU_INTERVAL_SECS)
def hf_pull():
"""Pull the latest metrics from HF Dataset on startup to survive restarts."""
if not METRICS_REPO or not HF_TOKEN:
return
print(f"[metrics] pulling saved data from {METRICS_REPO}...")
try:
# List files in the metrics/ folder
req = urllib.request.Request(
f"https://huggingface.co/api/datasets/{METRICS_REPO}/tree/main/metrics",
headers={"Authorization": f"Bearer {HF_TOKEN}"})
with urllib.request.urlopen(req, timeout=15) as r:
files = json.loads(r.read().decode())
for f in files:
name = f.get("path", "").split("/")[-1]
if not name.endswith(".jsonl"):
continue
local = LOG_DIR / name
url = f"https://huggingface.co/datasets/{METRICS_REPO}/resolve/main/metrics/{name}"
req = urllib.request.Request(url, headers={"Authorization": f"Bearer {HF_TOKEN}"})
with urllib.request.urlopen(req, timeout=15) as r:
local.write_bytes(r.read())
print(f"[metrics] restored {name} ({local.stat().st_size} bytes)")
except Exception as e:
print(f"[metrics] pull failed (starting fresh): {e}")
def wait_for_backends():
"""Block until at least one llama-server is healthy (retries every 5s)."""
print("[metrics] waiting for llama-server to be ready...")
while True:
for i in range(16):
url = f"http://127.0.0.1:{BASE_PORT + i}"
try:
with urllib.request.urlopen(f"{url}/health", timeout=2) as r:
if r.status == 200:
print(f"[metrics] backend ready: {url}")
return
except Exception:
pass
time.sleep(5)
def metrics_loop():
"""Slow loop: scrape llama metrics, append JSONL, push to HF every SNAPSHOT_SECS seconds."""
print(f"[metrics] snapshot={SNAPSHOT_SECS}s push={PUSH_SECS}s repo={METRICS_REPO or '(local only)'}")
hf_pull()
wait_for_backends()
last_push, backends, first_push_done = 0.0, [], False
while True:
try:
if not backends:
backends = detect_backends()
print(f"[metrics] backends: {backends}")
ts = now_utc()
data = scrape(backends)
compute_analytics()
if data is None:
print(f"[metrics] scrape returned no data — will retry next tick")
backends = []
else:
try:
gpus = json.loads(GPU_STATS_FILE.read_text()).get("gpus", []) if GPU_STATS_FILE.exists() else []
except Exception:
gpus = []
gpu_s = {f"gpu{g['index']}_util": g["util_gpu"] for g in gpus}
gpu_s.update({f"gpu{g['index']}_mem_used_mib": g["mem_used_mib"] for g in gpus})
row = {"ts": ts.isoformat(), **data, **gpu_s}
day = ts.strftime("%Y-%m-%d")
path = LOG_DIR / f"metrics-{day}.jsonl"
with open(path, "a") as f: f.write(json.dumps(row) + "\n")
gpu_str = " ".join(f"GPU{g['index']} {g['util_gpu']:.0f}% {g['mem_used_mib']/1024:.1f}GB {g['temp_c']:.0f}°C" for g in gpus)
print(f"[metrics] {ts.strftime('%H:%M:%S')} gen={data.get('predicted_tokens_seconds',0):.0f} tok/s active={data.get('requests_processing',0):.0f} {gpu_str}")
if not first_push_done or time.time() - last_push >= PUSH_SECS:
hf_push(path)
last_push = time.time()
first_push_done = True
except Exception as e:
print(f"[metrics] loop error (will retry): {e}")
backends = []
time.sleep(SNAPSHOT_SECS)
def run_with_restart(name, func):
"""Run a function in a loop, restarting on crash with backoff."""
backoff = 1
while True:
try:
func()
except Exception as e:
print(f"[{name}] CRASHED: {e} — restarting in {backoff}s")
time.sleep(backoff)
backoff = min(backoff * 2, 60)
else:
break
def main():
gpu_thread = threading.Thread(target=run_with_restart, args=("gpu", gpu_loop), daemon=True)
gpu_thread.start()
run_with_restart("metrics", metrics_loop)
if __name__ == "__main__":
main()