AGI_Assistant / app.py
Dmitry Beresnev
Add and configure NemoClaw, IronClaw, and complete claw stack configs
974e338
import json
import hmac
import os
import re
import shutil
import socket
import subprocess
import time
from datetime import date
from pathlib import Path
from urllib.parse import urlencode, urlsplit, urlunsplit, parse_qsl
import pandas as pd
import requests
import streamlit as st
import streamlit.components.v1 as components
from tools.backtesting_runner import (
load_price_data_from_csv_text,
load_price_data_from_yfinance,
run_backtest,
)
HF_PORT = int(os.getenv("PORT", "7860"))
OPENCLAW_PORT = int(os.getenv("OPENCLAW_PORT", "18789"))
VAULT_PATH = os.getenv("VAULT_PATH", "/app/vault")
OPENCLAW_BIN_ENV = os.getenv("OPENCLAW_BIN", "openclaw")
CONFIG_PATH = Path(os.getenv("OPENCLAW_CONFIG_PATH", "openclaw.json"))
NANOCLAW_CONFIG_PATH = Path(os.getenv("NANOCLAW_CONFIG_PATH", "nanoclaw.json"))
NANOBOT_CONFIG_PATH = Path(os.getenv("NANOBOT_CONFIG_PATH", "nanobot.json"))
PICOCLAW_CONFIG_PATH = Path(os.getenv("PICOCLAW_CONFIG_PATH", "picoclaw.json"))
ZEROCLAW_CONFIG_PATH = Path(os.getenv("ZEROCLAW_CONFIG_PATH", "zeroclaw.json"))
NULLCLAW_CONFIG_PATH = Path(os.getenv("NULLCLAW_CONFIG_PATH", "nullclaw.json"))
NEMOCLAW_CONFIG_PATH = Path(os.getenv("NEMOCLAW_CONFIG_PATH", "nemoclaw.json"))
ENV_EXAMPLE_PATH = Path("config/openclaw.env.example")
LOG_MAX_LINES = 300
OPENCLAW_STANDARD_UI_URL = os.getenv(
"OPENCLAW_STANDARD_UI_URL", "http://127.0.0.1:18789/openclaw/"
)
OPENCLAW_STANDARD_UI_PUBLIC_URL = os.getenv("OPENCLAW_STANDARD_UI_PUBLIC_URL", "/openclaw/")
OPENCLAW_GATEWAY_TOKEN = os.getenv("OPENCLAW_GATEWAY_TOKEN", "")
OPENCLAW_STATE_CONFIG_PATH = Path(os.getenv("OPENCLAW_STATE_CONFIG_PATH", "/app/.openclaw/state/openclaw.json"))
OPENCLAW_UI_QUERY_KEYS = [
k.strip()
for k in os.getenv("OPENCLAW_UI_QUERY_KEYS", "token,auth,access_token").split(",")
if k.strip()
]
GATEWAY_BOOT_LOG_PATH = Path(os.getenv("OPENCLAW_GATEWAY_LOG_PATH", "/tmp/openclaw-gateway.log"))
GATEWAY_ERR_LOG_PATH = Path(os.getenv("OPENCLAW_GATEWAY_ERR_LOG_PATH", "/tmp/openclaw-gateway.err.log"))
EXTERNAL_GATEWAY_MANAGED = os.getenv("EXTERNAL_GATEWAY_MANAGED", "0") == "1"
OPENCLAW_PROXY_LOCAL_URL = os.getenv("OPENCLAW_PROXY_LOCAL_URL", "http://127.0.0.1:7860/openclaw/")
SUPERVISOR_LOG_PATH = Path("/tmp/supervisord.log")
STREAMLIT_ERR_LOG_PATH = Path("/tmp/streamlit.err.log")
STREAMLIT_LOG_PATH = Path("/tmp/streamlit.log")
CADDY_ERR_LOG_PATH = Path("/tmp/caddy.err.log")
CADDY_LOG_PATH = Path("/tmp/caddy.log")
NANOCLAW_LOG_PATH = Path(os.getenv("NANOCLAW_LOG_PATH", "/tmp/nanoclaw.log"))
NANOCLAW_ERR_LOG_PATH = Path(os.getenv("NANOCLAW_ERR_LOG_PATH", "/tmp/nanoclaw.err.log"))
NANOCLAW_PORT = int(os.getenv("NANOCLAW_PORT", "18889"))
NANOCLAW_BASE_PATH = os.getenv("NANOCLAW_BASE_PATH", "/nanoclaw")
NANOCLAW_ENABLED = os.getenv("NANOCLAW_ENABLED", "1") == "1"
NANOCLAW_PROXY_LOCAL_URL = os.getenv(
"NANOCLAW_PROXY_LOCAL_URL", f"http://127.0.0.1:7860{NANOCLAW_BASE_PATH}/"
)
NANOBOT_LOG_PATH = Path(os.getenv("NANOBOT_LOG_PATH", "/tmp/nanobot.log"))
NANOBOT_ERR_LOG_PATH = Path(os.getenv("NANOBOT_ERR_LOG_PATH", "/tmp/nanobot.err.log"))
NANOBOT_PORT = int(os.getenv("NANOBOT_PORT", "18790"))
NANOBOT_BASE_PATH = os.getenv("NANOBOT_BASE_PATH", "/nanobot")
NANOBOT_ENABLED = os.getenv("NANOBOT_ENABLED", "1") == "1"
NANOBOT_PROXY_LOCAL_URL = os.getenv(
"NANOBOT_PROXY_LOCAL_URL", f"http://127.0.0.1:7860{NANOBOT_BASE_PATH}/"
)
PICOCLAW_LOG_PATH = Path(os.getenv("PICOCLAW_LOG_PATH", "/tmp/picoclaw.log"))
PICOCLAW_ERR_LOG_PATH = Path(os.getenv("PICOCLAW_ERR_LOG_PATH", "/tmp/picoclaw.err.log"))
PICOCLAW_PORT = int(os.getenv("PICOCLAW_PORT", "18792"))
PICOCLAW_BASE_PATH = os.getenv("PICOCLAW_BASE_PATH", "/picoclaw")
PICOCLAW_ENABLED = os.getenv("PICOCLAW_ENABLED", "1") == "1"
PICOCLAW_PROXY_LOCAL_URL = os.getenv(
"PICOCLAW_PROXY_LOCAL_URL", f"http://127.0.0.1:7860{PICOCLAW_BASE_PATH}/"
)
ZEROCLAW_LOG_PATH = Path(os.getenv("ZEROCLAW_LOG_PATH", "/tmp/zeroclaw.log"))
ZEROCLAW_ERR_LOG_PATH = Path(os.getenv("ZEROCLAW_ERR_LOG_PATH", "/tmp/zeroclaw.err.log"))
ZEROCLAW_PORT = int(os.getenv("ZEROCLAW_PORT", "42617"))
ZEROCLAW_BASE_PATH = os.getenv("ZEROCLAW_BASE_PATH", "/zeroclaw")
ZEROCLAW_ENABLED = os.getenv("ZEROCLAW_ENABLED", "1") == "1"
ZEROCLAW_PROXY_LOCAL_URL = os.getenv(
"ZEROCLAW_PROXY_LOCAL_URL", f"http://127.0.0.1:7860{ZEROCLAW_BASE_PATH}/"
)
NULLCLAW_LOG_PATH = Path(os.getenv("NULLCLAW_LOG_PATH", "/tmp/nullclaw.log"))
NULLCLAW_ERR_LOG_PATH = Path(os.getenv("NULLCLAW_ERR_LOG_PATH", "/tmp/nullclaw.err.log"))
NULLCLAW_PORT = int(os.getenv("NULLCLAW_PORT", "3000"))
NULLCLAW_BASE_PATH = os.getenv("NULLCLAW_BASE_PATH", "/nullclaw")
NULLCLAW_ENABLED = os.getenv("NULLCLAW_ENABLED", "1") == "1"
NULLCLAW_PROXY_LOCAL_URL = os.getenv(
"NULLCLAW_PROXY_LOCAL_URL", f"http://127.0.0.1:7860{NULLCLAW_BASE_PATH}/"
)
NEMOCLAW_LOG_PATH = Path(os.getenv("NEMOCLAW_LOG_PATH", "/tmp/nemoclaw.log"))
NEMOCLAW_ERR_LOG_PATH = Path(os.getenv("NEMOCLAW_ERR_LOG_PATH", "/tmp/nemoclaw.err.log"))
NEMOCLAW_PORT = int(os.getenv("NEMOCLAW_PORT", "18793"))
NEMOCLAW_BASE_PATH = os.getenv("NEMOCLAW_BASE_PATH", "/nemoclaw")
NEMOCLAW_ENABLED = os.getenv("NEMOCLAW_ENABLED", "1") == "1"
NEMOCLAW_PROXY_LOCAL_URL = os.getenv(
"NEMOCLAW_PROXY_LOCAL_URL", f"http://127.0.0.1:7860{NEMOCLAW_BASE_PATH}/"
)
IRONCLAW_LOG_PATH = Path(os.getenv("IRONCLAW_LOG_PATH", "/tmp/ironclaw.log"))
IRONCLAW_ERR_LOG_PATH = Path(os.getenv("IRONCLAW_ERR_LOG_PATH", "/tmp/ironclaw.err.log"))
IRONCLAW_ENABLED = os.getenv("IRONCLAW_ENABLED", "1") == "1"
IRONCLAW_CONFIG_PATH = Path(os.getenv("IRONCLAW_CONFIG_PATH", "ironclaw.json"))
STREAMLIT_AUTH_ENABLED = os.getenv("STREAMLIT_AUTH_ENABLED", "1") == "1"
STREAMLIT_AUTH_USERNAME = os.getenv("STREAMLIT_AUTH_USERNAME", "").strip()
STREAMLIT_AUTH_PASSWORD = os.getenv("STREAMLIT_AUTH_PASSWORD", "").strip()
def resolve_openclaw_bin() -> str | None:
if shutil.which(OPENCLAW_BIN_ENV):
return OPENCLAW_BIN_ENV
if shutil.which("openclaw"):
return "openclaw"
if shutil.which("clawdbot"):
return "clawdbot"
return None
def init_state() -> None:
st.session_state.setdefault("gateway_process", None)
st.session_state.setdefault("gateway_logs", [])
st.session_state.setdefault("config_editor_text", load_config_text())
st.session_state.setdefault("nanoclaw_config_text", load_nanoclaw_config_text())
st.session_state.setdefault("nanobot_config_text", load_nanobot_config_text())
st.session_state.setdefault("picoclaw_config_text", load_picoclaw_config_text())
st.session_state.setdefault("zeroclaw_config_text", load_zeroclaw_config_text())
st.session_state.setdefault("nullclaw_config_text", load_nullclaw_config_text())
st.session_state.setdefault("nemoclaw_config_text", load_nemoclaw_config_text())
st.session_state.setdefault("ironclaw_config_text", load_ironclaw_config_text())
st.session_state.setdefault("auto_started", False)
st.session_state.setdefault("auto_start_attempted", False)
st.session_state.setdefault("backtest_result", None)
st.session_state.setdefault("backtest_data", None)
st.session_state.setdefault("backtest_params", None)
st.session_state.setdefault("backtest_error", "")
st.session_state.setdefault("gateway_boot_log_offset_out", 0)
st.session_state.setdefault("gateway_boot_log_offset_err", 0)
def load_config_text() -> str:
if CONFIG_PATH.exists():
return CONFIG_PATH.read_text(encoding="utf-8")
return "{}"
def load_config_json() -> dict:
try:
return json.loads(load_config_text())
except json.JSONDecodeError:
return {}
def load_nanoclaw_config_text() -> str:
if NANOCLAW_CONFIG_PATH.exists():
return NANOCLAW_CONFIG_PATH.read_text(encoding="utf-8")
return "{}"
def load_nanoclaw_config_json() -> dict:
try:
return json.loads(load_nanoclaw_config_text())
except json.JSONDecodeError:
return {}
def load_nanobot_config_text() -> str:
if NANOBOT_CONFIG_PATH.exists():
return NANOBOT_CONFIG_PATH.read_text(encoding="utf-8")
return "{}"
def load_nanobot_config_json() -> dict:
try:
return json.loads(load_nanobot_config_text())
except json.JSONDecodeError:
return {}
def load_picoclaw_config_text() -> str:
if PICOCLAW_CONFIG_PATH.exists():
return PICOCLAW_CONFIG_PATH.read_text(encoding="utf-8")
return "{}"
def load_picoclaw_config_json() -> dict:
try:
return json.loads(load_picoclaw_config_text())
except json.JSONDecodeError:
return {}
def load_zeroclaw_config_text() -> str:
if ZEROCLAW_CONFIG_PATH.exists():
return ZEROCLAW_CONFIG_PATH.read_text(encoding="utf-8")
return "{}"
def load_zeroclaw_config_json() -> dict:
try:
return json.loads(load_zeroclaw_config_text())
except json.JSONDecodeError:
return {}
def load_nullclaw_config_text() -> str:
if NULLCLAW_CONFIG_PATH.exists():
return NULLCLAW_CONFIG_PATH.read_text(encoding="utf-8")
return "{}"
def load_nullclaw_config_json() -> dict:
try:
return json.loads(load_nullclaw_config_text())
except json.JSONDecodeError:
return {}
def load_nemoclaw_config_text() -> str:
if NEMOCLAW_CONFIG_PATH.exists():
return NEMOCLAW_CONFIG_PATH.read_text(encoding="utf-8")
return "{}"
def load_nemoclaw_config_json() -> dict:
try:
return json.loads(load_nemoclaw_config_text())
except json.JSONDecodeError:
return {}
def load_ironclaw_config_text() -> str:
if IRONCLAW_CONFIG_PATH.exists():
return IRONCLAW_CONFIG_PATH.read_text(encoding="utf-8")
return "{}"
def load_ironclaw_config_json() -> dict:
try:
return json.loads(load_ironclaw_config_text())
except json.JSONDecodeError:
return {}
def gateway_process() -> subprocess.Popen | None:
proc = st.session_state.get("gateway_process")
if proc is None:
return None
if proc.poll() is not None:
st.session_state["gateway_process"] = None
return None
return proc
def append_logs(lines: list[str]) -> None:
if not lines:
return
st.session_state["gateway_logs"].extend(lines)
st.session_state["gateway_logs"] = st.session_state["gateway_logs"][-LOG_MAX_LINES:]
def pull_logs() -> None:
proc = gateway_process()
if proc is None or proc.stdout is None:
return
lines = []
while True:
try:
line = proc.stdout.readline()
except BlockingIOError:
break
if not line:
break
lines.append(line.rstrip())
append_logs(lines)
pull_boot_logs()
def pull_boot_logs() -> None:
merged_new_lines = []
sources = [
(GATEWAY_BOOT_LOG_PATH, "gateway_boot_log_offset_out"),
(GATEWAY_ERR_LOG_PATH, "gateway_boot_log_offset_err"),
]
for path, offset_key in sources:
if not path.exists():
continue
try:
with path.open("r", encoding="utf-8", errors="ignore") as fh:
fh.seek(st.session_state[offset_key])
new_content = fh.read()
st.session_state[offset_key] = fh.tell()
except Exception:
continue
if new_content:
merged_new_lines.extend([ln for ln in new_content.splitlines() if ln.strip()])
if merged_new_lines:
append_logs(merged_new_lines)
def tail_text_file(path: Path, max_lines: int = 40) -> str:
if not path.exists():
return f"{path} not found."
try:
lines = path.read_text(encoding="utf-8", errors="ignore").splitlines()
except Exception as exc:
return f"Failed to read {path}: {exc}"
return "\n".join(lines[-max_lines:]) if lines else "(empty)"
def start_gateway() -> tuple[bool, str]:
if EXTERNAL_GATEWAY_MANAGED:
return True, "Gateway is managed by supervisor from Docker startup."
if gateway_process() is not None:
return True, "Gateway is already running."
binary = resolve_openclaw_bin()
if binary is None:
return False, "OpenClaw binary not found (expected `openclaw` or `clawdbot`)."
candidate_cmds = [
[binary, "gateway", "run", "--port", str(OPENCLAW_PORT), "--allow-unconfigured"],
[binary, "gateway", "--port", str(OPENCLAW_PORT), "--allow-unconfigured"],
]
if binary == "clawdbot":
candidate_cmds = [
[binary, "gateway", "--port", str(OPENCLAW_PORT), "--vault-path", VAULT_PATH],
[binary, "gateway", "run", "--port", str(OPENCLAW_PORT), "--vault-path", VAULT_PATH],
]
last_error = "unknown startup failure"
for cmd in candidate_cmds:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
if proc.stdout is not None:
os.set_blocking(proc.stdout.fileno(), False)
append_logs([f"$ {' '.join(cmd)}"])
# Verify the process stays alive and opens the gateway port.
deadline = time.time() + 12
opened = False
while time.time() < deadline:
if proc.poll() is not None:
break
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(0.3)
if sock.connect_ex(("127.0.0.1", OPENCLAW_PORT)) == 0:
opened = True
break
time.sleep(0.2)
if opened:
st.session_state["gateway_process"] = proc
append_logs(["Gateway process started."])
return True, "Gateway started."
if proc.poll() is None:
# Process is still alive but port not open yet; keep it and let logs/refresh catch readiness.
st.session_state["gateway_process"] = proc
append_logs(["Gateway is still initializing (port not open yet)."])
return True, "Gateway is initializing. Refresh in a few seconds."
# Collect some logs for diagnosis before trying next command.
fail_lines = []
if proc.stdout is not None:
try:
while True:
line = proc.stdout.readline()
if not line:
break
fail_lines.append(line.rstrip())
except BlockingIOError:
pass
if fail_lines:
append_logs(fail_lines[-10:])
last_error = fail_lines[-1]
else:
exit_code = proc.poll()
last_error = f"command exited with code {exit_code if exit_code is not None else 'unknown'}"
try:
proc.terminate()
except Exception:
pass
return False, f"Failed to start gateway ({last_error})."
def stop_gateway() -> tuple[bool, str]:
if EXTERNAL_GATEWAY_MANAGED:
return False, "Gateway is managed by supervisor; stop it via container config."
proc = gateway_process()
if proc is None:
return True, "Gateway is not running."
proc.terminate()
try:
proc.wait(timeout=10)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait(timeout=5)
st.session_state["gateway_process"] = None
append_logs(["Gateway process stopped."])
return True, "Gateway stopped."
def parse_expected_env_vars(config_data: dict) -> list[str]:
vars_found: set[str] = set()
for provider in config_data.get("providers", []):
base_url = provider.get("base_url")
if isinstance(base_url, str):
vars_found.update(re.findall(r"\$\{([A-Z0-9_]+)\}", base_url))
for _, val in provider.get("headers", {}).items():
if isinstance(val, str):
vars_found.update(re.findall(r"\$\{([A-Z0-9_]+)\}", val))
for tool in config_data.get("tools", {}).values():
for env_name in tool.get("env", []):
vars_found.add(env_name)
if ENV_EXAMPLE_PATH.exists():
for line in ENV_EXAMPLE_PATH.read_text(encoding="utf-8").splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#") or "=" not in stripped:
continue
vars_found.add(stripped.split("=", 1)[0].strip())
return sorted(vars_found)
def test_gateway(query: str) -> str:
try:
resp = requests.post(
f"http://127.0.0.1:{OPENCLAW_PORT}/ask",
json={"query": query},
timeout=30,
)
resp.raise_for_status()
return json.dumps(resp.json(), indent=2)
except Exception as exc:
return f"Gateway request failed: {exc}"
def is_gateway_port_open(timeout: float = 0.5) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(timeout)
return sock.connect_ex(("127.0.0.1", OPENCLAW_PORT)) == 0
def is_nanoclaw_port_open(timeout: float = 0.5) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(timeout)
return sock.connect_ex(("127.0.0.1", NANOCLAW_PORT)) == 0
def is_nanobot_port_open(timeout: float = 0.5) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(timeout)
return sock.connect_ex(("127.0.0.1", NANOBOT_PORT)) == 0
def is_picoclaw_port_open(timeout: float = 0.5) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(timeout)
return sock.connect_ex(("127.0.0.1", PICOCLAW_PORT)) == 0
def is_zeroclaw_port_open(timeout: float = 0.5) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(timeout)
return sock.connect_ex(("127.0.0.1", ZEROCLAW_PORT)) == 0
def is_nullclaw_port_open(timeout: float = 0.5) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(timeout)
return sock.connect_ex(("127.0.0.1", NULLCLAW_PORT)) == 0
def is_nemoclaw_port_open(timeout: float = 0.5) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(timeout)
return sock.connect_ex(("127.0.0.1", NEMOCLAW_PORT)) == 0
def supervisorctl(action: str, program: str) -> tuple[bool, str]:
cmd = shutil.which("supervisorctl")
if not cmd:
return False, "supervisorctl not available in container."
try:
result = subprocess.run(
[cmd, action, program],
check=True,
capture_output=True,
text=True,
)
return True, result.stdout.strip() or result.stderr.strip()
except subprocess.CalledProcessError as exc:
msg = exc.stderr.strip() or exc.stdout.strip() or str(exc)
return False, msg
def infer_llm_api_base() -> str:
candidates = [
os.getenv("LLM_SPACE_OPENAI_URL", "").strip(),
os.getenv("LLM_SPACE_WEBCHAT_URL", "").strip(),
]
for url in candidates:
if not url:
continue
if "/v1/chat/completions" in url:
return url.split("/v1/chat/completions", 1)[0]
if "/v1/web-chat/completions" in url:
return url.split("/v1/web-chat/completions", 1)[0]
return url.rstrip("/")
return "https://researchengineering-agi.hf.space"
def llm_list_models(api_base: str) -> dict:
resp = requests.get(f"{api_base}/models", timeout=15)
resp.raise_for_status()
return resp.json()
def llm_switch_model(api_base: str, model_name: str) -> dict:
resp = requests.post(
f"{api_base}/switch-model",
json={"model_name": model_name},
timeout=120,
)
resp.raise_for_status()
return resp.json()
def llm_chat_completion(api_base: str, prompt: str, max_tokens: int, temperature: float) -> dict:
resp = requests.post(
f"{api_base}/v1/chat/completions",
json={
"messages": [{"role": "user", "content": prompt}],
"max_tokens": max_tokens,
"temperature": temperature,
},
timeout=300,
)
resp.raise_for_status()
return resp.json()
def with_token(url: str, token: str, query_key: str = "token") -> str:
if not token:
return url
parts = urlsplit(url)
query = dict(parse_qsl(parts.query, keep_blank_values=True))
query.setdefault(query_key, token)
return urlunsplit((parts.scheme, parts.netloc, parts.path, urlencode(query), parts.fragment))
def with_token_all_keys(url: str, token: str) -> str:
if not token:
return url
parts = urlsplit(url)
query = dict(parse_qsl(parts.query, keep_blank_values=True))
keys = OPENCLAW_UI_QUERY_KEYS or ["token", "auth", "access_token"]
for key in keys:
query.setdefault(key, token)
return urlunsplit((parts.scheme, parts.netloc, parts.path, urlencode(query), parts.fragment))
def mask_token_in_url(url: str) -> str:
parts = urlsplit(url)
query = dict(parse_qsl(parts.query, keep_blank_values=True))
for key in ("token", "auth", "access_token"):
token = query.get(key, "")
if token:
query[key] = f"{token[:4]}...{token[-4:]}" if len(token) > 8 else "***"
return urlunsplit((parts.scheme, parts.netloc, parts.path, urlencode(query), parts.fragment))
def resolve_gateway_token() -> str:
if OPENCLAW_GATEWAY_TOKEN:
return OPENCLAW_GATEWAY_TOKEN
try:
state_cfg = json.loads(OPENCLAW_STATE_CONFIG_PATH.read_text(encoding="utf-8"))
return (
state_cfg.get("gateway", {})
.get("auth", {})
.get("token", "")
)
except Exception:
return ""
def _collect_auth_tokens(obj: object) -> list[str]:
found: list[str] = []
if isinstance(obj, dict):
# Direct auth token object: {"auth": {"token": "..."}}
auth_obj = obj.get("auth")
if isinstance(auth_obj, dict):
tok = auth_obj.get("token")
if isinstance(tok, str) and tok.strip():
found.append(tok.strip())
for value in obj.values():
found.extend(_collect_auth_tokens(value))
elif isinstance(obj, list):
for item in obj:
found.extend(_collect_auth_tokens(item))
return found
def resolve_ui_token() -> str:
# 1) Explicit env override always wins.
if OPENCLAW_GATEWAY_TOKEN:
return OPENCLAW_GATEWAY_TOKEN
# 2) Collect candidate tokens from runtime state.
candidates: list[str] = []
try:
state_cfg = json.loads(OPENCLAW_STATE_CONFIG_PATH.read_text(encoding="utf-8"))
candidates.extend(_collect_auth_tokens(state_cfg))
gw_tok = (
state_cfg.get("gateway", {})
.get("auth", {})
.get("token", "")
)
if isinstance(gw_tok, str) and gw_tok.strip():
candidates.append(gw_tok.strip())
except Exception:
pass
# Deduplicate while preserving order.
deduped = []
seen = set()
for t in candidates:
if t not in seen:
seen.add(t)
deduped.append(t)
# 3) Validate token candidates against browser UI endpoint.
for token in deduped:
try:
for key in OPENCLAW_UI_QUERY_KEYS:
probe_url = with_token(OPENCLAW_STANDARD_UI_URL, token, key)
resp = requests.get(probe_url, timeout=2, allow_redirects=True)
if resp.status_code < 400:
return token
except Exception:
continue
# 4) Fallback to legacy gateway token resolution.
return resolve_gateway_token()
def resolve_ui_query_key(token: str) -> str:
if not token:
return OPENCLAW_UI_QUERY_KEYS[0] if OPENCLAW_UI_QUERY_KEYS else "token"
for key in OPENCLAW_UI_QUERY_KEYS:
try:
probe_url = with_token(OPENCLAW_STANDARD_UI_URL, token, key)
resp = requests.get(probe_url, timeout=2, allow_redirects=True)
if resp.status_code < 400:
return key
except Exception:
continue
return OPENCLAW_UI_QUERY_KEYS[0] if OPENCLAW_UI_QUERY_KEYS else "token"
def build_strategy_frame(data: pd.DataFrame, fast_period: int, slow_period: int) -> pd.DataFrame:
frame = pd.DataFrame(index=data.index)
frame["Close"] = data["Close"]
frame["FastMA"] = data["Close"].rolling(window=fast_period).mean()
frame["SlowMA"] = data["Close"].rolling(window=slow_period).mean()
frame["Signal"] = 0
frame.loc[frame["FastMA"] > frame["SlowMA"], "Signal"] = 1
frame["SignalChange"] = frame["Signal"].diff().fillna(0)
return frame
def render_backtest_visuals(result, data: pd.DataFrame, params: dict) -> None:
st.success(f"Backtest completed with {result.engine} ({result.input_rows} bars).")
metrics = result.metrics or {}
if metrics:
metric_cols = st.columns(max(1, min(4, len(metrics))))
for idx, (metric_name, metric_value) in enumerate(metrics.items()):
col = metric_cols[idx % len(metric_cols)]
val = f"{metric_value:.3f}" if isinstance(metric_value, float) else str(metric_value)
col.metric(metric_name, val)
viz = build_strategy_frame(data, params["fast_period"], params["slow_period"])
st.markdown("**Price + Moving Averages**")
st.line_chart(viz[["Close", "FastMA", "SlowMA"]], use_container_width=True)
signal_events = viz[viz["SignalChange"] != 0][["Close", "SignalChange"]].copy()
if not signal_events.empty:
signal_events["Event"] = signal_events["SignalChange"].map({1: "BUY", -1: "SELL"}).fillna("HOLD")
st.markdown("**Crossover Events**")
st.dataframe(signal_events[["Event", "Close"]], use_container_width=True)
if isinstance(result.equity_curve, pd.DataFrame) and not result.equity_curve.empty:
equity = result.equity_curve.copy()
if "Equity" not in equity.columns:
numeric_cols = equity.select_dtypes(include=["number"]).columns
if len(numeric_cols) > 0:
equity = equity.rename(columns={numeric_cols[0]: "Equity"})
if "Equity" in equity.columns:
st.markdown("**Equity Curve**")
st.line_chart(equity["Equity"], use_container_width=True)
drawdown = (equity["Equity"] / equity["Equity"].cummax() - 1.0) * 100.0
st.markdown("**Drawdown (%)**")
st.area_chart(drawdown, use_container_width=True)
returns = equity["Equity"].pct_change().dropna()
if not returns.empty:
st.markdown("**Return Distribution**")
hist = pd.DataFrame({"return": returns})
hist["bin"] = pd.cut(hist["return"], bins=30)
hist = hist.groupby("bin", observed=False).size().rename("count").reset_index()
hist["bin"] = hist["bin"].astype(str)
st.bar_chart(hist.set_index("bin")["count"], use_container_width=True)
if isinstance(result.trades, pd.DataFrame) and not result.trades.empty:
st.markdown("**Trades**")
st.dataframe(result.trades, use_container_width=True)
def login_enabled() -> bool:
return STREAMLIT_AUTH_ENABLED
def has_login_credentials() -> bool:
return bool(STREAMLIT_AUTH_USERNAME and STREAMLIT_AUTH_PASSWORD)
def validate_login(username: str, password: str) -> bool:
return hmac.compare_digest(username, STREAMLIT_AUTH_USERNAME) and hmac.compare_digest(
password, STREAMLIT_AUTH_PASSWORD
)
def render_login_form() -> None:
st.markdown(
"""
<style>
.auth-wrap {display:flex; justify-content:center; margin-top:4vh;}
.auth-card {
width: 420px; max-width: 92vw; padding: 1.1rem 1.2rem 1rem 1.2rem;
border-radius: 14px; border: 1px solid #d8dee9; background: #ffffff;
box-shadow: 0 8px 24px rgba(16, 24, 40, 0.08);
}
.auth-title {font-size:1.2rem; font-weight:700; color:#0f172a; margin-bottom:.25rem;}
.auth-sub {font-size:.92rem; color:#475467; margin-bottom:.8rem;}
</style>
<div class="auth-wrap">
<div class="auth-card">
<div class="auth-title">OpenClaw Control Login</div>
<div class="auth-sub">Sign in to access the control center.</div>
</div>
</div>
""",
unsafe_allow_html=True,
)
with st.form("streamlit_auth_form", clear_on_submit=False):
username = st.text_input("Username")
password = st.text_input("Password", type="password")
submit = st.form_submit_button("Sign in", use_container_width=True)
if submit:
if validate_login(username.strip(), password):
st.session_state["authenticated"] = True
st.rerun()
st.error("Invalid username or password.")
st.set_page_config(page_title="OpenClaw Control Center", layout="wide")
st.session_state.setdefault("authenticated", False)
if login_enabled():
if not has_login_credentials():
st.error(
"UI auth is enabled, but credentials are missing. "
"Set `STREAMLIT_AUTH_USERNAME` and `STREAMLIT_AUTH_PASSWORD` in Hugging Face Secrets."
)
st.stop()
if not st.session_state.get("authenticated", False):
render_login_form()
st.stop()
st.title("OpenClaw Control Center")
st.caption("Manage gateway runtime, config, environment, and test calls from one UI.")
if login_enabled():
with st.sidebar:
st.success("Authenticated")
if st.button("Sign out", use_container_width=True):
st.session_state["authenticated"] = False
st.rerun()
init_state()
pull_logs()
if (
os.getenv("AUTO_START_GATEWAY", "1") == "1"
and not st.session_state["auto_start_attempted"]
):
ok, msg = start_gateway()
st.session_state["auto_start_attempted"] = True
st.session_state["auto_started"] = ok
if not ok:
st.warning(f"Auto-start failed: {msg}")
status_col, action_col = st.columns([2, 3])
with status_col:
proc = gateway_process()
running = is_gateway_port_open() or (proc is not None)
st.metric("Gateway Status", "Running" if running else "Stopped")
st.write(f"UI port: `{HF_PORT}`")
st.write(f"Gateway port: `{OPENCLAW_PORT}`")
st.write(f"Vault path: `{VAULT_PATH}`")
st.write(f"Binary: `{resolve_openclaw_bin() or 'not found'}`")
if proc is not None:
st.write(f"PID: `{proc.pid}`")
with action_col:
c1, c2, c3, c4 = st.columns(4)
if c1.button("Start", use_container_width=True):
ok, msg = start_gateway()
(st.success if ok else st.error)(msg)
if c2.button("Stop", use_container_width=True):
ok, msg = stop_gateway()
(st.success if ok else st.error)(msg)
if c3.button("Restart", use_container_width=True):
stop_gateway()
ok, msg = start_gateway()
(st.success if ok else st.error)(msg)
if c4.button("Refresh", use_container_width=True):
st.rerun()
if EXTERNAL_GATEWAY_MANAGED:
st.caption("Gateway controls are informational only (managed by Docker supervisor).")
st.divider()
st.subheader("Services")
svc_col_1, svc_col_2 = st.columns([2, 3])
with svc_col_1:
st.markdown("**Status**")
openclaw_up = is_gateway_port_open()
nanoclaw_up = is_nanoclaw_port_open() if NANOCLAW_ENABLED else False
nanobot_up = is_nanobot_port_open() if NANOBOT_ENABLED else False
picoclaw_up = is_picoclaw_port_open() if PICOCLAW_ENABLED else False
zeroclaw_up = is_zeroclaw_port_open() if ZEROCLAW_ENABLED else False
nullclaw_up = is_nullclaw_port_open() if NULLCLAW_ENABLED else False
nemoclaw_up = is_nemoclaw_port_open() if NEMOCLAW_ENABLED else False
st.write(f"OpenClaw: {'🟢 Running' if openclaw_up else '🔴 Down'} (port {OPENCLAW_PORT})")
st.write(f"NanoClaw: {'🟢 Running' if nanoclaw_up else '🔴 Down'} (port {NANOCLAW_PORT})")
st.write(f"NanoBot: {'🟢 Running' if nanobot_up else '🔴 Down'} (port {NANOBOT_PORT})")
st.write(f"PicoClaw: {'🟢 Running' if picoclaw_up else '🔴 Down'} (port {PICOCLAW_PORT})")
st.write(f"ZeroClaw: {'🟢 Running' if zeroclaw_up else '🔴 Down'} (port {ZEROCLAW_PORT})")
st.write(f"NullClaw: {'🟢 Running' if nullclaw_up else '🔴 Down'} (port {NULLCLAW_PORT})")
st.write(f"NemoClaw: {'🟢 Running' if nemoclaw_up else '🔴 Down'} (port {NEMOCLAW_PORT})")
st.write(f"IronClaw: {'🟢 Enabled' if IRONCLAW_ENABLED else '🔴 Disabled'} (no gateway port)")
with svc_col_2:
st.markdown("**Controls**")
if st.button("Restart OpenClaw", use_container_width=True):
ok, msg = supervisorctl("restart", "openclaw")
(st.success if ok else st.error)(msg)
if st.button("Restart NanoClaw", use_container_width=True):
ok, msg = supervisorctl("restart", "nanoclaw")
(st.success if ok else st.error)(msg)
if st.button("Restart NanoBot", use_container_width=True):
ok, msg = supervisorctl("restart", "nanobot")
(st.success if ok else st.error)(msg)
if st.button("Restart PicoClaw", use_container_width=True):
ok, msg = supervisorctl("restart", "picoclaw")
(st.success if ok else st.error)(msg)
if st.button("Restart ZeroClaw", use_container_width=True):
ok, msg = supervisorctl("restart", "zeroclaw")
(st.success if ok else st.error)(msg)
if st.button("Restart NullClaw", use_container_width=True):
ok, msg = supervisorctl("restart", "nullclaw")
(st.success if ok else st.error)(msg)
if st.button("Restart NemoClaw", use_container_width=True):
ok, msg = supervisorctl("restart", "nemoclaw")
(st.success if ok else st.error)(msg)
if st.button("Restart IronClaw", use_container_width=True):
ok, msg = supervisorctl("restart", "ironclaw")
(st.success if ok else st.error)(msg)
if not NANOCLAW_ENABLED:
st.info("NanoClaw is disabled. Set `NANOCLAW_ENABLED=1` to start it.")
if not NANOBOT_ENABLED:
st.info("NanoBot is disabled. Set `NANOBOT_ENABLED=1` to start it.")
if not PICOCLAW_ENABLED:
st.info("PicoClaw is disabled. Set `PICOCLAW_ENABLED=1` to start it.")
if not ZEROCLAW_ENABLED:
st.info("ZeroClaw is disabled. Set `ZEROCLAW_ENABLED=1` to start it.")
if not NULLCLAW_ENABLED:
st.info("NullClaw is disabled. Set `NULLCLAW_ENABLED=1` to start it.")
if not NEMOCLAW_ENABLED:
st.info("NemoClaw is disabled. Set `NEMOCLAW_ENABLED=1` to start it.")
if not IRONCLAW_ENABLED:
st.info("IronClaw is disabled. Set `IRONCLAW_ENABLED=1` to start it.")
st.divider()
st.subheader("OpenClaw Standard UI")
std_ui_status_col, std_ui_embed_col = st.columns([1, 3])
with std_ui_status_col:
local_ok = False
proxy_ok = False
local_status = "Unavailable"
proxy_status = "Unavailable"
gateway_token = resolve_ui_token()
std_ui_url = with_token_all_keys(OPENCLAW_STANDARD_UI_URL, gateway_token)
proxy_ui_url = with_token_all_keys(OPENCLAW_PROXY_LOCAL_URL, gateway_token)
try:
ui_resp = requests.get(std_ui_url, timeout=2, allow_redirects=True)
local_ok = ui_resp.status_code < 500
if ui_resp.status_code in (401, 403):
local_status = f"Available (auth required, HTTP {ui_resp.status_code})"
else:
local_status = "Available" if local_ok else f"HTTP {ui_resp.status_code}"
except Exception:
local_ok = False
try:
proxy_resp = requests.get(proxy_ui_url, timeout=2, allow_redirects=True)
proxy_ok = proxy_resp.status_code < 500
proxy_status = "Available" if proxy_ok else f"HTTP {proxy_resp.status_code}"
except Exception:
proxy_ok = False
st.markdown(
f"Gateway Local ({mask_token_in_url(std_ui_url)}): "
f"{'🟢' if local_ok else '🔴'} {local_status}"
)
st.markdown(
f"Gateway Via Proxy ({mask_token_in_url(proxy_ui_url)}): "
f"{'🟢' if proxy_ok else '🔴'} {proxy_status}"
)
st.caption(f"URL: {mask_token_in_url(std_ui_url)}")
if OPENCLAW_STANDARD_UI_PUBLIC_URL:
st.caption(f"Public URL: {OPENCLAW_STANDARD_UI_PUBLIC_URL}")
with std_ui_embed_col:
public_ui_url = with_token_all_keys(OPENCLAW_STANDARD_UI_PUBLIC_URL, gateway_token)
if public_ui_url and proxy_ok:
st.caption(f"Embedded URL: {mask_token_in_url(public_ui_url)}")
components.iframe(public_ui_url, height=560, scrolling=True)
st.markdown(
f'<a href="{public_ui_url}" target="_blank" rel="noopener noreferrer">Open Standard UI in new tab</a>',
unsafe_allow_html=True,
)
st.caption("If iframe shows a blank/refused page, open in a new tab. Some OpenClaw responses block iframe rendering.")
else:
st.info("Standard UI proxy is not ready yet. Wait 10-30s and refresh.")
st.divider()
st.subheader("NanoClaw UI")
nano_status_col, nano_link_col = st.columns([1, 3])
with nano_status_col:
nano_ok = False
nano_status = "Unavailable"
if NANOCLAW_ENABLED:
try:
nano_resp = requests.get(NANOCLAW_PROXY_LOCAL_URL, timeout=2, allow_redirects=True)
nano_ok = nano_resp.status_code < 500
nano_status = "Available" if nano_ok else f"HTTP {nano_resp.status_code}"
except Exception:
nano_ok = False
nano_status = "Unavailable"
st.markdown(
f"NanoClaw ({NANOCLAW_PROXY_LOCAL_URL}): "
f"{'🟢' if nano_ok else '🔴'} {nano_status}"
)
if not NANOCLAW_ENABLED:
st.caption("NanoClaw is disabled.")
with nano_link_col:
if NANOCLAW_ENABLED:
st.markdown(
f'<a href="{NANOCLAW_PROXY_LOCAL_URL}" target="_blank" rel="noopener noreferrer">Open NanoClaw in new tab</a>',
unsafe_allow_html=True,
)
st.caption("If embedded UI fails, use the new tab link.")
else:
st.info("Enable NanoClaw to access its UI.")
st.divider()
st.subheader("Claw UIs")
claw_ui_col_1, claw_ui_col_2 = st.columns(2)
with claw_ui_col_1:
st.markdown("**NanoBot**")
nanobot_ok = False
if NANOBOT_ENABLED:
try:
nanobot_resp = requests.get(NANOBOT_PROXY_LOCAL_URL, timeout=2, allow_redirects=True)
nanobot_ok = nanobot_resp.status_code < 500
except Exception:
nanobot_ok = False
if NANOBOT_ENABLED:
st.markdown(
f'<a href="{NANOBOT_PROXY_LOCAL_URL}" target="_blank" rel="noopener noreferrer">Open NanoBot</a>',
unsafe_allow_html=True,
)
st.caption(f"Proxy: {NANOBOT_PROXY_LOCAL_URL} ({'ok' if nanobot_ok else 'down'})")
else:
st.caption("Disabled")
st.markdown("**PicoClaw**")
picoclaw_ok = False
if PICOCLAW_ENABLED:
try:
picoclaw_resp = requests.get(PICOCLAW_PROXY_LOCAL_URL, timeout=2, allow_redirects=True)
picoclaw_ok = picoclaw_resp.status_code < 500
except Exception:
picoclaw_ok = False
if PICOCLAW_ENABLED:
st.markdown(
f'<a href="{PICOCLAW_PROXY_LOCAL_URL}" target="_blank" rel="noopener noreferrer">Open PicoClaw</a>',
unsafe_allow_html=True,
)
st.caption(f"Proxy: {PICOCLAW_PROXY_LOCAL_URL} ({'ok' if picoclaw_ok else 'down'})")
else:
st.caption("Disabled")
st.markdown("**ZeroClaw**")
zeroclaw_ok = False
if ZEROCLAW_ENABLED:
try:
zeroclaw_resp = requests.get(ZEROCLAW_PROXY_LOCAL_URL, timeout=2, allow_redirects=True)
zeroclaw_ok = zeroclaw_resp.status_code < 500
except Exception:
zeroclaw_ok = False
if ZEROCLAW_ENABLED:
st.markdown(
f'<a href="{ZEROCLAW_PROXY_LOCAL_URL}" target="_blank" rel="noopener noreferrer">Open ZeroClaw</a>',
unsafe_allow_html=True,
)
st.caption(f"Proxy: {ZEROCLAW_PROXY_LOCAL_URL} ({'ok' if zeroclaw_ok else 'down'})")
else:
st.caption("Disabled")
with claw_ui_col_2:
st.markdown("**NullClaw**")
nullclaw_ok = False
if NULLCLAW_ENABLED:
try:
nullclaw_resp = requests.get(NULLCLAW_PROXY_LOCAL_URL, timeout=2, allow_redirects=True)
nullclaw_ok = nullclaw_resp.status_code < 500
except Exception:
nullclaw_ok = False
if NULLCLAW_ENABLED:
st.markdown(
f'<a href="{NULLCLAW_PROXY_LOCAL_URL}" target="_blank" rel="noopener noreferrer">Open NullClaw</a>',
unsafe_allow_html=True,
)
st.caption(f"Proxy: {NULLCLAW_PROXY_LOCAL_URL} ({'ok' if nullclaw_ok else 'down'})")
else:
st.caption("Disabled")
st.markdown("**NemoClaw**")
nemoclaw_ok = False
if NEMOCLAW_ENABLED:
try:
nemoclaw_resp = requests.get(NEMOCLAW_PROXY_LOCAL_URL, timeout=2, allow_redirects=True)
nemoclaw_ok = nemoclaw_resp.status_code < 500
except Exception:
nemoclaw_ok = False
if NEMOCLAW_ENABLED:
st.markdown(
f'<a href="{NEMOCLAW_PROXY_LOCAL_URL}" target="_blank" rel="noopener noreferrer">Open NemoClaw</a>',
unsafe_allow_html=True,
)
st.caption(f"Proxy: {NEMOCLAW_PROXY_LOCAL_URL} ({'ok' if nemoclaw_ok else 'down'})")
else:
st.caption("Disabled")
st.markdown("**IronClaw**")
st.caption("No gateway port detected. Run `ironclaw` CLI after onboarding.")
st.divider()
cfg_col, env_col = st.columns(2)
with cfg_col:
st.subheader("Config Editor")
config_text = st.text_area(
"openclaw.json",
value=st.session_state["config_editor_text"],
height=360,
)
st.session_state["config_editor_text"] = config_text
if st.button("Save Config", use_container_width=True):
try:
parsed = json.loads(config_text)
CONFIG_PATH.write_text(json.dumps(parsed, indent=2) + "\n", encoding="utf-8")
st.success(f"Saved {CONFIG_PATH}.")
except json.JSONDecodeError as exc:
st.error(f"Invalid JSON: {exc}")
with env_col:
st.subheader("NanoClaw Config")
nanoclaw_text = st.text_area(
"nanoclaw.json",
value=st.session_state.get("nanoclaw_config_text", load_nanoclaw_config_text()),
height=260,
)
st.session_state["nanoclaw_config_text"] = nanoclaw_text
if st.button("Save NanoClaw Config", use_container_width=True):
try:
parsed = json.loads(nanoclaw_text)
NANOCLAW_CONFIG_PATH.write_text(
json.dumps(parsed, indent=2) + "\n", encoding="utf-8"
)
st.success(f"Saved {NANOCLAW_CONFIG_PATH}.")
except json.JSONDecodeError as exc:
st.error(f"Invalid JSON: {exc}")
st.subheader("Environment Checks")
config_data = load_config_json()
expected_vars = parse_expected_env_vars(config_data)
if not expected_vars:
st.info("No expected env vars detected.")
else:
checks = []
for key in expected_vars:
is_set = bool(os.getenv(key, ""))
checks.append(
{
"name": key,
"status": "available" if is_set else "missing",
"color": "green" if is_set else "red",
}
)
for item in checks:
st.markdown(
f"<span style='color:{item['color']}'>&#9679;</span> "
f"<b>{item['name']}</b> - {item['status']}",
unsafe_allow_html=True,
)
st.subheader("Claw Configs")
cfg_a, cfg_b = st.columns(2)
with cfg_a:
nanobot_text = st.text_area(
"nanobot.json",
value=st.session_state.get("nanobot_config_text", load_nanobot_config_text()),
height=220,
)
st.session_state["nanobot_config_text"] = nanobot_text
if st.button("Save NanoBot Config", use_container_width=True):
try:
parsed = json.loads(nanobot_text)
NANOBOT_CONFIG_PATH.write_text(
json.dumps(parsed, indent=2) + "\n", encoding="utf-8"
)
st.success(f"Saved {NANOBOT_CONFIG_PATH}.")
except json.JSONDecodeError as exc:
st.error(f"Invalid JSON: {exc}")
picoclaw_text = st.text_area(
"picoclaw.json",
value=st.session_state.get("picoclaw_config_text", load_picoclaw_config_text()),
height=220,
)
st.session_state["picoclaw_config_text"] = picoclaw_text
if st.button("Save PicoClaw Config", use_container_width=True):
try:
parsed = json.loads(picoclaw_text)
PICOCLAW_CONFIG_PATH.write_text(
json.dumps(parsed, indent=2) + "\n", encoding="utf-8"
)
st.success(f"Saved {PICOCLAW_CONFIG_PATH}.")
except json.JSONDecodeError as exc:
st.error(f"Invalid JSON: {exc}")
with cfg_b:
zeroclaw_text = st.text_area(
"zeroclaw.json",
value=st.session_state.get("zeroclaw_config_text", load_zeroclaw_config_text()),
height=220,
)
st.session_state["zeroclaw_config_text"] = zeroclaw_text
if st.button("Save ZeroClaw Config", use_container_width=True):
try:
parsed = json.loads(zeroclaw_text)
ZEROCLAW_CONFIG_PATH.write_text(
json.dumps(parsed, indent=2) + "\n", encoding="utf-8"
)
st.success(f"Saved {ZEROCLAW_CONFIG_PATH}.")
except json.JSONDecodeError as exc:
st.error(f"Invalid JSON: {exc}")
nullclaw_text = st.text_area(
"nullclaw.json",
value=st.session_state.get("nullclaw_config_text", load_nullclaw_config_text()),
height=220,
)
st.session_state["nullclaw_config_text"] = nullclaw_text
if st.button("Save NullClaw Config", use_container_width=True):
try:
parsed = json.loads(nullclaw_text)
NULLCLAW_CONFIG_PATH.write_text(
json.dumps(parsed, indent=2) + "\n", encoding="utf-8"
)
st.success(f"Saved {NULLCLAW_CONFIG_PATH}.")
except json.JSONDecodeError as exc:
st.error(f"Invalid JSON: {exc}")
nemoclaw_text = st.text_area(
"nemoclaw.json",
value=st.session_state.get("nemoclaw_config_text", load_nemoclaw_config_text()),
height=220,
)
st.session_state["nemoclaw_config_text"] = nemoclaw_text
if st.button("Save NemoClaw Config", use_container_width=True):
try:
parsed = json.loads(nemoclaw_text)
NEMOCLAW_CONFIG_PATH.write_text(
json.dumps(parsed, indent=2) + "\n", encoding="utf-8"
)
st.success(f"Saved {NEMOCLAW_CONFIG_PATH}.")
except json.JSONDecodeError as exc:
st.error(f"Invalid JSON: {exc}")
ironclaw_text = st.text_area(
"ironclaw.json",
value=st.session_state.get("ironclaw_config_text", load_ironclaw_config_text()),
height=220,
)
st.session_state["ironclaw_config_text"] = ironclaw_text
if st.button("Save IronClaw Config", use_container_width=True):
try:
parsed = json.loads(ironclaw_text)
IRONCLAW_CONFIG_PATH.write_text(
json.dumps(parsed, indent=2) + "\n", encoding="utf-8"
)
st.success(f"Saved {IRONCLAW_CONFIG_PATH}.")
except json.JSONDecodeError as exc:
st.error(f"Invalid JSON: {exc}")
st.divider()
test_col, logs_col = st.columns([2, 3])
with test_col:
st.subheader("Gateway Test")
test_query = st.text_input("Test prompt", value="Ping from Streamlit")
if st.button("Send /ask", use_container_width=True):
st.code(test_gateway(test_query), language="json")
with logs_col:
st.subheader("Gateway Logs")
st.code("\n".join(st.session_state["gateway_logs"][-LOG_MAX_LINES:]) or "No logs yet.")
st.expander("Service Diagnostics (Supervisor/Caddy/Streamlit/OpenClaw)").code(
"\n\n".join(
[
"=== supervisord.log ===",
tail_text_file(SUPERVISOR_LOG_PATH),
"=== openclaw-gateway.err.log ===",
tail_text_file(GATEWAY_ERR_LOG_PATH),
"=== openclaw-gateway.log ===",
tail_text_file(GATEWAY_BOOT_LOG_PATH),
"=== nanoclaw.err.log ===",
tail_text_file(NANOCLAW_ERR_LOG_PATH),
"=== nanoclaw.log ===",
tail_text_file(NANOCLAW_LOG_PATH),
"=== nanobot.err.log ===",
tail_text_file(NANOBOT_ERR_LOG_PATH),
"=== nanobot.log ===",
tail_text_file(NANOBOT_LOG_PATH),
"=== picoclaw.err.log ===",
tail_text_file(PICOCLAW_ERR_LOG_PATH),
"=== picoclaw.log ===",
tail_text_file(PICOCLAW_LOG_PATH),
"=== zeroclaw.err.log ===",
tail_text_file(ZEROCLAW_ERR_LOG_PATH),
"=== zeroclaw.log ===",
tail_text_file(ZEROCLAW_LOG_PATH),
"=== nullclaw.err.log ===",
tail_text_file(NULLCLAW_ERR_LOG_PATH),
"=== nullclaw.log ===",
tail_text_file(NULLCLAW_LOG_PATH),
"=== nemoclaw.err.log ===",
tail_text_file(NEMOCLAW_ERR_LOG_PATH),
"=== nemoclaw.log ===",
tail_text_file(NEMOCLAW_LOG_PATH),
"=== ironclaw.err.log ===",
tail_text_file(IRONCLAW_ERR_LOG_PATH),
"=== ironclaw.log ===",
tail_text_file(IRONCLAW_LOG_PATH),
"=== caddy.err.log ===",
tail_text_file(CADDY_ERR_LOG_PATH),
"=== caddy.log ===",
tail_text_file(CADDY_LOG_PATH),
"=== streamlit.err.log ===",
tail_text_file(STREAMLIT_ERR_LOG_PATH),
"=== streamlit.log ===",
tail_text_file(STREAMLIT_LOG_PATH),
]
)
)
st.divider()
st.subheader("External LLM Server Test")
llm_base_default = infer_llm_api_base()
llm_base_url = st.text_input("LLM API Base URL", value=llm_base_default)
llm_models_col, llm_switch_col = st.columns([2, 3])
with llm_models_col:
if st.button("Fetch Models", use_container_width=True):
try:
models_info = llm_list_models(llm_base_url.rstrip("/"))
st.session_state["llm_models_info"] = models_info
except Exception as exc:
st.error(f"Models request failed: {exc}")
models_info = st.session_state.get("llm_models_info", {})
if models_info:
st.json(models_info)
with llm_switch_col:
available_models = models_info.get(
"available_models",
["deepseek-chat", "mistral-7b", "openhermes-7b", "deepseek-coder", "llama-7b"],
)
selected_model = st.selectbox("Switch Model", options=available_models, index=0)
if st.button("Switch Active Model", use_container_width=True):
try:
switch_resp = llm_switch_model(llm_base_url.rstrip("/"), selected_model)
st.success(f"Switched: {switch_resp}")
except Exception as exc:
st.error(f"Model switch failed: {exc}")
llm_prompt_col, llm_result_col = st.columns([2, 3])
with llm_prompt_col:
llm_test_prompt = st.text_area(
"Prompt",
value="Summarize the key financial metrics from this report: Revenue increased 15% YoY to $10M, with EBITDA margins improving from 20% to 25%. Cash flow remained strong at $3M.",
height=140,
)
llm_max_tokens = st.number_input("Max Tokens", min_value=1, max_value=4096, value=256)
llm_temp = st.number_input("Temperature", min_value=0.0, max_value=2.0, value=0.7, step=0.1)
run_llm_test = st.button("Run Chat Completion", use_container_width=True)
with llm_result_col:
if run_llm_test:
try:
t0 = time.perf_counter()
chat_resp = llm_chat_completion(
llm_base_url.rstrip("/"),
llm_test_prompt,
int(llm_max_tokens),
float(llm_temp),
)
dt = time.perf_counter() - t0
st.caption(f"Latency: {dt:.2f}s")
content = (
chat_resp.get("choices", [{}])[0]
.get("message", {})
.get("content", "")
)
if content:
st.markdown("**Response**")
st.write(content)
st.markdown("**Raw JSON**")
st.json(chat_resp)
except Exception as exc:
st.error(f"Chat request failed: {exc}")
st.divider()
st.subheader("Backtesting Lab")
st.caption(
"Run SMA crossover tests with popular engines: backtesting.py and backtrader."
)
source_col, params_col = st.columns([3, 2])
with source_col:
data_source = st.radio(
"Data Source",
options=["yfinance", "csv_upload"],
horizontal=True,
)
if data_source == "yfinance":
symbol = st.text_input("Symbol", value="AAPL")
date_col1, date_col2, interval_col = st.columns([1, 1, 1])
start_date = date_col1.date_input("Start", value=date(2022, 1, 1))
end_date = date_col2.date_input("End", value=date.today())
interval = interval_col.selectbox("Interval", options=["1d", "1h", "30m", "15m"], index=0)
uploaded_file = None
else:
uploaded_file = st.file_uploader("Upload CSV (Date/Open/High/Low/Close/Volume)", type=["csv"])
symbol = ""
start_date = None
end_date = None
interval = "1d"
with params_col:
engine = st.selectbox("Engine", options=["backtesting.py", "backtrader"], index=0)
fast_period = st.number_input("Fast MA", min_value=2, max_value=200, value=10)
slow_period = st.number_input("Slow MA", min_value=3, max_value=400, value=30)
initial_cash = st.number_input("Initial Cash", min_value=1000.0, value=10000.0, step=1000.0)
commission = st.number_input(
"Commission (fraction)", min_value=0.0, max_value=0.1, value=0.001, format="%.5f"
)
run_bt = st.button("Run Backtest", use_container_width=True)
if run_bt:
try:
if data_source == "yfinance":
data = load_price_data_from_yfinance(
symbol=symbol.strip().upper(),
start=str(start_date),
end=str(end_date),
interval=interval,
)
else:
if uploaded_file is None:
raise ValueError("Upload a CSV file first.")
data = load_price_data_from_csv_text(uploaded_file.getvalue().decode("utf-8"))
result = run_backtest(
engine=engine,
data=data,
fast_period=int(fast_period),
slow_period=int(slow_period),
initial_cash=float(initial_cash),
commission=float(commission),
)
st.session_state["backtest_result"] = result
st.session_state["backtest_data"] = data
st.session_state["backtest_params"] = {
"engine": engine,
"fast_period": int(fast_period),
"slow_period": int(slow_period),
}
st.session_state["backtest_error"] = ""
except Exception as exc:
st.session_state["backtest_error"] = str(exc)
if st.session_state["backtest_error"]:
st.error(f"Backtest failed: {st.session_state['backtest_error']}")
if (
st.session_state["backtest_result"] is not None
and isinstance(st.session_state["backtest_data"], pd.DataFrame)
and st.session_state["backtest_params"] is not None
):
render_backtest_visuals(
st.session_state["backtest_result"],
st.session_state["backtest_data"],
st.session_state["backtest_params"],
)