hermes-bot / entry.py
Z User
feat: dark-themed real-time dashboard with SSE logs, config panel, interactive controls
8f68434
raw
history blame
18.6 kB
#!/usr/bin/env python3
"""Hermes Agent — HuggingFace Space Entry Point.
Serves a real-time monitoring dashboard on port 7860 and runs the
Hermes Gateway (Feishu WebSocket bot) in a background thread.
"""
import json
import os
import re
import subprocess
import sys
import threading
import time
import logging
import psutil
from datetime import datetime, timezone
from http.server import HTTPServer, BaseHTTPRequestHandler
from pathlib import Path
from urllib.parse import urlparse, parse_qs
from queue import Queue, Empty
from io import BytesIO
# ---------------------------------------------------------------------------
# Paths
# ---------------------------------------------------------------------------
HERMES_HOME = os.path.expanduser("~/.hermes")
DATA_DIR = "/data/hermes"
LOG_DIR = os.path.join(HERMES_HOME, "logs")
LOG_FILE = os.path.join(LOG_DIR, "gateway.log")
CONFIG_FILE = os.path.join(HERMES_HOME, "config.yaml")
ENV_FILE = os.path.join(HERMES_HOME, ".env")
DASHBOARD_HTML = "/app/dashboard.html"
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("entry")
# ---------------------------------------------------------------------------
# Global state
# ---------------------------------------------------------------------------
_gateway_thread = None
_gateway_process = None
_gateway_start_time = time.time()
_log_subscribers: list[Queue] = []
_log_tail_offset = 0
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _mask_key(key: str) -> str:
"""Mask API key for display: sk-or-v1-abc...xyz → sk-or-v1-ab••••wxyz"""
if not key or len(key) < 10:
return "••••••••"
return key[:7] + "••••" + key[-4:]
def _load_env() -> dict[str, str]:
"""Read .env file into dict."""
env = {}
try:
with open(ENV_FILE) as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
k, _, v = line.partition("=")
env[k.strip()] = v.strip().strip("\"'")
except FileNotFoundError:
pass
return env
def _load_config() -> dict:
"""Read config.yaml into dict (simple parser)."""
cfg: dict = {}
try:
with open(CONFIG_FILE) as f:
current_section = None
for line in f:
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
# Check for section headers
if ":" in stripped and not line.startswith(" "):
k, _, v = stripped.partition(":")
k = k.strip()
v = v.strip()
if v:
if current_section:
cfg[current_section + "." + k] = v
else:
cfg[k] = v
# Check for nested section
if stripped.endswith(":") and not stripped.startswith(" ") and ":" not in stripped[:-1]:
current_section = stripped[:-1].strip()
except FileNotFoundError:
pass
return cfg
def _get_sessions_count() -> int:
"""Count session files."""
sessions_dir = os.path.join(HERMES_HOME, "sessions")
if not os.path.isdir(sessions_dir):
return 0
count = 0
for _ in Path(sessions_dir).rglob("*.json"):
count += 1
return count
def _get_session_list() -> list[dict]:
"""Get list of recent sessions."""
sessions_dir = os.path.join(HERMES_HOME, "sessions")
sessions = []
if not os.path.isdir(sessions_dir):
return sessions
for f in sorted(Path(sessions_dir).rglob("*.json"), key=os.path.getmtime, reverse=True)[:20]:
try:
data = json.loads(f.read_text())
name = f.stem
msgs = len(data.get("trajectory", data.get("messages", [])))
mtime = datetime.fromtimestamp(f.stat().st_mtime).strftime("%m-%d %H:%M")
platform = data.get("platform", data.get("channel", ""))
sessions.append({
"id": f.stem,
"name": name[:40],
"messages": msgs,
"last": mtime,
"platform": platform,
"active": (time.time() - f.stat().st_mtime) < 3600,
})
except Exception:
sessions.append({"id": f.stem, "name": f.stem[:40], "messages": 0, "last": "--", "platform": "", "active": False})
return sessions
# ---------------------------------------------------------------------------
# Log tailer — reads log file and pushes to SSE subscribers
# ---------------------------------------------------------------------------
def _parse_log_line(line: str) -> dict | None:
"""Parse a log line like: 2026-04-27 22:19:12 [INFO] ..."""
m = re.match(r"^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s*\[?(INFO|WARN|WARNING|ERROR|DEBUG)\]?\s*(.*)", line)
if not m:
# Try other format: [timestamp] [LEVEL] name: msg
m = re.match(r"\[?(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\]?\s*\[?(INFO|WARN|WARNING|ERROR|DEBUG)\]?.*?:\s*(.*)", line)
if not m:
return None
return {
"time": m.group(1),
"level": m.group(2).upper(),
"msg": m.group(3).strip(),
"hl": "feishu" in m.group(3).lower() or "connected" in m.group(3).lower(),
}
def _log_tailer():
"""Background thread: tails gateway log and pushes to subscribers."""
global _log_tail_offset
while True:
try:
if not os.path.isfile(LOG_FILE):
time.sleep(2)
continue
with open(LOG_FILE, "r", errors="replace") as f:
# Seek to where we left off
f.seek(_log_tail_offset)
new_lines = f.readlines()
_log_tail_offset = f.tell()
if new_lines:
parsed = []
for line in new_lines:
p = _parse_log_line(line.strip())
if p:
parsed.append(p)
if parsed:
# Push to all subscribers
dead = []
for q in _log_subscribers:
try:
q.put_nowait(parsed)
except Exception:
dead.append(q)
for q in dead:
_log_subscribers.remove(q)
time.sleep(0.5)
except Exception as e:
logger.debug("Log tailer error: %s", e)
time.sleep(1)
# ---------------------------------------------------------------------------
# Persistent storage setup
# ---------------------------------------------------------------------------
def _ensure_persistent_storage():
"""Create data dirs and symlinks."""
for d in ("sessions", "memories", "uploads", "logs", "palace", "skills"):
os.makedirs(os.path.join(DATA_DIR, d), exist_ok=True)
hermes = Path(HERMES_HOME)
hermes.mkdir(parents=True, exist_ok=True)
for d in ("sessions", "memories", "uploads", "logs", "palace", "skills"):
target = hermes / d
if not target.exists():
try:
target.symlink_to(os.path.join(DATA_DIR, d))
logger.info("Symlink: %s -> %s", d, os.path.join(DATA_DIR, d))
except OSError:
# Symlink failed (maybe in Docker build), just copy the dir structure
target.mkdir(exist_ok=True)
logger.warning("Could not symlink %s, using local dir", d)
# ---------------------------------------------------------------------------
# HTTP Handler — Dashboard + API
# ---------------------------------------------------------------------------
class DashboardHandler(BaseHTTPRequestHandler):
"""Serves dashboard HTML and REST API endpoints."""
def log_message(self, fmt, *args):
pass # silence request logs
def _send_json(self, data: dict, status=200):
body = json.dumps(data, ensure_ascii=False).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(body)
def _send_html(self, path: str):
try:
with open(path, "rb") as f:
body = f.read()
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
except FileNotFoundError:
self.send_error(404)
def _read_body(self) -> bytes:
length = int(self.headers.get("Content-Length", 0))
return self.rfile.read(length) if length > 0 else b""
# ── GET routes ──
def do_GET(self):
parsed = urlparse(self.path)
# Dashboard
if parsed.path in ("/", "/index.html"):
return self._send_html(DASHBOARD_HTML)
# SSE log stream
if parsed.path == "/api/logs/stream":
return self._handle_sse()
# Status
if parsed.path == "/api/status":
return self._send_json(self._get_status())
# Sessions
if parsed.path == "/api/sessions":
return self._send_json(_get_session_list())
# Log history (REST, not SSE)
if parsed.path == "/api/logs":
return self._send_json(self._get_log_history(parsed.query))
self.send_error(404)
# ── POST routes ──
def do_POST(self):
parsed = urlparse(self.path)
if parsed.path == "/api/restart":
return self._handle_restart()
if parsed.path == "/api/model":
return self._handle_change_model()
if parsed.path == "/api/reset":
return self._send_json({"ok": False, "error": "Not implemented in Space mode"})
self.send_error(404)
# ── Handlers ──
def _get_status(self) -> dict:
env = _load_env()
cfg = _load_config()
ram = psutil.virtual_memory()
is_running = False
pid = "N/A"
# Check gateway process
if _gateway_process and _gateway_process.poll() is None:
is_running = True
pid = str(_gateway_process.pid)
else:
# Try to find hermes gateway process
for proc in psutil.process_iter(["pid", "cmdline"]):
try:
cmdline = " ".join(proc.info.get("cmdline") or [])
if "hermes" in cmdline and "gateway" in cmdline:
is_running = True
pid = str(proc.info["pid"])
break
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
model = cfg.get("model", env.get("LLM_MODEL", "unknown"))
provider = cfg.get("provider", "openrouter")
return {
"running": is_running,
"pid": pid,
"model": model,
"provider": provider,
"platform": "飞书 Feishu",
"platform_mode": "WebSocket",
"sessions": _get_sessions_count(),
"messages": 0,
"ram": f"{ram.percent:.0f}%",
"uptime_ms": int((time.time() - _gateway_start_time) * 1000),
"config": {
"OPENROUTER_API_KEY": _mask_key(env.get("OPENROUTER_API_KEY", "")),
"FEISHU_APP_ID": env.get("FEISHU_APP_ID", ""),
"FEISHU_APP_SECRET": _mask_key(env.get("FEISHU_APP_SECRET", "")),
"terminal": cfg.get("terminal", {}).get("backend", "local") if isinstance(cfg.get("terminal"), dict) else "local",
"timezone": cfg.get("timezone", "Asia/Shanghai"),
"max_turns": cfg.get("max_turns", "90"),
"memory": cfg.get("memory", {}).get("provider", "none") if isinstance(cfg.get("memory"), dict) else "none",
"no_mcp": cfg.get("no_mcp", False),
"compress": cfg.get("compress", {}).get("enabled", False) if isinstance(cfg.get("compress"), dict) else False,
},
}
def _handle_sse(self):
"""Server-Sent Events for real-time log streaming."""
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "keep-alive")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
q: Queue = Queue(maxsize=100)
_log_subscribers.append(q)
try:
# First, send recent log history
history = self._get_log_history_inner(limit=100)
if history:
self.wfile.write(f"data: {json.dumps(history, ensure_ascii=False)}\n\n".encode())
self.wfile.flush()
# Then stream new logs
while True:
try:
lines = q.get(timeout=30)
payload = f"data: {json.dumps(lines, ensure_ascii=False)}\n\n"
self.wfile.write(payload.encode())
self.wfile.flush()
except Empty:
# Send heartbeat
self.wfile.write(":heartbeat\n\n".encode())
self.wfile.flush()
except (BrokenPipeError, ConnectionResetError, OSError):
pass
finally:
if q in _log_subscribers:
_log_subscribers.remove(q)
def _get_log_history(self, query: str = "") -> list:
params = parse_qs(query)
limit = int(params.get("limit", ["100"])[0])
return self._get_log_history_inner(limit=limit)
def _get_log_history_inner(self, limit: int = 100) -> list:
"""Read last N lines from log file."""
if not os.path.isfile(LOG_FILE):
return []
try:
with open(LOG_FILE, "r", errors="replace") as f:
lines = f.readlines()[-limit:]
result = []
for line in lines:
p = _parse_log_line(line.strip())
if p:
result.append(p)
return result
except Exception:
return []
def _handle_restart(self):
"""Restart the gateway process."""
global _gateway_process, _gateway_start_time
try:
if _gateway_process and _gateway_process.poll() is None:
_gateway_process.terminate()
_gateway_process.wait(timeout=10)
_gateway_start_time = time.time()
env = os.environ.copy()
env["HERMES_ACCEPT_HOOKS"] = "1"
env["MEMPALACE_PALACE_PATH"] = os.path.join(DATA_DIR, "palace")
_gateway_process = subprocess.Popen(
[sys.executable, "-m", "hermes_cli.main", "gateway", "run", "-v"],
stdout=open(LOG_FILE, "a"),
stderr=subprocess.STDOUT,
env=env,
cwd="/app/hermes-agent",
)
logger.info("Gateway restarted (PID: %d)", _gateway_process.pid)
self._send_json({"ok": True, "pid": _gateway_process.pid})
except Exception as e:
self._send_json({"ok": False, "error": str(e)}, 500)
def _handle_change_model(self):
"""Change the LLM model in config.yaml."""
try:
body = json.loads(self._read_body())
model = body.get("model", "")
if not model:
return self._send_json({"ok": False, "error": "No model specified"})
# Update config.yaml
config_path = CONFIG_FILE
if not os.path.isfile(config_path):
return self._send_json({"ok": False, "error": "config.yaml not found"})
with open(config_path, "r") as f:
content = f.read()
# Replace model line
new_content = re.sub(
r"^model:.*$",
f"model: {model}",
content,
flags=re.MULTILINE,
)
with open(config_path, "w") as f:
f.write(new_content)
logger.info("Model changed to: %s", model)
self._send_json({"ok": True, "model": model})
except Exception as e:
self._send_json({"ok": False, "error": str(e)}, 500)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
logger.info("=== Hermes Agent — HuggingFace Space Entry ===")
# Setup persistent storage
_ensure_persistent_storage()
logger.info("Persistent storage ready at %s", DATA_DIR)
# Start log tailer thread
tailer = threading.Thread(target=_log_tailer, daemon=True)
tailer.start()
logger.info("Log tailer started")
# Start Hermes Gateway in subprocess (not thread, for isolation)
global _gateway_process, _gateway_start_time
_gateway_start_time = time.time()
env = os.environ.copy()
env["HERMES_ACCEPT_HOOKS"] = "1"
env["MEMPALACE_PALACE_PATH"] = os.path.join(DATA_DIR, "palace")
os.makedirs(LOG_DIR, exist_ok=True)
log_fh = open(LOG_FILE, "a")
_gateway_process = subprocess.Popen(
[sys.executable, "-m", "hermes_cli.main", "gateway", "run", "-v"],
stdout=log_fh,
stderr=subprocess.STDOUT,
env=env,
cwd="/app/hermes-agent",
)
logger.info("Gateway started (PID: %d)", _gateway_process.pid)
# Start dashboard HTTP server
server = HTTPServer(("0.0.0.0", 7860), DashboardHandler)
logger.info("Dashboard listening on :7860")
server.serve_forever()
if __name__ == "__main__":
main()