import os import uuid import time import asyncio import threading import json import re from fastapi import FastAPI, Request from fastapi.responses import JSONResponse # ==================================================================== # Configuration # ==================================================================== API_SECRET_KEY = os.getenv("API_SECRET_KEY", "change-me-secret") DUCK_MODELS = { "gpt-5-mini": "GPT-5 mini", "gpt-5": "GPT-5", "gpt-4o-mini": "GPT-4o mini", "o3-mini": "o3 mini", "gpt-oss-120b": "gpt-oss 120B", "claude-haiku-4-5": "Claude Haiku 4.5", "llama-4-scout": "Llama 4 Scout", "mistral-small-4": "Mistral Small 4", } ALL_MODELS = list(DUCK_MODELS.keys()) DEFAULT_MODEL = "gpt-5-mini" # ==================================================================== # Browser Engine # ==================================================================== class AsyncBrowserThread(threading.Thread): def __init__(self): super().__init__(daemon=True) self.loop = asyncio.new_event_loop() self.ready_event = threading.Event() self.browser = None self.playwright = None self.persistent_context = None def run(self): asyncio.set_event_loop(self.loop) self.loop.run_until_complete(self._start_browser()) self.ready_event.set() print("[SERVER] Browser + Duck.ai ready!") self.loop.run_forever() async def _start_browser(self): from playwright.async_api import async_playwright print("[SERVER] Launching Chrome...") self.playwright = await async_playwright().start() self.browser = await self.playwright.chromium.launch( headless=True, channel="chrome", args=[ "--disable-blink-features=AutomationControlled", "--no-sandbox", "--disable-gpu", "--disable-dev-shm-usage", "--disable-setuid-sandbox", "--single-process", "--no-zygote", ], ) # ── Context دائم: cookies تُحفظ بين الـ requests ───────── self.persistent_context = await self.browser.new_context( user_agent=( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ), viewport={"width": 1920, "height": 1080}, ) await self.persistent_context.add_init_script( "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})" ) # ── قبول شروط duck.ai مرة واحدة عند الإقلاع ───────────── setup_page = await self.persistent_context.new_page() try: print("[SERVER] Opening duck.ai for first-time setup...") await setup_page.goto( "https://duckduckgo.com/aichat", wait_until="domcontentloaded" ) await asyncio.sleep(5) # زر "Agree and Continue" من الصورة الحقيقية agree_btn = setup_page.locator('button:has-text("Agree and Continue")') await agree_btn.wait_for(state="visible", timeout=12000) await agree_btn.click() print("[SERVER] Duck.ai terms accepted ✓") await asyncio.sleep(3) # انتظر ظهور صندوق الكتابة = الموقع جاهز await setup_page.wait_for_selector( 'textarea[name="user-prompt"]', timeout=20000 ) print("[SERVER] Duck.ai chat interface ready ✓") except Exception as e: print(f"[SERVER] Setup note (non-fatal): {e}") finally: await setup_page.close() # ────────────────────────────────────────────────────────────── # الدالة الرئيسية: إرسال prompt واستقبال الرد # ────────────────────────────────────────────────────────────── async def _chat(self, model_label: str, prompt: str) -> str: # كل request يأخذ page منفصلة من نفس الـ context المحفوظ page = await self.persistent_context.new_page() try: page.set_default_timeout(120000) # ── 1. فتح صفحة جديدة نظيفة ─────────────────────────── await page.goto( "https://duckduckgo.com/aichat", wait_until="domcontentloaded" ) await asyncio.sleep(3) # ── 2. إغلاق أي popup/banner إضافي ──────────────────── # Agree and Continue إن ظهر مجدداً (احتياط) try: agree_btn = page.locator('button:has-text("Agree and Continue")') if await agree_btn.count() > 0: is_visible = await agree_btn.first.is_visible() if is_visible: await agree_btn.first.click() print("[DUCK] Terms re-accepted ✓") await asyncio.sleep(2) except Exception: pass # إغلاق إشعار PDF try: close_btn = page.locator( 'button.XkSxBJ8ofSQsZmGZs6qx, ' 'li.HmVD0odzmaobZhTx3jzd button[aria-label="Close"], ' 'button[aria-label="Close"]' ) if await close_btn.count() > 0: await close_btn.first.click() await asyncio.sleep(1) except Exception: pass # ── 3. انتظار صندوق الكتابة ──────────────────────────── await page.wait_for_selector( 'textarea[name="user-prompt"]', timeout=30000 ) print("[DUCK] Input ready ✓") # ── 4. تغيير النموذج إذا لزم ─────────────────────────── try: model_btn = page.locator('button[data-testid="model-select-button"]') current_text = await model_btn.inner_text() print(f"[DUCK] Current model: {current_text.strip()}") if model_label.lower() not in current_text.lower(): await model_btn.click() await asyncio.sleep(1.5) option = page.locator( f"li:has-text('{model_label}'), " f"button:has-text('{model_label}'), " f"[role='option']:has-text('{model_label}')" ) if await option.count() > 0: await option.first.click() await asyncio.sleep(1) print(f"[DUCK] Model → {model_label} ✓") else: await page.keyboard.press("Escape") print(f"[DUCK] Model '{model_label}' not found, using default") except Exception as e: print(f"[DUCK] Model select (non-fatal): {e}") # ── 5. كتابة وإرسال الرسالة ──────────────────────────── textarea = page.locator('textarea[name="user-prompt"]') await textarea.click() await textarea.fill(prompt) await asyncio.sleep(0.5) send_btn = page.locator('button[type="submit"][aria-label="Send"]') await send_btn.wait_for(state="enabled", timeout=10000) await send_btn.click() print(f"[DUCK] Sent ({len(prompt)} chars) ✓") # ── 6. انتظار بدء الرد ──────────────────────────────── await asyncio.sleep(3) try: stop_btn = page.locator('button[aria-label="Stop generating"]') await stop_btn.wait_for(state="visible", timeout=20000) print("[DUCK] Response started ✓") except Exception: print("[DUCK] Stop button not detected, continuing...") # ── 7. انتظار اكتمال الرد ───────────────────────────── # الرد اكتمل = زر "Stop generating" اختفى أو أصبح disabled max_wait = 120 elapsed = 0 while elapsed < max_wait: await asyncio.sleep(2) elapsed += 2 stop_active = await page.locator( 'button[aria-label="Stop generating"]:not([disabled])' ).count() if stop_active == 0: print(f"[DUCK] Response complete (~{elapsed}s) ✓") break await asyncio.sleep(1.5) # ── 8. استخراج النص ──────────────────────────────────── response_text = await page.evaluate(""" () => { // طريقة 1: article elements const articles = document.querySelectorAll('article'); if (articles.length > 0) { return articles[articles.length - 1].innerText.trim(); } // طريقة 2: divs تحتوي على "message" في class const msgDivs = document.querySelectorAll( '[class*="message"]:not([class*="user"])' + ':not([class*="User"]):not([class*="input"])' ); if (msgDivs.length > 0) { const textDivs = [...msgDivs].filter(el => el.innerText && el.innerText.trim().length > 20 && !el.querySelector('textarea') ); if (textDivs.length > 0) { return textDivs[textDivs.length - 1].innerText.trim(); } } // طريقة 3: أطول div بدون textarea const allDivs = [...document.querySelectorAll('div')].filter(el => el.children.length < 10 && el.innerText && el.innerText.trim().length > 50 && !el.querySelector('textarea') && !el.querySelector('button[type="submit"]') ); if (allDivs.length > 0) { return allDivs.sort( (a, b) => b.innerText.length - a.innerText.length )[0].innerText.trim(); } return ''; } """) # ── 9. fallback إذا كان النص فارغاً ─────────────────── if not response_text or len(response_text.strip()) < 10: await asyncio.sleep(5) response_text = await page.evaluate(""" () => { const articles = document.querySelectorAll('article'); if (articles.length > 0) { return articles[articles.length - 1].innerText.trim(); } return document.body.innerText.slice(0, 5000); } """) print(f"[DUCK] Extracted {len(response_text)} chars ✓") return response_text.strip() except Exception as e: print(f"[DUCK] Error: {e}") raise RuntimeError(f"duck.ai error: {e}") finally: await page.close() # ────────────────────────────────────────────────────────────── def process(self, model_label: str, prompt: str) -> str: if not self.ready_event.wait(timeout=120): raise RuntimeError("Browser not ready after 120s") future = asyncio.run_coroutine_threadsafe( self._chat(model_label, prompt), self.loop ) return future.result(timeout=180) browser_engine = AsyncBrowserThread() browser_engine.start() # ==================================================================== # Helpers # ==================================================================== def _extract_content(msg: dict) -> str: content = msg.get("content", "") if isinstance(content, list): parts = [] for item in content: if isinstance(item, dict): parts.append(item.get("text", item.get("content", str(item)))) else: parts.append(str(item)) return "\n".join(parts) return str(content) if content else "" def _build_prompt(messages: list) -> str: parts = [] for msg in messages: role = msg.get("role", "user") content = _extract_content(msg) if not content.strip(): continue if role == "system": parts.append(f"=== SYSTEM INSTRUCTIONS ===\n{content}\n=== END INSTRUCTIONS ===") elif role == "assistant": parts.append(f"[Assistant]: {content}") else: parts.append(content) return "\n\n".join(parts) def _parse_tool_calls(text: str): cleaned = text.strip() if "```" in cleaned: m = re.search(r'```(?:json)?\s*\n?(.*?)\n?\s*```', cleaned, re.DOTALL) if m: cleaned = m.group(1).strip() candidates = [cleaned] m2 = re.search(r'\{[\s\S]*"tool_calls"[\s\S]*\}', cleaned) if m2: candidates.append(m2.group(0)) for c in candidates: try: parsed = json.loads(c) if isinstance(parsed, dict) and "tool_calls" in parsed: raw = parsed["tool_calls"] if isinstance(raw, list) and raw: return [{ "id": f"call_{uuid.uuid4().hex[:24]}", "type": "function", "function": { "name": call.get("name", ""), "arguments": ( json.dumps(call.get("arguments", {}), ensure_ascii=False) if isinstance(call.get("arguments"), dict) else str(call.get("arguments", "{}")) ), }, } for call in raw] except (json.JSONDecodeError, TypeError, KeyError): continue return None def _auth(request: Request) -> bool: token = request.headers.get("authorization", "").replace("Bearer ", "").strip() return token == API_SECRET_KEY def _get_model_label(model: str) -> str: return DUCK_MODELS.get(model, DUCK_MODELS[DEFAULT_MODEL]) def _make_completion(start_time, model, text, messages, tools=None): p = sum(len(_extract_content(m).split()) for m in messages) c = len(text.split()) tc = _parse_tool_calls(text) if tools else None if tc: return { "id": f"chatcmpl-{uuid.uuid4().hex[:29]}", "object": "chat.completion", "created": int(start_time), "model": model, "choices": [{"index": 0, "message": { "role": "assistant", "content": None, "tool_calls": tc }, "finish_reason": "tool_calls"}], "usage": {"prompt_tokens": p, "completion_tokens": c, "total_tokens": p + c}, } return { "id": f"chatcmpl-{uuid.uuid4().hex[:29]}", "object": "chat.completion", "created": int(start_time), "model": model, "choices": [{"index": 0, "message": { "role": "assistant", "content": text }, "finish_reason": "stop"}], "usage": {"prompt_tokens": p, "completion_tokens": c, "total_tokens": p + c}, } # ==================================================================== # FastAPI # ==================================================================== app = FastAPI(title="Duck.ai API Server") @app.post("/v1/chat/completions") async def chat_completions(request: Request): try: data = await request.json() except Exception: return JSONResponse(status_code=400, content={"error": {"message": "Invalid JSON"}}) if not _auth(request): return JSONResponse(status_code=401, content={"error": {"message": "Invalid API Key"}}) messages = data.get("messages", []) if not messages: return JSONResponse(status_code=400, content={"error": {"message": "messages required"}}) model = data.get("model", DEFAULT_MODEL) tools = data.get("tools", None) start_time = time.time() model_label = _get_model_label(model) prompt = _build_prompt(messages) print(f"[API] /v1/chat/completions → {model} ({model_label})") try: text = await asyncio.get_event_loop().run_in_executor( None, browser_engine.process, model_label, prompt ) return _make_completion(start_time, model, text, messages, tools) except Exception as e: print(f"[API] ERROR: {e}") return JSONResponse(status_code=500, content={"error": {"message": str(e)}}) @app.post("/v1/responses") async def responses(request: Request): try: data = await request.json() except Exception: return JSONResponse(status_code=400, content={"error": {"message": "Invalid JSON"}}) if not _auth(request): return JSONResponse(status_code=401, content={"error": {"message": "Invalid API Key"}}) input_data = data.get("input", "") if isinstance(input_data, str): messages = [{"role": "user", "content": input_data}] elif isinstance(input_data, list): messages = input_data else: messages = data.get("messages", []) if not messages: return JSONResponse(status_code=400, content={"error": {"message": "input required"}}) model = data.get("model", DEFAULT_MODEL) tools = data.get("tools", None) instructions = data.get("instructions", "") if instructions: messages.insert(0, {"role": "system", "content": instructions}) start_time = time.time() model_label = _get_model_label(model) prompt = _build_prompt(messages) print(f"[API] /v1/responses → {model} ({model_label})") try: text = await asyncio.get_event_loop().run_in_executor( None, browser_engine.process, model_label, prompt ) p = sum(len(_extract_content(m).split()) for m in messages) c = len(text.split()) tc = _parse_tool_calls(text) if tools else None if tc: return { "id": f"resp-{uuid.uuid4().hex[:29]}", "object": "response", "created_at": int(start_time), "model": model, "status": "completed", "output": [{ "type": "function_call", "id": t["id"], "call_id": t["id"], "name": t["function"]["name"], "arguments": t["function"]["arguments"], "status": "completed", } for t in tc], "usage": {"input_tokens": p, "output_tokens": c, "total_tokens": p + c}, } return { "id": f"resp-{uuid.uuid4().hex[:29]}", "object": "response", "created_at": int(start_time), "model": model, "status": "completed", "output": [{"type": "message", "role": "assistant", "content": [{"type": "output_text", "text": text}]}], "usage": {"input_tokens": p, "output_tokens": c, "total_tokens": p + c}, } except Exception as e: print(f"[API] ERROR: {e}") return JSONResponse(status_code=500, content={"error": {"message": str(e)}}) @app.get("/v1/models") async def list_models(request: Request): if not _auth(request): return JSONResponse(status_code=401, content={"error": {"message": "Invalid API Key"}}) return { "object": "list", "data": [ {"id": m, "object": "model", "owned_by": "duck.ai"} for m in ALL_MODELS ], } @app.get("/health") @app.get("/") async def health(): return { "status": "running", "message": "Duck.ai API Server is active!", "models": ALL_MODELS, } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)