import os import uuid import time import random import asyncio import threading import json import re import psutil import httpx from fastapi import FastAPI, Request from fastapi.responses import JSONResponse, HTMLResponse # ==================================================================== # Configuration # ==================================================================== API_SECRET_KEY = os.getenv("API_SECRET_KEY", "change-me-secret") POOL_SIZE = int(os.getenv("POOL_SIZE", "5")) MAX_REQUESTS = int(os.getenv("MAX_REQUESTS", "30")) QUEUE_TIMEOUT = int(os.getenv("QUEUE_TIMEOUT", "300")) DUCK_MODELS = { "gpt-5-mini": "GPT-5 mini", "gpt-5": "GPT-5", "gpt-4o-mini": "GPT-4o mini", "o3-mini": "o3 mini", "gpt-oss-120b": "gpt-oss 120B", "claude-haiku-4-5": "Claude Haiku 4.5", "llama-4-scout": "Llama 4 Scout", "mistral-small-4": "Mistral Small 4", } ALL_MODELS = list(DUCK_MODELS.keys()) DEFAULT_MODEL = "gpt-5-mini" USER_AGENTS = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", ] # ==================================================================== # Browser Worker # ==================================================================== class BrowserWorker: def __init__(self, worker_id: int, loop: asyncio.AbstractEventLoop): self.id = worker_id self.loop = loop self.context = None self._browser = None self.busy = False self._lock = asyncio.Lock() self._request_count = 0 self._total_count = 0 def tag(self, msg: str) -> str: return f"[W{self.id}] {msg}" async def init(self, playwright): self._playwright = playwright self._browser = await playwright.chromium.launch( headless=True, channel="chrome", args=[ "--disable-blink-features=AutomationControlled", "--no-sandbox", "--disable-gpu", "--disable-dev-shm-usage", "--disable-setuid-sandbox", "--single-process", "--no-zygote", ], ) await self._create_context() await self._accept_terms() print(self.tag("Ready ✓")) async def _create_context(self, ua: str = None): user_agent = ua or random.choice(USER_AGENTS) self.context = await self._browser.new_context( user_agent=user_agent, viewport={"width": 1920, "height": 1080}, ) await self.context.add_init_script( "Object.defineProperty(navigator,'webdriver',{get:()=>undefined})" ) print(self.tag(f"Context created (UA: ...{user_agent[-30:]}) ✓")) async def _accept_terms(self): page = await self.context.new_page() try: print(self.tag("Accepting terms...")) await page.goto("https://duckduckgo.com/aichat", wait_until="domcontentloaded") await asyncio.sleep(5) agree = page.locator('button:has-text("Agree and Continue")') await agree.wait_for(state="visible", timeout=15000) await agree.click() print(self.tag("Terms accepted ✓")) await asyncio.sleep(3) await page.wait_for_selector('textarea[name="user-prompt"]', timeout=20000) except Exception as e: print(self.tag(f"Terms note: {e}")) finally: await page.close() async def _rotate_context(self): print(self.tag(f"Rotating context after {self._request_count} requests...")) try: await self.context.close() except Exception: pass await self._create_context(ua=random.choice(USER_AGENTS)) await self._accept_terms() self._request_count = 0 print(self.tag("Context rotated ✓")) async def chat(self, model_label: str, prompt: str) -> str: async with self._lock: self.busy = True try: return await self._do_chat(model_label, prompt) finally: self.busy = False async def _do_chat(self, model_label: str, prompt: str) -> str: await asyncio.sleep(random.uniform(0.5, 2.0)) self._request_count += 1 self._total_count += 1 if self._request_count >= MAX_REQUESTS: await self._rotate_context() page = await self.context.new_page() try: page.set_default_timeout(180000) await page.goto("https://duckduckgo.com/aichat", wait_until="domcontentloaded") await asyncio.sleep(3) try: agree = page.locator('button:has-text("Agree and Continue")') if await agree.count() > 0 and await agree.first.is_visible(): await agree.first.click() await asyncio.sleep(2) except Exception: pass await page.wait_for_selector('textarea[name="user-prompt"]') print(self.tag(f"Input ready (req #{self._total_count}) ✓")) await self._select_model(page, model_label) await self._send(page, prompt) return await self._extract(page) except Exception as e: print(self.tag(f"Error: {e}")) raise RuntimeError(f"duck.ai error: {e}") finally: await page.close() async def _select_model(self, page, model_label: str): try: btn = page.locator('button[data-testid="model-select-button"]') text = (await btn.inner_text()).strip() print(self.tag(f"Current model: {text}")) if model_label.lower() in text.lower(): return await btn.click() await asyncio.sleep(2) option = page.locator( f'li:has-text("{model_label}"), ' f'[role="option"]:has-text("{model_label}"), ' f'button:has-text("{model_label}")' ) if await option.count() > 0: await option.first.click() print(self.tag(f"Model → {model_label} ✓")) await asyncio.sleep(1) start = page.locator( 'button:has-text("Start chat"), button:has-text("Start new chat")' ) if await start.count() > 0: await start.first.click() print(self.tag("Start chat ✓")) await asyncio.sleep(2) await page.wait_for_selector( 'textarea[name="user-prompt"]', state="visible", timeout=15000 ) print(self.tag("Textarea ready ✓")) else: await page.keyboard.press("Escape") print(self.tag("Model not found, using default")) except Exception as e: print(self.tag(f"Model select (non-fatal): {e}")) async def _send(self, page, prompt: str): await page.evaluate( """ (text) => { const ta = document.querySelector('textarea[name="user-prompt"]'); if (!ta) return; const setter = Object.getOwnPropertyDescriptor( window.HTMLTextAreaElement.prototype, 'value' ).set; setter.call(ta, text); ta.dispatchEvent(new InputEvent('input', { bubbles: true })); ta.dispatchEvent(new Event('change', { bubbles: true })); ta.focus(); } """, prompt ) await asyncio.sleep(2) sent = False try: btn = page.locator('button[type="submit"][aria-label="Send"]') if await btn.get_attribute("disabled") is None: await btn.click() sent = True print(self.tag(f"Sent via button ✓ ({len(prompt)} chars)")) except Exception: pass if not sent: ta = page.locator('textarea[name="user-prompt"]') await ta.click() await asyncio.sleep(0.3) await page.keyboard.press("Enter") print(self.tag("Sent via Enter ✓")) async def _extract(self, page) -> str: await asyncio.sleep(3) try: copy = page.locator('button[data-copyairesponse="true"]') await copy.last.wait_for(state="visible", timeout=90000) print(self.tag("Response complete ✓")) except Exception: for _ in range(75): await asyncio.sleep(2) if await page.locator( 'button[aria-label="Stop generating"]:not([disabled])' ).count() == 0: print(self.tag("Response complete (fallback) ✓")) break await asyncio.sleep(1) text = await page.evaluate(""" () => { const active = document.querySelector('[data-activeresponse="true"]'); if (active) { const sp = active.querySelector('.space-y-4'); if (sp && sp.innerText.trim().length > 0) return sp.innerText.trim(); const prose = active.querySelector('[class*="whitespace-normal"],[class*="prose"]'); if (prose && prose.innerText.trim().length > 0) return prose.innerText.trim(); } const all = document.querySelectorAll('[data-activeresponse]'); if (all.length > 0) { const last = all[all.length - 1]; const sp = last.querySelector('.space-y-4'); if (sp) return sp.innerText.trim(); return last.innerText.trim(); } const arts = document.querySelectorAll('article'); if (arts.length > 0) return arts[arts.length - 1].innerText.trim(); const divs = document.querySelectorAll('.space-y-4'); for (let i = divs.length - 1; i >= 0; i--) { if (divs[i].innerText.trim().length > 10) return divs[i].innerText.trim(); } return ''; } """) if not text or len(text.strip()) < 5: await asyncio.sleep(5) text = await page.evaluate(""" () => { const sp = document.querySelectorAll('.space-y-4'); if (sp.length > 0) return sp[sp.length-1].innerText.trim(); return document.body.innerText.slice(0, 5000); } """) print(self.tag(f"Extracted {len(text)} chars ✓")) return text.strip() # ==================================================================== # Browser Pool Manager # ==================================================================== class BrowserPool: def __init__(self): self.loop = asyncio.new_event_loop() self.workers: list[BrowserWorker] = [] self.ready_event = threading.Event() self._thread = threading.Thread(target=self._run, daemon=True) self._queue: asyncio.Queue | None = None self._total_requests = 0 self._rejected = 0 def start(self): self._thread.start() print(f"[POOL] Starting {POOL_SIZE} browsers " f"(rotation every {MAX_REQUESTS} req, queue timeout {QUEUE_TIMEOUT}s)...") def _run(self): asyncio.set_event_loop(self.loop) self.loop.run_until_complete(self._init_pool()) self.ready_event.set() print(f"[POOL] All {POOL_SIZE} browsers ready! ✓") self.loop.run_forever() async def _init_pool(self): from playwright.async_api import async_playwright self._queue = asyncio.Queue() pw = await async_playwright().start() workers = [BrowserWorker(i + 1, self.loop) for i in range(POOL_SIZE)] await asyncio.gather(*[w.init(pw) for w in workers]) self.workers = workers for w in workers: await self._queue.put(w) async def _process(self, model_label: str, prompt: str) -> str: self._total_requests += 1 try: worker: BrowserWorker = await asyncio.wait_for( self._queue.get(), timeout=QUEUE_TIMEOUT ) except asyncio.TimeoutError: self._rejected += 1 print(f"[POOL] ⚠️ Queue timeout! All {POOL_SIZE} workers busy. " f"Rejected: {self._rejected}") raise RuntimeError( f"All {POOL_SIZE} workers are busy. " f"Please retry in a moment. (rejected total: {self._rejected})" ) print(f"[POOL] Assigned W{worker.id} (total req: {self._total_requests}) ✓") try: return await worker.chat(model_label, prompt) finally: await self._queue.put(worker) print(f"[POOL] W{worker.id} returned to pool ✓") def process(self, model_label: str, prompt: str) -> str: if not self.ready_event.wait(timeout=180): raise RuntimeError("Pool not ready") future = asyncio.run_coroutine_threadsafe( self._process(model_label, prompt), self.loop ) return future.result(timeout=QUEUE_TIMEOUT + 240) pool = BrowserPool() pool.start() # ==================================================================== # Keep-Alive (يمنع HuggingFace من إيقاف Space بسبب عدم النشاط) # ==================================================================== _keep_alive_count = 0 async def _keep_alive(): global _keep_alive_count # انتظر حتى يكون السيرفر جاهزاً await asyncio.sleep(30) print("[KEEP-ALIVE] Started — ping every 4 minutes ✓") while True: await asyncio.sleep(240) # كل 4 دقائق try: async with httpx.AsyncClient(timeout=10) as client: r = await client.get("http://localhost:7860/") _keep_alive_count += 1 print(f"[KEEP-ALIVE] Ping #{_keep_alive_count} → {r.status_code} ✓") except Exception as e: print(f"[KEEP-ALIVE] Ping failed (non-fatal): {e}") # ==================================================================== # Helpers # ==================================================================== def _extract_content(msg: dict) -> str: content = msg.get("content", "") if isinstance(content, list): return "\n".join( item.get("text", item.get("content", str(item))) if isinstance(item, dict) else str(item) for item in content ) return str(content) if content else "" def _build_prompt(messages: list) -> str: parts = [] for msg in messages: role = msg.get("role", "user") content = _extract_content(msg) if not content.strip(): continue if role == "system": parts.append( f"=== SYSTEM INSTRUCTIONS ===\n{content}\n=== END INSTRUCTIONS ===" ) elif role == "assistant": parts.append(f"[Assistant]: {content}") else: parts.append(content) return "\n\n".join(parts) def _parse_tool_calls(text: str): cleaned = text.strip() if "```" in cleaned: m = re.search(r'```(?:json)?\s*\n?(.*?)\n?\s*```', cleaned, re.DOTALL) if m: cleaned = m.group(1).strip() candidates = [cleaned] m2 = re.search(r'\{[\s\S]*"tool_calls"[\s\S]*\}', cleaned) if m2: candidates.append(m2.group(0)) for c in candidates: try: parsed = json.loads(c) if isinstance(parsed, dict) and "tool_calls" in parsed: raw = parsed["tool_calls"] if isinstance(raw, list) and raw: return [{ "id": f"call_{uuid.uuid4().hex[:24]}", "type": "function", "function": { "name": call.get("name", ""), "arguments": ( json.dumps(call.get("arguments", {}), ensure_ascii=False) if isinstance(call.get("arguments"), dict) else str(call.get("arguments", "{}")) ), }, } for call in raw] except (json.JSONDecodeError, TypeError, KeyError): continue return None def _auth(request: Request) -> bool: return ( request.headers.get("authorization", "") .replace("Bearer ", "").strip() == API_SECRET_KEY ) def _auth_token(request: Request, token: str = "") -> bool: header_key = ( request.headers.get("authorization", "") .replace("Bearer ", "").strip() ) return header_key == API_SECRET_KEY or token == API_SECRET_KEY def _get_model_label(model: str) -> str: return DUCK_MODELS.get(model, DUCK_MODELS[DEFAULT_MODEL]) def _make_completion(start_time, model, text, messages, tools=None): p = sum(len(_extract_content(m).split()) for m in messages) c = len(text.split()) tc = _parse_tool_calls(text) if tools else None if tc: return { "id": f"chatcmpl-{uuid.uuid4().hex[:29]}", "object": "chat.completion", "created": int(start_time), "model": model, "choices": [{"index": 0, "message": { "role": "assistant", "content": None, "tool_calls": tc }, "finish_reason": "tool_calls"}], "usage": {"prompt_tokens": p, "completion_tokens": c, "total_tokens": p + c}, } return { "id": f"chatcmpl-{uuid.uuid4().hex[:29]}", "object": "chat.completion", "created": int(start_time), "model": model, "choices": [{"index": 0, "message": { "role": "assistant", "content": text }, "finish_reason": "stop"}], "usage": {"prompt_tokens": p, "completion_tokens": c, "total_tokens": p + c}, } # ==================================================================== # FastAPI # ==================================================================== app = FastAPI(title="Duck.ai API Server") @app.on_event("startup") async def startup_event(): asyncio.create_task(_keep_alive()) print("[APP] Keep-alive task registered ✓") @app.post("/v1/chat/completions") async def chat_completions(request: Request): try: data = await request.json() except Exception: return JSONResponse(status_code=400, content={"error": {"message": "Invalid JSON"}}) if not _auth(request): return JSONResponse(status_code=401, content={"error": {"message": "Invalid API Key"}}) messages = data.get("messages", []) if not messages: return JSONResponse(status_code=400, content={"error": {"message": "messages required"}}) model = data.get("model", DEFAULT_MODEL) tools = data.get("tools", None) start_time = time.time() model_label = _get_model_label(model) prompt = _build_prompt(messages) print(f"[API] /v1/chat/completions → {model} ({model_label})") try: text = await asyncio.get_event_loop().run_in_executor( None, pool.process, model_label, prompt ) return _make_completion(start_time, model, text, messages, tools) except Exception as e: print(f"[API] ERROR: {e}") status = 503 if "busy" in str(e).lower() else 500 return JSONResponse(status_code=status, content={"error": {"message": str(e)}}) @app.post("/v1/responses") async def responses(request: Request): try: data = await request.json() except Exception: return JSONResponse(status_code=400, content={"error": {"message": "Invalid JSON"}}) if not _auth(request): return JSONResponse(status_code=401, content={"error": {"message": "Invalid API Key"}}) input_data = data.get("input", "") if isinstance(input_data, str): messages = [{"role": "user", "content": input_data}] elif isinstance(input_data, list): messages = input_data else: messages = data.get("messages", []) if not messages: return JSONResponse(status_code=400, content={"error": {"message": "input required"}}) model = data.get("model", DEFAULT_MODEL) tools = data.get("tools", None) instructions = data.get("instructions", "") if instructions: messages.insert(0, {"role": "system", "content": instructions}) start_time = time.time() model_label = _get_model_label(model) prompt = _build_prompt(messages) print(f"[API] /v1/responses → {model} ({model_label})") try: text = await asyncio.get_event_loop().run_in_executor( None, pool.process, model_label, prompt ) p = sum(len(_extract_content(m).split()) for m in messages) c = len(text.split()) tc = _parse_tool_calls(text) if tools else None if tc: return { "id": f"resp-{uuid.uuid4().hex[:29]}", "object": "response", "created_at": int(start_time), "model": model, "status": "completed", "output": [{"type": "function_call", "id": t["id"], "call_id": t["id"], "name": t["function"]["name"], "arguments": t["function"]["arguments"], "status": "completed"} for t in tc], "usage": {"input_tokens": p, "output_tokens": c, "total_tokens": p + c}, } return { "id": f"resp-{uuid.uuid4().hex[:29]}", "object": "response", "created_at": int(start_time), "model": model, "status": "completed", "output": [{"type": "message", "role": "assistant", "content": [{"type": "output_text", "text": text}]}], "usage": {"input_tokens": p, "output_tokens": c, "total_tokens": p + c}, } except Exception as e: print(f"[API] ERROR: {e}") status = 503 if "busy" in str(e).lower() else 500 return JSONResponse(status_code=status, content={"error": {"message": str(e)}}) @app.get("/v1/models") async def list_models(request: Request): if not _auth(request): return JSONResponse(status_code=401, content={"error": {"message": "Invalid API Key"}}) return { "object": "list", "data": [{"id": m, "object": "model", "owned_by": "duck.ai"} for m in ALL_MODELS], } @app.get("/health") async def health(request: Request, token: str = ""): if not _auth_token(request, token): return JSONResponse(status_code=401, content={"error": {"message": "Invalid API Key"}}) busy = sum(1 for w in pool.workers if w.busy) stats = [ { "id": w.id, "busy": w.busy, "total_requests": w._total_count, "requests_until_rotation": MAX_REQUESTS - w._request_count, } for w in pool.workers ] mem = psutil.virtual_memory() return { "status": "running", "message": "Duck.ai API Pool Server is active!", "models": ALL_MODELS, "pool_size": POOL_SIZE, "workers_busy": busy, "workers_free": POOL_SIZE - busy, "total_requests": pool._total_requests, "rejected_requests": pool._rejected, "queue_timeout_sec": QUEUE_TIMEOUT, "keep_alive_pings": _keep_alive_count, "workers": stats, "ram": { "used_gb": round(mem.used / 1024**3, 2), "total_gb": round(mem.total / 1024**3, 2), "percent": mem.percent, }, "cpu": psutil.cpu_percent(interval=None), } @app.get("/") async def root(): return { "status": "running", "message": "Duck.ai API Pool Server is active!", "docs": "/docs", "health": "/health", "keep_alive_pings": _keep_alive_count, } @app.get("/dashboard", response_class=HTMLResponse) async def dashboard(request: Request, token: str = ""): if not _auth_token(request, token): return HTMLResponse( "

401 — Unauthorized

", status_code=401 ) with open("dashboard.html", "r", encoding="utf-8") as f: return HTMLResponse(f.read()) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)