| import asyncio |
| import json |
| import logging |
| import random |
| import string |
| import time |
| import uuid |
| from typing import AsyncGenerator, Dict, List, Any |
|
|
| import aiohttp |
| import brotli |
| from fastapi import FastAPI, Request, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import StreamingResponse |
| from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type, RetryError |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s %(levelname)s %(message)s", |
| datefmt="%H:%M:%S", |
| ) |
| logger = logging.getLogger("proxy") |
|
|
| |
| BLACKBOX_URL = "https://www.blackbox.ai/api/chat" |
| REQUEST_TIMEOUT = 300 |
| HEADERS = { |
| "authority": "www.blackbox.ai", |
| "method": "POST", |
| "path": "/api/chat", |
| "scheme": "https", |
| "accept": "*/*", |
| "accept-encoding": "gzip, deflate, br, zstd", |
| "accept-language": "en-US,en;q=0.9", |
| "origin": "https://www.blackbox.ai", |
| "priority": "u=1, i", |
| "referer": "https://www.blackbox.ai/", |
| "sec-ch-ua": '"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand";v="99"', |
| "sec-ch-ua-mobile": "?0", |
| "sec-ch-ua-platform": '"Windows"', |
| "sec-fetch-dest": "empty", |
| "sec-fetch-mode": "cors", |
| "sec-fetch-site": "same-origin", |
| "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36", |
| "content-type": "application/json", |
| } |
|
|
| |
| app = FastAPI() |
| app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]) |
| HTTP_SESSION: aiohttp.ClientSession = None |
| RETRYABLE_STATUSES = {400, 429, 500, 502, 503, 504} |
| _ascii = string.ascii_letters + string.digits |
|
|
| def _rand(n, pool=_ascii): return ''.join(random.choice(pool) for _ in range(n)) |
| def random_email(): return _rand(12) + "@gmail.com" |
| def random_id(): return _rand(21, string.digits) |
| def random_customer_id(): return "cus_" + _rand(12) |
| def generate_7char_id(): return _rand(7) |
|
|
| def build_payload(messages: List[Dict[str, Any]]) -> Dict[str, Any]: |
| msg_id = generate_7char_id() |
| if messages: |
| messages[-1]["id"] = msg_id |
| now = int(time.time()) |
| return { |
| "messages": messages, |
| "agentMode": {}, |
| "id": msg_id, |
| "previewToken": None, |
| "userId": None, |
| "codeModelMode": True, |
| "trendingAgentMode": {}, |
| "isMicMode": False, |
| "userSystemPrompt": None, |
| "maxTokens": 1024, |
| "playgroundTopP": None, |
| "playgroundTemperature": None, |
| "isChromeExt": False, |
| "githubToken": "", |
| "clickedAnswer2": False, |
| "clickedAnswer3": False, |
| "clickedForceWebSearch": False, |
| "visitFromDelta": False, |
| "isMemoryEnabled": False, |
| "mobileClient": False, |
| "userSelectedModel": None, |
| "validated": "00f37b34-a166-4efb-bce5-1312d87f2f94", |
| "imageGenerationMode": False, |
| "webSearchModePrompt": False, |
| "deepSearchMode": True, |
| "domains": None, |
| "vscodeClient": False, |
| "codeInterpreterMode": False, |
| "customProfile": {"name": "", "occupation": "", "traits": [], "additionalInfo": "", "enableNewChats": False}, |
| "session": { |
| "user": { |
| "name": _rand(10), |
| "email": random_email(), |
| "image": "https://lh3.googleusercontent.com/a/default", |
| "id": random_id() |
| }, |
| "expires": "2025-06-09T19:36:08.220Z", |
| "isNewUser": False |
| }, |
| "isPremium": True, |
| "subscriptionCache": { |
| "status": "PREMIUM", |
| "customerId": random_customer_id(), |
| "expiryTimestamp": now + 60 * 86400, |
| "lastChecked": int(time.time() * 1000), |
| "isTrialSubscription": False |
| }, |
| "beastMode": False, |
| "reasoningMode": False, |
| "designerMode": False |
| } |
|
|
| class RetryableStatusError(Exception): |
| def __init__(self, status: int, text: str): |
| super().__init__(f"status={status} body={text[:100]}...") |
|
|
| def log_retry(retry_state): |
| rid = retry_state.kwargs.get("request_id", "unknown") |
| logger.warning("[%s] retry %s/3 due to %s", rid, retry_state.attempt_number, retry_state.outcome.exception()) |
|
|
| @retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10), retry=retry_if_exception_type( |
| (aiohttp.ClientConnectionError, aiohttp.ClientResponseError, asyncio.TimeoutError, RetryableStatusError)), |
| before_sleep=log_retry) |
| async def get_blackbox_response(*, data, stream: bool, request_id: str) -> AsyncGenerator[str, None]: |
| global HTTP_SESSION |
| if not HTTP_SESSION: |
| HTTP_SESSION = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT)) |
| async with HTTP_SESSION.post(BLACKBOX_URL, json=data, headers=HEADERS, timeout=REQUEST_TIMEOUT) as resp: |
| if resp.status != 200: |
| body = await resp.text() |
| logger.error("[%s] Upstream %s error: %s", request_id, BLACKBOX_URL, resp.status) |
| if resp.status in RETRYABLE_STATUSES: |
| raise RetryableStatusError(resp.status, body) |
| raise HTTPException(status_code=502, detail=f"Upstream error {resp.status}") |
| if stream: |
| async for chunk in resp.content.iter_any(): |
| if chunk: |
| yield chunk.decode("utf-8", "ignore") |
| else: |
| yield await resp.text() |
|
|
| @app.middleware("http") |
| async def add_request_id(request: Request, call_next): |
| request.state.request_id = rid = str(uuid.uuid4()) |
| logger.info("[%s] %s %s", rid, request.method, request.url.path) |
| start = time.perf_counter() |
| resp = await call_next(request) |
| logger.info("[%s] finished in %.2fs", rid, time.perf_counter() - start) |
| return resp |
|
|
| @app.get("/") |
| async def root(): |
| return {"message": "API is running"} |
|
|
| @app.get("/health") |
| async def health(): |
| return {"status": "ok"} |
|
|
| @app.post("/v1/chat/completions") |
| async def chat_completions(request: Request): |
| rid = request.state.request_id |
| try: |
| body = await request.json() |
| messages = body.get("messages", []) |
| if not messages: |
| raise HTTPException(status_code=400, detail="Missing 'messages'") |
| stream = body.get("stream", False) |
|
|
| |
| if body.get("test_payload") == "deepsearch": |
| payload = { |
| "messages": [{"id": "s2eB86t", "content": "google", "role": "user"}], |
| "agentMode": {}, |
| "id": "s2eB86t", |
| "previewToken": None, |
| "userId": None, |
| "codeModelMode": True, |
| "trendingAgentMode": {}, |
| "isMicMode": False, |
| "userSystemPrompt": None, |
| "maxTokens": 1024, |
| "playgroundTopP": None, |
| "playgroundTemperature": None, |
| "isChromeExt": False, |
| "githubToken": "", |
| "clickedAnswer2": False, |
| "clickedAnswer3": False, |
| "clickedForceWebSearch": False, |
| "visitFromDelta": False, |
| "isMemoryEnabled": False, |
| "mobileClient": False, |
| "userSelectedModel": None, |
| "validated": "00f37b34-a166-4efb-bce5-1312d87f2f94", |
| "imageGenerationMode": False, |
| "webSearchModePrompt": False, |
| "deepSearchMode": True, |
| "domains": None, |
| "vscodeClient": False, |
| "codeInterpreterMode": False, |
| "customProfile": { |
| "name": "", |
| "occupation": "", |
| "traits": [], |
| "additionalInfo": "", |
| "enableNewChats": False |
| }, |
| "session": { |
| "user": { |
| "name": "S.C gaming", |
| "email": "simarmanbir@gmail.com", |
| "image": "https://lh3.googleusercontent.com/a/ACg8ocI-ze5Qe42S-j8xaCL6X7KSVwfiOae4fONqpTxzt0d2_a2FIld1=s96-c", |
| "id": "100846841133312010974" |
| }, |
| "expires": "2025-06-09T19:36:08.220Z", |
| "isNewUser": False |
| }, |
| "isPremium": True, |
| "subscriptionCache": { |
| "status": "PREMIUM", |
| "customerId": "cus_Rtiok4sPQNoo1c", |
| "expiryTimestamp": 1749108685, |
| "lastChecked": 1746822333827, |
| "isTrialSubscription": True |
| }, |
| "beastMode": False, |
| "reasoningMode": False, |
| "designerMode": False |
| } |
| else: |
| payload = build_payload(messages) |
|
|
| if not stream: |
| chunks = [] |
| async for part in get_blackbox_response(data=payload, stream=False, request_id=rid): |
| if part.startswith("Error:"): |
| raise HTTPException(status_code=502, detail=part) |
| chunks.append(part) |
| return { |
| "id": str(uuid.uuid4()), |
| "object": "chat.completion", |
| "created": int(time.time()), |
| "model": "DeepResearch", |
| "choices": [{ |
| "index": 0, |
| "message": {"role": "assistant", "content": "".join(chunks)}, |
| "finish_reason": "stop" |
| }] |
| } |
|
|
| async def event_stream(): |
| async for chunk in get_blackbox_response(data=payload, stream=True, request_id=rid): |
| msg = { |
| "id": str(uuid.uuid4()), |
| "object": "chat.completion.chunk", |
| "created": int(time.time()), |
| "model": "DeepResearch", |
| "choices": [{"index": 0, "delta": {"content": chunk}}], |
| } |
| yield f"data: {json.dumps(msg)}\n\n" |
| yield "data: [DONE]\n\n" |
|
|
| return StreamingResponse(event_stream(), media_type="text/event-stream") |
|
|
| except json.JSONDecodeError: |
| raise HTTPException(status_code=400, detail="Invalid JSON") |
| except RetryError as re: |
| logger.error("[%s] retries failed: %s", rid, re) |
| raise HTTPException(status_code=502, detail="Blackbox upstream failed") |
| except Exception as e: |
| logger.exception("[%s] error", rid) |
| raise HTTPException(status_code=500, detail="Internal proxy error") |
|
|