#!/usr/bin/env python3 """Hermes Bot — HuggingFace Space Entry Point. Serves as the main HTTP entry point on port 7860. Routes traffic to: - hermes-web-ui BFF (Node.js on :6060) for the WebUI interface - Original dashboard on :7860/ and /deploy - Gateway health on /api/gateway-status """ import json import os import re import subprocess import sys import threading import time import logging import psutil import mimetypes import urllib.request import urllib.error import urllib.parse from datetime import datetime, timezone from http.server import HTTPServer, BaseHTTPRequestHandler from socketserver import ThreadingMixIn from pathlib import Path from urllib.parse import urlparse, parse_qs from queue import Queue, Empty from io import BytesIO class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): daemon_threads = True allow_reuse_address = True allow_reuse_port = True # Allow multiple binds during restart # Limit concurrent connections to prevent DoS / RAM exhaustion _semaphore = threading.Semaphore(50) def server_bind(self): # Set SO_REUSEADDR and SO_REUSEPORT before binding import socket self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: self.socket.setsockopt(socket.SOL_SOCKET, 15, 1) # SO_REUSEPORT except (AttributeError, OSError): pass super().server_bind() def process_request(self, request, client_address): with self._semaphore: return super().process_request(request, client_address) # --------------------------------------------------------------------------- # Paths # --------------------------------------------------------------------------- HERMES_HOME = os.path.expanduser("~/.hermes") DATA_DIR = "/data/hermes" LOG_DIR = os.path.join(HERMES_HOME, "logs") LOG_FILE = os.path.join(LOG_DIR, "gateway.log") CONFIG_FILE = os.path.join(HERMES_HOME, "config.yaml") ENV_FILE = os.path.join(HERMES_HOME, ".env") DASHBOARD_HTML = "/app/dashboard.html" WEBUI_CLIENT_DIR = "/app/webui-client" # Backend URLs WEBUI_BFF_URL = "http://127.0.0.1:6060" GATEWAY_URL = "http://127.0.0.1:8642" # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) logger = logging.getLogger("entry") # --------------------------------------------------------------------------- # Global state # --------------------------------------------------------------------------- _gateway_start_time = time.time() _log_subscribers: list[Queue] = [] _log_tail_offset = 0 # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _mask_key(key: str) -> str: if not key or len(key) < 10: return "····" return key[:7] + "····" + key[-4:] def _load_env() -> dict[str, str]: env = {} try: with open(ENV_FILE) as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if "=" in line: k, _, v = line.partition("=") env[k.strip()] = v.strip().strip("\"'") except FileNotFoundError: pass return env def _load_config() -> dict: try: import yaml with open(CONFIG_FILE) as f: return yaml.safe_load(f) or {} except ImportError: cfg: dict = {} try: with open(CONFIG_FILE) as f: for line in f: stripped = line.strip() if not stripped or stripped.startswith("#"): continue if ":" in stripped and not line.startswith(" "): k, _, v = stripped.partition(":") k, v = k.strip(), v.strip() if v: cfg[k] = v except FileNotFoundError: pass return cfg except FileNotFoundError: return {} def _get_sessions_count() -> int: sessions_dir = os.path.join(HERMES_HOME, "sessions") if not os.path.isdir(sessions_dir): return 0 count = 0 for _ in Path(sessions_dir).rglob("*.json"): count += 1 return count def _get_session_list() -> list[dict]: sessions_dir = os.path.join(HERMES_HOME, "sessions") sessions = [] if not os.path.isdir(sessions_dir): return sessions for f in sorted(Path(sessions_dir).rglob("*.json"), key=os.path.getmtime, reverse=True)[:20]: try: data = json.loads(f.read_text()) name = f.stem msgs = len(data.get("trajectory", data.get("messages", []))) mtime = datetime.fromtimestamp(f.stat().st_mtime).strftime("%m-%d %H:%M") platform = data.get("platform", data.get("channel", "")) sessions.append({ "id": f.stem, "name": name[:40], "messages": msgs, "last": mtime, "platform": platform, "active": (time.time() - f.stat().st_mtime) < 3600, }) except Exception: sessions.append({"id": f.stem, "name": f.stem[:40], "messages": 0, "last": "--", "platform": "", "active": False}) return sessions def _proxy_request(url: str, method: str = "GET", headers: dict = None, body: bytes = None, timeout: int = 120) -> tuple[int, dict, bytes]: """Proxy a request to a backend URL and return (status, response_headers, body).""" req = urllib.request.Request(url, data=body, method=method) if headers: for k, v in headers.items(): try: req.add_header(k, v) except Exception: pass resp_body = b"" resp_headers = {} resp_status = 502 try: with urllib.request.urlopen(req, timeout=timeout) as resp: resp_status = resp.status resp_body = resp.read() # Collect response headers (skip hop-by-hop headers) skip = {"transfer-encoding", "connection", "keep-alive", "server", "date"} for key, val in resp.getheaders(): if key.lower() not in skip: resp_headers[key] = val except urllib.error.HTTPError as e: resp_status = e.code try: resp_body = e.read() except Exception: resp_body = b"" for key, val in list(e.headers.items()): skip = {"transfer-encoding", "connection", "keep-alive", "server", "date"} if key.lower() not in skip: resp_headers[key] = val except Exception as e: logger.warning("Proxy error: %s -> %s", url, e) resp_body = json.dumps({"error": str(e)}).encode() return resp_status, resp_headers, resp_body # --------------------------------------------------------------------------- # Log tailer # --------------------------------------------------------------------------- def _parse_log_line(line: str) -> dict | None: m = re.match(r"^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s*\[?(INFO|WARN|WARNING|ERROR|DEBUG)\]?\s*(.*)", line) if not m: m = re.match(r"\[?(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\]?\s*\[?(INFO|WARN|WARNING|ERROR|DEBUG)\]?.*?:\s*(.*)", line) if not m: return None return { "time": m.group(1), "level": m.group(2).upper(), "msg": m.group(3).strip(), "hl": "feishu" in m.group(3).lower() or "connected" in m.group(3).lower(), } def _log_tailer(): global _log_tail_offset while True: try: if not os.path.isfile(LOG_FILE): time.sleep(2) continue with open(LOG_FILE, "r", errors="replace") as f: f.seek(_log_tail_offset) new_lines = f.readlines() _log_tail_offset = f.tell() if new_lines: parsed = [] for line in new_lines: p = _parse_log_line(line.strip()) if p: parsed.append(p) if parsed: dead = [] for q in _log_subscribers: try: q.put_nowait(parsed) except Exception: dead.append(q) for q in dead: _log_subscribers.remove(q) time.sleep(0.5) except Exception as e: logger.debug("Log tailer error: %s", e) time.sleep(1) # --------------------------------------------------------------------------- # Persistent storage setup # --------------------------------------------------------------------------- def _ensure_persistent_storage(): for d in ("sessions", "memories", "uploads", "logs", "palace", "skills", "weixin"): os.makedirs(os.path.join(DATA_DIR, d), exist_ok=True) hermes = Path(HERMES_HOME) hermes.mkdir(parents=True, exist_ok=True) for d in ("sessions", "memories", "uploads", "logs", "palace", "skills", "weixin"): target = hermes / d if not target.exists(): try: target.symlink_to(os.path.join(DATA_DIR, d)) logger.info("Symlink: %s -> %s", d, os.path.join(DATA_DIR, d)) except OSError: target.mkdir(exist_ok=True) logger.warning("Could not symlink %s, using local dir", d) # --------------------------------------------------------------------------- # HTTP Handler — Reverse Proxy + Dashboard # --------------------------------------------------------------------------- class ProxyHandler(BaseHTTPRequestHandler): """Routes traffic between WebUI BFF, Gateway, and Dashboard.""" def log_message(self, fmt, *args): pass # silence request logs def _check_auth(self) -> bool: """Check Bearer token auth for sensitive endpoints.""" auth = self.headers.get("Authorization", "") expected = os.environ.get("AUTH_TOKEN", "hermes-bot-2026") return auth == f"Bearer {expected}" def _send_json(self, data: dict, status=200): body = json.dumps(data, ensure_ascii=False).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(body) def _send_html(self, path: str): try: with open(path, "rb") as f: body = f.read() self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) except FileNotFoundError: self.send_error(404) def _read_body(self) -> bytes: length = int(self.headers.get("Content-Length", 0)) return self.rfile.read(length) if length > 0 else b"" def _send_file(self, filepath: str, content_type: str = None): try: with open(filepath, "rb") as f: body = f.read() if content_type is None: content_type = mimetypes.guess_type(filepath)[0] or "application/octet-stream" self.send_response(200) self.send_header("Content-Type", content_type) self.send_header("Content-Length", str(len(body))) self.send_header("Cache-Control", "public, max-age=86400") self.end_headers() self.wfile.write(body) except FileNotFoundError: self.send_error(404) def _send_proxy_response(self, status: int, headers: dict, body: bytes): self.send_response(status) # Always add CORS self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, PATCH, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Authorization, Content-Type, X-Hermes-Session-Id, X-Hermes-Profile, X-Hermes-Session-Token") content_type = headers.pop("Content-Type", headers.pop("content-type", "application/json")) self.send_header("Content-Type", content_type) content_length = headers.pop("Content-Length", headers.pop("content-length", None)) if content_length is None: content_length = str(len(body)) self.send_header("Content-Length", content_length) for k, v in headers.items(): self.send_header(k, v) self.end_headers() self.wfile.write(body) def _forward_to_webui(self, path: str, method: str): """Forward request to WebUI BFF on :6060.""" url = f"{WEBUI_BFF_URL}{path}" headers = {} # Forward key headers for h in ("Authorization", "Content-Type", "X-Hermes-Session-Id", "X-Hermes-Profile", "X-Hermes-Session-Token"): val = self.headers.get(h) if val: headers[h] = val body = self._read_body() if method in ("POST", "PUT", "PATCH", "DELETE") else None status, resp_headers, resp_body = _proxy_request(url, method, headers, body) self._send_proxy_response(status, resp_headers, resp_body) def _forward_to_gateway(self, path: str, method: str): """Forward request to Gateway API on :8642.""" url = f"{GATEWAY_URL}{path}" headers = {} for h in ("Authorization", "Content-Type", "X-Hermes-Session-Id"): val = self.headers.get(h) if val: headers[h] = val body = self._read_body() if method in ("POST", "PUT", "PATCH", "DELETE") else None status, resp_headers, resp_body = _proxy_request(url, method, headers, body) self._send_proxy_response(status, resp_headers, resp_body) # ── OPTIONS (CORS preflight) ── def do_OPTIONS(self): self.send_response(200) self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, PATCH, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Authorization, Content-Type, X-Hermes-Session-Id, X-Hermes-Profile, X-Hermes-Session-Token") self.send_header("Access-Control-Max-Age", "86400") self.send_header("Content-Length", "0") self.end_headers() # ── GET routes ── def do_GET(self): parsed = urlparse(self.path) path = parsed.path qs = parsed.query method_override = "GET" # ── Root → redirect to WebUI Chat ── if path in ("/", "/index.html"): self.send_response(302) self.send_header("Location", "/webui#/hermes/chat") self.send_header("Content-Length", "0") self.end_headers() return # ── Deploy overview ── if path == "/deploy": return self._send_html("/app/deploy.html") # ── Diagnostic: gateway log dump (auth required) ── if path == "/api/diagnostics/gateway-log": if not self._check_auth(): return self._send_json({"error": "unauthorized"}, 401) return self._handle_gateway_log() # ── Diagnostic: gateway restart (auth required) ── if path == "/api/diagnostics/restart-gateway": if not self._check_auth(): return self._send_json({"error": "unauthorized"}, 401) return self._handle_restart_gateway() # ── SSE log stream (auth required) ── if path == "/api/logs/stream": if not self._check_auth(): return self._send_json({"error": "unauthorized"}, 401) return self._handle_sse() # ── WebUI SPA: /webui, /webui/* → serve from webui-client static dir ── if path.startswith("/webui"): # Map /webui → /, /webui/xxx → /xxx for the static dir if path in ("/webui", "/webui/"): index_path = os.path.join(WEBUI_CLIENT_DIR, "index.html") if os.path.isfile(index_path): return self._send_html(index_path) # Try static file static_path = os.path.join(WEBUI_CLIENT_DIR, path[len("/webui"):].lstrip("/")) if os.path.isfile(static_path): return self._send_file(static_path) # SPA fallback index_path = os.path.join(WEBUI_CLIENT_DIR, "index.html") if os.path.isfile(index_path): return self._send_html(index_path) # Final fallback: proxy to BFF return self._forward_to_webui(path[5:] or "/", "GET") # ── WebUI static assets (referenced by SPA as /assets/*, /favicon*) ── # These must be served from webui-client dir because the SPA uses absolute paths if path.startswith("/assets/") or path.startswith("/favicon") or path == "/logo.png": static_path = os.path.join(WEBUI_CLIENT_DIR, path.lstrip("/")) if os.path.isfile(static_path): return self._send_file(static_path) # If not found locally, try proxying to BFF return self._forward_to_webui(self.path, "GET") # ── WebUI API routes: /api/*, /v1/* → proxy to WebUI BFF ── if path.startswith("/api/") or path.startswith("/v1/"): return self._forward_to_webui(self.path, "GET") # ── /health → proxy to BFF ── if path == "/health": return self._forward_to_webui("/health", "GET") # ── /upload, /webhook → proxy to BFF ── if path in ("/upload", "/webhook"): return self._forward_to_webui(self.path, "GET") # ── /socket.io/* → proxy to BFF (WebSocket upgrade handled separately) ── if path.startswith("/socket.io/"): return self._forward_to_webui(self.path, "GET") self.send_error(404) # ── POST routes ── def _rewrite_webui_path(self, path: str) -> str: """Rewrite /webui/* to /* for BFF proxy.""" if path.startswith("/webui") and len(path) > 5: return path[5:] or "/" if path == "/webui": return "/" return path def do_POST(self): parsed = urlparse(self.path) path = parsed.path bff_path = self._rewrite_webui_path(path) # All /api/*, /v1/*, /upload, /webhook → proxy to WebUI BFF if path.startswith("/api/") or path.startswith("/v1/") or path in ("/upload", "/webhook") or path.startswith("/webui"): return self._forward_to_webui(bff_path + parsed.query, "POST") self.send_error(404) def do_PUT(self): parsed = urlparse(self.path) path = parsed.path bff_path = self._rewrite_webui_path(path) if path.startswith("/api/") or path.startswith("/v1/") or path.startswith("/webui"): return self._forward_to_webui(bff_path + parsed.query, "PUT") self.send_error(404) def do_DELETE(self): parsed = urlparse(self.path) path = parsed.path bff_path = self._rewrite_webui_path(path) if path.startswith("/api/") or path.startswith("/v1/") or path.startswith("/webui"): return self._forward_to_webui(bff_path + parsed.query, "DELETE") self.send_error(404) def do_PATCH(self): parsed = urlparse(self.path) path = parsed.path bff_path = self._rewrite_webui_path(path) if path.startswith("/api/") or path.startswith("/v1/") or path.startswith("/webui"): return self._forward_to_webui(bff_path + parsed.query, "PATCH") self.send_error(404) # ── Handlers ── def _get_status(self) -> dict: env = _load_env() cfg = _load_config() ram = psutil.virtual_memory() is_running = False pid = "N/A" # Check gateway health via HTTP try: status, _, body = _proxy_request(f"{GATEWAY_URL}/health", "GET", timeout=3) is_running = (status == 200) except Exception: pass model = cfg.get("model", env.get("LLM_MODEL", "unknown")) provider = cfg.get("provider", "openrouter") fallback = cfg.get("fallback_model", {}) fallback_model = fallback.get("model", "") if isinstance(fallback, dict) else "" return { "running": is_running, "pid": pid, "model": model, "provider": provider, "fallback_model": fallback_model, "platform": "飞书 Feishu", "platform_mode": "WebSocket", "sessions": _get_sessions_count(), "messages": 0, "ram": f"{ram.percent:.0f}%", "uptime_ms": int((time.time() - _gateway_start_time) * 1000), "config": { "OPENROUTER_API_KEY": _mask_key(env.get("OPENROUTER_API_KEY", "")), "FEISHU_APP_ID": env.get("FEISHU_APP_ID", ""), "FEISHU_APP_SECRET": _mask_key(env.get("FEISHU_APP_SECRET", "")), "terminal": cfg.get("terminal", {}).get("backend", "local") if isinstance(cfg.get("terminal"), dict) else "local", "timezone": cfg.get("timezone", "北京时间 (UTC+8)"), "max_turns": cfg.get("max_turns", "90"), "memory": cfg.get("memory", {}).get("provider", "none") if isinstance(cfg.get("memory"), dict) else "none", "mcp_servers": list(cfg.get("mcp_servers", {}).keys()) if isinstance(cfg.get("mcp_servers"), dict) else [], "compress": cfg.get("compress", {}).get("enabled", False) if isinstance(cfg.get("compress"), dict) else False, }, } def _handle_sse(self): self.send_response(200) self.send_header("Content-Type", "text/event-stream") self.send_header("Cache-Control", "no-cache") self.send_header("Connection", "keep-alive") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() q: Queue = Queue(maxsize=100) _log_subscribers.append(q) try: history = self._get_log_history_inner(limit=100) if history: self.wfile.write(f"data: {json.dumps(history, ensure_ascii=False)}\n\n".encode()) self.wfile.flush() while True: try: lines = q.get(timeout=30) payload = f"data: {json.dumps(lines, ensure_ascii=False)}\n\n" self.wfile.write(payload.encode()) self.wfile.flush() except Empty: self.wfile.write(":heartbeat\n\n".encode()) self.wfile.flush() except (BrokenPipeError, ConnectionResetError, OSError): pass finally: if q in _log_subscribers: _log_subscribers.remove(q) def _handle_gateway_log(self): """Return raw gateway.log contents for diagnostics.""" log_path = LOG_FILE if not os.path.isfile(log_path): # Also check the data dir directly alt = "/data/hermes/logs/gateway.log" if os.path.isfile(alt): log_path = alt if not os.path.isfile(log_path): self._send_json({"error": "gateway.log not found", "checked": [LOG_FILE, "/data/hermes/logs/gateway.log"]}) return try: with open(log_path, "r", errors="replace") as f: content = f.read() body = content.encode("utf-8") if content else b"(empty)" self.send_response(200) self.send_header("Content-Type", "text/plain; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(body) except Exception as e: self._send_json({"error": str(e)}) def _handle_restart_gateway(self): """Trigger gateway restart via watchdog signal file.""" try: signal_file = "/tmp/hermes-gateway-restart" with open(signal_file, "w") as f: f.write(str(time.time())) self._send_json({"ok": True, "message": "Restart signal sent"}) except Exception as e: self._send_json({"ok": False, "error": str(e)}) def _get_log_history(self, query: str = "") -> list: params = parse_qs(query) limit = int(params.get("limit", ["100"])[0]) return self._get_log_history_inner(limit=limit) def _get_log_history_inner(self, limit: int = 100) -> list: if not os.path.isfile(LOG_FILE): return [] try: with open(LOG_FILE, "r", errors="replace") as f: lines = f.readlines()[-limit:] result = [] for line in lines: p = _parse_log_line(line.strip()) if p: result.append(p) return result except Exception: return [] # --------------------------------------------------------------------------- # Python-based Gateway Watchdog (robust, lives inside entry.py) # --------------------------------------------------------------------------- def _gateway_watchdog(interval: int = 30): """Monitor gateway process liveness and auto-restart if it dies. This runs as a daemon thread inside entry.py so it survives SIGTERM to the shell-level watchdog in start.sh. """ import signal as _signal pid_file = os.path.join(HERMES_HOME, "gateway.pid") alt_pid_file = "/tmp/hermes-gateway.pid" lock_file = os.path.join(HERMES_HOME, ".gateway_runtime_lock") takeover_file = os.path.join(HERMES_HOME, ".gateway_takeover") gw_log = "/data/hermes/logs/gateway.log" def _find_gateway_pid() -> int | None: """Find the gateway PID from pid files or by process name.""" for pf in (pid_file, alt_pid_file): try: with open(pf) as f: pid = int(f.read().strip()) if pid > 0 and psutil.pid_exists(pid): proc = psutil.Process(pid) if "hermes" in " ".join(proc.cmdline()).lower() or "gateway" in " ".join(proc.cmdline()).lower(): return pid except Exception: pass # Fallback: search by process name try: for proc in psutil.process_iter(["pid", "cmdline"]): try: cmdline = " ".join(proc.info["cmdline"] or []) if "hermes_cli.main" in cmdline and "gateway" in cmdline: return proc.info["pid"] except (psutil.NoSuchProcess, psutil.AccessDenied): pass except Exception: pass return None def _kill_all_gateways(): """Force-kill ALL existing gateway processes and wait for ports to free. This prevents the 'another gateway is already using this Feishu app_id' error and port conflicts when restarting after a crash. """ killed = [] # Find ALL hermes gateway processes (not just the one we track) try: for proc in psutil.process_iter(["pid", "cmdline", "create_time"]): try: cmdline = " ".join(proc.info["cmdline"] or []) if "hermes_cli.main" in cmdline and "gateway" in cmdline: pid = proc.info["pid"] logger.info("[Watchdog] Force-killing residual gateway PID=%d", pid) proc.kill() # SIGKILL — no graceful shutdown, just die killed.append(pid) except (psutil.NoSuchProcess, psutil.AccessDenied): pass except Exception as e: logger.warning("[Watchdog] Error scanning processes: %s", e) # Also kill any lingering aiohttp servers on port 8642/8643 for port in (8642, 8643): try: for conn in psutil.net_connections(kind='inet'): if conn.laddr.port == port and conn.status == 'LISTEN': try: owner = psutil.Process(conn.pid) if conn.pid not in killed: logger.info("[Watchdog] Force-killing process %d on port %d", conn.pid, port) owner.kill() except (psutil.NoSuchProcess, psutil.AccessDenied): pass except Exception: pass # Wait for processes to actually die and ports to be released if killed: time.sleep(3) for pid in killed: try: p = psutil.Process(pid) if p.is_running(): logger.warning("[Watchdog] PID %d still running after kill, waiting...", pid) time.sleep(5) except psutil.NoSuchProcess: pass # Clean ALL stale state files for f in (pid_file, alt_pid_file, lock_file, takeover_file): try: os.remove(f) except Exception: pass # Also clean any .feishu_lock or similar files for pattern in ("feishu*.lock", "*.feishu_lock"): import glob as _glob for lock_f in _glob.glob(os.path.join(HERMES_HOME, pattern)): try: os.remove(lock_f) logger.info("[Watchdog] Removed stale lock: %s", lock_f) except Exception: pass return len(killed) def _start_gateway(): """Start the gateway process with full cleanup of any residual state.""" # Force-kill ALL residual gateway processes first killed = _kill_all_gateways() if killed: logger.info("[Watchdog] Killed %d residual gateway process(es) before restart", killed) log_fh = None try: log_fh = open(gw_log, "a") log_fh.write(f"\n--- [Python Watchdog] Starting gateway at {datetime.now().isoformat()} ---\n") log_fh.flush() except Exception: pass env = os.environ.copy() env["PYTHONUNBUFFERED"] = "1" env["HERMES_ACCEPT_HOOKS"] = "1" proc = subprocess.Popen( [sys.executable, "-u", "-m", "hermes_cli.main", "gateway", "run", "-v", "--replace"], stdout=log_fh if log_fh else subprocess.DEVNULL, stderr=subprocess.STDOUT, env=env, start_new_session=True, # decouple from entry.py signals ) try: with open(alt_pid_file, "w") as f: f.write(str(proc.pid)) except Exception: pass return proc logger.info("[Watchdog] Python gateway watchdog started (interval=%ds)", interval) # On startup: check if gateway is already running; if not, start it immediately initial_pid = _find_gateway_pid() if initial_pid is None: logger.info("[Watchdog] No gateway running on startup — launching gateway now") _start_gateway() time.sleep(15) # Give it time to fully initialize else: logger.info("[Watchdog] Gateway already running (PID=%d) on startup", initial_pid) time.sleep(5) # Brief settle time last_restart = 0 restart_backoff = 30 while True: try: time.sleep(interval) gw_pid = _find_gateway_pid() if gw_pid is not None: # Process is alive — but check if gateway is FUNCTIONAL (not zombie) # A zombie gateway has a live process but dead asyncio executor, # causing "Executor shutdown has been called" on every send. try: # Check gateway log for executor shutdown errors in last 60s now_ts = time.time() _zombie = False if os.path.isfile(gw_log): with open(gw_log, "r", errors="replace") as _lf: for _line in _lf.readlines()[-50:]: if "Executor shutdown" in _line and "RuntimeError" in _line: _zombie = True break if _zombie: logger.error("[Watchdog] Gateway is ZOMBIE (executor shutdown) — force-restarting...") killed = _kill_all_gateways() if not killed: try: os.kill(gw_pid, 9) except Exception: pass time.sleep(3) # Fall through to restart below else: restart_backoff = 30 continue except Exception: # If log check fails, assume gateway is fine restart_backoff = 30 continue else: logger.warning("[Watchdog] Gateway process is DEAD — restarting...") # Gateway is dead or zombie — restart with backoff now = time.time() if now - last_restart < restart_backoff: continue logger.warning("[Watchdog] Gateway process is DEAD — restarting...") proc = _start_gateway() last_restart = now restart_backoff = min(restart_backoff * 2, 300) # exponential backoff, max 5min # Wait and verify time.sleep(15) try: if proc.poll() is None: logger.info("[Watchdog] Gateway restarted successfully (PID=%d)", proc.pid) restart_backoff = 30 # reset on success else: logger.error("[Watchdog] Gateway exited immediately with code %d", proc.returncode) except Exception as e: logger.error("[Watchdog] Failed to verify restart: %s", e) except Exception as e: logger.error("[Watchdog] Error: %s", e) time.sleep(10) # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): logger.info("=== Hermes Bot — HuggingFace Space Entry ===") logger.info("WebUI at /webui (hermes-web-ui)") logger.info("Dashboard at /") # Setup persistent storage _ensure_persistent_storage() logger.info("Persistent storage ready at %s", DATA_DIR) # Start HTTP proxy server FIRST (before background checks) # HF Space requires the EXPOSE'd port to be bound quickly, # otherwise it shows a 404 page even though the container is RUNNING. try: server = ThreadingHTTPServer(("0.0.0.0", 7860), ProxyHandler) logger.info("Proxy listening on :7860") except OSError as e: logger.error("FATAL: Cannot bind port 7860: %s", e) sys.exit(1) # Start log tailer thread tailer = threading.Thread(target=_log_tailer, daemon=True) tailer.start() logger.info("Log tailer started") # Start Python-based gateway watchdog (survives shell death) watchdog = threading.Thread(target=_gateway_watchdog, args=(30,), daemon=True, name="gateway-watchdog") watchdog.start() logger.info("Python gateway watchdog started") # Check if backend services are reachable (non-blocking, best-effort) # This runs in a background thread so it doesn't delay serve_forever() def _wait_for_backends(): for attempt in range(10): try: urllib.request.urlopen(f"{GATEWAY_URL}/health", timeout=2) logger.info("Gateway reachable at %s", GATEWAY_URL) break except Exception: time.sleep(2) for attempt in range(10): try: urllib.request.urlopen(f"{WEBUI_BFF_URL}/health", timeout=2) logger.info("WebUI BFF reachable at %s", WEBUI_BFF_URL) break except Exception: time.sleep(2) threading.Thread(target=_wait_for_backends, daemon=True).start() # Start serving (this blocks forever) try: logger.info(" / → Dashboard") logger.info(" /webui → hermes-web-ui") logger.info(" /api/* → proxy to WebUI BFF (:6060)") logger.info(" /v1/* → proxy to WebUI BFF (:6060)") server.serve_forever() except Exception as e: logger.error("FATAL: Proxy server error: %s", e) import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()