Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| import random | |
| import threading | |
| import time | |
| from dataclasses import dataclass | |
| from datetime import datetime, timezone | |
| from typing import Any, Dict, List, Optional, Set | |
| from urllib.parse import urlencode, urlparse | |
| import requests | |
| from huggingface_hub import InferenceClient | |
| from chat.handler import ChatHandler | |
| from ui.controller import UIController | |
| from ui.layout import create_demo | |
| class ConfigError(Exception): | |
| pass | |
| class Config: | |
| moltbook_base_url: str | |
| hf_token: str | |
| hf_model: str | |
| hf_temperature: float | |
| hf_max_tokens: int | |
| heartbeat_interval_s: int | |
| heartbeat_jitter_s: int | |
| request_timeout_s: int | |
| dry_run: bool | |
| max_feed_items: int | |
| max_replies_per_cycle: int | |
| max_upvotes_per_cycle: int | |
| memory_notes_limit: int | |
| moltbook_token: str | |
| moltbook_feed_path: str | |
| moltbook_post_path: str | |
| moltbook_reply_path_template: str | |
| moltbook_upvote_path_template: str | |
| moltbook_me_activity_path: str | |
| enforce_https: bool | |
| block_unknown_network: bool | |
| def allowed_hosts(self) -> Set[str]: | |
| return { | |
| urlparse(self.moltbook_base_url).netloc.lower(), | |
| "api-inference.huggingface.co", | |
| } | |
| def _to_bool(value: Optional[str], default: bool = False) -> bool: | |
| if value is None or value == "": | |
| return default | |
| return str(value).strip().lower() in {"1", "true", "yes", "on"} | |
| def _to_int(value: Optional[str], default: int) -> int: | |
| if value is None or value == "": | |
| return default | |
| try: | |
| parsed = int(value) | |
| return parsed if parsed > 0 else default | |
| except ValueError: | |
| return default | |
| def _to_float(value: Optional[str], default: float) -> float: | |
| if value is None or value == "": | |
| return default | |
| try: | |
| parsed = float(value) | |
| return parsed if 0 <= parsed <= 2 else default | |
| except ValueError: | |
| return default | |
| def _require_env(name: str) -> str: | |
| value = os.getenv(name, "").strip() | |
| if not value: | |
| raise ConfigError(f"Missing required env var: {name}") | |
| return value | |
| def _validate_url(name: str, value: str, enforce_https: bool) -> str: | |
| parsed = urlparse(value) | |
| if parsed.scheme not in {"http", "https"}: | |
| raise ConfigError(f"{name} must be http/https URL") | |
| if enforce_https and parsed.scheme != "https": | |
| raise ConfigError(f"{name} must be https when ENFORCE_HTTPS=true") | |
| if not parsed.netloc: | |
| raise ConfigError(f"{name} missing host") | |
| return value.rstrip("/") | |
| def _normalize_path(value: str, fallback: str) -> str: | |
| path = (value or fallback).strip() | |
| if not path: | |
| path = fallback | |
| if not path.startswith("/"): | |
| path = "/" + path | |
| return path | |
| def load_config() -> Config: | |
| enforce_https = _to_bool(os.getenv("ENFORCE_HTTPS"), True) | |
| moltbook_base_url = _validate_url( | |
| "MOLTBOOK_BASE_URL", | |
| os.getenv("MOLTBOOK_BASE_URL", "https://www.moltbook.com"), | |
| enforce_https, | |
| ) | |
| if urlparse(moltbook_base_url).netloc.lower() == "moltbook.com": | |
| raise ConfigError("Use https://www.moltbook.com (www required)") | |
| return Config( | |
| moltbook_base_url=moltbook_base_url, | |
| hf_token=_require_env("HF_TOKEN"), | |
| hf_model=os.getenv("HF_MODEL", "meta-llama/Llama-3.1-8B-Instruct"), | |
| hf_temperature=_to_float(os.getenv("HF_TEMPERATURE"), 0.4), | |
| hf_max_tokens=_to_int(os.getenv("HF_MAX_TOKENS"), 450), | |
| heartbeat_interval_s=_to_int(os.getenv("HEARTBEAT_INTERVAL_S"), 90), | |
| heartbeat_jitter_s=_to_int(os.getenv("HEARTBEAT_JITTER_S"), 3), | |
| request_timeout_s=_to_int(os.getenv("REQUEST_TIMEOUT_S"), 15), | |
| dry_run=_to_bool(os.getenv("DRY_RUN"), True), | |
| max_feed_items=_to_int(os.getenv("MAX_FEED_ITEMS"), 20), | |
| max_replies_per_cycle=_to_int(os.getenv("MAX_REPLIES_PER_CYCLE"), 1), | |
| max_upvotes_per_cycle=_to_int(os.getenv("MAX_UPVOTES_PER_CYCLE"), 2), | |
| memory_notes_limit=_to_int(os.getenv("MEMORY_NOTES_LIMIT"), 20), | |
| moltbook_token=os.getenv("MOLTBOOK_API_TOKEN", ""), | |
| moltbook_feed_path=_normalize_path(os.getenv("MOLTBOOK_FEED_PATH", "/api/v1/feed"), "/api/v1/feed"), | |
| moltbook_post_path=_normalize_path(os.getenv("MOLTBOOK_POST_PATH", "/api/v1/posts"), "/api/v1/posts"), | |
| moltbook_reply_path_template=_normalize_path( | |
| os.getenv("MOLTBOOK_REPLY_PATH_TEMPLATE", "/api/v1/posts/{postId}/comments"), | |
| "/api/v1/posts/{postId}/comments", | |
| ), | |
| moltbook_upvote_path_template=_normalize_path( | |
| os.getenv("MOLTBOOK_UPVOTE_PATH_TEMPLATE", "/api/v1/posts/{postId}/upvote"), | |
| "/api/v1/posts/{postId}/upvote", | |
| ), | |
| moltbook_me_activity_path=_normalize_path(os.getenv("MOLTBOOK_ME_ACTIVITY_PATH", "/api/v1/agents/me"), "/api/v1/agents/me"), | |
| enforce_https=enforce_https, | |
| block_unknown_network=_to_bool(os.getenv("BLOCK_UNKNOWN_NETWORK"), True), | |
| ) | |
| class Logger: | |
| def __init__(self, max_lines: int = 300): | |
| self._max_lines = max_lines | |
| self._lines: List[str] = [] | |
| self._lock = threading.Lock() | |
| def _push(self, level: str, message: str, meta: Optional[Dict[str, Any]] = None) -> None: | |
| payload = { | |
| "time": datetime.now(timezone.utc).isoformat(), | |
| "level": level, | |
| "message": message, | |
| } | |
| if meta: | |
| payload["meta"] = meta | |
| line = json.dumps(payload, ensure_ascii=True) | |
| with self._lock: | |
| self._lines.append(line) | |
| self._lines = self._lines[-self._max_lines :] | |
| print(line, flush=True) | |
| def info(self, message: str, meta: Optional[Dict[str, Any]] = None) -> None: | |
| self._push("info", message, meta) | |
| def warn(self, message: str, meta: Optional[Dict[str, Any]] = None) -> None: | |
| self._push("warn", message, meta) | |
| def error(self, message: str, meta: Optional[Dict[str, Any]] = None) -> None: | |
| self._push("error", message, meta) | |
| def snapshot(self) -> str: | |
| with self._lock: | |
| return "\n".join(self._lines[-120:]) | |
| _ORIGINAL_REQUEST = None | |
| _REQUEST_GUARD_LOCK = threading.Lock() | |
| def install_requests_network_guard(config: Config) -> None: | |
| global _ORIGINAL_REQUEST | |
| with _REQUEST_GUARD_LOCK: | |
| if _ORIGINAL_REQUEST is not None: | |
| return | |
| _ORIGINAL_REQUEST = requests.sessions.Session.request | |
| def guarded_request(session: requests.Session, method: str, url: str, *args: Any, **kwargs: Any) -> requests.Response: | |
| parsed = urlparse(url) | |
| host = parsed.netloc.lower() | |
| if config.enforce_https and parsed.scheme != "https": | |
| raise RuntimeError(f"Blocked non-https request: {url}") | |
| if config.block_unknown_network and host not in config.allowed_hosts: | |
| raise RuntimeError(f"Blocked host outside allowlist: {host}") | |
| return _ORIGINAL_REQUEST(session, method, url, *args, **kwargs) | |
| requests.sessions.Session.request = guarded_request | |
| class GuardedHttp: | |
| def __init__(self, config: Config): | |
| self.config = config | |
| self.session = requests.Session() | |
| def request( | |
| self, | |
| method: str, | |
| url: str, | |
| headers: Dict[str, str], | |
| json_body: Optional[Dict[str, Any]] = None, | |
| ) -> requests.Response: | |
| parsed = urlparse(url) | |
| host = parsed.netloc.lower() | |
| if self.config.enforce_https and parsed.scheme != "https": | |
| raise RuntimeError(f"Blocked non-https request: {url}") | |
| if self.config.block_unknown_network and host not in self.config.allowed_hosts: | |
| raise RuntimeError(f"Blocked host outside allowlist: {host}") | |
| try: | |
| response = self.session.request( | |
| method=method, | |
| url=url, | |
| headers=headers, | |
| json=json_body, | |
| timeout=self.config.request_timeout_s, | |
| ) | |
| except requests.RequestException as exc: | |
| raise RuntimeError(f"Network request failed for {method} {url}: {exc}") from exc | |
| return response | |
| class MoltbookClient: | |
| def __init__(self, config: Config, http: GuardedHttp): | |
| self.cfg = config | |
| self.http = http | |
| def _url(self, path: str) -> str: | |
| return f"{self.cfg.moltbook_base_url}{path}" | |
| def _headers(self) -> Dict[str, str]: | |
| headers = {"content-type": "application/json"} | |
| if self.cfg.moltbook_token: | |
| headers["authorization"] = f"Bearer {self.cfg.moltbook_token}" | |
| return headers | |
| def _request(self, method: str, path: str, body: Optional[Dict[str, Any]] = None, query: Optional[Dict[str, Any]] = None) -> Any: | |
| url = self._url(path) | |
| if query: | |
| filtered_query = {k: v for k, v in query.items() if v is not None} | |
| if filtered_query: | |
| sep = "&" if "?" in url else "?" | |
| url += sep + urlencode(filtered_query) | |
| resp = self.http.request(method=method, url=url, headers=self._headers(), json_body=body) | |
| try: | |
| data = resp.json() if resp.text else {} | |
| except ValueError: | |
| data = {"raw": resp.text} | |
| if not resp.ok: | |
| raise RuntimeError(f"Moltbook API error {resp.status_code}: {data}") | |
| return data | |
| def _normalize_post(raw: Dict[str, Any]) -> Optional[Dict[str, Any]]: | |
| post_id = str(raw.get("id") or raw.get("post_id") or "").strip() | |
| text = str(raw.get("text") or raw.get("content") or "").strip() | |
| if not post_id or not text: | |
| return None | |
| replies_raw = raw.get("replies") if isinstance(raw.get("replies"), list) else raw.get("comments") | |
| replies = [] | |
| if isinstance(replies_raw, list): | |
| for item in replies_raw: | |
| if isinstance(item, dict): | |
| replies.append( | |
| { | |
| "id": str(item.get("id") or ""), | |
| "authorHandle": str(item.get("author_handle") or item.get("author", {}).get("handle") or "unknown"), | |
| "text": str(item.get("text") or item.get("content") or ""), | |
| } | |
| ) | |
| author = raw.get("author") if isinstance(raw.get("author"), dict) else {} | |
| return { | |
| "id": post_id, | |
| "authorHandle": str(raw.get("author_handle") or author.get("handle") or "unknown"), | |
| "text": text, | |
| "viewerHasUpvoted": bool(raw.get("viewer_has_upvoted") or raw.get("viewerHasUpvoted") or False), | |
| "replies": replies, | |
| } | |
| def read_feed(self, limit: int) -> List[Dict[str, Any]]: | |
| data = self._request("GET", self.cfg.moltbook_feed_path, query={"limit": limit}) | |
| items = data.get("items") if isinstance(data, dict) else data | |
| if isinstance(data, dict) and isinstance(data.get("feed"), list): | |
| items = data["feed"] | |
| if not isinstance(items, list): | |
| items = [] | |
| out: List[Dict[str, Any]] = [] | |
| for raw in items: | |
| if isinstance(raw, dict): | |
| norm = self._normalize_post(raw) | |
| if norm: | |
| out.append(norm) | |
| return out | |
| def get_activity(self, limit: int) -> List[Dict[str, Any]]: | |
| data = self._request("GET", self.cfg.moltbook_me_activity_path, query={"limit": limit}) | |
| if isinstance(data, dict) and isinstance(data.get("items"), list): | |
| return data["items"] | |
| return data if isinstance(data, list) else [] | |
| def post(self, content: str) -> None: | |
| self._request("POST", self.cfg.moltbook_post_path, body={"content": content}) | |
| def reply(self, post_id: str, content: str) -> None: | |
| path = self.cfg.moltbook_reply_path_template.replace("{postId}", post_id) | |
| self._request("POST", path, body={"content": content}) | |
| def upvote(self, post_id: str) -> None: | |
| path = self.cfg.moltbook_upvote_path_template.replace("{postId}", post_id) | |
| self._request("POST", path, body={}) | |
| class HfDecisionClient: | |
| def __init__(self, config: Config): | |
| self.cfg = config | |
| self.client = InferenceClient(token=self.cfg.hf_token, timeout=self.cfg.request_timeout_s) | |
| def _extract_content(response: Any) -> str: | |
| if isinstance(response, dict): | |
| content = ( | |
| response.get("choices", [{}])[0] | |
| .get("message", {}) | |
| .get("content", "") | |
| ) | |
| else: | |
| choices = getattr(response, "choices", None) or [] | |
| message = choices[0].message if choices else None | |
| content = getattr(message, "content", "") | |
| if isinstance(content, list): | |
| content = "".join(str(part.get("text", "")) for part in content if isinstance(part, dict)) | |
| return str(content or "").strip() | |
| def decide(self, system_prompt: str, user_prompt: str) -> Dict[str, Any]: | |
| response = self.client.chat_completion( | |
| model=self.cfg.hf_model, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt}, | |
| ], | |
| temperature=self.cfg.hf_temperature, | |
| max_tokens=self.cfg.hf_max_tokens, | |
| response_format={"type": "json_object"}, | |
| ) | |
| content = self._extract_content(response) | |
| if not isinstance(content, str) or not content.strip(): | |
| raise RuntimeError("HF response missing message content") | |
| try: | |
| parsed = json.loads(content) | |
| except json.JSONDecodeError as exc: | |
| raise RuntimeError(f"HF content is not valid JSON: {exc}") from exc | |
| if not isinstance(parsed, dict): | |
| raise RuntimeError("HF decision payload must be a JSON object") | |
| return parsed | |
| def chat(self, system_prompt: str, history: List[Any], user_message: str) -> str: | |
| messages: List[Dict[str, str]] = [{"role": "system", "content": system_prompt}] | |
| for item in history or []: | |
| if isinstance(item, dict): | |
| role = str(item.get("role") or "").strip().lower() | |
| content = str(item.get("content") or "").strip() | |
| if role in {"user", "assistant"} and content: | |
| messages.append({"role": role, "content": content}) | |
| continue | |
| if isinstance(item, (list, tuple)) and len(item) == 2: | |
| user_text = str(item[0] or "").strip() | |
| bot_text = str(item[1] or "").strip() | |
| if user_text: | |
| messages.append({"role": "user", "content": user_text}) | |
| if bot_text: | |
| messages.append({"role": "assistant", "content": bot_text}) | |
| messages.append({"role": "user", "content": user_message}) | |
| response = self.client.chat_completion( | |
| model=self.cfg.hf_model, | |
| messages=messages, | |
| temperature=self.cfg.hf_temperature, | |
| max_tokens=min(self.cfg.hf_max_tokens, 350), | |
| ) | |
| content = self._extract_content(response) | |
| if not content: | |
| raise RuntimeError("HF response missing message content") | |
| return content | |
| class EphemeralMemory: | |
| def __init__(self, agent_handle: str, max_notes: int): | |
| self.agent_handle = agent_handle | |
| self.max_notes = max_notes | |
| self.notes: List[str] = [] | |
| self.replied_post_ids: Set[str] = set() | |
| self.upvoted_post_ids: Set[str] = set() | |
| def rebuild(self, feed: List[Dict[str, Any]], activity: List[Dict[str, Any]]) -> None: | |
| self.replied_post_ids = set() | |
| self.upvoted_post_ids = set() | |
| for post in feed: | |
| if post.get("viewerHasUpvoted"): | |
| self.upvoted_post_ids.add(post["id"]) | |
| for reply in post.get("replies", []): | |
| if reply.get("authorHandle") == self.agent_handle: | |
| self.replied_post_ids.add(post["id"]) | |
| for event in activity: | |
| if not isinstance(event, dict): | |
| continue | |
| post_id = str(event.get("post_id") or event.get("postId") or "").strip() | |
| typ = str(event.get("type") or "").lower() | |
| if not post_id: | |
| continue | |
| if "reply" in typ or "comment" in typ: | |
| self.replied_post_ids.add(post_id) | |
| if "upvote" in typ or "like" in typ: | |
| self.upvoted_post_ids.add(post_id) | |
| def add_note(self, note: str) -> None: | |
| clean = " ".join(str(note).split()).strip() | |
| if not clean: | |
| return | |
| self.notes.append(f"{datetime.now(timezone.utc).isoformat()}: {clean[:160]}") | |
| self.notes = self.notes[-self.max_notes :] | |
| def to_context(self) -> Dict[str, Any]: | |
| return { | |
| "agentHandle": self.agent_handle, | |
| "notes": self.notes[-6:], | |
| "repliedPostIds": list(self.replied_post_ids), | |
| "upvotedPostIds": list(self.upvoted_post_ids), | |
| } | |
| class AutonomousAgent: | |
| def __init__(self, config: Config, logger: Logger): | |
| self.cfg = config | |
| self.logger = logger | |
| self.http = GuardedHttp(config) | |
| self.moltbook = MoltbookClient(config, self.http) | |
| self.llm = HfDecisionClient(config) | |
| self.memory = EphemeralMemory(agent_handle=os.getenv("AGENT_HANDLE", "clawdbot"), max_notes=config.memory_notes_limit) | |
| self.running = False | |
| self.thread: Optional[threading.Thread] = None | |
| self.last_status = "idle" | |
| self._state_lock = threading.Lock() | |
| self._stop_event = threading.Event() | |
| def _build_prompts(self, feed: List[Dict[str, Any]]) -> Dict[str, str]: | |
| system = ( | |
| "You are an autonomous Moltbook-only social bot. " | |
| "Return strict JSON only with schema: " | |
| '{"post": null|{"content": string}, "replies": [{"postId": string, "content": string}], "upvotes": [string], "memory_note": string}. ' | |
| "Limits: max 1 post, max 1 reply, max 2 upvotes, each content <= 220 chars." | |
| ) | |
| compact = [] | |
| for item in feed[: self.cfg.max_feed_items]: | |
| compact.append( | |
| { | |
| "id": item["id"], | |
| "author": f"@{item.get('authorHandle', 'unknown')}", | |
| "text": " ".join(item.get("text", "").split())[:280], | |
| "viewerHasUpvoted": bool(item.get("viewerHasUpvoted", False)), | |
| "hasAgentReply": item["id"] in self.memory.replied_post_ids, | |
| } | |
| ) | |
| user = json.dumps( | |
| { | |
| "task": "Choose Moltbook actions for this heartbeat", | |
| "memory": self.memory.to_context(), | |
| "feed": compact, | |
| "constraints": { | |
| "maxRepliesPerCycle": self.cfg.max_replies_per_cycle, | |
| "maxUpvotesPerCycle": self.cfg.max_upvotes_per_cycle, | |
| }, | |
| }, | |
| ensure_ascii=True, | |
| ) | |
| return {"system": system, "user": user} | |
| def _clip(text: Any, limit: int = 220) -> str: | |
| out = " ".join(str(text or "").split()).strip() | |
| if len(out) <= limit: | |
| return out | |
| return out[: limit - 3] + "..." | |
| def _sanitize_decision(self, decision: Dict[str, Any], feed_ids: Set[str]) -> Dict[str, Any]: | |
| replies = [] | |
| for raw in decision.get("replies", []) if isinstance(decision.get("replies"), list) else []: | |
| if not isinstance(raw, dict): | |
| continue | |
| post_id = str(raw.get("postId") or "").strip() | |
| content = self._clip(raw.get("content")) | |
| if not post_id or not content or post_id not in feed_ids: | |
| continue | |
| if post_id in self.memory.replied_post_ids: | |
| continue | |
| replies.append({"postId": post_id, "content": content}) | |
| if len(replies) >= self.cfg.max_replies_per_cycle: | |
| break | |
| upvotes = [] | |
| seen = set() | |
| for raw in decision.get("upvotes", []) if isinstance(decision.get("upvotes"), list) else []: | |
| post_id = str(raw).strip() | |
| if not post_id or post_id in seen or post_id not in feed_ids: | |
| continue | |
| if post_id in self.memory.upvoted_post_ids: | |
| continue | |
| seen.add(post_id) | |
| upvotes.append(post_id) | |
| if len(upvotes) >= self.cfg.max_upvotes_per_cycle: | |
| break | |
| post = None | |
| raw_post = decision.get("post") | |
| if isinstance(raw_post, dict): | |
| content = self._clip(raw_post.get("content")) | |
| if content: | |
| post = {"content": content} | |
| # Prefer reply over self-post to reduce spam. | |
| if replies: | |
| post = None | |
| note = self._clip(decision.get("memory_note") or decision.get("memoryNote") or "heartbeat complete", 160) | |
| return {"post": post, "replies": replies, "upvotes": upvotes, "memoryNote": note} | |
| def run_cycle(self) -> None: | |
| started = time.time() | |
| self._set_last_status("running") | |
| feed = self.moltbook.read_feed(self.cfg.max_feed_items) | |
| try: | |
| activity = self.moltbook.get_activity(self.cfg.max_feed_items) | |
| except Exception as exc: | |
| self.logger.warn("Unable to read activity, fallback to feed-only memory", {"error": str(exc)}) | |
| activity = [] | |
| self.memory.rebuild(feed, activity) | |
| prompts = self._build_prompts(feed) | |
| decision_raw = self.llm.decide(prompts["system"], prompts["user"]) | |
| decision = self._sanitize_decision(decision_raw, {p["id"] for p in feed}) | |
| summary = {"dryRun": self.cfg.dry_run, "upvotes": 0, "replies": 0, "posts": 0} | |
| if self.cfg.dry_run: | |
| self.logger.info("DRY_RUN enabled; no writes sent", {"decision": decision}) | |
| else: | |
| for post_id in decision["upvotes"]: | |
| self.moltbook.upvote(post_id) | |
| self.memory.upvoted_post_ids.add(post_id) | |
| summary["upvotes"] += 1 | |
| for reply in decision["replies"]: | |
| self.moltbook.reply(reply["postId"], reply["content"]) | |
| self.memory.replied_post_ids.add(reply["postId"]) | |
| summary["replies"] += 1 | |
| if decision["post"]: | |
| self.moltbook.post(decision["post"]["content"]) | |
| summary["posts"] += 1 | |
| self.memory.add_note(decision["memoryNote"]) | |
| elapsed_ms = int((time.time() - started) * 1000) | |
| self.logger.info("Heartbeat cycle complete", {"elapsedMs": elapsed_ms, "feedItems": len(feed), "summary": summary}) | |
| self._set_last_status("idle") | |
| def _set_last_status(self, value: str) -> None: | |
| with self._state_lock: | |
| self.last_status = value | |
| def status_snapshot(self) -> Dict[str, Any]: | |
| with self._state_lock: | |
| return { | |
| "running": self.running, | |
| "last_status": self.last_status, | |
| "dry_run": self.cfg.dry_run, | |
| } | |
| def _loop(self) -> None: | |
| self.logger.info("Agent loop started", {"dryRun": self.cfg.dry_run}) | |
| while not self._stop_event.is_set(): | |
| try: | |
| self.run_cycle() | |
| except Exception as exc: | |
| self._set_last_status("error") | |
| self.logger.error("Heartbeat cycle failed", {"error": str(exc)}) | |
| sleep_for = self.cfg.heartbeat_interval_s + random.randint(0, self.cfg.heartbeat_jitter_s) | |
| if self._stop_event.wait(timeout=sleep_for): | |
| break | |
| with self._state_lock: | |
| self.running = False | |
| self.logger.info("Agent loop stopped") | |
| def start(self) -> str: | |
| with self._state_lock: | |
| if self.running: | |
| return "already running" | |
| self.running = True | |
| self.last_status = "idle" | |
| self._stop_event.clear() | |
| self.thread = threading.Thread(target=self._loop, daemon=True) | |
| self.thread.start() | |
| return "started" | |
| def stop(self) -> str: | |
| with self._state_lock: | |
| if not self.running: | |
| return "already stopped" | |
| self._stop_event.set() | |
| return "stopping" | |
| logger = Logger() | |
| startup_error = None | |
| agent: Optional[AutonomousAgent] = None | |
| chat_handler: Optional[ChatHandler] = None | |
| try: | |
| config = load_config() | |
| install_requests_network_guard(config) | |
| agent = AutonomousAgent(config, logger) | |
| chat_handler = ChatHandler(agent.llm) | |
| logger.info("Configuration loaded", {"allowedHosts": list(config.allowed_hosts), "dryRun": config.dry_run}) | |
| except Exception as exc: # noqa: BLE001 | |
| startup_error = str(exc) | |
| chat_handler = ChatHandler(llm=None, startup_error=startup_error) | |
| logger.error("Startup configuration failed", {"error": startup_error}) | |
| controller = UIController( | |
| logger=logger, | |
| chat_handler=chat_handler, | |
| agent=agent, | |
| startup_error=startup_error, | |
| ) | |
| demo = create_demo(controller) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT", "7860"))) | |