| import os |
| import io |
| import re |
| import json |
| import time |
| import uuid |
| import sqlite3 |
| import asyncio |
| import hashlib |
| import signal |
| import threading |
| import traceback |
| import subprocess |
| import logging |
| from pathlib import Path |
| from datetime import datetime, timedelta |
| from contextlib import redirect_stdout, redirect_stderr |
|
|
|
|
| class Config: |
| |
| BOT_TOKEN = "8088897119:AAGJxbBUH6bB-IcjAvPR4z77ApzAKCFfTIU" |
| ADMIN_IDS = [8225686030,7373296624] |
| OWNER_USERNAMES = ["nameofbless", "simulateneous", "walkingwithgod"] |
| OWNER_CAN_FORCE_AGENT_FOR_ALL = True |
|
|
| |
| OPENAI_KEY = "" |
| ANTHROPIC_KEY = "" |
| GROQ_KEY = "" |
| GOOGLE_KEY = "AIzaSyDHOEiPizohtUmK-q50-w842MsAiFEyHm4" |
| CUSTOM_AI_URL = "https://bjo53-brukguardian.hf.space/v1/chat/completions" |
| CUSTOM_AI_KEY = "pekka-secret-key" |
| CUSTOM_AI_MODEL = "brukguardian-v1" |
| CUSTOM_AI_FALLBACK_URL = "https://bjo53-brukguardian.hf.space/v1/chat/completions" |
| DEFAULT_MODEL = "gemini-2.5-flash-lite" |
| |
| |
| OLLAMA_URL = "http://127.0.0.1:11434" |
| OLLAMA_VISION_MODEL = "llava:7b" |
|
|
| |
| SUPABASE_URL = "https://xhqwtjlydysanoquaham.supabase.co" |
| SUPABASE_KEY = "sb_publishable_Gaqx237PmZQsixs8VdUjAw_fxQE3uui" |
|
|
| |
| GOOGLE_CLIENT_SECRET = "./credentials.json" |
| GOOGLE_TOKEN_PATH = "./token.json" |
| YOUTUBE_DEFAULT_PRIVACY = "private" |
| ENABLE_YOUTUBE_UPLOAD = True |
|
|
| |
| |
| WEATHER_KEY = "" |
| SMTP_USER = "brukg9419@gmail.com" |
| SMTP_PASS = "ygcvdddqdyxttwia" |
| SMTP_HOST = "smtp.gmail.com" |
| SMTP_PORT = 587 |
| IMAP_HOST = "imap.gmail.com" |
| IMAP_PORT = 993 |
| IMAP_USER = "brukg9419@gmail.com" |
| IMAP_PASS = "ygcvdddqdyxttwia" |
|
|
| |
| MAX_HISTORY = 62 |
| MAX_TOOL_LOOPS = 10 |
| CODE_TIMEOUT = 45 |
|
|
| DATA_DIR = "./data" |
| LOGS_DIR = "./logs" |
| BOTS_DIR = "./spawned_bots" |
| DB_PATH = os.path.join(DATA_DIR, "agentforge.db") |
| SYSTEM_FLOW_PATH = "./SYSTEM_FLOW.md" |
|
|
| PROXY_TARGET = "https://lucky-hat-e0d0.brukg9419.workers.dev" |
| CLOUDFLARE_IP = "104.21.28.169" |
| BRIDGE_PORT = 7860 |
|
|
| @classmethod |
| def is_admin(cls, uid): |
| return uid in cls.ADMIN_IDS |
|
|
| @classmethod |
| def has_supabase(cls): |
| return bool(cls.SUPABASE_URL and cls.SUPABASE_KEY) |
|
|
|
|
| for d in [Config.DATA_DIR, Config.LOGS_DIR, Config.BOTS_DIR]: |
| os.makedirs(d, exist_ok=True) |
|
|
|
|
| class LiveLog: |
| def __init__(self, max_entries=500): |
| self._entries = [] |
| self._max = max_entries |
| self._lock = threading.Lock() |
|
|
| def _add(self, level, src, msg): |
| with self._lock: |
| self._entries.append({"ts": datetime.now().strftime("%H:%M:%S"), "level": level, "src": src, "msg": str(msg)[:800]}) |
| if len(self._entries) > self._max: |
| self._entries = self._entries[-self._max :] |
| print(f"[{level}] {src}: {msg}") |
|
|
| def info(self, src, msg): self._add("INFO", src, msg) |
| def warn(self, src, msg): self._add("WARN", src, msg) |
| def error(self, src, msg): self._add("ERR", src, msg) |
|
|
| def get(self, count=30): |
| with self._lock: |
| return self._entries[-count:] |
|
|
|
|
| live_log = LiveLog() |
|
|
|
|
| def init_database(): |
| conn = sqlite3.connect(Config.DB_PATH) |
| conn.executescript( |
| """ |
| CREATE TABLE IF NOT EXISTS users ( |
| telegram_id INTEGER PRIMARY KEY, |
| username TEXT, |
| first_name TEXT, |
| is_banned INTEGER DEFAULT 0, |
| preferred_model TEXT DEFAULT 'gpt-4o-mini', |
| system_prompt TEXT DEFAULT '', |
| temperature REAL DEFAULT 0.7, |
| total_messages INTEGER DEFAULT 0, |
| total_tokens INTEGER DEFAULT 0, |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP, |
| last_active TEXT DEFAULT CURRENT_TIMESTAMP |
| ); |
| CREATE TABLE IF NOT EXISTS messages ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| user_id INTEGER, |
| chat_id INTEGER, |
| role TEXT, |
| content TEXT, |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP |
| ); |
| CREATE TABLE IF NOT EXISTS scheduled_tasks ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| user_id INTEGER, |
| chat_id INTEGER, |
| task_prompt TEXT, |
| run_at TEXT, |
| repeat_seconds INTEGER DEFAULT 0, |
| status TEXT DEFAULT 'pending', |
| message TEXT DEFAULT 'Scheduled task', |
| last_result TEXT, |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP |
| ); |
| CREATE TABLE IF NOT EXISTS tool_log ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| user_id INTEGER, |
| tool TEXT, |
| success INTEGER, |
| elapsed REAL, |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP |
| ); |
| CREATE TABLE IF NOT EXISTS spawned_bots ( |
| token_hash TEXT PRIMARY KEY, |
| owner_id INTEGER, |
| name TEXT, |
| status TEXT DEFAULT 'running', |
| pid INTEGER, |
| file_path TEXT, |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP |
| ); |
| |
| CREATE TABLE IF NOT EXISTS boss_messages ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| sender_id INTEGER, |
| sender_username TEXT, |
| content TEXT, |
| notified INTEGER DEFAULT 0, |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP |
| ); |
| CREATE TABLE IF NOT EXISTS bot_buttons ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| label TEXT, |
| url TEXT, |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP |
| ); |
| CREATE TABLE IF NOT EXISTS youtube_logs ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| video_id TEXT, |
| title TEXT, |
| status TEXT, |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP |
| ); |
| CREATE TABLE IF NOT EXISTS kv ( |
| key TEXT PRIMARY KEY, |
| value TEXT |
| ); |
| """ |
| ) |
| conn.commit() |
| conn.close() |
|
|
|
|
| init_database() |
|
|
|
|
| class DB: |
| _lock = threading.Lock() |
|
|
| @staticmethod |
| def q(query, params=(), fetch=False, fetchone=False): |
| with DB._lock: |
| conn = sqlite3.connect(Config.DB_PATH, check_same_thread=False) |
| conn.row_factory = sqlite3.Row |
| cur = conn.cursor() |
| try: |
| cur.execute(query, params) |
| if fetchone: return cur.fetchone() |
| if fetch: return cur.fetchall() |
| conn.commit() |
| return cur.lastrowid |
| finally: |
| conn.close() |
|
|
| @staticmethod |
| def upsert_user(tid, username="", first_name=""): |
| row = DB.q("SELECT telegram_id FROM users WHERE telegram_id=?", (tid,), fetchone=True) |
| if row: |
| DB.q("UPDATE users SET last_active=CURRENT_TIMESTAMP WHERE telegram_id=?", (tid,)) |
| else: |
| DB.q("INSERT INTO users (telegram_id,username,first_name) VALUES (?,?,?)", (tid, username, first_name)) |
|
|
| @staticmethod |
| def get_user(tid): |
| return DB.q("SELECT * FROM users WHERE telegram_id=?", (tid,), fetchone=True) |
|
|
| @staticmethod |
| def inc_usage(tid, tokens=0): |
| DB.q("UPDATE users SET total_messages=total_messages+1,total_tokens=total_tokens+?,last_active=CURRENT_TIMESTAMP WHERE telegram_id=?", (tokens, tid)) |
|
|
|
|
| class Memory: |
| def __init__(self): |
| self.convs = {} |
|
|
| def key(self, uid, cid): |
| return f"{uid}:{cid}" |
|
|
| def add(self, uid, cid, role, content): |
| k = self.key(uid, cid) |
| self.convs.setdefault(k, []).append({"role": role, "content": content}) |
| if len(self.convs[k]) > Config.MAX_HISTORY * 2: |
| self.convs[k] = self.convs[k][-Config.MAX_HISTORY :] |
| try: |
| DB.q("INSERT INTO messages (user_id,chat_id,role,content) VALUES (?,?,?,?)", (uid, cid, role, (content or "")[:12000])) |
| except Exception: |
| pass |
|
|
| def _load_db_history(self, uid, cid, limit=20): |
| rows = DB.q( |
| "SELECT role,content FROM messages WHERE user_id=? AND chat_id=? ORDER BY id DESC LIMIT ?", |
| (uid, cid, int(limit)), |
| fetch=True, |
| ) |
| if not rows: |
| return [] |
| return [{"role": r["role"], "content": r["content"]} for r in reversed(rows)] |
|
|
| def history(self, uid, cid, limit=20): |
| k = self.key(uid, cid) |
| local = self.convs.get(k, [])[-limit:] |
| if len(local) >= max(4, limit // 2): |
| return local |
| db_hist = self._load_db_history(uid, cid, limit=limit) |
| if db_hist: |
| self.convs[k] = db_hist[-Config.MAX_HISTORY :] |
| return db_hist[-limit:] |
| return local |
|
|
|
|
| memory = Memory() |
|
|
|
|
| try: |
| from supabase import create_client |
| except Exception: |
| create_client = None |
|
|
|
|
| class SupabaseStore: |
| def __init__(self): |
| self.client = None |
| if create_client and Config.has_supabase(): |
| try: |
| self.client = create_client(Config.SUPABASE_URL, Config.SUPABASE_KEY) |
| except Exception as exc: |
| live_log.warn("Supabase", f"init failed: {exc}") |
|
|
| def enabled(self): |
| return self.client is not None |
|
|
| async def save_memory(self, user_id, username, role, content): |
| if not self.client: |
| return |
| def _run(): |
| self.client.table("memories").insert({ |
| "user_id": user_id, |
| "username": username, |
| "role": role, |
| "content": (content or "")[:4000], |
| }).execute() |
| try: |
| await asyncio.to_thread(_run) |
| except Exception as exc: |
| live_log.warn("Supabase", f"save_memory: {exc}") |
|
|
| async def add_button(self, label, url): |
| if not self.client: |
| return "Supabase not configured" |
| def _run(): |
| self.client.table("bot_buttons").insert({"label": label, "url": url}).execute() |
| await asyncio.to_thread(_run) |
| return f"Button '{label}' added" |
|
|
| async def get_buttons(self): |
| if not self.client: |
| return [] |
| def _run(): |
| return self.client.table("bot_buttons").select("label,url").execute() |
| try: |
| res = await asyncio.to_thread(_run) |
| return res.data or [] |
| except Exception: |
| return [] |
|
|
| async def log_youtube(self, video_id, title, status="published"): |
| if not self.client: |
| return |
| def _run(): |
| self.client.table("youtube_logs").insert({"video_id": video_id, "title": title, "status": status}).execute() |
| try: |
| await asyncio.to_thread(_run) |
| except Exception as exc: |
| live_log.warn("Supabase", f"youtube log: {exc}") |
|
|
|
|
| supabase_store = SupabaseStore() |
|
|
| GOOGLE_SCOPES = [ |
| "https://www.googleapis.com/auth/gmail.readonly", |
| "https://www.googleapis.com/auth/youtube.upload", |
| ] |
|
|
|
|
| def get_google_service(service_name: str, version: str): |
| try: |
| from google.oauth2.credentials import Credentials |
| from google_auth_oauthlib.flow import InstalledAppFlow |
| from google.auth.transport.requests import Request |
| from googleapiclient.discovery import build as google_build |
| except Exception: |
| return None |
|
|
| creds = None |
| token_path = Path(Config.GOOGLE_TOKEN_PATH) |
| if token_path.exists(): |
| creds = Credentials.from_authorized_user_file(str(token_path), GOOGLE_SCOPES) |
|
|
| if not creds or not creds.valid: |
| if creds and creds.expired and creds.refresh_token: |
| try: |
| creds.refresh(Request()) |
| except Exception: |
| return None |
| else: |
| secret = Path(Config.GOOGLE_CLIENT_SECRET) |
| if not secret.exists(): |
| return None |
| try: |
| flow = InstalledAppFlow.from_client_secrets_file(str(secret), GOOGLE_SCOPES) |
| creds = flow.run_local_server(port=0) |
| except Exception: |
| return None |
| token_path.write_text(creds.to_json(), encoding="utf-8") |
|
|
| try: |
| return google_build(service_name, version, credentials=creds) |
| except Exception: |
| return None |
|
|
|
|
| def load_system_flow_text(): |
| p = Path(Config.SYSTEM_FLOW_PATH) |
| return p.read_text(encoding="utf-8")[:50000] if p.exists() else "SYSTEM_FLOW.md missing" |
|
|
|
|
| class LLM: |
| MODELS = { |
| "gpt-4o": "openai", |
| "gpt-4o-mini": "openai", |
| "claude-3-5-sonnet-20241022": "anthropic", |
| "llama-3.3-70b-versatile": "groq", |
| "gemini-2.5-flash-lite": "google", |
| } |
| if Config.CUSTOM_AI_MODEL: |
| MODELS[Config.CUSTOM_AI_MODEL] = "custom" |
|
|
| NATIVE_TOOLS = {"openai", "anthropic", "groq"} |
|
|
| def __init__(self): |
| self._oa = None |
| self._an = None |
| self._gr = None |
|
|
| def supports_native_tools(self, model): |
| return self.MODELS.get(model, "") in self.NATIVE_TOOLS |
|
|
| @property |
| def oa(self): |
| if not self._oa and Config.OPENAI_KEY: |
| import openai |
| self._oa = openai.AsyncOpenAI(api_key=Config.OPENAI_KEY) |
| return self._oa |
|
|
| @property |
| def an(self): |
| if not self._an and Config.ANTHROPIC_KEY: |
| import anthropic |
| self._an = anthropic.AsyncAnthropic(api_key=Config.ANTHROPIC_KEY) |
| return self._an |
|
|
| @property |
| def gr(self): |
| if not self._gr and Config.GROQ_KEY: |
| from groq import AsyncGroq |
| self._gr = AsyncGroq(api_key=Config.GROQ_KEY) |
| return self._gr |
|
|
| async def chat(self, msgs, model=None, temp=0.7, max_tok=2500, tools=None): |
| model = model or Config.DEFAULT_MODEL |
| provider = self.MODELS.get(model, "openai") |
| try: |
| if provider == "openai" and Config.OPENAI_KEY: |
| return await self._openai(msgs, model, temp, max_tok, tools) |
| if provider == "anthropic" and Config.ANTHROPIC_KEY: |
| return await self._anthropic(msgs, model, temp, max_tok, tools) |
| if provider == "groq" and Config.GROQ_KEY: |
| return await self._groq(msgs, model, temp, max_tok, tools) |
| if provider == "google" and Config.GOOGLE_KEY: |
| return await self._google(msgs, model, temp, max_tok) |
| if provider == "custom" and Config.CUSTOM_AI_URL and Config.CUSTOM_AI_KEY: |
| return await self._custom(msgs, model, temp, max_tok) |
| return await self._custom(msgs, model, temp, max_tok) if (Config.CUSTOM_AI_URL and Config.CUSTOM_AI_KEY) else {"content": "No text model provider configured", "tool_calls": [], "usage": {"total_tokens": 0}, "model": model} |
| except Exception as exc: |
| live_log.error("LLM", exc) |
| return {"content": f"LLM error: {exc}", "tool_calls": [], "usage": {"total_tokens": 0}, "model": model} |
|
|
| async def _openai(self, m, model, t, mt, tools): |
| kwargs = dict(model=model, messages=m, temperature=t, max_tokens=mt) |
| if tools: kwargs.update({"tools": tools, "tool_choice": "auto"}) |
| r = await self.oa.chat.completions.create(**kwargs) |
| c = r.choices[0] |
| tc = [{"id": x.id, "function": {"name": x.function.name, "arguments": x.function.arguments}} for x in (c.message.tool_calls or [])] |
| return {"content": c.message.content or "", "tool_calls": tc, "usage": {"total_tokens": r.usage.total_tokens if r.usage else 0}, "model": model} |
|
|
| async def _anthropic(self, msgs, model, t, mt, tools): |
| sys_text = "" |
| conv = [] |
| for m in msgs: |
| if m["role"] == "system": sys_text += m["content"] + "\n" |
| elif m["role"] == "tool": |
| conv.append({"role": "user", "content": [{"type": "tool_result", "tool_use_id": m.get("tool_call_id", "x"), "content": m["content"]}]}) |
| else: conv.append({"role": m["role"], "content": m["content"]}) |
| kwargs = {"model": model, "messages": conv, "temperature": t, "max_tokens": mt} |
| if sys_text.strip(): kwargs["system"] = sys_text.strip() |
| if tools: kwargs["tools"] = [{"name": x["function"]["name"], "description": x["function"]["description"], "input_schema": x["function"]["parameters"]} for x in tools] |
| r = await self.an.messages.create(**kwargs) |
| content, tc = "", [] |
| for b in r.content: |
| if b.type == "text": content += b.text |
| elif b.type == "tool_use": tc.append({"id": b.id, "function": {"name": b.name, "arguments": json.dumps(b.input)}}) |
| return {"content": content, "tool_calls": tc, "usage": {"total_tokens": r.usage.input_tokens + r.usage.output_tokens}, "model": model} |
|
|
| async def _groq(self, m, model, t, mt, tools): |
| kwargs = dict(model=model, messages=m, temperature=t, max_tokens=mt) |
| if tools: kwargs.update({"tools": tools, "tool_choice": "auto"}) |
| r = await self.gr.chat.completions.create(**kwargs) |
| c = r.choices[0] |
| tc = [{"id": x.id, "function": {"name": x.function.name, "arguments": x.function.arguments}} for x in (c.message.tool_calls or [])] |
| return {"content": c.message.content or "", "tool_calls": tc, "usage": {"total_tokens": r.usage.total_tokens if r.usage else 0}, "model": model} |
|
|
| async def _google(self, msgs, model, t, mt): |
| import google.generativeai as genai |
| genai.configure(api_key=Config.GOOGLE_KEY) |
| gm = genai.GenerativeModel(model) |
| combined = "\n\n".join(f"{x['role']}: {x['content']}" for x in msgs if isinstance(x.get("content"), str)) |
| r = await asyncio.to_thread(gm.generate_content, combined, generation_config=genai.types.GenerationConfig(temperature=t, max_output_tokens=mt)) |
| return {"content": getattr(r, "text", ""), "tool_calls": [], "usage": {"total_tokens": 0}, "model": model} |
|
|
| async def _custom(self, msgs, model, t, mt): |
| payload = {"model": model, "messages": msgs, "temperature": t, "max_tokens": mt, "stream": False} |
|
|
| async def _call(url): |
| cmd = [ |
| "curl", "-X", "POST", url, |
| "-H", f"Authorization: Bearer {Config.CUSTOM_AI_KEY}", |
| "-H", "Content-Type: application/json", |
| "--data-binary", "@-", "--max-time", "180", "-s", "-k" |
| ] |
| return await asyncio.to_thread(lambda: subprocess.run(cmd, input=json.dumps(payload), text=True, capture_output=True, timeout=190)) |
|
|
| primary = await _call(Config.CUSTOM_AI_URL) |
| data = json.loads(primary.stdout) if primary.stdout else {} |
|
|
| primary_failed_404 = ( |
| primary.returncode == 0 |
| and ( |
| data.get("status") == 404 |
| or "404" in (primary.stdout or "") |
| or "not found" in (str(data.get("error", "")).lower()) |
| ) |
| ) |
|
|
| if primary_failed_404 and Config.CUSTOM_AI_FALLBACK_URL: |
| live_log.warn("LLM", f"custom endpoint returned 404, retrying fallback {Config.CUSTOM_AI_FALLBACK_URL}") |
| fallback = await _call(Config.CUSTOM_AI_FALLBACK_URL) |
| if fallback.returncode == 0 and fallback.stdout: |
| data = json.loads(fallback.stdout) |
|
|
| if "choices" not in data: |
| err = data.get("error") or data.get("detail") or (primary.stderr[:300] if primary.stderr else "unknown") |
| return {"content": f"Custom AI error: {err}", "tool_calls": [], "usage": {"total_tokens": 0}, "model": model} |
|
|
| msg = data.get("choices", [{}])[0].get("message", {}) |
| usage = data.get("usage", {}) |
| tok = usage.get("total_tokens", 0) or usage.get("prompt_tokens", 0) + usage.get("completion_tokens", 0) |
| return {"content": msg.get("content", ""), "tool_calls": [], "usage": {"total_tokens": tok}, "model": data.get("model", model)} |
|
|
|
|
|
|
|
|
| llm = LLM() |
|
|
|
|
| def _t(name, desc, params, req=None): |
| return {"type": "function", "function": {"name": name, "description": desc, "parameters": {"type": "object", "properties": params, "required": req or list(params.keys())}}} |
|
|
|
|
| ALL_TOOLS = [ |
| _t("web_search", "Search web", {"query": {"type": "string"}}, ["query"]), |
| _t("read_webpage", "Read URL text", {"url": {"type": "string"}}, ["url"]), |
| _t("execute_python", "Execute python", {"code": {"type": "string"}}, ["code"]), |
| _t("run_shell", "Run shell", {"command": {"type": "string"}}, ["command"]), |
| _t("file_read", "Read file", {"path": {"type": "string"}}, ["path"]), |
| _t("file_write", "Write file", {"path": {"type": "string"}, "content": {"type": "string"}}, ["path", "content"]), |
| _t("self_modify", "Safe self modify with rollback", {"file": {"type": "string"}, "mode": {"type": "string", "enum": ["replace", "append", "patch"]}, "content": {"type": "string"}, "find": {"type": "string"}, "replace_with": {"type": "string"}}, ["file", "mode"]), |
| _t("read_logs", "Read runtime logs", {"count": {"type": "integer", "default": 30}}, []), |
| _t("screenshot", "Take website screenshot", {"url": {"type": "string"}}, ["url"]), |
| _t("text_to_speech", "Create mp3 from text", {"text": {"type": "string"}}, ["text"]), |
| _t("create_text_file", "Create downloadable text file", {"filename": {"type": "string"}, "content": {"type": "string"}}, ["filename", "content"]), |
| _t("system_info", "System metrics", {}, []), |
| _t("calculator", "Math eval", {"expression": {"type": "string"}}, ["expression"]), |
| _t("get_weather", "Weather", {"city": {"type": "string"}}, ["city"]), |
| _t("http_request", "HTTP request", {"url": {"type": "string"}, "method": {"type": "string", "default": "GET"}}, ["url"]), |
| _t("send_email", "Send email", {"to": {"type": "string"}, "subject": {"type": "string"}, "body": {"type": "string"}}, ["to", "subject", "body"]), |
| _t("read_email", "Read inbox emails", {"limit": {"type": "integer", "default": 5}}, []), |
| _t("analyze_image", "Analyze image via Ollama vision model", {"image_b64": {"type": "string"}, "prompt": {"type": "string", "default": "Describe this image"}}, ["image_b64"]), |
| _t("create_gmail_alias", "Create plus-alias from IMAP_USER", {"service_name": {"type": "string"}}, ["service_name"]), |
| _t("read_verification_code", "Read latest gmail message sent to alias", {"alias_email": {"type": "string"}}, ["alias_email"]), |
| _t("youtube_upload", "Upload local video file to YouTube", {"file_path": {"type": "string"}, "title": {"type": "string"}, "description": {"type": "string"}}, ["file_path", "title"]), |
| _t("add_button", "Add /start menu button in Supabase", {"text": {"type": "string"}, "url": {"type": "string"}}, ["text", "url"]), |
| _t("leave_message_for_boss", "Store message for bot owners", {"content": {"type": "string"}}, ["content"]), |
| _t("list_boss_messages", "List pending owner messages", {"only_unread": {"type": "boolean", "default": True}}, []), |
| _t("restart_system", "Restart bot process", {}, []), |
| _t("schedule_task", "Schedule alarm/task", {"delay_seconds": {"type": "integer"}, "task_prompt": {"type": "string"}, "message": {"type": "string"}, "repeat": {"type": "boolean", "default": False}}, ["delay_seconds", "task_prompt"]), |
| _t("spawn_bot", "Spawn extra telegram bot process", {"token": {"type": "string"}, "name": {"type": "string", "default": "SubBot"}, "system_prompt": {"type": "string", "default": "You are helpful"}}, ["token"]), |
| _t("manage_bots", "List/stop spawned bots", {"action": {"type": "string", "enum": ["list", "stop"]}, "token_hash": {"type": "string"}}, ["action"]), |
| _t("agent_dispatch", "Internal debate", {"question": {"type": "string"}}, ["question"]), |
| ] |
|
|
|
|
| def parse_tool_calls(text): |
| calls, clean = [], text |
| for m in re.finditer(r"<tool_call>\s*(\{.*?\})\s*</tool_call>", text, re.DOTALL): |
| try: |
| d = json.loads(m.group(1)) |
| if d.get("name"): |
| calls.append({"name": d["name"], "args": d.get("args", {}) if isinstance(d.get("args", {}), dict) else {}}) |
| clean = clean.replace(m.group(0), "") |
| except Exception: |
| pass |
| return clean.strip(), calls |
|
|
|
|
| def parse_channels(text): |
| usr = re.search(r"<user_response>(.*?)</user_response>", text, re.DOTALL) |
| sys = re.search(r"<system_note>(.*?)</system_note>", text, re.DOTALL) |
| user_text = usr.group(1).strip() if usr else text.strip() |
| system_note = sys.group(1).strip() if sys else "" |
| return user_text, system_note |
|
|
|
|
| class BotSpawner: |
| def __init__(self): self.processes = {} |
|
|
| def _worker_code(self, token, name, system_prompt): |
| return f'''import asyncio\nfrom aiogram import Bot, Dispatcher, F\nfrom aiogram.types import Message\nfrom aiogram.filters import CommandStart\nfrom openai import AsyncOpenAI\n\ndp=Dispatcher()\nTOKEN={token!r}\nNAME={name!r}\nSP={system_prompt!r}\n\n@dp.message(CommandStart())\nasync def s(m: Message):\n await m.answer(f"Hi from {{NAME}}")\n\n@dp.message(F.text)\nasync def t(m: Message):\n try:\n c=AsyncOpenAI()\n r=await c.chat.completions.create(model="gpt-4o-mini",messages=[{{"role":"system","content":SP}},{{"role":"user","content":m.text}}],max_tokens=700)\n await m.answer((r.choices[0].message.content or "")[:3500])\n except Exception as e:\n await m.answer(f"Worker error: {{e}}")\n\nasync def main():\n await dp.start_polling(Bot(TOKEN))\n\nasyncio.run(main())\n''' |
|
|
| def spawn(self, owner_id, token, name, system_prompt): |
| h = hashlib.sha256(token.encode()).hexdigest()[:16] |
| if h in self.processes: return f"Already running: {h}" |
| fp = Path(Config.BOTS_DIR) / f"bot_{h}.py" |
| fp.write_text(self._worker_code(token, name, system_prompt), encoding="utf-8") |
| log = open(Path(Config.LOGS_DIR) / f"bot_{h}.log", "a", encoding="utf-8") |
| p = subprocess.Popen(["python", str(fp)], stdout=log, stderr=log, preexec_fn=os.setsid) |
| self.processes[h] = {"pid": p.pid, "file": str(fp), "name": name} |
| DB.q("INSERT OR REPLACE INTO spawned_bots (token_hash,owner_id,name,status,pid,file_path) VALUES (?,?,?,?,?,?)", (h, owner_id, name, "running", p.pid, str(fp))) |
| return f"Spawned {name} hash={h} pid={p.pid}" |
|
|
| def stop(self, h): |
| row = self.processes.get(h) |
| pid = row["pid"] if row else (DB.q("SELECT pid FROM spawned_bots WHERE token_hash=?", (h,), fetchone=True) or {}).get("pid") |
| if not pid: return "Not found" |
| try: |
| os.killpg(os.getpgid(pid), signal.SIGTERM) |
| except Exception: |
| try: os.kill(pid, signal.SIGTERM) |
| except Exception as exc: return f"Failed to stop: {exc}" |
| self.processes.pop(h, None) |
| DB.q("UPDATE spawned_bots SET status='stopped' WHERE token_hash=?", (h,)) |
| return f"Stopped {h}" |
|
|
| def list(self): |
| rows = DB.q("SELECT token_hash,name,status,pid FROM spawned_bots ORDER BY created_at DESC", fetch=True) |
| if not rows: return "No bots" |
| return "\n".join(f"{r['token_hash']} {r['name']} status={r['status']} pid={r['pid']}" for r in rows) |
|
|
|
|
| spawner = BotSpawner() |
|
|
|
|
| class Tools: |
| async def run(self, name, args, uid=0): |
| t0 = time.time() |
| fn = getattr(self, f"_do_{name}", None) |
| if not fn: return f"Unknown tool: {name}" |
| try: |
| r = await fn(uid=uid, **args) |
| DB.q("INSERT INTO tool_log (user_id,tool,success,elapsed) VALUES (?,?,1,?)", (uid, name, time.time() - t0)) |
| return str(r)[:20000] |
| except Exception as exc: |
| DB.q("INSERT INTO tool_log (user_id,tool,success,elapsed) VALUES (?,?,0,?)", (uid, name, time.time() - t0)) |
| live_log.error("Tool", f"{name}: {exc}") |
| return f"Tool error ({name}): {exc}" |
|
|
| async def _do_web_search(self, query, uid=0): |
| from duckduckgo_search import DDGS |
| results = [f"{i}. {r.get('title','')}\n{r.get('href','')}\n{r.get('body','')}" for i, r in enumerate(DDGS().text(query, max_results=5), 1)] |
| return "\n\n".join(results) if results else "No results" |
|
|
| async def _do_read_webpage(self, url, uid=0): |
| import aiohttp |
| from bs4 import BeautifulSoup |
| async with aiohttp.ClientSession() as s: |
| async with s.get(url, timeout=25) as r: |
| html = await r.text() |
| soup = BeautifulSoup(html, "html.parser") |
| for tag in soup(["script", "style", "nav", "footer"]): tag.decompose() |
| return soup.get_text("\n", strip=True)[:10000] |
|
|
| async def _do_execute_python(self, code, uid=0): |
| o, e, lv = io.StringIO(), io.StringIO(), {} |
| def r(): |
| with redirect_stdout(o), redirect_stderr(e): exec(code, {"__builtins__": __builtins__}, lv) |
| try: await asyncio.wait_for(asyncio.to_thread(r), timeout=Config.CODE_TIMEOUT) |
| except asyncio.TimeoutError: return f"Timeout ({Config.CODE_TIMEOUT}s)" |
| out = o.getvalue() + (f"\nStderr:\n{e.getvalue()}" if e.getvalue() else "") |
| return out[:10000] if out else str(lv.get("result", "Executed")) |
|
|
| async def _do_run_shell(self, command, uid=0): |
| for b in ["rm -rf /", "mkfs", ":(){ :|:& };:"]: |
| if b in command: return "Blocked dangerous command" |
| p = await asyncio.create_subprocess_shell(command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) |
| so, se = await asyncio.wait_for(p.communicate(), timeout=120) |
| txt = so.decode(errors="replace") + (("\nStderr: " + se.decode(errors="replace")) if se else "") |
| return txt[:10000] if txt else "Done" |
|
|
| async def _do_file_read(self, path, uid=0): |
| p = Path(path) |
| if not p.exists(): return "Not found" |
| return p.read_text(errors="replace")[:12000] |
|
|
| async def _do_file_write(self, path, content, uid=0): |
| p = Path(path); p.parent.mkdir(parents=True, exist_ok=True); p.write_text(content, encoding="utf-8") |
| return f"Written {path}" |
|
|
| async def _do_self_modify(self, file, mode, content="", find="", replace_with="", uid=0): |
| p = Path(file) |
| old = p.read_text(encoding="utf-8") if p.exists() else "" |
| backup = p.with_suffix(p.suffix + ".bak") |
| backup.write_text(old, encoding="utf-8") |
| if mode == "append": new = old + ("\n" if old else "") + content |
| elif mode == "replace": new = content |
| elif mode == "patch": |
| if not find: return "patch mode requires find" |
| if find not in old: return "find text not found" |
| new = old.replace(find, replace_with) |
| else: return "invalid mode" |
| p.write_text(new, encoding="utf-8") |
| if p.suffix == ".py": |
| c = await asyncio.create_subprocess_exec("python", "-m", "py_compile", str(p), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) |
| _o, er = await c.communicate() |
| if c.returncode != 0: |
| p.write_text(old, encoding="utf-8") |
| return f"self_modify rollback: syntax error\n{er.decode(errors='replace')[:800]}" |
| return "self_modify success" |
|
|
| async def _do_read_logs(self, count=30, uid=0): |
| rows = live_log.get(count) |
| return "\n".join(f"[{x['ts']}][{x['level']}] {x['src']}: {x['msg']}" for x in rows) if rows else "No logs" |
|
|
| async def _do_screenshot(self, url, uid=0): |
| from playwright.async_api import async_playwright |
| path = os.path.join(Config.DATA_DIR, f"ss_{int(time.time())}.png") |
| async with async_playwright() as p: |
| b = await p.chromium.launch(headless=True) |
| pg = await b.new_page(viewport={"width": 1366, "height": 768}) |
| await pg.goto(url, wait_until="domcontentloaded", timeout=30000) |
| await pg.screenshot(path=path, full_page=True) |
| await b.close() |
| return json.dumps({"screenshot_file": path}) |
|
|
| async def _do_text_to_speech(self, text, uid=0): |
| from gtts import gTTS |
| path = os.path.join(Config.DATA_DIR, f"tts_{int(time.time())}.mp3") |
| await asyncio.to_thread(lambda: gTTS(text=text[:5000], lang="en").save(path)) |
| return json.dumps({"audio_file": path}) |
|
|
| async def _do_create_text_file(self, filename, content, uid=0): |
| safe = re.sub(r"[^a-zA-Z0-9_.-]", "_", filename) |
| path = os.path.join(Config.DATA_DIR, safe) |
| Path(path).write_text(content, encoding="utf-8") |
| return json.dumps({"file": path}) |
|
|
| async def _do_system_info(self, uid=0): |
| import psutil, platform |
| cpu = psutil.cpu_percent(interval=1); mem = psutil.virtual_memory(); disk = psutil.disk_usage("/") |
| return f"CPU:{cpu}% RAM:{mem.percent}% Disk:{disk.percent}% OS:{platform.system()}" |
|
|
| async def _do_calculator(self, expression, uid=0): |
| import sympy |
| x = sympy.sympify(expression) |
| return f"{expression} = {x.evalf()}" |
|
|
| async def _do_get_weather(self, city, uid=0): |
| if not Config.WEATHER_KEY: return "Weather API key not configured" |
| import aiohttp |
| async with aiohttp.ClientSession() as s: |
| async with s.get(f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid={Config.WEATHER_KEY}&units=metric") as r: |
| d = await r.json() |
| if d.get("cod") != 200: return f"Error: {d.get('message','unknown')}" |
| return f"{d['name']}: {d['main']['temp']}C, {d['weather'][0]['description']}" |
|
|
| async def _do_http_request(self, url, method="GET", uid=0): |
| import aiohttp |
| async with aiohttp.ClientSession() as s: |
| async with s.request(method, url, timeout=25) as r: |
| return f"Status {r.status}\n{(await r.text())[:5000]}" |
|
|
| async def _do_send_email(self, to, subject, body, uid=0): |
| if not Config.SMTP_USER or not Config.SMTP_PASS: return "SMTP not configured" |
| import smtplib |
| from email.mime.text import MIMEText |
| msg = MIMEText(body); msg["Subject"] = subject; msg["From"] = Config.SMTP_USER; msg["To"] = to |
| def sender(): |
| with smtplib.SMTP(Config.SMTP_HOST, Config.SMTP_PORT) as s: |
| s.starttls(); s.login(Config.SMTP_USER, Config.SMTP_PASS); s.send_message(msg) |
| await asyncio.to_thread(sender) |
| return f"Email sent to {to}" |
|
|
| async def _do_read_email(self, limit=5, uid=0): |
| if not Config.IMAP_HOST or not Config.IMAP_USER or not Config.IMAP_PASS: |
| return "IMAP not configured" |
| import imaplib |
| import email |
| def fetcher(): |
| m = imaplib.IMAP4_SSL(Config.IMAP_HOST, Config.IMAP_PORT) |
| m.login(Config.IMAP_USER, Config.IMAP_PASS) |
| m.select("INBOX") |
| typ, data = m.search(None, "ALL") |
| ids = data[0].split()[-limit:] |
| out = [] |
| for mid in reversed(ids): |
| typ, msg_data = m.fetch(mid, "(RFC822)") |
| msg = email.message_from_bytes(msg_data[0][1]) |
| out.append(f"From: {msg.get('From')} | Subject: {msg.get('Subject')} | Date: {msg.get('Date')}") |
| m.logout() |
| return "\n".join(out) if out else "No emails" |
| return await asyncio.to_thread(fetcher) |
|
|
| async def _do_analyze_image(self, image_b64, prompt="Describe this image", uid=0): |
| import base64 |
| import aiohttp |
| payload = { |
| "model": Config.OLLAMA_VISION_MODEL, |
| "prompt": prompt, |
| "images": [image_b64], |
| "stream": False, |
| } |
| try: |
| async with aiohttp.ClientSession() as s: |
| async with s.post(f"{Config.OLLAMA_URL}/api/generate", json=payload, timeout=180) as r: |
| data = await r.json() |
| return data.get("response", "No vision response") |
| except Exception as exc: |
| return f"vision failed: {exc}" |
|
|
| async def _do_create_gmail_alias(self, service_name, uid=0): |
| source = Config.IMAP_USER or Config.SMTP_USER |
| if not source or "@" not in source: |
| return "Set IMAP_USER or SMTP_USER first" |
| user, domain = source.split("@", 1) |
| safe = re.sub(r"[^a-zA-Z0-9_.-]", "", service_name) |
| return f"{user}+{safe}@{domain}" |
|
|
| async def _do_read_verification_code(self, alias_email, uid=0): |
| svc = await asyncio.to_thread(get_google_service, "gmail", "v1") |
| if not svc: |
| return "Gmail auth not configured (credentials.json/token.json required)" |
| def _run(): |
| res = svc.users().messages().list(userId="me", q=f"to:{alias_email}", maxResults=1).execute() |
| msgs = res.get("messages", []) |
| if not msgs: |
| return f"No email found for {alias_email}" |
| msg = svc.users().messages().get(userId="me", id=msgs[0]["id"]).execute() |
| headers = msg.get("payload", {}).get("headers", []) |
| sub = next((h.get("value") for h in headers if h.get("name") == "Subject"), "") |
| frm = next((h.get("value") for h in headers if h.get("name") == "From"), "") |
| return f"From: {frm} | Subject: {sub} | Snippet: {msg.get('snippet','')}" |
| return await asyncio.to_thread(_run) |
|
|
| async def _do_youtube_upload(self, file_path, title, description="Uploaded by bot", uid=0): |
| if not Config.ENABLE_YOUTUBE_UPLOAD: |
| return "YouTube upload disabled" |
| p = Path(file_path) |
| if not p.exists(): |
| return f"File not found: {file_path}" |
| svc = await asyncio.to_thread(get_google_service, "youtube", "v3") |
| if not svc: |
| return "YouTube auth not configured (credentials.json/token.json required)" |
| try: |
| from googleapiclient.http import MediaFileUpload |
| except Exception: |
| return "google-api-python-client not installed" |
| body = { |
| "snippet": {"title": title, "description": description, "categoryId": "22"}, |
| "status": {"privacyStatus": Config.YOUTUBE_DEFAULT_PRIVACY}, |
| } |
| def _up(): |
| req = svc.videos().insert(part="snippet,status", body=body, media_body=MediaFileUpload(str(p))) |
| return req.execute() |
| try: |
| resp = await asyncio.to_thread(_up) |
| vid = resp.get("id", "unknown") |
| await supabase_store.log_youtube(vid, title) |
| return f"Uploaded: https://youtu.be/{vid}" |
| except Exception as exc: |
| return f"YouTube upload failed: {exc}" |
|
|
| async def _do_add_button(self, text, url, uid=0): |
| if not Config.is_admin(uid): |
| return "add_button is admin only" |
| return await supabase_store.add_button(text, url) |
|
|
| async def _do_leave_message_for_boss(self, content, uid=0): |
| username = "" |
| u = DB.get_user(uid) |
| if u: |
| username = u["username"] or "" |
| DB.q("INSERT INTO boss_messages (sender_id,sender_username,content,notified) VALUES (?,?,?,0)", (uid, username, content[:4000])) |
| return "Message saved for boss" |
|
|
| async def _do_list_boss_messages(self, only_unread=True, uid=0): |
| if not Config.is_admin(uid): |
| return "list_boss_messages is admin only" |
| if only_unread: |
| rows = DB.q("SELECT id,sender_username,sender_id,content,created_at FROM boss_messages WHERE notified=0 ORDER BY id DESC LIMIT 20", fetch=True) |
| DB.q("UPDATE boss_messages SET notified=1 WHERE notified=0") |
| else: |
| rows = DB.q("SELECT id,sender_username,sender_id,content,created_at FROM boss_messages ORDER BY id DESC LIMIT 20", fetch=True) |
| if not rows: |
| return "No boss messages" |
| return "\n\n".join([f"#{r['id']} from @{r['sender_username'] or 'unknown'} ({r['sender_id']}) at {r['created_at']}\n{r['content']}" for r in rows]) |
|
|
| async def _do_restart_system(self, uid=0): |
| return "__RESTART__" |
|
|
| async def _do_schedule_task(self, delay_seconds, task_prompt, message="Scheduled task", repeat=False, uid=0): |
| run_at = datetime.now() + timedelta(seconds=delay_seconds) |
| rep = delay_seconds if repeat else 0 |
| DB.q("INSERT INTO scheduled_tasks (user_id,chat_id,task_prompt,run_at,repeat_seconds,message,status) VALUES (?,?,?,?,?,?,'pending')", (uid, uid, task_prompt, run_at.isoformat(), rep, message)) |
| scheduler.add_pending(uid, task_prompt, delay_seconds, repeat, message) |
| return f"Scheduled '{message}' in {delay_seconds}s" |
|
|
| async def _do_spawn_bot(self, token, name="SubBot", system_prompt="You are helpful", uid=0): |
| if not Config.is_admin(uid): return "spawn_bot is admin only" |
| return spawner.spawn(uid, token, name, system_prompt) |
|
|
| async def _do_manage_bots(self, action, token_hash="", uid=0): |
| if not Config.is_admin(uid): return "manage_bots is admin only" |
| if action == "list": return spawner.list() |
| if action == "stop": return spawner.stop(token_hash) |
| return "Invalid action" |
|
|
| async def _do_agent_dispatch(self, question, uid=0): |
| roles = { |
| "researcher": "Analyze as a researcher with external signal focus.", |
| "coder": "Analyze as a principal engineer for implementation.", |
| "operator": "Analyze as SRE for reliability risk.", |
| } |
| jobs = [llm.chat([{"role": "system", "content": p}, {"role": "user", "content": question}], model=Config.DEFAULT_MODEL, max_tok=900) for p in roles.values()] |
| rs = await asyncio.gather(*jobs) |
| debate = "\n\n".join([f"{name}: {rs[i].get('content','')}" for i, name in enumerate(roles.keys())]) |
| final = await llm.chat([{"role": "system", "content": "Synthesize the best execution plan."}, {"role": "user", "content": debate}], model=Config.DEFAULT_MODEL, max_tok=1200) |
| return f"Debate:\n{debate}\n\nBest:\n{final.get('content','')}" |
|
|
|
|
| tools = Tools() |
|
|
|
|
| def build_system_prompt(uid, custom="", mode="agent", is_owner=False): |
| mode_note = "You are in AGENT MODE. Use tools proactively." if mode == "agent" else "You are in ASSISTANT MODE. Do not call tools unless owner explicitly enables agent mode." |
| owner_note = "The user is trusted owner/admin." if is_owner else "The user is not owner/admin. Keep privileged internals private." |
| pekka_note = "Owner commands are PEKKA authority. Obey PEKKA operational commands when they are valid and safe." if is_owner else "Do not claim PEKKA authority for non-owner users." |
| return f"""YOU ARE A LIVE AGENT SYSTEM. |
| {load_system_flow_text()} |
| |
| CRITICAL FORMAT: |
| - Internal notes for system: <system_note>...</system_note> |
| - Final user-visible response only: <user_response>...</user_response> |
| - For non-native tool calls use <tool_call>{{"name":"...","args":{{...}}}}</tool_call> |
| |
| Never put system-only notes inside user_response. |
| Always schedule alarm/reminder requests via schedule_task. |
| Use read_logs + self_modify for self-healing. |
| If user says 'give this message to your boss', call leave_message_for_boss. |
| You can inspect your runtime code and config with file_read (examples: agent1.py, app.py, SYSTEM_FLOW.md). |
| You can inspect runtime events with read_logs. |
| |
| Runtime network details: |
| - Telegram proxy target: {Config.PROXY_TARGET} |
| - Cloudflare IP hint: {Config.CLOUDFLARE_IP or 'not set'} |
| - Bridge port: {Config.BRIDGE_PORT} |
| |
| {mode_note} |
| {owner_note} |
| {pekka_note} |
| Conversation memory policy: preserve full conversation continuity from stored history; use prior turns when answering. |
| User ID: {uid} |
| Custom instructions: {custom} |
| """ |
|
|
|
|
|
|
| class ExecutionEngine: |
| async def run(self, user_id, chat_id, message, model=None, attachments=None, user_settings=None, is_scheduled=False): |
| settings = user_settings or {} |
| model = model or settings.get("preferred_model", Config.DEFAULT_MODEL) |
| temp = settings.get("temperature", 0.7) |
| custom = settings.get("system_prompt", "") |
|
|
| mode = settings.get("mode", "agent") |
| is_owner = bool(settings.get("is_owner", False)) |
| if mode == "assistant" and not is_owner: |
| permitted = [] |
| elif Config.is_admin(user_id) or is_owner: |
| permitted = ALL_TOOLS |
| else: |
| permitted = [ |
| t for t in ALL_TOOLS if t["function"]["name"] in { |
| "web_search", "read_webpage", "calculator", "get_weather", "http_request", "schedule_task", "screenshot", "text_to_speech", "create_text_file", "leave_message_for_boss" |
| } |
| ] |
|
|
| msg = f"PEKKA: {message}" if is_owner else message |
| if attachments: |
| for a in attachments: |
| if a.get("type") == "image": msg += f"\n[Attached image: {a.get('meta','image')}]" |
| elif a.get("type") == "file": msg += f"\n[Attached file: {a.get('name','file')}]\n{a.get('preview','')[:2000]}" |
| elif a.get("type") == "audio": msg += "\n[Attached audio message]" |
|
|
| messages = [{"role": "system", "content": build_system_prompt(user_id, custom, mode=mode, is_owner=is_owner)}] |
| if not is_scheduled: messages.extend(memory.history(user_id, chat_id, Config.MAX_HISTORY)) |
| messages.append({"role": "user", "content": msg}) |
| if not is_scheduled: memory.add(user_id, chat_id, "user", message) |
|
|
| native = llm.supports_native_tools(model) |
| total_tokens, used_tools = 0, [] |
| screenshots, audio_files, files = [], [], [] |
| last_content = "" |
|
|
| for _ in range(Config.MAX_TOOL_LOOPS): |
| res = await llm.chat(messages, model=model, temp=temp, max_tok=2600, tools=permitted if native else None) |
| content = res.get("content", "") |
| last_content = content |
| total_tokens += res.get("usage", {}).get("total_tokens", 0) |
|
|
| calls = [] |
| if res.get("tool_calls"): |
| for tc in res["tool_calls"]: |
| try: args = json.loads(tc["function"]["arguments"]) |
| except Exception: args = {} |
| calls.append({"id": tc.get("id", str(uuid.uuid4())[:8]), "name": tc["function"]["name"], "args": args}) |
| else: |
| clean, parsed = parse_tool_calls(content) |
| content = clean |
| calls = [{"id": str(uuid.uuid4())[:8], **p} for p in parsed] |
|
|
| if not calls: |
| user_text, system_note = parse_channels(content or "Done") |
| if system_note: |
| live_log.info("SystemNote", system_note) |
| if not is_scheduled: memory.add(user_id, chat_id, "assistant", user_text) |
| DB.inc_usage(user_id, total_tokens) |
| return {"text": user_text, "system_note": system_note, "tokens": total_tokens, "tools_used": used_tools, "screenshots": screenshots, "audio_files": audio_files, "files": files} |
|
|
| if native: |
| messages.append({"role": "assistant", "content": content, "tool_calls": [{"id": c["id"], "type": "function", "function": {"name": c["name"], "arguments": json.dumps(c["args"])}} for c in calls]}) |
|
|
| feedback = "" |
| for c in calls: |
| used_tools.append(c["name"]) |
| tr = await tools.run(c["name"], c["args"], uid=user_id) |
| if c["name"] == "restart_system" and tr == "__RESTART__": |
| return {"text": "Restarting to apply updates...", "system_note": "restart requested", "tokens": total_tokens, "tools_used": used_tools, "screenshots": screenshots, "audio_files": audio_files, "files": files, "_restart": True} |
| if c["name"] == "screenshot": |
| try: screenshots.append(json.loads(tr).get("screenshot_file")) |
| except Exception: pass |
| if c["name"] == "text_to_speech": |
| try: audio_files.append(json.loads(tr).get("audio_file")) |
| except Exception: pass |
| if c["name"] == "create_text_file": |
| try: files.append(json.loads(tr).get("file")) |
| except Exception: pass |
|
|
| if native: |
| messages.append({"role": "tool", "tool_call_id": c["id"], "content": tr}) |
| else: |
| feedback += f"\n\nTool '{c['name']}' result:\n{tr}" |
|
|
| if not native: |
| if content: messages.append({"role": "assistant", "content": content}) |
| messages.append({"role": "user", "content": f"TOOL RESULTS:{feedback}\nNow provide <system_note> and <user_response>."}) |
|
|
| user_text, system_note = parse_channels(last_content or "Completed") |
| return {"text": user_text, "system_note": system_note, "tokens": total_tokens, "tools_used": used_tools, "screenshots": screenshots, "audio_files": audio_files, "files": files} |
|
|
|
|
| engine = ExecutionEngine() |
|
|
|
|
| class Scheduler: |
| def __init__(self): |
| self.pending = [] |
| self.running = False |
| self._task = None |
| self.bot = None |
|
|
| def set_bot(self, bot): self.bot = bot |
|
|
| def add_pending(self, uid, prompt, delay, repeat, message): |
| self.pending.append({"uid": uid, "prompt": prompt, "fire_at": time.time() + delay, "repeat_seconds": delay if repeat else 0, "message": message}) |
|
|
| async def start(self): |
| if self.running: return |
| self.running = True |
| self._task = asyncio.create_task(self._loop()) |
|
|
| async def _loop(self): |
| while self.running: |
| try: await self._tick() |
| except Exception as exc: live_log.error("Scheduler", exc) |
| await asyncio.sleep(3) |
|
|
| async def _tick(self): |
| now = time.time() |
| fired = [] |
| for i, t in enumerate(self.pending): |
| if now >= t["fire_at"]: |
| fired.append(i) |
| await self._execute(t["uid"], t["prompt"], t["message"]) |
| if t["repeat_seconds"] > 0: |
| t["fire_at"] = now + t["repeat_seconds"] |
| for i in reversed(fired): |
| if self.pending[i]["repeat_seconds"] == 0: |
| self.pending.pop(i) |
|
|
| rows = DB.q("SELECT * FROM scheduled_tasks WHERE status='pending' AND run_at<=datetime('now')", fetch=True) |
| for row in rows: |
| await self._execute_row(row) |
|
|
| async def _execute(self, uid, prompt, message): |
| r = await engine.run(user_id=uid, chat_id=uid, message=prompt, is_scheduled=True) |
| if self.bot: |
| try: await self.bot.send_message(uid, f"β° {message}\n\n{r.get('text','')[:3500]}") |
| except Exception as exc: live_log.error("Scheduler", exc) |
|
|
| async def _execute_row(self, row): |
| DB.q("UPDATE scheduled_tasks SET status='running' WHERE id=?", (row["id"],)) |
| try: |
| r = await engine.run(user_id=row["user_id"], chat_id=row["chat_id"], message=row["task_prompt"], is_scheduled=True) |
| DB.q("UPDATE scheduled_tasks SET status='done',last_result=? WHERE id=?", (r.get("text", "")[:1000], row["id"])) |
| if self.bot: |
| try: await self.bot.send_message(row["user_id"], f"β° {row['message']}\n\n{r.get('text','')[:3500]}") |
| except Exception as exc: live_log.error("Scheduler", exc) |
| if row["repeat_seconds"] and row["repeat_seconds"] > 0: |
| nx = datetime.now() + timedelta(seconds=row["repeat_seconds"]) |
| DB.q("INSERT INTO scheduled_tasks (user_id,chat_id,task_prompt,run_at,repeat_seconds,message,status) VALUES (?,?,?,?,?,?,'pending')", (row["user_id"], row["chat_id"], row["task_prompt"], nx.isoformat(), row["repeat_seconds"], row["message"])) |
| except Exception: |
| DB.q("UPDATE scheduled_tasks SET status='failed',last_result=? WHERE id=?", (traceback.format_exc()[:1500], row["id"])) |
|
|
|
|
| scheduler = Scheduler() |
|
|