Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """Hermes Bot — HuggingFace Space Entry Point. | |
| Serves as the main HTTP entry point on port 7860. | |
| Routes traffic to: | |
| - hermes-web-ui BFF (Node.js on :6060) for the WebUI interface | |
| - Original dashboard on :7860/ and /deploy | |
| - Gateway health on /api/gateway-status | |
| """ | |
| import json | |
| import os | |
| import re | |
| import subprocess | |
| import sys | |
| import threading | |
| import time | |
| import logging | |
| import psutil | |
| import mimetypes | |
| import urllib.request | |
| import urllib.error | |
| import urllib.parse | |
| from datetime import datetime, timezone | |
| from http.server import HTTPServer, BaseHTTPRequestHandler | |
| from socketserver import ThreadingMixIn | |
| from pathlib import Path | |
| from urllib.parse import urlparse, parse_qs | |
| from queue import Queue, Empty | |
| from io import BytesIO | |
| class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): | |
| daemon_threads = True | |
| allow_reuse_address = True | |
| allow_reuse_port = True # Allow multiple binds during restart | |
| # Limit concurrent connections to prevent DoS / RAM exhaustion | |
| _semaphore = threading.Semaphore(50) | |
| def server_bind(self): | |
| # Set SO_REUSEADDR and SO_REUSEPORT before binding | |
| import socket | |
| self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| try: | |
| self.socket.setsockopt(socket.SOL_SOCKET, 15, 1) # SO_REUSEPORT | |
| except (AttributeError, OSError): | |
| pass | |
| super().server_bind() | |
| def process_request(self, request, client_address): | |
| with self._semaphore: | |
| return super().process_request(request, client_address) | |
| # --------------------------------------------------------------------------- | |
| # Paths | |
| # --------------------------------------------------------------------------- | |
| HERMES_HOME = os.path.expanduser("~/.hermes") | |
| DATA_DIR = "/data/hermes" | |
| LOG_DIR = os.path.join(HERMES_HOME, "logs") | |
| LOG_FILE = os.path.join(LOG_DIR, "gateway.log") | |
| CONFIG_FILE = os.path.join(HERMES_HOME, "config.yaml") | |
| ENV_FILE = os.path.join(HERMES_HOME, ".env") | |
| DASHBOARD_HTML = "/app/dashboard.html" | |
| WEBUI_CLIENT_DIR = "/app/webui-client" | |
| # Backend URLs | |
| WEBUI_BFF_URL = "http://127.0.0.1:6060" | |
| GATEWAY_URL = "http://127.0.0.1:8642" | |
| # --------------------------------------------------------------------------- | |
| # Logging | |
| # --------------------------------------------------------------------------- | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", | |
| ) | |
| logger = logging.getLogger("entry") | |
| # --------------------------------------------------------------------------- | |
| # Global state | |
| # --------------------------------------------------------------------------- | |
| _gateway_start_time = time.time() | |
| _log_subscribers: list[Queue] = [] | |
| _log_tail_offset = 0 | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| def _mask_key(key: str) -> str: | |
| if not key or len(key) < 10: | |
| return "····" | |
| return key[:7] + "····" + key[-4:] | |
| def _load_env() -> dict[str, str]: | |
| env = {} | |
| try: | |
| with open(ENV_FILE) as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line or line.startswith("#"): | |
| continue | |
| if "=" in line: | |
| k, _, v = line.partition("=") | |
| env[k.strip()] = v.strip().strip("\"'") | |
| except FileNotFoundError: | |
| pass | |
| return env | |
| def _load_config() -> dict: | |
| try: | |
| import yaml | |
| with open(CONFIG_FILE) as f: | |
| return yaml.safe_load(f) or {} | |
| except ImportError: | |
| cfg: dict = {} | |
| try: | |
| with open(CONFIG_FILE) as f: | |
| for line in f: | |
| stripped = line.strip() | |
| if not stripped or stripped.startswith("#"): | |
| continue | |
| if ":" in stripped and not line.startswith(" "): | |
| k, _, v = stripped.partition(":") | |
| k, v = k.strip(), v.strip() | |
| if v: | |
| cfg[k] = v | |
| except FileNotFoundError: | |
| pass | |
| return cfg | |
| except FileNotFoundError: | |
| return {} | |
| def _get_sessions_count() -> int: | |
| sessions_dir = os.path.join(HERMES_HOME, "sessions") | |
| if not os.path.isdir(sessions_dir): | |
| return 0 | |
| count = 0 | |
| for _ in Path(sessions_dir).rglob("*.json"): | |
| count += 1 | |
| return count | |
| def _get_session_list() -> list[dict]: | |
| sessions_dir = os.path.join(HERMES_HOME, "sessions") | |
| sessions = [] | |
| if not os.path.isdir(sessions_dir): | |
| return sessions | |
| for f in sorted(Path(sessions_dir).rglob("*.json"), key=os.path.getmtime, reverse=True)[:20]: | |
| try: | |
| data = json.loads(f.read_text()) | |
| name = f.stem | |
| msgs = len(data.get("trajectory", data.get("messages", []))) | |
| mtime = datetime.fromtimestamp(f.stat().st_mtime).strftime("%m-%d %H:%M") | |
| platform = data.get("platform", data.get("channel", "")) | |
| sessions.append({ | |
| "id": f.stem, | |
| "name": name[:40], | |
| "messages": msgs, | |
| "last": mtime, | |
| "platform": platform, | |
| "active": (time.time() - f.stat().st_mtime) < 3600, | |
| }) | |
| except Exception: | |
| sessions.append({"id": f.stem, "name": f.stem[:40], "messages": 0, "last": "--", "platform": "", "active": False}) | |
| return sessions | |
| def _proxy_request(url: str, method: str = "GET", headers: dict = None, | |
| body: bytes = None, timeout: int = 120) -> tuple[int, dict, bytes]: | |
| """Proxy a request to a backend URL and return (status, response_headers, body).""" | |
| req = urllib.request.Request(url, data=body, method=method) | |
| if headers: | |
| for k, v in headers.items(): | |
| try: | |
| req.add_header(k, v) | |
| except Exception: | |
| pass | |
| resp_body = b"" | |
| resp_headers = {} | |
| resp_status = 502 | |
| try: | |
| with urllib.request.urlopen(req, timeout=timeout) as resp: | |
| resp_status = resp.status | |
| resp_body = resp.read() | |
| # Collect response headers (skip hop-by-hop headers) | |
| skip = {"transfer-encoding", "connection", "keep-alive", "server", "date"} | |
| for key, val in resp.getheaders(): | |
| if key.lower() not in skip: | |
| resp_headers[key] = val | |
| except urllib.error.HTTPError as e: | |
| resp_status = e.code | |
| try: | |
| resp_body = e.read() | |
| except Exception: | |
| resp_body = b"" | |
| for key, val in list(e.headers.items()): | |
| skip = {"transfer-encoding", "connection", "keep-alive", "server", "date"} | |
| if key.lower() not in skip: | |
| resp_headers[key] = val | |
| except Exception as e: | |
| logger.warning("Proxy error: %s -> %s", url, e) | |
| resp_body = json.dumps({"error": str(e)}).encode() | |
| return resp_status, resp_headers, resp_body | |
| # --------------------------------------------------------------------------- | |
| # Log tailer | |
| # --------------------------------------------------------------------------- | |
| def _parse_log_line(line: str) -> dict | None: | |
| m = re.match(r"^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s*\[?(INFO|WARN|WARNING|ERROR|DEBUG)\]?\s*(.*)", line) | |
| if not m: | |
| m = re.match(r"\[?(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\]?\s*\[?(INFO|WARN|WARNING|ERROR|DEBUG)\]?.*?:\s*(.*)", line) | |
| if not m: | |
| return None | |
| return { | |
| "time": m.group(1), | |
| "level": m.group(2).upper(), | |
| "msg": m.group(3).strip(), | |
| "hl": "feishu" in m.group(3).lower() or "connected" in m.group(3).lower(), | |
| } | |
| def _log_tailer(): | |
| global _log_tail_offset | |
| while True: | |
| try: | |
| if not os.path.isfile(LOG_FILE): | |
| time.sleep(2) | |
| continue | |
| with open(LOG_FILE, "r", errors="replace") as f: | |
| f.seek(_log_tail_offset) | |
| new_lines = f.readlines() | |
| _log_tail_offset = f.tell() | |
| if new_lines: | |
| parsed = [] | |
| for line in new_lines: | |
| p = _parse_log_line(line.strip()) | |
| if p: | |
| parsed.append(p) | |
| if parsed: | |
| dead = [] | |
| for q in _log_subscribers: | |
| try: | |
| q.put_nowait(parsed) | |
| except Exception: | |
| dead.append(q) | |
| for q in dead: | |
| _log_subscribers.remove(q) | |
| time.sleep(0.5) | |
| except Exception as e: | |
| logger.debug("Log tailer error: %s", e) | |
| time.sleep(1) | |
| # --------------------------------------------------------------------------- | |
| # Persistent storage setup | |
| # --------------------------------------------------------------------------- | |
| def _ensure_persistent_storage(): | |
| for d in ("sessions", "memories", "uploads", "logs", "palace", "skills", "weixin"): | |
| os.makedirs(os.path.join(DATA_DIR, d), exist_ok=True) | |
| hermes = Path(HERMES_HOME) | |
| hermes.mkdir(parents=True, exist_ok=True) | |
| for d in ("sessions", "memories", "uploads", "logs", "palace", "skills", "weixin"): | |
| target = hermes / d | |
| if not target.exists(): | |
| try: | |
| target.symlink_to(os.path.join(DATA_DIR, d)) | |
| logger.info("Symlink: %s -> %s", d, os.path.join(DATA_DIR, d)) | |
| except OSError: | |
| target.mkdir(exist_ok=True) | |
| logger.warning("Could not symlink %s, using local dir", d) | |
| # --------------------------------------------------------------------------- | |
| # HTTP Handler — Reverse Proxy + Dashboard | |
| # --------------------------------------------------------------------------- | |
| class ProxyHandler(BaseHTTPRequestHandler): | |
| """Routes traffic between WebUI BFF, Gateway, and Dashboard.""" | |
| def log_message(self, fmt, *args): | |
| pass # silence request logs | |
| def _check_auth(self) -> bool: | |
| """Check Bearer token auth for sensitive endpoints.""" | |
| auth = self.headers.get("Authorization", "") | |
| expected = os.environ.get("AUTH_TOKEN", "hermes-bot-2026") | |
| return auth == f"Bearer {expected}" | |
| def _send_json(self, data: dict, status=200): | |
| body = json.dumps(data, ensure_ascii=False).encode("utf-8") | |
| self.send_response(status) | |
| self.send_header("Content-Type", "application/json; charset=utf-8") | |
| self.send_header("Content-Length", str(len(body))) | |
| self.send_header("Access-Control-Allow-Origin", "*") | |
| self.end_headers() | |
| self.wfile.write(body) | |
| def _send_html(self, path: str): | |
| try: | |
| with open(path, "rb") as f: | |
| body = f.read() | |
| self.send_response(200) | |
| self.send_header("Content-Type", "text/html; charset=utf-8") | |
| self.send_header("Content-Length", str(len(body))) | |
| self.end_headers() | |
| self.wfile.write(body) | |
| except FileNotFoundError: | |
| self.send_error(404) | |
| def _read_body(self) -> bytes: | |
| length = int(self.headers.get("Content-Length", 0)) | |
| return self.rfile.read(length) if length > 0 else b"" | |
| def _send_file(self, filepath: str, content_type: str = None): | |
| try: | |
| with open(filepath, "rb") as f: | |
| body = f.read() | |
| if content_type is None: | |
| content_type = mimetypes.guess_type(filepath)[0] or "application/octet-stream" | |
| self.send_response(200) | |
| self.send_header("Content-Type", content_type) | |
| self.send_header("Content-Length", str(len(body))) | |
| self.send_header("Cache-Control", "public, max-age=86400") | |
| self.end_headers() | |
| self.wfile.write(body) | |
| except FileNotFoundError: | |
| self.send_error(404) | |
| def _send_proxy_response(self, status: int, headers: dict, body: bytes): | |
| self.send_response(status) | |
| # Always add CORS | |
| self.send_header("Access-Control-Allow-Origin", "*") | |
| self.send_header("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, PATCH, OPTIONS") | |
| self.send_header("Access-Control-Allow-Headers", "Authorization, Content-Type, X-Hermes-Session-Id, X-Hermes-Profile, X-Hermes-Session-Token") | |
| content_type = headers.pop("Content-Type", headers.pop("content-type", "application/json")) | |
| self.send_header("Content-Type", content_type) | |
| content_length = headers.pop("Content-Length", headers.pop("content-length", None)) | |
| if content_length is None: | |
| content_length = str(len(body)) | |
| self.send_header("Content-Length", content_length) | |
| for k, v in headers.items(): | |
| self.send_header(k, v) | |
| self.end_headers() | |
| self.wfile.write(body) | |
| def _forward_to_webui(self, path: str, method: str): | |
| """Forward request to WebUI BFF on :6060.""" | |
| url = f"{WEBUI_BFF_URL}{path}" | |
| headers = {} | |
| # Forward key headers | |
| for h in ("Authorization", "Content-Type", "X-Hermes-Session-Id", | |
| "X-Hermes-Profile", "X-Hermes-Session-Token"): | |
| val = self.headers.get(h) | |
| if val: | |
| headers[h] = val | |
| body = self._read_body() if method in ("POST", "PUT", "PATCH", "DELETE") else None | |
| status, resp_headers, resp_body = _proxy_request(url, method, headers, body) | |
| self._send_proxy_response(status, resp_headers, resp_body) | |
| def _forward_to_gateway(self, path: str, method: str): | |
| """Forward request to Gateway API on :8642.""" | |
| url = f"{GATEWAY_URL}{path}" | |
| headers = {} | |
| for h in ("Authorization", "Content-Type", "X-Hermes-Session-Id"): | |
| val = self.headers.get(h) | |
| if val: | |
| headers[h] = val | |
| body = self._read_body() if method in ("POST", "PUT", "PATCH", "DELETE") else None | |
| status, resp_headers, resp_body = _proxy_request(url, method, headers, body) | |
| self._send_proxy_response(status, resp_headers, resp_body) | |
| # ── OPTIONS (CORS preflight) ── | |
| def do_OPTIONS(self): | |
| self.send_response(200) | |
| self.send_header("Access-Control-Allow-Origin", "*") | |
| self.send_header("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, PATCH, OPTIONS") | |
| self.send_header("Access-Control-Allow-Headers", "Authorization, Content-Type, X-Hermes-Session-Id, X-Hermes-Profile, X-Hermes-Session-Token") | |
| self.send_header("Access-Control-Max-Age", "86400") | |
| self.send_header("Content-Length", "0") | |
| self.end_headers() | |
| # ── GET routes ── | |
| def do_GET(self): | |
| parsed = urlparse(self.path) | |
| path = parsed.path | |
| qs = parsed.query | |
| method_override = "GET" | |
| # ── Root → redirect to WebUI Chat ── | |
| if path in ("/", "/index.html"): | |
| self.send_response(302) | |
| self.send_header("Location", "/webui#/hermes/chat") | |
| self.send_header("Content-Length", "0") | |
| self.end_headers() | |
| return | |
| # ── Deploy overview ── | |
| if path == "/deploy": | |
| return self._send_html("/app/deploy.html") | |
| # ── Diagnostic: gateway log dump (auth required) ── | |
| if path == "/api/diagnostics/gateway-log": | |
| if not self._check_auth(): | |
| return self._send_json({"error": "unauthorized"}, 401) | |
| return self._handle_gateway_log() | |
| # ── Diagnostic: gateway restart (auth required) ── | |
| if path == "/api/diagnostics/restart-gateway": | |
| if not self._check_auth(): | |
| return self._send_json({"error": "unauthorized"}, 401) | |
| return self._handle_restart_gateway() | |
| # ── SSE log stream (auth required) ── | |
| if path == "/api/logs/stream": | |
| if not self._check_auth(): | |
| return self._send_json({"error": "unauthorized"}, 401) | |
| return self._handle_sse() | |
| # ── WebUI SPA: /webui, /webui/* → serve from webui-client static dir ── | |
| if path.startswith("/webui"): | |
| # Map /webui → /, /webui/xxx → /xxx for the static dir | |
| if path in ("/webui", "/webui/"): | |
| index_path = os.path.join(WEBUI_CLIENT_DIR, "index.html") | |
| if os.path.isfile(index_path): | |
| return self._send_html(index_path) | |
| # Try static file | |
| static_path = os.path.join(WEBUI_CLIENT_DIR, path[len("/webui"):].lstrip("/")) | |
| if os.path.isfile(static_path): | |
| return self._send_file(static_path) | |
| # SPA fallback | |
| index_path = os.path.join(WEBUI_CLIENT_DIR, "index.html") | |
| if os.path.isfile(index_path): | |
| return self._send_html(index_path) | |
| # Final fallback: proxy to BFF | |
| return self._forward_to_webui(path[5:] or "/", "GET") | |
| # ── WebUI static assets (referenced by SPA as /assets/*, /favicon*) ── | |
| # These must be served from webui-client dir because the SPA uses absolute paths | |
| if path.startswith("/assets/") or path.startswith("/favicon") or path == "/logo.png": | |
| static_path = os.path.join(WEBUI_CLIENT_DIR, path.lstrip("/")) | |
| if os.path.isfile(static_path): | |
| return self._send_file(static_path) | |
| # If not found locally, try proxying to BFF | |
| return self._forward_to_webui(self.path, "GET") | |
| # ── WebUI API routes: /api/*, /v1/* → proxy to WebUI BFF ── | |
| if path.startswith("/api/") or path.startswith("/v1/"): | |
| return self._forward_to_webui(self.path, "GET") | |
| # ── /health → proxy to BFF ── | |
| if path == "/health": | |
| return self._forward_to_webui("/health", "GET") | |
| # ── /upload, /webhook → proxy to BFF ── | |
| if path in ("/upload", "/webhook"): | |
| return self._forward_to_webui(self.path, "GET") | |
| # ── /socket.io/* → proxy to BFF (WebSocket upgrade handled separately) ── | |
| if path.startswith("/socket.io/"): | |
| return self._forward_to_webui(self.path, "GET") | |
| self.send_error(404) | |
| # ── POST routes ── | |
| def _rewrite_webui_path(self, path: str) -> str: | |
| """Rewrite /webui/* to /* for BFF proxy.""" | |
| if path.startswith("/webui") and len(path) > 5: | |
| return path[5:] or "/" | |
| if path == "/webui": | |
| return "/" | |
| return path | |
| def do_POST(self): | |
| parsed = urlparse(self.path) | |
| path = parsed.path | |
| bff_path = self._rewrite_webui_path(path) | |
| # All /api/*, /v1/*, /upload, /webhook → proxy to WebUI BFF | |
| if path.startswith("/api/") or path.startswith("/v1/") or path in ("/upload", "/webhook") or path.startswith("/webui"): | |
| return self._forward_to_webui(bff_path + parsed.query, "POST") | |
| self.send_error(404) | |
| def do_PUT(self): | |
| parsed = urlparse(self.path) | |
| path = parsed.path | |
| bff_path = self._rewrite_webui_path(path) | |
| if path.startswith("/api/") or path.startswith("/v1/") or path.startswith("/webui"): | |
| return self._forward_to_webui(bff_path + parsed.query, "PUT") | |
| self.send_error(404) | |
| def do_DELETE(self): | |
| parsed = urlparse(self.path) | |
| path = parsed.path | |
| bff_path = self._rewrite_webui_path(path) | |
| if path.startswith("/api/") or path.startswith("/v1/") or path.startswith("/webui"): | |
| return self._forward_to_webui(bff_path + parsed.query, "DELETE") | |
| self.send_error(404) | |
| def do_PATCH(self): | |
| parsed = urlparse(self.path) | |
| path = parsed.path | |
| bff_path = self._rewrite_webui_path(path) | |
| if path.startswith("/api/") or path.startswith("/v1/") or path.startswith("/webui"): | |
| return self._forward_to_webui(bff_path + parsed.query, "PATCH") | |
| self.send_error(404) | |
| # ── Handlers ── | |
| def _get_status(self) -> dict: | |
| env = _load_env() | |
| cfg = _load_config() | |
| ram = psutil.virtual_memory() | |
| is_running = False | |
| pid = "N/A" | |
| # Check gateway health via HTTP | |
| try: | |
| status, _, body = _proxy_request(f"{GATEWAY_URL}/health", "GET", timeout=3) | |
| is_running = (status == 200) | |
| except Exception: | |
| pass | |
| model = cfg.get("model", env.get("LLM_MODEL", "unknown")) | |
| provider = cfg.get("provider", "openrouter") | |
| fallback = cfg.get("fallback_model", {}) | |
| fallback_model = fallback.get("model", "") if isinstance(fallback, dict) else "" | |
| return { | |
| "running": is_running, | |
| "pid": pid, | |
| "model": model, | |
| "provider": provider, | |
| "fallback_model": fallback_model, | |
| "platform": "飞书 Feishu", | |
| "platform_mode": "WebSocket", | |
| "sessions": _get_sessions_count(), | |
| "messages": 0, | |
| "ram": f"{ram.percent:.0f}%", | |
| "uptime_ms": int((time.time() - _gateway_start_time) * 1000), | |
| "config": { | |
| "OPENROUTER_API_KEY": _mask_key(env.get("OPENROUTER_API_KEY", "")), | |
| "FEISHU_APP_ID": env.get("FEISHU_APP_ID", ""), | |
| "FEISHU_APP_SECRET": _mask_key(env.get("FEISHU_APP_SECRET", "")), | |
| "terminal": cfg.get("terminal", {}).get("backend", "local") if isinstance(cfg.get("terminal"), dict) else "local", | |
| "timezone": cfg.get("timezone", "北京时间 (UTC+8)"), | |
| "max_turns": cfg.get("max_turns", "90"), | |
| "memory": cfg.get("memory", {}).get("provider", "none") if isinstance(cfg.get("memory"), dict) else "none", | |
| "mcp_servers": list(cfg.get("mcp_servers", {}).keys()) if isinstance(cfg.get("mcp_servers"), dict) else [], | |
| "compress": cfg.get("compress", {}).get("enabled", False) if isinstance(cfg.get("compress"), dict) else False, | |
| }, | |
| } | |
| def _handle_sse(self): | |
| self.send_response(200) | |
| self.send_header("Content-Type", "text/event-stream") | |
| self.send_header("Cache-Control", "no-cache") | |
| self.send_header("Connection", "keep-alive") | |
| self.send_header("Access-Control-Allow-Origin", "*") | |
| self.end_headers() | |
| q: Queue = Queue(maxsize=100) | |
| _log_subscribers.append(q) | |
| try: | |
| history = self._get_log_history_inner(limit=100) | |
| if history: | |
| self.wfile.write(f"data: {json.dumps(history, ensure_ascii=False)}\n\n".encode()) | |
| self.wfile.flush() | |
| while True: | |
| try: | |
| lines = q.get(timeout=30) | |
| payload = f"data: {json.dumps(lines, ensure_ascii=False)}\n\n" | |
| self.wfile.write(payload.encode()) | |
| self.wfile.flush() | |
| except Empty: | |
| self.wfile.write(":heartbeat\n\n".encode()) | |
| self.wfile.flush() | |
| except (BrokenPipeError, ConnectionResetError, OSError): | |
| pass | |
| finally: | |
| if q in _log_subscribers: | |
| _log_subscribers.remove(q) | |
| def _handle_gateway_log(self): | |
| """Return raw gateway.log contents for diagnostics.""" | |
| log_path = LOG_FILE | |
| if not os.path.isfile(log_path): | |
| # Also check the data dir directly | |
| alt = "/data/hermes/logs/gateway.log" | |
| if os.path.isfile(alt): | |
| log_path = alt | |
| if not os.path.isfile(log_path): | |
| self._send_json({"error": "gateway.log not found", "checked": [LOG_FILE, "/data/hermes/logs/gateway.log"]}) | |
| return | |
| try: | |
| with open(log_path, "r", errors="replace") as f: | |
| content = f.read() | |
| body = content.encode("utf-8") if content else b"(empty)" | |
| self.send_response(200) | |
| self.send_header("Content-Type", "text/plain; charset=utf-8") | |
| self.send_header("Content-Length", str(len(body))) | |
| self.send_header("Access-Control-Allow-Origin", "*") | |
| self.end_headers() | |
| self.wfile.write(body) | |
| except Exception as e: | |
| self._send_json({"error": str(e)}) | |
| def _handle_restart_gateway(self): | |
| """Trigger gateway restart via watchdog signal file.""" | |
| try: | |
| signal_file = "/tmp/hermes-gateway-restart" | |
| with open(signal_file, "w") as f: | |
| f.write(str(time.time())) | |
| self._send_json({"ok": True, "message": "Restart signal sent"}) | |
| except Exception as e: | |
| self._send_json({"ok": False, "error": str(e)}) | |
| def _get_log_history(self, query: str = "") -> list: | |
| params = parse_qs(query) | |
| limit = int(params.get("limit", ["100"])[0]) | |
| return self._get_log_history_inner(limit=limit) | |
| def _get_log_history_inner(self, limit: int = 100) -> list: | |
| if not os.path.isfile(LOG_FILE): | |
| return [] | |
| try: | |
| with open(LOG_FILE, "r", errors="replace") as f: | |
| lines = f.readlines()[-limit:] | |
| result = [] | |
| for line in lines: | |
| p = _parse_log_line(line.strip()) | |
| if p: | |
| result.append(p) | |
| return result | |
| except Exception: | |
| return [] | |
| # --------------------------------------------------------------------------- | |
| # Python-based Gateway Watchdog (robust, lives inside entry.py) | |
| # --------------------------------------------------------------------------- | |
| def _gateway_watchdog(interval: int = 30): | |
| """Monitor gateway process liveness and auto-restart if it dies. | |
| This runs as a daemon thread inside entry.py so it survives | |
| SIGTERM to the shell-level watchdog in start.sh. | |
| """ | |
| import signal as _signal | |
| pid_file = os.path.join(HERMES_HOME, "gateway.pid") | |
| alt_pid_file = "/tmp/hermes-gateway.pid" | |
| lock_file = os.path.join(HERMES_HOME, ".gateway_runtime_lock") | |
| takeover_file = os.path.join(HERMES_HOME, ".gateway_takeover") | |
| gw_log = "/data/hermes/logs/gateway.log" | |
| def _find_gateway_pid() -> int | None: | |
| """Find the gateway PID from pid files or by process name.""" | |
| for pf in (pid_file, alt_pid_file): | |
| try: | |
| with open(pf) as f: | |
| pid = int(f.read().strip()) | |
| if pid > 0 and psutil.pid_exists(pid): | |
| proc = psutil.Process(pid) | |
| if "hermes" in " ".join(proc.cmdline()).lower() or "gateway" in " ".join(proc.cmdline()).lower(): | |
| return pid | |
| except Exception: | |
| pass | |
| # Fallback: search by process name | |
| try: | |
| for proc in psutil.process_iter(["pid", "cmdline"]): | |
| try: | |
| cmdline = " ".join(proc.info["cmdline"] or []) | |
| if "hermes_cli.main" in cmdline and "gateway" in cmdline: | |
| return proc.info["pid"] | |
| except (psutil.NoSuchProcess, psutil.AccessDenied): | |
| pass | |
| except Exception: | |
| pass | |
| return None | |
| def _kill_all_gateways(): | |
| """Force-kill ALL existing gateway processes and wait for ports to free. | |
| This prevents the 'another gateway is already using this Feishu app_id' | |
| error and port conflicts when restarting after a crash. | |
| """ | |
| killed = [] | |
| # Find ALL hermes gateway processes (not just the one we track) | |
| try: | |
| for proc in psutil.process_iter(["pid", "cmdline", "create_time"]): | |
| try: | |
| cmdline = " ".join(proc.info["cmdline"] or []) | |
| if "hermes_cli.main" in cmdline and "gateway" in cmdline: | |
| pid = proc.info["pid"] | |
| logger.info("[Watchdog] Force-killing residual gateway PID=%d", pid) | |
| proc.kill() # SIGKILL — no graceful shutdown, just die | |
| killed.append(pid) | |
| except (psutil.NoSuchProcess, psutil.AccessDenied): | |
| pass | |
| except Exception as e: | |
| logger.warning("[Watchdog] Error scanning processes: %s", e) | |
| # Also kill any lingering aiohttp servers on port 8642/8643 | |
| for port in (8642, 8643): | |
| try: | |
| for conn in psutil.net_connections(kind='inet'): | |
| if conn.laddr.port == port and conn.status == 'LISTEN': | |
| try: | |
| owner = psutil.Process(conn.pid) | |
| if conn.pid not in killed: | |
| logger.info("[Watchdog] Force-killing process %d on port %d", conn.pid, port) | |
| owner.kill() | |
| except (psutil.NoSuchProcess, psutil.AccessDenied): | |
| pass | |
| except Exception: | |
| pass | |
| # Wait for processes to actually die and ports to be released | |
| if killed: | |
| time.sleep(3) | |
| for pid in killed: | |
| try: | |
| p = psutil.Process(pid) | |
| if p.is_running(): | |
| logger.warning("[Watchdog] PID %d still running after kill, waiting...", pid) | |
| time.sleep(5) | |
| except psutil.NoSuchProcess: | |
| pass | |
| # Clean ALL stale state files | |
| for f in (pid_file, alt_pid_file, lock_file, takeover_file): | |
| try: | |
| os.remove(f) | |
| except Exception: | |
| pass | |
| # Also clean any .feishu_lock or similar files | |
| for pattern in ("feishu*.lock", "*.feishu_lock"): | |
| import glob as _glob | |
| for lock_f in _glob.glob(os.path.join(HERMES_HOME, pattern)): | |
| try: | |
| os.remove(lock_f) | |
| logger.info("[Watchdog] Removed stale lock: %s", lock_f) | |
| except Exception: | |
| pass | |
| return len(killed) | |
| def _start_gateway(): | |
| """Start the gateway process with full cleanup of any residual state.""" | |
| # Force-kill ALL residual gateway processes first | |
| killed = _kill_all_gateways() | |
| if killed: | |
| logger.info("[Watchdog] Killed %d residual gateway process(es) before restart", killed) | |
| log_fh = None | |
| try: | |
| log_fh = open(gw_log, "a") | |
| log_fh.write(f"\n--- [Python Watchdog] Starting gateway at {datetime.now().isoformat()} ---\n") | |
| log_fh.flush() | |
| except Exception: | |
| pass | |
| env = os.environ.copy() | |
| env["PYTHONUNBUFFERED"] = "1" | |
| env["HERMES_ACCEPT_HOOKS"] = "1" | |
| proc = subprocess.Popen( | |
| [sys.executable, "-u", "-m", "hermes_cli.main", "gateway", "run", "-v", "--replace"], | |
| stdout=log_fh if log_fh else subprocess.DEVNULL, | |
| stderr=subprocess.STDOUT, | |
| env=env, | |
| start_new_session=True, # decouple from entry.py signals | |
| ) | |
| try: | |
| with open(alt_pid_file, "w") as f: | |
| f.write(str(proc.pid)) | |
| except Exception: | |
| pass | |
| return proc | |
| logger.info("[Watchdog] Python gateway watchdog started (interval=%ds)", interval) | |
| # On startup: check if gateway is already running; if not, start it immediately | |
| initial_pid = _find_gateway_pid() | |
| if initial_pid is None: | |
| logger.info("[Watchdog] No gateway running on startup — launching gateway now") | |
| _start_gateway() | |
| time.sleep(15) # Give it time to fully initialize | |
| else: | |
| logger.info("[Watchdog] Gateway already running (PID=%d) on startup", initial_pid) | |
| time.sleep(5) # Brief settle time | |
| last_restart = 0 | |
| restart_backoff = 30 | |
| while True: | |
| try: | |
| time.sleep(interval) | |
| gw_pid = _find_gateway_pid() | |
| if gw_pid is not None: | |
| # Process is alive — but check if gateway is FUNCTIONAL (not zombie) | |
| # A zombie gateway has a live process but dead asyncio executor, | |
| # causing "Executor shutdown has been called" on every send. | |
| try: | |
| # Check gateway log for executor shutdown errors in last 60s | |
| now_ts = time.time() | |
| _zombie = False | |
| if os.path.isfile(gw_log): | |
| with open(gw_log, "r", errors="replace") as _lf: | |
| for _line in _lf.readlines()[-50:]: | |
| if "Executor shutdown" in _line and "RuntimeError" in _line: | |
| _zombie = True | |
| break | |
| if _zombie: | |
| logger.error("[Watchdog] Gateway is ZOMBIE (executor shutdown) — force-restarting...") | |
| killed = _kill_all_gateways() | |
| if not killed: | |
| try: | |
| os.kill(gw_pid, 9) | |
| except Exception: | |
| pass | |
| time.sleep(3) | |
| # Fall through to restart below | |
| else: | |
| restart_backoff = 30 | |
| continue | |
| except Exception: | |
| # If log check fails, assume gateway is fine | |
| restart_backoff = 30 | |
| continue | |
| else: | |
| logger.warning("[Watchdog] Gateway process is DEAD — restarting...") | |
| # Gateway is dead or zombie — restart with backoff | |
| now = time.time() | |
| if now - last_restart < restart_backoff: | |
| continue | |
| logger.warning("[Watchdog] Gateway process is DEAD — restarting...") | |
| proc = _start_gateway() | |
| last_restart = now | |
| restart_backoff = min(restart_backoff * 2, 300) # exponential backoff, max 5min | |
| # Wait and verify | |
| time.sleep(15) | |
| try: | |
| if proc.poll() is None: | |
| logger.info("[Watchdog] Gateway restarted successfully (PID=%d)", proc.pid) | |
| restart_backoff = 30 # reset on success | |
| else: | |
| logger.error("[Watchdog] Gateway exited immediately with code %d", proc.returncode) | |
| except Exception as e: | |
| logger.error("[Watchdog] Failed to verify restart: %s", e) | |
| except Exception as e: | |
| logger.error("[Watchdog] Error: %s", e) | |
| time.sleep(10) | |
| # --------------------------------------------------------------------------- | |
| # Main | |
| # --------------------------------------------------------------------------- | |
| def main(): | |
| logger.info("=== Hermes Bot — HuggingFace Space Entry ===") | |
| logger.info("WebUI at /webui (hermes-web-ui)") | |
| logger.info("Dashboard at /") | |
| # Setup persistent storage | |
| _ensure_persistent_storage() | |
| logger.info("Persistent storage ready at %s", DATA_DIR) | |
| # Start HTTP proxy server FIRST (before background checks) | |
| # HF Space requires the EXPOSE'd port to be bound quickly, | |
| # otherwise it shows a 404 page even though the container is RUNNING. | |
| try: | |
| server = ThreadingHTTPServer(("0.0.0.0", 7860), ProxyHandler) | |
| logger.info("Proxy listening on :7860") | |
| except OSError as e: | |
| logger.error("FATAL: Cannot bind port 7860: %s", e) | |
| sys.exit(1) | |
| # Start log tailer thread | |
| tailer = threading.Thread(target=_log_tailer, daemon=True) | |
| tailer.start() | |
| logger.info("Log tailer started") | |
| # Start Python-based gateway watchdog (survives shell death) | |
| watchdog = threading.Thread(target=_gateway_watchdog, args=(30,), daemon=True, name="gateway-watchdog") | |
| watchdog.start() | |
| logger.info("Python gateway watchdog started") | |
| # Check if backend services are reachable (non-blocking, best-effort) | |
| # This runs in a background thread so it doesn't delay serve_forever() | |
| def _wait_for_backends(): | |
| for attempt in range(10): | |
| try: | |
| urllib.request.urlopen(f"{GATEWAY_URL}/health", timeout=2) | |
| logger.info("Gateway reachable at %s", GATEWAY_URL) | |
| break | |
| except Exception: | |
| time.sleep(2) | |
| for attempt in range(10): | |
| try: | |
| urllib.request.urlopen(f"{WEBUI_BFF_URL}/health", timeout=2) | |
| logger.info("WebUI BFF reachable at %s", WEBUI_BFF_URL) | |
| break | |
| except Exception: | |
| time.sleep(2) | |
| threading.Thread(target=_wait_for_backends, daemon=True).start() | |
| # Start serving (this blocks forever) | |
| try: | |
| logger.info(" / → Dashboard") | |
| logger.info(" /webui → hermes-web-ui") | |
| logger.info(" /api/* → proxy to WebUI BFF (:6060)") | |
| logger.info(" /v1/* → proxy to WebUI BFF (:6060)") | |
| server.serve_forever() | |
| except Exception as e: | |
| logger.error("FATAL: Proxy server error: %s", e) | |
| import traceback | |
| traceback.print_exc() | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() | |