import json import hmac import os import re import shutil import socket import subprocess import time from datetime import date from pathlib import Path from urllib.parse import urlencode, urlsplit, urlunsplit, parse_qsl import pandas as pd import requests import streamlit as st import streamlit.components.v1 as components from tools.backtesting_runner import ( load_price_data_from_csv_text, load_price_data_from_yfinance, run_backtest, ) HF_PORT = int(os.getenv("PORT", "7860")) OPENCLAW_PORT = int(os.getenv("OPENCLAW_PORT", "18789")) VAULT_PATH = os.getenv("VAULT_PATH", "/app/vault") OPENCLAW_BIN_ENV = os.getenv("OPENCLAW_BIN", "openclaw") CONFIG_PATH = Path(os.getenv("OPENCLAW_CONFIG_PATH", "openclaw.json")) NANOCLAW_CONFIG_PATH = Path(os.getenv("NANOCLAW_CONFIG_PATH", "nanoclaw.json")) NANOBOT_CONFIG_PATH = Path(os.getenv("NANOBOT_CONFIG_PATH", "nanobot.json")) PICOCLAW_CONFIG_PATH = Path(os.getenv("PICOCLAW_CONFIG_PATH", "picoclaw.json")) ZEROCLAW_CONFIG_PATH = Path(os.getenv("ZEROCLAW_CONFIG_PATH", "zeroclaw.json")) NULLCLAW_CONFIG_PATH = Path(os.getenv("NULLCLAW_CONFIG_PATH", "nullclaw.json")) NEMOCLAW_CONFIG_PATH = Path(os.getenv("NEMOCLAW_CONFIG_PATH", "nemoclaw.json")) ENV_EXAMPLE_PATH = Path("config/openclaw.env.example") LOG_MAX_LINES = 300 OPENCLAW_STANDARD_UI_URL = os.getenv( "OPENCLAW_STANDARD_UI_URL", "http://127.0.0.1:18789/openclaw/" ) OPENCLAW_STANDARD_UI_PUBLIC_URL = os.getenv("OPENCLAW_STANDARD_UI_PUBLIC_URL", "/openclaw/") OPENCLAW_GATEWAY_TOKEN = os.getenv("OPENCLAW_GATEWAY_TOKEN", "") OPENCLAW_STATE_CONFIG_PATH = Path(os.getenv("OPENCLAW_STATE_CONFIG_PATH", "/app/.openclaw/state/openclaw.json")) OPENCLAW_UI_QUERY_KEYS = [ k.strip() for k in os.getenv("OPENCLAW_UI_QUERY_KEYS", "token,auth,access_token").split(",") if k.strip() ] GATEWAY_BOOT_LOG_PATH = Path(os.getenv("OPENCLAW_GATEWAY_LOG_PATH", "/tmp/openclaw-gateway.log")) GATEWAY_ERR_LOG_PATH = Path(os.getenv("OPENCLAW_GATEWAY_ERR_LOG_PATH", "/tmp/openclaw-gateway.err.log")) EXTERNAL_GATEWAY_MANAGED = os.getenv("EXTERNAL_GATEWAY_MANAGED", "0") == "1" OPENCLAW_PROXY_LOCAL_URL = os.getenv("OPENCLAW_PROXY_LOCAL_URL", "http://127.0.0.1:7860/openclaw/") SUPERVISOR_LOG_PATH = Path("/tmp/supervisord.log") STREAMLIT_ERR_LOG_PATH = Path("/tmp/streamlit.err.log") STREAMLIT_LOG_PATH = Path("/tmp/streamlit.log") CADDY_ERR_LOG_PATH = Path("/tmp/caddy.err.log") CADDY_LOG_PATH = Path("/tmp/caddy.log") NANOCLAW_LOG_PATH = Path(os.getenv("NANOCLAW_LOG_PATH", "/tmp/nanoclaw.log")) NANOCLAW_ERR_LOG_PATH = Path(os.getenv("NANOCLAW_ERR_LOG_PATH", "/tmp/nanoclaw.err.log")) NANOCLAW_PORT = int(os.getenv("NANOCLAW_PORT", "18889")) NANOCLAW_BASE_PATH = os.getenv("NANOCLAW_BASE_PATH", "/nanoclaw") NANOCLAW_ENABLED = os.getenv("NANOCLAW_ENABLED", "1") == "1" NANOCLAW_PROXY_LOCAL_URL = os.getenv( "NANOCLAW_PROXY_LOCAL_URL", f"http://127.0.0.1:7860{NANOCLAW_BASE_PATH}/" ) NANOBOT_LOG_PATH = Path(os.getenv("NANOBOT_LOG_PATH", "/tmp/nanobot.log")) NANOBOT_ERR_LOG_PATH = Path(os.getenv("NANOBOT_ERR_LOG_PATH", "/tmp/nanobot.err.log")) NANOBOT_PORT = int(os.getenv("NANOBOT_PORT", "18790")) NANOBOT_BASE_PATH = os.getenv("NANOBOT_BASE_PATH", "/nanobot") NANOBOT_ENABLED = os.getenv("NANOBOT_ENABLED", "1") == "1" NANOBOT_PROXY_LOCAL_URL = os.getenv( "NANOBOT_PROXY_LOCAL_URL", f"http://127.0.0.1:7860{NANOBOT_BASE_PATH}/" ) PICOCLAW_LOG_PATH = Path(os.getenv("PICOCLAW_LOG_PATH", "/tmp/picoclaw.log")) PICOCLAW_ERR_LOG_PATH = Path(os.getenv("PICOCLAW_ERR_LOG_PATH", "/tmp/picoclaw.err.log")) PICOCLAW_PORT = int(os.getenv("PICOCLAW_PORT", "18792")) PICOCLAW_BASE_PATH = os.getenv("PICOCLAW_BASE_PATH", "/picoclaw") PICOCLAW_ENABLED = os.getenv("PICOCLAW_ENABLED", "1") == "1" PICOCLAW_PROXY_LOCAL_URL = os.getenv( "PICOCLAW_PROXY_LOCAL_URL", f"http://127.0.0.1:7860{PICOCLAW_BASE_PATH}/" ) ZEROCLAW_LOG_PATH = Path(os.getenv("ZEROCLAW_LOG_PATH", "/tmp/zeroclaw.log")) ZEROCLAW_ERR_LOG_PATH = Path(os.getenv("ZEROCLAW_ERR_LOG_PATH", "/tmp/zeroclaw.err.log")) ZEROCLAW_PORT = int(os.getenv("ZEROCLAW_PORT", "42617")) ZEROCLAW_BASE_PATH = os.getenv("ZEROCLAW_BASE_PATH", "/zeroclaw") ZEROCLAW_ENABLED = os.getenv("ZEROCLAW_ENABLED", "1") == "1" ZEROCLAW_PROXY_LOCAL_URL = os.getenv( "ZEROCLAW_PROXY_LOCAL_URL", f"http://127.0.0.1:7860{ZEROCLAW_BASE_PATH}/" ) NULLCLAW_LOG_PATH = Path(os.getenv("NULLCLAW_LOG_PATH", "/tmp/nullclaw.log")) NULLCLAW_ERR_LOG_PATH = Path(os.getenv("NULLCLAW_ERR_LOG_PATH", "/tmp/nullclaw.err.log")) NULLCLAW_PORT = int(os.getenv("NULLCLAW_PORT", "3000")) NULLCLAW_BASE_PATH = os.getenv("NULLCLAW_BASE_PATH", "/nullclaw") NULLCLAW_ENABLED = os.getenv("NULLCLAW_ENABLED", "1") == "1" NULLCLAW_PROXY_LOCAL_URL = os.getenv( "NULLCLAW_PROXY_LOCAL_URL", f"http://127.0.0.1:7860{NULLCLAW_BASE_PATH}/" ) NEMOCLAW_LOG_PATH = Path(os.getenv("NEMOCLAW_LOG_PATH", "/tmp/nemoclaw.log")) NEMOCLAW_ERR_LOG_PATH = Path(os.getenv("NEMOCLAW_ERR_LOG_PATH", "/tmp/nemoclaw.err.log")) NEMOCLAW_PORT = int(os.getenv("NEMOCLAW_PORT", "18793")) NEMOCLAW_BASE_PATH = os.getenv("NEMOCLAW_BASE_PATH", "/nemoclaw") NEMOCLAW_ENABLED = os.getenv("NEMOCLAW_ENABLED", "1") == "1" NEMOCLAW_PROXY_LOCAL_URL = os.getenv( "NEMOCLAW_PROXY_LOCAL_URL", f"http://127.0.0.1:7860{NEMOCLAW_BASE_PATH}/" ) IRONCLAW_LOG_PATH = Path(os.getenv("IRONCLAW_LOG_PATH", "/tmp/ironclaw.log")) IRONCLAW_ERR_LOG_PATH = Path(os.getenv("IRONCLAW_ERR_LOG_PATH", "/tmp/ironclaw.err.log")) IRONCLAW_ENABLED = os.getenv("IRONCLAW_ENABLED", "1") == "1" IRONCLAW_CONFIG_PATH = Path(os.getenv("IRONCLAW_CONFIG_PATH", "ironclaw.json")) STREAMLIT_AUTH_ENABLED = os.getenv("STREAMLIT_AUTH_ENABLED", "1") == "1" STREAMLIT_AUTH_USERNAME = os.getenv("STREAMLIT_AUTH_USERNAME", "").strip() STREAMLIT_AUTH_PASSWORD = os.getenv("STREAMLIT_AUTH_PASSWORD", "").strip() def resolve_openclaw_bin() -> str | None: if shutil.which(OPENCLAW_BIN_ENV): return OPENCLAW_BIN_ENV if shutil.which("openclaw"): return "openclaw" if shutil.which("clawdbot"): return "clawdbot" return None def init_state() -> None: st.session_state.setdefault("gateway_process", None) st.session_state.setdefault("gateway_logs", []) st.session_state.setdefault("config_editor_text", load_config_text()) st.session_state.setdefault("nanoclaw_config_text", load_nanoclaw_config_text()) st.session_state.setdefault("nanobot_config_text", load_nanobot_config_text()) st.session_state.setdefault("picoclaw_config_text", load_picoclaw_config_text()) st.session_state.setdefault("zeroclaw_config_text", load_zeroclaw_config_text()) st.session_state.setdefault("nullclaw_config_text", load_nullclaw_config_text()) st.session_state.setdefault("nemoclaw_config_text", load_nemoclaw_config_text()) st.session_state.setdefault("ironclaw_config_text", load_ironclaw_config_text()) st.session_state.setdefault("auto_started", False) st.session_state.setdefault("auto_start_attempted", False) st.session_state.setdefault("backtest_result", None) st.session_state.setdefault("backtest_data", None) st.session_state.setdefault("backtest_params", None) st.session_state.setdefault("backtest_error", "") st.session_state.setdefault("gateway_boot_log_offset_out", 0) st.session_state.setdefault("gateway_boot_log_offset_err", 0) def load_config_text() -> str: if CONFIG_PATH.exists(): return CONFIG_PATH.read_text(encoding="utf-8") return "{}" def load_config_json() -> dict: try: return json.loads(load_config_text()) except json.JSONDecodeError: return {} def load_nanoclaw_config_text() -> str: if NANOCLAW_CONFIG_PATH.exists(): return NANOCLAW_CONFIG_PATH.read_text(encoding="utf-8") return "{}" def load_nanoclaw_config_json() -> dict: try: return json.loads(load_nanoclaw_config_text()) except json.JSONDecodeError: return {} def load_nanobot_config_text() -> str: if NANOBOT_CONFIG_PATH.exists(): return NANOBOT_CONFIG_PATH.read_text(encoding="utf-8") return "{}" def load_nanobot_config_json() -> dict: try: return json.loads(load_nanobot_config_text()) except json.JSONDecodeError: return {} def load_picoclaw_config_text() -> str: if PICOCLAW_CONFIG_PATH.exists(): return PICOCLAW_CONFIG_PATH.read_text(encoding="utf-8") return "{}" def load_picoclaw_config_json() -> dict: try: return json.loads(load_picoclaw_config_text()) except json.JSONDecodeError: return {} def load_zeroclaw_config_text() -> str: if ZEROCLAW_CONFIG_PATH.exists(): return ZEROCLAW_CONFIG_PATH.read_text(encoding="utf-8") return "{}" def load_zeroclaw_config_json() -> dict: try: return json.loads(load_zeroclaw_config_text()) except json.JSONDecodeError: return {} def load_nullclaw_config_text() -> str: if NULLCLAW_CONFIG_PATH.exists(): return NULLCLAW_CONFIG_PATH.read_text(encoding="utf-8") return "{}" def load_nullclaw_config_json() -> dict: try: return json.loads(load_nullclaw_config_text()) except json.JSONDecodeError: return {} def load_nemoclaw_config_text() -> str: if NEMOCLAW_CONFIG_PATH.exists(): return NEMOCLAW_CONFIG_PATH.read_text(encoding="utf-8") return "{}" def load_nemoclaw_config_json() -> dict: try: return json.loads(load_nemoclaw_config_text()) except json.JSONDecodeError: return {} def load_ironclaw_config_text() -> str: if IRONCLAW_CONFIG_PATH.exists(): return IRONCLAW_CONFIG_PATH.read_text(encoding="utf-8") return "{}" def load_ironclaw_config_json() -> dict: try: return json.loads(load_ironclaw_config_text()) except json.JSONDecodeError: return {} def gateway_process() -> subprocess.Popen | None: proc = st.session_state.get("gateway_process") if proc is None: return None if proc.poll() is not None: st.session_state["gateway_process"] = None return None return proc def append_logs(lines: list[str]) -> None: if not lines: return st.session_state["gateway_logs"].extend(lines) st.session_state["gateway_logs"] = st.session_state["gateway_logs"][-LOG_MAX_LINES:] def pull_logs() -> None: proc = gateway_process() if proc is None or proc.stdout is None: return lines = [] while True: try: line = proc.stdout.readline() except BlockingIOError: break if not line: break lines.append(line.rstrip()) append_logs(lines) pull_boot_logs() def pull_boot_logs() -> None: merged_new_lines = [] sources = [ (GATEWAY_BOOT_LOG_PATH, "gateway_boot_log_offset_out"), (GATEWAY_ERR_LOG_PATH, "gateway_boot_log_offset_err"), ] for path, offset_key in sources: if not path.exists(): continue try: with path.open("r", encoding="utf-8", errors="ignore") as fh: fh.seek(st.session_state[offset_key]) new_content = fh.read() st.session_state[offset_key] = fh.tell() except Exception: continue if new_content: merged_new_lines.extend([ln for ln in new_content.splitlines() if ln.strip()]) if merged_new_lines: append_logs(merged_new_lines) def tail_text_file(path: Path, max_lines: int = 40) -> str: if not path.exists(): return f"{path} not found." try: lines = path.read_text(encoding="utf-8", errors="ignore").splitlines() except Exception as exc: return f"Failed to read {path}: {exc}" return "\n".join(lines[-max_lines:]) if lines else "(empty)" def start_gateway() -> tuple[bool, str]: if EXTERNAL_GATEWAY_MANAGED: return True, "Gateway is managed by supervisor from Docker startup." if gateway_process() is not None: return True, "Gateway is already running." binary = resolve_openclaw_bin() if binary is None: return False, "OpenClaw binary not found (expected `openclaw` or `clawdbot`)." candidate_cmds = [ [binary, "gateway", "run", "--port", str(OPENCLAW_PORT), "--allow-unconfigured"], [binary, "gateway", "--port", str(OPENCLAW_PORT), "--allow-unconfigured"], ] if binary == "clawdbot": candidate_cmds = [ [binary, "gateway", "--port", str(OPENCLAW_PORT), "--vault-path", VAULT_PATH], [binary, "gateway", "run", "--port", str(OPENCLAW_PORT), "--vault-path", VAULT_PATH], ] last_error = "unknown startup failure" for cmd in candidate_cmds: proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, ) if proc.stdout is not None: os.set_blocking(proc.stdout.fileno(), False) append_logs([f"$ {' '.join(cmd)}"]) # Verify the process stays alive and opens the gateway port. deadline = time.time() + 12 opened = False while time.time() < deadline: if proc.poll() is not None: break with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(0.3) if sock.connect_ex(("127.0.0.1", OPENCLAW_PORT)) == 0: opened = True break time.sleep(0.2) if opened: st.session_state["gateway_process"] = proc append_logs(["Gateway process started."]) return True, "Gateway started." if proc.poll() is None: # Process is still alive but port not open yet; keep it and let logs/refresh catch readiness. st.session_state["gateway_process"] = proc append_logs(["Gateway is still initializing (port not open yet)."]) return True, "Gateway is initializing. Refresh in a few seconds." # Collect some logs for diagnosis before trying next command. fail_lines = [] if proc.stdout is not None: try: while True: line = proc.stdout.readline() if not line: break fail_lines.append(line.rstrip()) except BlockingIOError: pass if fail_lines: append_logs(fail_lines[-10:]) last_error = fail_lines[-1] else: exit_code = proc.poll() last_error = f"command exited with code {exit_code if exit_code is not None else 'unknown'}" try: proc.terminate() except Exception: pass return False, f"Failed to start gateway ({last_error})." def stop_gateway() -> tuple[bool, str]: if EXTERNAL_GATEWAY_MANAGED: return False, "Gateway is managed by supervisor; stop it via container config." proc = gateway_process() if proc is None: return True, "Gateway is not running." proc.terminate() try: proc.wait(timeout=10) except subprocess.TimeoutExpired: proc.kill() proc.wait(timeout=5) st.session_state["gateway_process"] = None append_logs(["Gateway process stopped."]) return True, "Gateway stopped." def parse_expected_env_vars(config_data: dict) -> list[str]: vars_found: set[str] = set() for provider in config_data.get("providers", []): base_url = provider.get("base_url") if isinstance(base_url, str): vars_found.update(re.findall(r"\$\{([A-Z0-9_]+)\}", base_url)) for _, val in provider.get("headers", {}).items(): if isinstance(val, str): vars_found.update(re.findall(r"\$\{([A-Z0-9_]+)\}", val)) for tool in config_data.get("tools", {}).values(): for env_name in tool.get("env", []): vars_found.add(env_name) if ENV_EXAMPLE_PATH.exists(): for line in ENV_EXAMPLE_PATH.read_text(encoding="utf-8").splitlines(): stripped = line.strip() if not stripped or stripped.startswith("#") or "=" not in stripped: continue vars_found.add(stripped.split("=", 1)[0].strip()) return sorted(vars_found) def test_gateway(query: str) -> str: try: resp = requests.post( f"http://127.0.0.1:{OPENCLAW_PORT}/ask", json={"query": query}, timeout=30, ) resp.raise_for_status() return json.dumps(resp.json(), indent=2) except Exception as exc: return f"Gateway request failed: {exc}" def is_gateway_port_open(timeout: float = 0.5) -> bool: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(timeout) return sock.connect_ex(("127.0.0.1", OPENCLAW_PORT)) == 0 def is_nanoclaw_port_open(timeout: float = 0.5) -> bool: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(timeout) return sock.connect_ex(("127.0.0.1", NANOCLAW_PORT)) == 0 def is_nanobot_port_open(timeout: float = 0.5) -> bool: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(timeout) return sock.connect_ex(("127.0.0.1", NANOBOT_PORT)) == 0 def is_picoclaw_port_open(timeout: float = 0.5) -> bool: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(timeout) return sock.connect_ex(("127.0.0.1", PICOCLAW_PORT)) == 0 def is_zeroclaw_port_open(timeout: float = 0.5) -> bool: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(timeout) return sock.connect_ex(("127.0.0.1", ZEROCLAW_PORT)) == 0 def is_nullclaw_port_open(timeout: float = 0.5) -> bool: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(timeout) return sock.connect_ex(("127.0.0.1", NULLCLAW_PORT)) == 0 def is_nemoclaw_port_open(timeout: float = 0.5) -> bool: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(timeout) return sock.connect_ex(("127.0.0.1", NEMOCLAW_PORT)) == 0 def supervisorctl(action: str, program: str) -> tuple[bool, str]: cmd = shutil.which("supervisorctl") if not cmd: return False, "supervisorctl not available in container." try: result = subprocess.run( [cmd, action, program], check=True, capture_output=True, text=True, ) return True, result.stdout.strip() or result.stderr.strip() except subprocess.CalledProcessError as exc: msg = exc.stderr.strip() or exc.stdout.strip() or str(exc) return False, msg def infer_llm_api_base() -> str: candidates = [ os.getenv("LLM_SPACE_OPENAI_URL", "").strip(), os.getenv("LLM_SPACE_WEBCHAT_URL", "").strip(), ] for url in candidates: if not url: continue if "/v1/chat/completions" in url: return url.split("/v1/chat/completions", 1)[0] if "/v1/web-chat/completions" in url: return url.split("/v1/web-chat/completions", 1)[0] return url.rstrip("/") return "https://researchengineering-agi.hf.space" def llm_list_models(api_base: str) -> dict: resp = requests.get(f"{api_base}/models", timeout=15) resp.raise_for_status() return resp.json() def llm_switch_model(api_base: str, model_name: str) -> dict: resp = requests.post( f"{api_base}/switch-model", json={"model_name": model_name}, timeout=120, ) resp.raise_for_status() return resp.json() def llm_chat_completion(api_base: str, prompt: str, max_tokens: int, temperature: float) -> dict: resp = requests.post( f"{api_base}/v1/chat/completions", json={ "messages": [{"role": "user", "content": prompt}], "max_tokens": max_tokens, "temperature": temperature, }, timeout=300, ) resp.raise_for_status() return resp.json() def with_token(url: str, token: str, query_key: str = "token") -> str: if not token: return url parts = urlsplit(url) query = dict(parse_qsl(parts.query, keep_blank_values=True)) query.setdefault(query_key, token) return urlunsplit((parts.scheme, parts.netloc, parts.path, urlencode(query), parts.fragment)) def with_token_all_keys(url: str, token: str) -> str: if not token: return url parts = urlsplit(url) query = dict(parse_qsl(parts.query, keep_blank_values=True)) keys = OPENCLAW_UI_QUERY_KEYS or ["token", "auth", "access_token"] for key in keys: query.setdefault(key, token) return urlunsplit((parts.scheme, parts.netloc, parts.path, urlencode(query), parts.fragment)) def mask_token_in_url(url: str) -> str: parts = urlsplit(url) query = dict(parse_qsl(parts.query, keep_blank_values=True)) for key in ("token", "auth", "access_token"): token = query.get(key, "") if token: query[key] = f"{token[:4]}...{token[-4:]}" if len(token) > 8 else "***" return urlunsplit((parts.scheme, parts.netloc, parts.path, urlencode(query), parts.fragment)) def resolve_gateway_token() -> str: if OPENCLAW_GATEWAY_TOKEN: return OPENCLAW_GATEWAY_TOKEN try: state_cfg = json.loads(OPENCLAW_STATE_CONFIG_PATH.read_text(encoding="utf-8")) return ( state_cfg.get("gateway", {}) .get("auth", {}) .get("token", "") ) except Exception: return "" def _collect_auth_tokens(obj: object) -> list[str]: found: list[str] = [] if isinstance(obj, dict): # Direct auth token object: {"auth": {"token": "..."}} auth_obj = obj.get("auth") if isinstance(auth_obj, dict): tok = auth_obj.get("token") if isinstance(tok, str) and tok.strip(): found.append(tok.strip()) for value in obj.values(): found.extend(_collect_auth_tokens(value)) elif isinstance(obj, list): for item in obj: found.extend(_collect_auth_tokens(item)) return found def resolve_ui_token() -> str: # 1) Explicit env override always wins. if OPENCLAW_GATEWAY_TOKEN: return OPENCLAW_GATEWAY_TOKEN # 2) Collect candidate tokens from runtime state. candidates: list[str] = [] try: state_cfg = json.loads(OPENCLAW_STATE_CONFIG_PATH.read_text(encoding="utf-8")) candidates.extend(_collect_auth_tokens(state_cfg)) gw_tok = ( state_cfg.get("gateway", {}) .get("auth", {}) .get("token", "") ) if isinstance(gw_tok, str) and gw_tok.strip(): candidates.append(gw_tok.strip()) except Exception: pass # Deduplicate while preserving order. deduped = [] seen = set() for t in candidates: if t not in seen: seen.add(t) deduped.append(t) # 3) Validate token candidates against browser UI endpoint. for token in deduped: try: for key in OPENCLAW_UI_QUERY_KEYS: probe_url = with_token(OPENCLAW_STANDARD_UI_URL, token, key) resp = requests.get(probe_url, timeout=2, allow_redirects=True) if resp.status_code < 400: return token except Exception: continue # 4) Fallback to legacy gateway token resolution. return resolve_gateway_token() def resolve_ui_query_key(token: str) -> str: if not token: return OPENCLAW_UI_QUERY_KEYS[0] if OPENCLAW_UI_QUERY_KEYS else "token" for key in OPENCLAW_UI_QUERY_KEYS: try: probe_url = with_token(OPENCLAW_STANDARD_UI_URL, token, key) resp = requests.get(probe_url, timeout=2, allow_redirects=True) if resp.status_code < 400: return key except Exception: continue return OPENCLAW_UI_QUERY_KEYS[0] if OPENCLAW_UI_QUERY_KEYS else "token" def build_strategy_frame(data: pd.DataFrame, fast_period: int, slow_period: int) -> pd.DataFrame: frame = pd.DataFrame(index=data.index) frame["Close"] = data["Close"] frame["FastMA"] = data["Close"].rolling(window=fast_period).mean() frame["SlowMA"] = data["Close"].rolling(window=slow_period).mean() frame["Signal"] = 0 frame.loc[frame["FastMA"] > frame["SlowMA"], "Signal"] = 1 frame["SignalChange"] = frame["Signal"].diff().fillna(0) return frame def render_backtest_visuals(result, data: pd.DataFrame, params: dict) -> None: st.success(f"Backtest completed with {result.engine} ({result.input_rows} bars).") metrics = result.metrics or {} if metrics: metric_cols = st.columns(max(1, min(4, len(metrics)))) for idx, (metric_name, metric_value) in enumerate(metrics.items()): col = metric_cols[idx % len(metric_cols)] val = f"{metric_value:.3f}" if isinstance(metric_value, float) else str(metric_value) col.metric(metric_name, val) viz = build_strategy_frame(data, params["fast_period"], params["slow_period"]) st.markdown("**Price + Moving Averages**") st.line_chart(viz[["Close", "FastMA", "SlowMA"]], use_container_width=True) signal_events = viz[viz["SignalChange"] != 0][["Close", "SignalChange"]].copy() if not signal_events.empty: signal_events["Event"] = signal_events["SignalChange"].map({1: "BUY", -1: "SELL"}).fillna("HOLD") st.markdown("**Crossover Events**") st.dataframe(signal_events[["Event", "Close"]], use_container_width=True) if isinstance(result.equity_curve, pd.DataFrame) and not result.equity_curve.empty: equity = result.equity_curve.copy() if "Equity" not in equity.columns: numeric_cols = equity.select_dtypes(include=["number"]).columns if len(numeric_cols) > 0: equity = equity.rename(columns={numeric_cols[0]: "Equity"}) if "Equity" in equity.columns: st.markdown("**Equity Curve**") st.line_chart(equity["Equity"], use_container_width=True) drawdown = (equity["Equity"] / equity["Equity"].cummax() - 1.0) * 100.0 st.markdown("**Drawdown (%)**") st.area_chart(drawdown, use_container_width=True) returns = equity["Equity"].pct_change().dropna() if not returns.empty: st.markdown("**Return Distribution**") hist = pd.DataFrame({"return": returns}) hist["bin"] = pd.cut(hist["return"], bins=30) hist = hist.groupby("bin", observed=False).size().rename("count").reset_index() hist["bin"] = hist["bin"].astype(str) st.bar_chart(hist.set_index("bin")["count"], use_container_width=True) if isinstance(result.trades, pd.DataFrame) and not result.trades.empty: st.markdown("**Trades**") st.dataframe(result.trades, use_container_width=True) def login_enabled() -> bool: return STREAMLIT_AUTH_ENABLED def has_login_credentials() -> bool: return bool(STREAMLIT_AUTH_USERNAME and STREAMLIT_AUTH_PASSWORD) def validate_login(username: str, password: str) -> bool: return hmac.compare_digest(username, STREAMLIT_AUTH_USERNAME) and hmac.compare_digest( password, STREAMLIT_AUTH_PASSWORD ) def render_login_form() -> None: st.markdown( """