Spaces:
Runtime error
Runtime error
Ashira Pitchayapakayakul
feat: dataset-mirror β 100x faster ingest by bulk-cloning community SFT mixes
0005a16 | #!/usr/bin/env python3 | |
| """ | |
| Hermes status HTTP server for HF Space. | |
| FastAPI + uvicorn β robust port binding, auto-handles signals. | |
| Endpoints: | |
| GET / β JSON status (ledger size, episodes, daemons, disk) | |
| GET /health β simple {"ok": true} | |
| GET /logs β tail of recent boot/cron logs (debug) | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import sqlite3 | |
| import subprocess | |
| import time | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| import asyncio | |
| from typing import Any | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.responses import JSONResponse, PlainTextResponse | |
| from pydantic import BaseModel | |
| app = FastAPI(title="hermes", docs_url=None, redoc_url=None) | |
| HOME = Path(os.environ.get("HOME", "/home/hermes")) | |
| LEDGER = HOME / ".surrogate/state/scrape-ledger.db" | |
| EPISODES = HOME / ".surrogate/state/surrogate-memory/episodes.jsonl" | |
| LOG_DIR = HOME / ".surrogate/logs" | |
| def _ledger_count() -> int: | |
| try: | |
| with sqlite3.connect(str(LEDGER), timeout=2) as c: | |
| return c.execute("SELECT COUNT(*) FROM scraped").fetchone()[0] | |
| except Exception: | |
| return 0 | |
| def _episodes_count() -> int: | |
| try: | |
| if EPISODES.exists(): | |
| return sum(1 for _ in EPISODES.open()) | |
| except Exception: | |
| pass | |
| return 0 | |
| def _daemons() -> int: | |
| """Count all surrogate daemons by name pattern.""" | |
| try: | |
| out = subprocess.run( | |
| ["pgrep", "-fc", | |
| "discord-bot|surrogate-dev|scrape-loop|scrape-daemon|" | |
| "agentic-crawler|skill-synthesis|hermes-cron|ollama|" | |
| "domain-scrape|qwen-coder|auto-orchestrate"], | |
| capture_output=True, text=True, timeout=2, | |
| ) | |
| return int(out.stdout.strip() or 0) | |
| except Exception: | |
| return 0 | |
| def _episodes_count_v2() -> int: | |
| """Count training pairs (current source of truth) instead of legacy episodes.""" | |
| pairs = HOME / ".surrogate/training-pairs.jsonl" | |
| try: | |
| if pairs.exists(): | |
| return sum(1 for _ in pairs.open()) | |
| except Exception: | |
| pass | |
| # Fallback to old episodes path | |
| return _episodes_count() | |
| def _training_pairs_count() -> int: | |
| pairs = HOME / ".surrogate/training-pairs.jsonl" | |
| try: | |
| if pairs.exists(): | |
| return sum(1 for _ in pairs.open()) | |
| except Exception: | |
| pass | |
| return 0 | |
| def _skill_count() -> int: | |
| skills = HOME / ".surrogate/skills" | |
| if not skills.exists(): | |
| return 0 | |
| return len(list(skills.glob("**/SKILL.md"))) | |
| def _agentic_visited() -> int: | |
| db = HOME / ".surrogate/state/agentic-frontier.db" | |
| try: | |
| with sqlite3.connect(str(db), timeout=2) as c: | |
| return c.execute("SELECT COUNT(*) FROM visited").fetchone()[0] | |
| except Exception: | |
| return 0 | |
| def _ollama_models() -> list[str]: | |
| """Quick (non-blocking) check of loaded Ollama models. Caches for 30s.""" | |
| cache = HOME / ".surrogate/state/.ollama-models-cache.json" | |
| try: | |
| import json as _json, time | |
| if cache.exists(): | |
| cached = _json.loads(cache.read_text()) | |
| if time.time() - cached.get("ts", 0) < 30: | |
| return cached.get("models", []) | |
| except Exception: | |
| pass | |
| try: | |
| import urllib.request, json as _json, time | |
| with urllib.request.urlopen("http://127.0.0.1:11434/api/tags", timeout=1.5) as r: | |
| models = [m["name"] for m in _json.load(r).get("models", [])] | |
| cache.parent.mkdir(parents=True, exist_ok=True) | |
| cache.write_text(_json.dumps({"ts": time.time(), "models": models})) | |
| return models | |
| except Exception: | |
| return [] | |
| def _dedup_count() -> int: | |
| """Total deduped hashes β single source of truth.""" | |
| db = HOME / ".surrogate/state/dedup.db" | |
| try: | |
| with sqlite3.connect(str(db), timeout=2) as c: | |
| return c.execute("SELECT COUNT(*) FROM seen_hashes").fetchone()[0] | |
| except Exception: | |
| return 0 | |
| def root() -> JSONResponse: | |
| return JSONResponse({ | |
| "service": "surrogate", | |
| "model": "axentx/surrogate-1", | |
| "status": "ok", | |
| "ts": datetime.now(timezone.utc).isoformat(), | |
| "ledger_repos": _ledger_count(), | |
| "training_pairs": _training_pairs_count(), | |
| "agentic_urls_visited": _agentic_visited(), | |
| "skills_synthesized": _skill_count(), | |
| "episodes": _episodes_count_v2(), | |
| "daemons_running": _daemons(), | |
| "models_loaded": _ollama_models(), | |
| "dedup_hashes": _dedup_count(), | |
| }) | |
| def health() -> dict: | |
| return {"ok": True} | |
| def log_tail(name: str, lines: int = 100) -> PlainTextResponse: | |
| """Tail a specific log file. Allowlist for security.""" | |
| allowed = { | |
| "boot", "cron", "cron-master", "scrape-continuous", "scrape-daemon", | |
| "scrape-keyword-tuner", "agentic-crawler", "skill-synthesis", | |
| "auto-orchestrate-loop", "training-push", "ollama", "discord-bot", | |
| "hermes-discord-bot", "surrogate-research-loop", "surrogate-research-apply", | |
| "surrogate-dev-loop", "domain-scrape-loop", "github-domain-scrape", | |
| "qwen-coder", "git-clone", "git-pull", "redis", "parquet-direct-ingest", "bulk-ingest-parallel", "rag-vector-builder", "auto-orchestrate-continuous", "dataset-enrich", "hf-dataset-discoverer", "dedup-bootstrap", "github-agentic-crawler", "ollama-pull-granite", "synthetic-data", "self-ingest", "scrape-sre-postmortems", "refresh-cve-feed", "self-heal-watchdog", "gh-actions-ticker", "llm-burst-generator", "expand-role-keywords", "kaggle-trainer", "lightning-trainer", "dataset-mirror", | |
| "ollama-pull-coder", "ollama-pull-devstral", "ollama-pull-fallback", | |
| "ollama-pull-yicoder", "ollama-pull-embed", "ollama-pull-light", | |
| } | |
| if name not in allowed: | |
| raise HTTPException(404, f"Unknown log: {name}. Allowed: {sorted(allowed)}") | |
| log_file = LOG_DIR / f"{name}.log" | |
| if not log_file.exists(): | |
| return PlainTextResponse(f"# {name}.log does not exist yet", status_code=200) | |
| try: | |
| out = subprocess.run( | |
| ["tail", "-n", str(min(lines, 500)), str(log_file)], | |
| capture_output=True, text=True, timeout=5, | |
| ) | |
| return PlainTextResponse(out.stdout) | |
| except Exception as e: | |
| raise HTTPException(500, str(e)) | |
| def logs_list() -> dict: | |
| """List all available log files.""" | |
| if not LOG_DIR.exists(): | |
| return {"logs": []} | |
| return {"logs": sorted(p.stem for p in LOG_DIR.glob("*.log"))} | |
| def dynamic_datasets(): | |
| """Expose the discoverer's running list of auto-found datasets so | |
| external runners (GitHub Actions, Oracle Free Tier, etc.) can sync it | |
| and ingest the same expanding catalog without each having to re-run | |
| the discoverer themselves.""" | |
| p = HOME / ".surrogate/state/dynamic-datasets.json" | |
| if not p.exists(): | |
| return JSONResponse({"datasets": [], "note": "dynamic-datasets.json not yet built"}, status_code=200) | |
| try: | |
| return PlainTextResponse(p.read_text(), media_type="application/json") | |
| except Exception as e: | |
| raise HTTPException(500, f"read failed: {e}") | |
| # ββ Cursor / stamp-and-move state (the "don't re-pull row 0 every time" fix) ββ | |
| # Stored as ~/.surrogate/state/cursors.db (SQLite). Each row = (slug, offset, ts). | |
| # Runners GET /cursor/{slug} before streaming, then POST /cursor/{slug}/advance | |
| # with how many rows they processed. Next runner picks up where the last left off. | |
| import sqlite3 as _sql_for_cursor | |
| _CURSOR_DB = HOME / ".surrogate/state/cursors.db" | |
| def _cursor_conn(): | |
| _CURSOR_DB.parent.mkdir(parents=True, exist_ok=True) | |
| c = _sql_for_cursor.connect(str(_CURSOR_DB), check_same_thread=False, timeout=10) | |
| c.execute("PRAGMA journal_mode=WAL") | |
| c.execute(""" | |
| CREATE TABLE IF NOT EXISTS cursors ( | |
| slug TEXT PRIMARY KEY, | |
| offset INTEGER NOT NULL DEFAULT 0, | |
| ts INTEGER NOT NULL | |
| ) | |
| """) | |
| return c | |
| def get_cursor(slug: str): | |
| """Return the next-row-to-process offset for this dataset slug. | |
| Default 0 if never seen. Runners SHOULD itertools.islice(stream, offset, offset+cap).""" | |
| try: | |
| c = _cursor_conn() | |
| row = c.execute("SELECT offset, ts FROM cursors WHERE slug = ?", (slug,)).fetchone() | |
| return {"slug": slug, "offset": row[0] if row else 0, "ts": row[1] if row else 0} | |
| except Exception as e: | |
| raise HTTPException(500, f"cursor read: {e}") | |
| class CursorAdvance(BaseModel): | |
| n: int | |
| def advance_cursor(slug: str, body: CursorAdvance): | |
| """Advance the cursor by N rows. Atomic via SQLite UPSERT. | |
| Idempotent β same call with same n yields same final offset only if | |
| sequential; concurrent calls race-add (fine, dedup catches the rest).""" | |
| try: | |
| c = _cursor_conn() | |
| ts = int(time.time()) | |
| c.execute(""" | |
| INSERT INTO cursors (slug, offset, ts) VALUES (?, ?, ?) | |
| ON CONFLICT(slug) DO UPDATE SET | |
| offset = offset + excluded.offset, | |
| ts = excluded.ts | |
| """, (slug, body.n, ts)) | |
| c.commit() | |
| new_offset = c.execute("SELECT offset FROM cursors WHERE slug = ?", (slug,)).fetchone()[0] | |
| return {"slug": slug, "advanced_by": body.n, "new_offset": new_offset, "ts": ts} | |
| except Exception as e: | |
| raise HTTPException(500, f"cursor advance: {e}") | |
| def list_cursors(limit: int = 100): | |
| """List all cursors β useful for ops dashboard.""" | |
| try: | |
| c = _cursor_conn() | |
| rows = c.execute( | |
| "SELECT slug, offset, ts FROM cursors ORDER BY ts DESC LIMIT ?", | |
| (limit,), | |
| ).fetchall() | |
| return {"cursors": [{"slug": s, "offset": o, "ts": t} for s, o, t in rows]} | |
| except Exception as e: | |
| raise HTTPException(500, f"cursor list: {e}") | |
| class ChatRequest(BaseModel): | |
| prompt: str | |
| cwd: str | None = None | |
| max_steps: int = 12 | |
| timeout_sec: int = 180 | |
| def selftest() -> dict: | |
| """Verify HF Space environment β catches Mac-mindset bugs early. | |
| Tests: critical imports, hardcoded path leaks, key file existence.""" | |
| results = {"ok": True, "checks": {}} | |
| # 1. Required imports | |
| for mod in ["datasets", "huggingface_hub", "pyarrow", "numpy", "sqlite3"]: | |
| try: | |
| __import__(mod) | |
| results["checks"][f"import_{mod}"] = True | |
| except ImportError as e: | |
| results["checks"][f"import_{mod}"] = False | |
| results["ok"] = False | |
| # 2. Critical paths exist (HF Space side) | |
| for path_str in ["~/.surrogate/bin", "~/.surrogate/state", "~/.surrogate/logs"]: | |
| p = Path(os.path.expanduser(path_str)) | |
| results["checks"][f"path_{path_str}"] = p.exists() | |
| if not p.exists(): | |
| results["ok"] = False | |
| # 3. No Mac path leaks in active scripts | |
| bad_paths = [] | |
| for f in (HOME / ".surrogate/bin").rglob("*.sh"): | |
| try: | |
| text_content = f.read_text(errors="ignore") | |
| if "/Users/Ashira" in text_content: | |
| bad_paths.append(f.name) | |
| except Exception: | |
| pass | |
| results["checks"]["no_mac_paths"] = len(bad_paths) == 0 | |
| if bad_paths: | |
| results["ok"] = False | |
| results["mac_path_leaks"] = bad_paths[:10] | |
| # 4. HF token present | |
| results["checks"]["hf_token_set"] = bool(os.environ.get("HF_TOKEN") or os.environ.get("HUGGING_FACE_HUB_TOKEN")) | |
| return results | |
| async def chat(req: ChatRequest) -> JSONResponse: | |
| """Run a prompt through the surrogate CLI inside the container, return result. | |
| Used by remote Surrogate CLI clients (Mac/laptop) to delegate to Hermes brain on HF. | |
| """ | |
| if not req.prompt.strip(): | |
| raise HTTPException(status_code=400, detail="prompt is empty") | |
| surrogate_bin = HOME / ".surrogate/bin/surrogate" | |
| if not surrogate_bin.exists(): | |
| raise HTTPException(status_code=503, detail="surrogate CLI not installed in container") | |
| proc = await asyncio.create_subprocess_exec( | |
| str(surrogate_bin), "-p", req.prompt, "--max-steps", str(req.max_steps), | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| cwd=req.cwd or str(HOME), | |
| env={**os.environ, "TERM": "dumb"}, | |
| ) | |
| try: | |
| stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=req.timeout_sec) | |
| except asyncio.TimeoutError: | |
| proc.kill() | |
| await proc.wait() | |
| raise HTTPException(status_code=504, detail=f"timeout after {req.timeout_sec}s") | |
| out = stdout.decode("utf-8", errors="replace") | |
| err = stderr.decode("utf-8", errors="replace") | |
| # Strip ANSI for clean JSON output | |
| import re as _re | |
| out = _re.sub(r"\x1b\[[0-9;?]*[a-zA-Z]", "", out) | |
| out = _re.sub(r"\x1b\[\?[0-9]+[hl]", "", out) | |
| out = "\n".join(l for l in out.splitlines() if not l.strip().startswith(("βΊ", "β"))).strip() | |
| return JSONResponse({ | |
| "ok": proc.returncode == 0, | |
| "rc": proc.returncode or 0, | |
| "response": out or "(empty)", | |
| "stderr_tail": err[-1000:] if err else "", | |
| }) | |
| def logs() -> PlainTextResponse: | |
| out_lines: list[str] = [] | |
| for log_name in ("boot.log", "cron.log", "discord-bot.log", "ollama.log"): | |
| f = LOG_DIR / log_name | |
| if not f.exists(): | |
| continue | |
| try: | |
| tail = f.read_text(errors="replace").splitlines()[-10:] | |
| out_lines.append(f"βββ {log_name} βββ") | |
| out_lines.extend(tail) | |
| out_lines.append("") | |
| except Exception: | |
| pass | |
| return PlainTextResponse("\n".join(out_lines) or "(no logs)") | |
| if __name__ == "__main__": | |
| import os, sys, uvicorn | |
| port = int(os.environ.get("PORT", "7860")) | |
| print(f"[hermes] starting uvicorn on 0.0.0.0:{port}", flush=True) | |
| print(f"[hermes] python={sys.version.split()[0]} home={HOME}", flush=True) | |
| uvicorn.run(app, host="0.0.0.0", port=port, log_level="info", access_log=True) | |