#!/usr/bin/env python3 """Hermes Agent — HuggingFace Space Entry Point. Serves a real-time monitoring dashboard on port 7860 and runs the Hermes Gateway (Feishu WebSocket bot) in a background thread. """ import json import os import re import subprocess import sys import threading import time import logging import psutil from datetime import datetime, timezone from http.server import HTTPServer, BaseHTTPRequestHandler from pathlib import Path from urllib.parse import urlparse, parse_qs from queue import Queue, Empty from io import BytesIO # --------------------------------------------------------------------------- # Paths # --------------------------------------------------------------------------- HERMES_HOME = os.path.expanduser("~/.hermes") DATA_DIR = "/data/hermes" LOG_DIR = os.path.join(HERMES_HOME, "logs") LOG_FILE = os.path.join(LOG_DIR, "gateway.log") CONFIG_FILE = os.path.join(HERMES_HOME, "config.yaml") ENV_FILE = os.path.join(HERMES_HOME, ".env") DASHBOARD_HTML = "/app/dashboard.html" # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) logger = logging.getLogger("entry") # --------------------------------------------------------------------------- # Global state # --------------------------------------------------------------------------- _gateway_thread = None _gateway_process = None _gateway_start_time = time.time() _log_subscribers: list[Queue] = [] _log_tail_offset = 0 # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _mask_key(key: str) -> str: """Mask API key for display: sk-or-v1-abc...xyz → sk-or-v1-ab••••wxyz""" if not key or len(key) < 10: return "••••••••" return key[:7] + "••••" + key[-4:] def _load_env() -> dict[str, str]: """Read .env file into dict.""" env = {} try: with open(ENV_FILE) as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if "=" in line: k, _, v = line.partition("=") env[k.strip()] = v.strip().strip("\"'") except FileNotFoundError: pass return env def _load_config() -> dict: """Read config.yaml into dict (simple parser).""" cfg: dict = {} try: with open(CONFIG_FILE) as f: current_section = None for line in f: stripped = line.strip() if not stripped or stripped.startswith("#"): continue # Check for section headers if ":" in stripped and not line.startswith(" "): k, _, v = stripped.partition(":") k = k.strip() v = v.strip() if v: if current_section: cfg[current_section + "." + k] = v else: cfg[k] = v # Check for nested section if stripped.endswith(":") and not stripped.startswith(" ") and ":" not in stripped[:-1]: current_section = stripped[:-1].strip() except FileNotFoundError: pass return cfg def _get_sessions_count() -> int: """Count session files.""" sessions_dir = os.path.join(HERMES_HOME, "sessions") if not os.path.isdir(sessions_dir): return 0 count = 0 for _ in Path(sessions_dir).rglob("*.json"): count += 1 return count def _get_session_list() -> list[dict]: """Get list of recent sessions.""" sessions_dir = os.path.join(HERMES_HOME, "sessions") sessions = [] if not os.path.isdir(sessions_dir): return sessions for f in sorted(Path(sessions_dir).rglob("*.json"), key=os.path.getmtime, reverse=True)[:20]: try: data = json.loads(f.read_text()) name = f.stem msgs = len(data.get("trajectory", data.get("messages", []))) mtime = datetime.fromtimestamp(f.stat().st_mtime).strftime("%m-%d %H:%M") platform = data.get("platform", data.get("channel", "")) sessions.append({ "id": f.stem, "name": name[:40], "messages": msgs, "last": mtime, "platform": platform, "active": (time.time() - f.stat().st_mtime) < 3600, }) except Exception: sessions.append({"id": f.stem, "name": f.stem[:40], "messages": 0, "last": "--", "platform": "", "active": False}) return sessions # --------------------------------------------------------------------------- # Log tailer — reads log file and pushes to SSE subscribers # --------------------------------------------------------------------------- def _parse_log_line(line: str) -> dict | None: """Parse a log line like: 2026-04-27 22:19:12 [INFO] ...""" m = re.match(r"^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s*\[?(INFO|WARN|WARNING|ERROR|DEBUG)\]?\s*(.*)", line) if not m: # Try other format: [timestamp] [LEVEL] name: msg m = re.match(r"\[?(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\]?\s*\[?(INFO|WARN|WARNING|ERROR|DEBUG)\]?.*?:\s*(.*)", line) if not m: return None return { "time": m.group(1), "level": m.group(2).upper(), "msg": m.group(3).strip(), "hl": "feishu" in m.group(3).lower() or "connected" in m.group(3).lower(), } def _log_tailer(): """Background thread: tails gateway log and pushes to subscribers.""" global _log_tail_offset while True: try: if not os.path.isfile(LOG_FILE): time.sleep(2) continue with open(LOG_FILE, "r", errors="replace") as f: # Seek to where we left off f.seek(_log_tail_offset) new_lines = f.readlines() _log_tail_offset = f.tell() if new_lines: parsed = [] for line in new_lines: p = _parse_log_line(line.strip()) if p: parsed.append(p) if parsed: # Push to all subscribers dead = [] for q in _log_subscribers: try: q.put_nowait(parsed) except Exception: dead.append(q) for q in dead: _log_subscribers.remove(q) time.sleep(0.5) except Exception as e: logger.debug("Log tailer error: %s", e) time.sleep(1) # --------------------------------------------------------------------------- # Persistent storage setup # --------------------------------------------------------------------------- def _ensure_persistent_storage(): """Create data dirs and symlinks.""" for d in ("sessions", "memories", "uploads", "logs", "palace", "skills"): os.makedirs(os.path.join(DATA_DIR, d), exist_ok=True) hermes = Path(HERMES_HOME) hermes.mkdir(parents=True, exist_ok=True) for d in ("sessions", "memories", "uploads", "logs", "palace", "skills"): target = hermes / d if not target.exists(): try: target.symlink_to(os.path.join(DATA_DIR, d)) logger.info("Symlink: %s -> %s", d, os.path.join(DATA_DIR, d)) except OSError: # Symlink failed (maybe in Docker build), just copy the dir structure target.mkdir(exist_ok=True) logger.warning("Could not symlink %s, using local dir", d) # --------------------------------------------------------------------------- # HTTP Handler — Dashboard + API # --------------------------------------------------------------------------- class DashboardHandler(BaseHTTPRequestHandler): """Serves dashboard HTML and REST API endpoints.""" def log_message(self, fmt, *args): pass # silence request logs def _send_json(self, data: dict, status=200): body = json.dumps(data, ensure_ascii=False).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(body) def _send_html(self, path: str): try: with open(path, "rb") as f: body = f.read() self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) except FileNotFoundError: self.send_error(404) def _read_body(self) -> bytes: length = int(self.headers.get("Content-Length", 0)) return self.rfile.read(length) if length > 0 else b"" # ── GET routes ── def do_GET(self): parsed = urlparse(self.path) # Dashboard if parsed.path in ("/", "/index.html"): return self._send_html(DASHBOARD_HTML) # SSE log stream if parsed.path == "/api/logs/stream": return self._handle_sse() # Status if parsed.path == "/api/status": return self._send_json(self._get_status()) # Sessions if parsed.path == "/api/sessions": return self._send_json(_get_session_list()) # Log history (REST, not SSE) if parsed.path == "/api/logs": return self._send_json(self._get_log_history(parsed.query)) self.send_error(404) # ── POST routes ── def do_POST(self): parsed = urlparse(self.path) if parsed.path == "/api/restart": return self._handle_restart() if parsed.path == "/api/model": return self._handle_change_model() if parsed.path == "/api/reset": return self._send_json({"ok": False, "error": "Not implemented in Space mode"}) self.send_error(404) # ── Handlers ── def _get_status(self) -> dict: env = _load_env() cfg = _load_config() ram = psutil.virtual_memory() is_running = False pid = "N/A" # Check gateway process if _gateway_process and _gateway_process.poll() is None: is_running = True pid = str(_gateway_process.pid) else: # Try to find hermes gateway process for proc in psutil.process_iter(["pid", "cmdline"]): try: cmdline = " ".join(proc.info.get("cmdline") or []) if "hermes" in cmdline and "gateway" in cmdline: is_running = True pid = str(proc.info["pid"]) break except (psutil.NoSuchProcess, psutil.AccessDenied): pass model = cfg.get("model", env.get("LLM_MODEL", "unknown")) provider = cfg.get("provider", "openrouter") return { "running": is_running, "pid": pid, "model": model, "provider": provider, "platform": "飞书 Feishu", "platform_mode": "WebSocket", "sessions": _get_sessions_count(), "messages": 0, "ram": f"{ram.percent:.0f}%", "uptime_ms": int((time.time() - _gateway_start_time) * 1000), "config": { "OPENROUTER_API_KEY": _mask_key(env.get("OPENROUTER_API_KEY", "")), "FEISHU_APP_ID": env.get("FEISHU_APP_ID", ""), "FEISHU_APP_SECRET": _mask_key(env.get("FEISHU_APP_SECRET", "")), "terminal": cfg.get("terminal", {}).get("backend", "local") if isinstance(cfg.get("terminal"), dict) else "local", "timezone": cfg.get("timezone", "Asia/Shanghai"), "max_turns": cfg.get("max_turns", "90"), "memory": cfg.get("memory", {}).get("provider", "none") if isinstance(cfg.get("memory"), dict) else "none", "no_mcp": cfg.get("no_mcp", False), "compress": cfg.get("compress", {}).get("enabled", False) if isinstance(cfg.get("compress"), dict) else False, }, } def _handle_sse(self): """Server-Sent Events for real-time log streaming.""" self.send_response(200) self.send_header("Content-Type", "text/event-stream") self.send_header("Cache-Control", "no-cache") self.send_header("Connection", "keep-alive") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() q: Queue = Queue(maxsize=100) _log_subscribers.append(q) try: # First, send recent log history history = self._get_log_history_inner(limit=100) if history: self.wfile.write(f"data: {json.dumps(history, ensure_ascii=False)}\n\n".encode()) self.wfile.flush() # Then stream new logs while True: try: lines = q.get(timeout=30) payload = f"data: {json.dumps(lines, ensure_ascii=False)}\n\n" self.wfile.write(payload.encode()) self.wfile.flush() except Empty: # Send heartbeat self.wfile.write(":heartbeat\n\n".encode()) self.wfile.flush() except (BrokenPipeError, ConnectionResetError, OSError): pass finally: if q in _log_subscribers: _log_subscribers.remove(q) def _get_log_history(self, query: str = "") -> list: params = parse_qs(query) limit = int(params.get("limit", ["100"])[0]) return self._get_log_history_inner(limit=limit) def _get_log_history_inner(self, limit: int = 100) -> list: """Read last N lines from log file.""" if not os.path.isfile(LOG_FILE): return [] try: with open(LOG_FILE, "r", errors="replace") as f: lines = f.readlines()[-limit:] result = [] for line in lines: p = _parse_log_line(line.strip()) if p: result.append(p) return result except Exception: return [] def _handle_restart(self): """Restart the gateway process.""" global _gateway_process, _gateway_start_time try: if _gateway_process and _gateway_process.poll() is None: _gateway_process.terminate() _gateway_process.wait(timeout=10) _gateway_start_time = time.time() env = os.environ.copy() env["HERMES_ACCEPT_HOOKS"] = "1" env["MEMPALACE_PALACE_PATH"] = os.path.join(DATA_DIR, "palace") _gateway_process = subprocess.Popen( [sys.executable, "-m", "hermes_cli.main", "gateway", "run", "-v"], stdout=open(LOG_FILE, "a"), stderr=subprocess.STDOUT, env=env, cwd="/app/hermes-agent", ) logger.info("Gateway restarted (PID: %d)", _gateway_process.pid) self._send_json({"ok": True, "pid": _gateway_process.pid}) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) def _handle_change_model(self): """Change the LLM model in config.yaml.""" try: body = json.loads(self._read_body()) model = body.get("model", "") if not model: return self._send_json({"ok": False, "error": "No model specified"}) # Update config.yaml config_path = CONFIG_FILE if not os.path.isfile(config_path): return self._send_json({"ok": False, "error": "config.yaml not found"}) with open(config_path, "r") as f: content = f.read() # Replace model line new_content = re.sub( r"^model:.*$", f"model: {model}", content, flags=re.MULTILINE, ) with open(config_path, "w") as f: f.write(new_content) logger.info("Model changed to: %s", model) self._send_json({"ok": True, "model": model}) except Exception as e: self._send_json({"ok": False, "error": str(e)}, 500) # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): logger.info("=== Hermes Agent — HuggingFace Space Entry ===") # Setup persistent storage _ensure_persistent_storage() logger.info("Persistent storage ready at %s", DATA_DIR) # Start log tailer thread tailer = threading.Thread(target=_log_tailer, daemon=True) tailer.start() logger.info("Log tailer started") # Start Hermes Gateway in subprocess (not thread, for isolation) global _gateway_process, _gateway_start_time _gateway_start_time = time.time() env = os.environ.copy() env["HERMES_ACCEPT_HOOKS"] = "1" env["MEMPALACE_PALACE_PATH"] = os.path.join(DATA_DIR, "palace") os.makedirs(LOG_DIR, exist_ok=True) log_fh = open(LOG_FILE, "a") _gateway_process = subprocess.Popen( [sys.executable, "-m", "hermes_cli.main", "gateway", "run", "-v"], stdout=log_fh, stderr=subprocess.STDOUT, env=env, cwd="/app/hermes-agent", ) logger.info("Gateway started (PID: %d)", _gateway_process.pid) # Start dashboard HTTP server server = HTTPServer(("0.0.0.0", 7860), DashboardHandler) logger.info("Dashboard listening on :7860") server.serve_forever() if __name__ == "__main__": main()