Spaces:
Running
Running
| import os | |
| import uuid | |
| import time | |
| import random | |
| import asyncio | |
| import threading | |
| import json | |
| import re | |
| import psutil | |
| import httpx | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import JSONResponse, HTMLResponse | |
| # ==================================================================== | |
| # Configuration | |
| # ==================================================================== | |
| API_SECRET_KEY = os.getenv("API_SECRET_KEY", "change-me-secret") | |
| POOL_SIZE = int(os.getenv("POOL_SIZE", "5")) | |
| MAX_REQUESTS = int(os.getenv("MAX_REQUESTS", "30")) | |
| QUEUE_TIMEOUT = int(os.getenv("QUEUE_TIMEOUT", "300")) | |
| DUCK_MODELS = { | |
| "gpt-5-mini": "GPT-5 mini", | |
| "gpt-5": "GPT-5", | |
| "gpt-4o-mini": "GPT-4o mini", | |
| "o3-mini": "o3 mini", | |
| "gpt-oss-120b": "gpt-oss 120B", | |
| "claude-haiku-4-5": "Claude Haiku 4.5", | |
| "llama-4-scout": "Llama 4 Scout", | |
| "mistral-small-4": "Mistral Small 4", | |
| } | |
| ALL_MODELS = list(DUCK_MODELS.keys()) | |
| DEFAULT_MODEL = "gpt-5-mini" | |
| USER_AGENTS = [ | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", | |
| ] | |
| # ==================================================================== | |
| # Browser Worker | |
| # ==================================================================== | |
| class BrowserWorker: | |
| def __init__(self, worker_id: int, loop: asyncio.AbstractEventLoop): | |
| self.id = worker_id | |
| self.loop = loop | |
| self.context = None | |
| self._browser = None | |
| self.busy = False | |
| self._lock = asyncio.Lock() | |
| self._request_count = 0 | |
| self._total_count = 0 | |
| def tag(self, msg: str) -> str: | |
| return f"[W{self.id}] {msg}" | |
| async def init(self, playwright): | |
| self._playwright = playwright | |
| self._browser = await playwright.chromium.launch( | |
| headless=True, | |
| channel="chrome", | |
| args=[ | |
| "--disable-blink-features=AutomationControlled", | |
| "--no-sandbox", "--disable-gpu", | |
| "--disable-dev-shm-usage", "--disable-setuid-sandbox", | |
| "--single-process", "--no-zygote", | |
| ], | |
| ) | |
| await self._create_context() | |
| await self._accept_terms() | |
| print(self.tag("Ready ✓")) | |
| async def _create_context(self, ua: str = None): | |
| user_agent = ua or random.choice(USER_AGENTS) | |
| self.context = await self._browser.new_context( | |
| user_agent=user_agent, | |
| viewport={"width": 1920, "height": 1080}, | |
| ) | |
| await self.context.add_init_script( | |
| "Object.defineProperty(navigator,'webdriver',{get:()=>undefined})" | |
| ) | |
| print(self.tag(f"Context created (UA: ...{user_agent[-30:]}) ✓")) | |
| async def _accept_terms(self): | |
| page = await self.context.new_page() | |
| try: | |
| print(self.tag("Accepting terms...")) | |
| await page.goto("https://duckduckgo.com/aichat", wait_until="domcontentloaded") | |
| await asyncio.sleep(5) | |
| agree = page.locator('button:has-text("Agree and Continue")') | |
| await agree.wait_for(state="visible", timeout=15000) | |
| await agree.click() | |
| print(self.tag("Terms accepted ✓")) | |
| await asyncio.sleep(3) | |
| await page.wait_for_selector('textarea[name="user-prompt"]', timeout=20000) | |
| except Exception as e: | |
| print(self.tag(f"Terms note: {e}")) | |
| finally: | |
| await page.close() | |
| async def _rotate_context(self): | |
| print(self.tag(f"Rotating context after {self._request_count} requests...")) | |
| try: | |
| await self.context.close() | |
| except Exception: | |
| pass | |
| await self._create_context(ua=random.choice(USER_AGENTS)) | |
| await self._accept_terms() | |
| self._request_count = 0 | |
| print(self.tag("Context rotated ✓")) | |
| async def chat(self, model_label: str, prompt: str) -> str: | |
| async with self._lock: | |
| self.busy = True | |
| try: | |
| return await self._do_chat(model_label, prompt) | |
| finally: | |
| self.busy = False | |
| async def _do_chat(self, model_label: str, prompt: str) -> str: | |
| await asyncio.sleep(random.uniform(0.5, 2.0)) | |
| self._request_count += 1 | |
| self._total_count += 1 | |
| if self._request_count >= MAX_REQUESTS: | |
| await self._rotate_context() | |
| page = await self.context.new_page() | |
| try: | |
| page.set_default_timeout(180000) | |
| await page.goto("https://duckduckgo.com/aichat", wait_until="domcontentloaded") | |
| await asyncio.sleep(3) | |
| try: | |
| agree = page.locator('button:has-text("Agree and Continue")') | |
| if await agree.count() > 0 and await agree.first.is_visible(): | |
| await agree.first.click() | |
| await asyncio.sleep(2) | |
| except Exception: | |
| pass | |
| await page.wait_for_selector('textarea[name="user-prompt"]') | |
| print(self.tag(f"Input ready (req #{self._total_count}) ✓")) | |
| await self._select_model(page, model_label) | |
| await self._send(page, prompt) | |
| return await self._extract(page) | |
| except Exception as e: | |
| print(self.tag(f"Error: {e}")) | |
| raise RuntimeError(f"duck.ai error: {e}") | |
| finally: | |
| await page.close() | |
| async def _select_model(self, page, model_label: str): | |
| try: | |
| btn = page.locator('button[data-testid="model-select-button"]') | |
| text = (await btn.inner_text()).strip() | |
| print(self.tag(f"Current model: {text}")) | |
| if model_label.lower() in text.lower(): | |
| return | |
| await btn.click() | |
| await asyncio.sleep(2) | |
| option = page.locator( | |
| f'li:has-text("{model_label}"), ' | |
| f'[role="option"]:has-text("{model_label}"), ' | |
| f'button:has-text("{model_label}")' | |
| ) | |
| if await option.count() > 0: | |
| await option.first.click() | |
| print(self.tag(f"Model → {model_label} ✓")) | |
| await asyncio.sleep(1) | |
| start = page.locator( | |
| 'button:has-text("Start chat"), button:has-text("Start new chat")' | |
| ) | |
| if await start.count() > 0: | |
| await start.first.click() | |
| print(self.tag("Start chat ✓")) | |
| await asyncio.sleep(2) | |
| await page.wait_for_selector( | |
| 'textarea[name="user-prompt"]', state="visible", timeout=15000 | |
| ) | |
| print(self.tag("Textarea ready ✓")) | |
| else: | |
| await page.keyboard.press("Escape") | |
| print(self.tag("Model not found, using default")) | |
| except Exception as e: | |
| print(self.tag(f"Model select (non-fatal): {e}")) | |
| async def _send(self, page, prompt: str): | |
| await page.evaluate( | |
| """ | |
| (text) => { | |
| const ta = document.querySelector('textarea[name="user-prompt"]'); | |
| if (!ta) return; | |
| const setter = Object.getOwnPropertyDescriptor( | |
| window.HTMLTextAreaElement.prototype, 'value' | |
| ).set; | |
| setter.call(ta, text); | |
| ta.dispatchEvent(new InputEvent('input', { bubbles: true })); | |
| ta.dispatchEvent(new Event('change', { bubbles: true })); | |
| ta.focus(); | |
| } | |
| """, | |
| prompt | |
| ) | |
| await asyncio.sleep(2) | |
| sent = False | |
| try: | |
| btn = page.locator('button[type="submit"][aria-label="Send"]') | |
| if await btn.get_attribute("disabled") is None: | |
| await btn.click() | |
| sent = True | |
| print(self.tag(f"Sent via button ✓ ({len(prompt)} chars)")) | |
| except Exception: | |
| pass | |
| if not sent: | |
| ta = page.locator('textarea[name="user-prompt"]') | |
| await ta.click() | |
| await asyncio.sleep(0.3) | |
| await page.keyboard.press("Enter") | |
| print(self.tag("Sent via Enter ✓")) | |
| async def _extract(self, page) -> str: | |
| await asyncio.sleep(3) | |
| try: | |
| copy = page.locator('button[data-copyairesponse="true"]') | |
| await copy.last.wait_for(state="visible", timeout=90000) | |
| print(self.tag("Response complete ✓")) | |
| except Exception: | |
| for _ in range(75): | |
| await asyncio.sleep(2) | |
| if await page.locator( | |
| 'button[aria-label="Stop generating"]:not([disabled])' | |
| ).count() == 0: | |
| print(self.tag("Response complete (fallback) ✓")) | |
| break | |
| await asyncio.sleep(1) | |
| text = await page.evaluate(""" | |
| () => { | |
| const active = document.querySelector('[data-activeresponse="true"]'); | |
| if (active) { | |
| const sp = active.querySelector('.space-y-4'); | |
| if (sp && sp.innerText.trim().length > 0) return sp.innerText.trim(); | |
| const prose = active.querySelector('[class*="whitespace-normal"],[class*="prose"]'); | |
| if (prose && prose.innerText.trim().length > 0) return prose.innerText.trim(); | |
| } | |
| const all = document.querySelectorAll('[data-activeresponse]'); | |
| if (all.length > 0) { | |
| const last = all[all.length - 1]; | |
| const sp = last.querySelector('.space-y-4'); | |
| if (sp) return sp.innerText.trim(); | |
| return last.innerText.trim(); | |
| } | |
| const arts = document.querySelectorAll('article'); | |
| if (arts.length > 0) return arts[arts.length - 1].innerText.trim(); | |
| const divs = document.querySelectorAll('.space-y-4'); | |
| for (let i = divs.length - 1; i >= 0; i--) { | |
| if (divs[i].innerText.trim().length > 10) return divs[i].innerText.trim(); | |
| } | |
| return ''; | |
| } | |
| """) | |
| if not text or len(text.strip()) < 5: | |
| await asyncio.sleep(5) | |
| text = await page.evaluate(""" | |
| () => { | |
| const sp = document.querySelectorAll('.space-y-4'); | |
| if (sp.length > 0) return sp[sp.length-1].innerText.trim(); | |
| return document.body.innerText.slice(0, 5000); | |
| } | |
| """) | |
| print(self.tag(f"Extracted {len(text)} chars ✓")) | |
| return text.strip() | |
| # ==================================================================== | |
| # Browser Pool Manager | |
| # ==================================================================== | |
| class BrowserPool: | |
| def __init__(self): | |
| self.loop = asyncio.new_event_loop() | |
| self.workers: list[BrowserWorker] = [] | |
| self.ready_event = threading.Event() | |
| self._thread = threading.Thread(target=self._run, daemon=True) | |
| self._queue: asyncio.Queue | None = None | |
| self._total_requests = 0 | |
| self._rejected = 0 | |
| def start(self): | |
| self._thread.start() | |
| print(f"[POOL] Starting {POOL_SIZE} browsers " | |
| f"(rotation every {MAX_REQUESTS} req, queue timeout {QUEUE_TIMEOUT}s)...") | |
| def _run(self): | |
| asyncio.set_event_loop(self.loop) | |
| self.loop.run_until_complete(self._init_pool()) | |
| self.ready_event.set() | |
| print(f"[POOL] All {POOL_SIZE} browsers ready! ✓") | |
| self.loop.run_forever() | |
| async def _init_pool(self): | |
| from playwright.async_api import async_playwright | |
| self._queue = asyncio.Queue() | |
| pw = await async_playwright().start() | |
| workers = [BrowserWorker(i + 1, self.loop) for i in range(POOL_SIZE)] | |
| await asyncio.gather(*[w.init(pw) for w in workers]) | |
| self.workers = workers | |
| for w in workers: | |
| await self._queue.put(w) | |
| async def _process(self, model_label: str, prompt: str) -> str: | |
| self._total_requests += 1 | |
| try: | |
| worker: BrowserWorker = await asyncio.wait_for( | |
| self._queue.get(), timeout=QUEUE_TIMEOUT | |
| ) | |
| except asyncio.TimeoutError: | |
| self._rejected += 1 | |
| print(f"[POOL] ⚠️ Queue timeout! All {POOL_SIZE} workers busy. " | |
| f"Rejected: {self._rejected}") | |
| raise RuntimeError( | |
| f"All {POOL_SIZE} workers are busy. " | |
| f"Please retry in a moment. (rejected total: {self._rejected})" | |
| ) | |
| print(f"[POOL] Assigned W{worker.id} (total req: {self._total_requests}) ✓") | |
| try: | |
| return await worker.chat(model_label, prompt) | |
| finally: | |
| await self._queue.put(worker) | |
| print(f"[POOL] W{worker.id} returned to pool ✓") | |
| def process(self, model_label: str, prompt: str) -> str: | |
| if not self.ready_event.wait(timeout=180): | |
| raise RuntimeError("Pool not ready") | |
| future = asyncio.run_coroutine_threadsafe( | |
| self._process(model_label, prompt), self.loop | |
| ) | |
| return future.result(timeout=QUEUE_TIMEOUT + 240) | |
| pool = BrowserPool() | |
| pool.start() | |
| # ==================================================================== | |
| # Keep-Alive (يمنع HuggingFace من إيقاف Space بسبب عدم النشاط) | |
| # ==================================================================== | |
| _keep_alive_count = 0 | |
| async def _keep_alive(): | |
| global _keep_alive_count | |
| # انتظر حتى يكون السيرفر جاهزاً | |
| await asyncio.sleep(30) | |
| print("[KEEP-ALIVE] Started — ping every 4 minutes ✓") | |
| while True: | |
| await asyncio.sleep(240) # كل 4 دقائق | |
| try: | |
| async with httpx.AsyncClient(timeout=10) as client: | |
| r = await client.get("http://localhost:7860/") | |
| _keep_alive_count += 1 | |
| print(f"[KEEP-ALIVE] Ping #{_keep_alive_count} → {r.status_code} ✓") | |
| except Exception as e: | |
| print(f"[KEEP-ALIVE] Ping failed (non-fatal): {e}") | |
| # ==================================================================== | |
| # Helpers | |
| # ==================================================================== | |
| def _extract_content(msg: dict) -> str: | |
| content = msg.get("content", "") | |
| if isinstance(content, list): | |
| return "\n".join( | |
| item.get("text", item.get("content", str(item))) | |
| if isinstance(item, dict) else str(item) | |
| for item in content | |
| ) | |
| return str(content) if content else "" | |
| def _build_prompt(messages: list) -> str: | |
| parts = [] | |
| for msg in messages: | |
| role = msg.get("role", "user") | |
| content = _extract_content(msg) | |
| if not content.strip(): | |
| continue | |
| if role == "system": | |
| parts.append( | |
| f"=== SYSTEM INSTRUCTIONS ===\n{content}\n=== END INSTRUCTIONS ===" | |
| ) | |
| elif role == "assistant": | |
| parts.append(f"[Assistant]: {content}") | |
| else: | |
| parts.append(content) | |
| return "\n\n".join(parts) | |
| def _parse_tool_calls(text: str): | |
| cleaned = text.strip() | |
| if "```" in cleaned: | |
| m = re.search(r'```(?:json)?\s*\n?(.*?)\n?\s*```', cleaned, re.DOTALL) | |
| if m: | |
| cleaned = m.group(1).strip() | |
| candidates = [cleaned] | |
| m2 = re.search(r'\{[\s\S]*"tool_calls"[\s\S]*\}', cleaned) | |
| if m2: | |
| candidates.append(m2.group(0)) | |
| for c in candidates: | |
| try: | |
| parsed = json.loads(c) | |
| if isinstance(parsed, dict) and "tool_calls" in parsed: | |
| raw = parsed["tool_calls"] | |
| if isinstance(raw, list) and raw: | |
| return [{ | |
| "id": f"call_{uuid.uuid4().hex[:24]}", | |
| "type": "function", | |
| "function": { | |
| "name": call.get("name", ""), | |
| "arguments": ( | |
| json.dumps(call.get("arguments", {}), ensure_ascii=False) | |
| if isinstance(call.get("arguments"), dict) | |
| else str(call.get("arguments", "{}")) | |
| ), | |
| }, | |
| } for call in raw] | |
| except (json.JSONDecodeError, TypeError, KeyError): | |
| continue | |
| return None | |
| def _auth(request: Request) -> bool: | |
| return ( | |
| request.headers.get("authorization", "") | |
| .replace("Bearer ", "").strip() == API_SECRET_KEY | |
| ) | |
| def _auth_token(request: Request, token: str = "") -> bool: | |
| header_key = ( | |
| request.headers.get("authorization", "") | |
| .replace("Bearer ", "").strip() | |
| ) | |
| return header_key == API_SECRET_KEY or token == API_SECRET_KEY | |
| def _get_model_label(model: str) -> str: | |
| return DUCK_MODELS.get(model, DUCK_MODELS[DEFAULT_MODEL]) | |
| def _make_completion(start_time, model, text, messages, tools=None): | |
| p = sum(len(_extract_content(m).split()) for m in messages) | |
| c = len(text.split()) | |
| tc = _parse_tool_calls(text) if tools else None | |
| if tc: | |
| return { | |
| "id": f"chatcmpl-{uuid.uuid4().hex[:29]}", "object": "chat.completion", | |
| "created": int(start_time), "model": model, | |
| "choices": [{"index": 0, "message": { | |
| "role": "assistant", "content": None, "tool_calls": tc | |
| }, "finish_reason": "tool_calls"}], | |
| "usage": {"prompt_tokens": p, "completion_tokens": c, "total_tokens": p + c}, | |
| } | |
| return { | |
| "id": f"chatcmpl-{uuid.uuid4().hex[:29]}", "object": "chat.completion", | |
| "created": int(start_time), "model": model, | |
| "choices": [{"index": 0, "message": { | |
| "role": "assistant", "content": text | |
| }, "finish_reason": "stop"}], | |
| "usage": {"prompt_tokens": p, "completion_tokens": c, "total_tokens": p + c}, | |
| } | |
| # ==================================================================== | |
| # FastAPI | |
| # ==================================================================== | |
| app = FastAPI(title="Duck.ai API Server") | |
| async def startup_event(): | |
| asyncio.create_task(_keep_alive()) | |
| print("[APP] Keep-alive task registered ✓") | |
| async def chat_completions(request: Request): | |
| try: | |
| data = await request.json() | |
| except Exception: | |
| return JSONResponse(status_code=400, content={"error": {"message": "Invalid JSON"}}) | |
| if not _auth(request): | |
| return JSONResponse(status_code=401, content={"error": {"message": "Invalid API Key"}}) | |
| messages = data.get("messages", []) | |
| if not messages: | |
| return JSONResponse(status_code=400, content={"error": {"message": "messages required"}}) | |
| model = data.get("model", DEFAULT_MODEL) | |
| tools = data.get("tools", None) | |
| start_time = time.time() | |
| model_label = _get_model_label(model) | |
| prompt = _build_prompt(messages) | |
| print(f"[API] /v1/chat/completions → {model} ({model_label})") | |
| try: | |
| text = await asyncio.get_event_loop().run_in_executor( | |
| None, pool.process, model_label, prompt | |
| ) | |
| return _make_completion(start_time, model, text, messages, tools) | |
| except Exception as e: | |
| print(f"[API] ERROR: {e}") | |
| status = 503 if "busy" in str(e).lower() else 500 | |
| return JSONResponse(status_code=status, content={"error": {"message": str(e)}}) | |
| async def responses(request: Request): | |
| try: | |
| data = await request.json() | |
| except Exception: | |
| return JSONResponse(status_code=400, content={"error": {"message": "Invalid JSON"}}) | |
| if not _auth(request): | |
| return JSONResponse(status_code=401, content={"error": {"message": "Invalid API Key"}}) | |
| input_data = data.get("input", "") | |
| if isinstance(input_data, str): | |
| messages = [{"role": "user", "content": input_data}] | |
| elif isinstance(input_data, list): | |
| messages = input_data | |
| else: | |
| messages = data.get("messages", []) | |
| if not messages: | |
| return JSONResponse(status_code=400, content={"error": {"message": "input required"}}) | |
| model = data.get("model", DEFAULT_MODEL) | |
| tools = data.get("tools", None) | |
| instructions = data.get("instructions", "") | |
| if instructions: | |
| messages.insert(0, {"role": "system", "content": instructions}) | |
| start_time = time.time() | |
| model_label = _get_model_label(model) | |
| prompt = _build_prompt(messages) | |
| print(f"[API] /v1/responses → {model} ({model_label})") | |
| try: | |
| text = await asyncio.get_event_loop().run_in_executor( | |
| None, pool.process, model_label, prompt | |
| ) | |
| p = sum(len(_extract_content(m).split()) for m in messages) | |
| c = len(text.split()) | |
| tc = _parse_tool_calls(text) if tools else None | |
| if tc: | |
| return { | |
| "id": f"resp-{uuid.uuid4().hex[:29]}", "object": "response", | |
| "created_at": int(start_time), "model": model, "status": "completed", | |
| "output": [{"type": "function_call", "id": t["id"], "call_id": t["id"], | |
| "name": t["function"]["name"], | |
| "arguments": t["function"]["arguments"], | |
| "status": "completed"} for t in tc], | |
| "usage": {"input_tokens": p, "output_tokens": c, "total_tokens": p + c}, | |
| } | |
| return { | |
| "id": f"resp-{uuid.uuid4().hex[:29]}", "object": "response", | |
| "created_at": int(start_time), "model": model, "status": "completed", | |
| "output": [{"type": "message", "role": "assistant", | |
| "content": [{"type": "output_text", "text": text}]}], | |
| "usage": {"input_tokens": p, "output_tokens": c, "total_tokens": p + c}, | |
| } | |
| except Exception as e: | |
| print(f"[API] ERROR: {e}") | |
| status = 503 if "busy" in str(e).lower() else 500 | |
| return JSONResponse(status_code=status, content={"error": {"message": str(e)}}) | |
| async def list_models(request: Request): | |
| if not _auth(request): | |
| return JSONResponse(status_code=401, content={"error": {"message": "Invalid API Key"}}) | |
| return { | |
| "object": "list", | |
| "data": [{"id": m, "object": "model", "owned_by": "duck.ai"} for m in ALL_MODELS], | |
| } | |
| async def health(request: Request, token: str = ""): | |
| if not _auth_token(request, token): | |
| return JSONResponse(status_code=401, content={"error": {"message": "Invalid API Key"}}) | |
| busy = sum(1 for w in pool.workers if w.busy) | |
| stats = [ | |
| { | |
| "id": w.id, | |
| "busy": w.busy, | |
| "total_requests": w._total_count, | |
| "requests_until_rotation": MAX_REQUESTS - w._request_count, | |
| } | |
| for w in pool.workers | |
| ] | |
| mem = psutil.virtual_memory() | |
| return { | |
| "status": "running", | |
| "message": "Duck.ai API Pool Server is active!", | |
| "models": ALL_MODELS, | |
| "pool_size": POOL_SIZE, | |
| "workers_busy": busy, | |
| "workers_free": POOL_SIZE - busy, | |
| "total_requests": pool._total_requests, | |
| "rejected_requests": pool._rejected, | |
| "queue_timeout_sec": QUEUE_TIMEOUT, | |
| "keep_alive_pings": _keep_alive_count, | |
| "workers": stats, | |
| "ram": { | |
| "used_gb": round(mem.used / 1024**3, 2), | |
| "total_gb": round(mem.total / 1024**3, 2), | |
| "percent": mem.percent, | |
| }, | |
| "cpu": psutil.cpu_percent(interval=None), | |
| } | |
| async def root(): | |
| return { | |
| "status": "running", | |
| "message": "Duck.ai API Pool Server is active!", | |
| "docs": "/docs", | |
| "health": "/health", | |
| "keep_alive_pings": _keep_alive_count, | |
| } | |
| async def dashboard(request: Request, token: str = ""): | |
| if not _auth_token(request, token): | |
| return HTMLResponse( | |
| "<h1 style='font-family:sans-serif;text-align:center;margin-top:20%;" | |
| "color:#ef5350'>401 — Unauthorized</h1>", | |
| status_code=401 | |
| ) | |
| with open("dashboard.html", "r", encoding="utf-8") as f: | |
| return HTMLResponse(f.read()) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |