import os import httpx from fastapi import FastAPI, Request from fastapi.responses import JSONResponse, Response, StreamingResponse from starlette.requests import ClientDisconnect import time import json import asyncio import uuid app = FastAPI() # ========================================== # KONFIGURASI & LOAD KEYS # ========================================== BASE_URL = os.getenv("BASE_URL", "https://elysiadev11-proxyollma.hf.space") MASTER_API_KEY = os.getenv("MASTER_API_KEY", "olla") # Default model mapping (Claude → MiniMax) DEFAULT_MODEL_MAPPING = { # Opus models "claude-opus-4-7": "minimax-m2.7:cloud", "claude-opus-4-6": "minimax-m2.7:cloud", "claude-opus-4-5": "minimax-m2.7:cloud", "claude-opus-4-1": "minimax-m2.7:cloud", "claude-opus-4-20250514": "minimax-m2.7:cloud", # Sonnet models "claude-sonnet-4-6": "minimax-m2.7:cloud", "claude-sonnet-4-5": "minimax-m2.7:cloud", "claude-sonnet-4-20250514": "minimax-m2.7:cloud", # Haiku models "claude-haiku-4-5": "minimax-m2.7:cloud", "claude-haiku-4-5-20251001": "minimax-m2.7:cloud", } # Load model mapping dari ENV def load_model_mapping(): mapping = DEFAULT_MODEL_MAPPING.copy() env_map = os.getenv("CLAUDE_MODEL_MAP") if env_map: for pair in env_map.split(","): if ":" in pair: parts = pair.split(":", 1) if len(parts) == 2: claude_model = parts[0].strip() ollama_model = parts[1].strip() mapping[claude_model] = ollama_model return mapping def map_model(claude_model: str) -> str: """Map Claude model name to Ollama model""" model_mapping = load_model_mapping() # Try exact match first if claude_model in model_mapping: return model_mapping[claude_model] # Fallback based on model family if "opus" in claude_model.lower(): return os.getenv("DEFAULT_OPUS_MODEL", "minimax-m2.7:cloud") if "haiku" in claude_model.lower(): return os.getenv("DEFAULT_HAIKU_MODEL", "minimax-m2.7:cloud") # Default to Sonnet model return os.getenv("DEFAULT_SONNET_MODEL", "minimax-m2.7:cloud") OLLAMA_KEYS = [] # Mendukung hingga 100 API Key (OLLAMA_KEY_1 sampai OLLAMA_KEY_100) for i in range(1, 101): key = os.getenv(f"OLLAMA_KEY_{i}") if key: OLLAMA_KEYS.append(key) if not OLLAMA_KEYS: OLLAMA_KEYS.append("ollam") # Dummy key jika ENV kosong # Inisialisasi Status Key # Round-Robin Index for load balancing last_used_index = 0 key_status = {} for idx, k in enumerate(OLLAMA_KEYS, 1): key_status[k] = { "index": idx, "prefix": k[:8] + "...", "failures": 0, "success": 0, "healthy": True, "in_use": False # Fitur Lock: 1 Key = 1 Request } def log(msg): print(f"[{time.strftime('%H:%M:%S')}] {msg}") def get_and_lock_key(exclude_keys=None): """ Round-Robin + Atomic Lock: Pilih key berurutan dari last_used_index. Ini memastikan burst request terdistribusi merata ke semua key. """ global last_used_index if exclude_keys is None: exclude_keys = set() # Cek apakah semua key mati? Jika ya, reset semuanya if not any(v["healthy"] for v in key_status.values()): log("⚠️ Semua API Key berstatus mati/unhealthy. Melakukan RESET MASSAL...") for v in key_status.values(): v["failures"] = 0 v["healthy"] = True last_used_index = 0 # Round-robin: cari key berurutan dari last_used_index for i in range(len(OLLAMA_KEYS)): idx = (last_used_index + i) % len(OLLAMA_KEYS) key = OLLAMA_KEYS[idx] if key_status[key]["healthy"] and not key_status[key]["in_use"] and key not in exclude_keys: last_used_index = idx key_status[key]["in_use"] = True return key return None def anthropic_error(error_type: str, message: str, status_code: int = 400): """Format error in Anthropic style""" return JSONResponse( { "type": "error", "error": { "type": error_type, "message": message } }, status_code=status_code ) def anthropic_to_ollama(body: dict) -> dict: """Convert Anthropic request to Ollama format""" # Build messages array messages = [] # Add system message if exists if body.get("system"): messages.append({ "role": "system", "content": body["system"] }) # Add conversation messages for msg in body.get("messages", []): # Handle content blocks (Anthropic support array or string) content = msg["content"] if isinstance(content, list): # Extract text from content blocks text_content = "" for block in content: if block.get("type") == "text": text_content += block.get("text", "") content = text_content messages.append({ "role": msg["role"], "content": content }) # Map model ollama_model = map_model(body.get("model", "claude-sonnet-4-6")) # Build Ollama request ollama_body = { "model": ollama_model, "messages": messages, "stream": body.get("stream", False), "options": {} } # Add optional parameters if "max_tokens" in body: ollama_body["options"]["num_predict"] = body["max_tokens"] if "temperature" in body: ollama_body["options"]["temperature"] = body["temperature"] if "top_p" in body: ollama_body["options"]["top_p"] = body["top_p"] if "top_k" in body: ollama_body["options"]["top_k"] = body["top_k"] return ollama_body def ollama_to_anthropic(ollama_response: dict, original_model: str) -> dict: """Convert Ollama response to Anthropic format""" message = ollama_response.get("message", {}) # Map stop reasons stop_reason_map = { "stop": "end_turn", "length": "max_tokens", "eos": "end_turn", "load": "end_turn", "unload": "end_turn", } done_reason = ollama_response.get("done_reason", "stop") return { "id": f"msg_{uuid.uuid4().hex[:10]}", "type": "message", "role": "assistant", "content": [ { "type": "text", "text": message.get("content", "") } ], "model": original_model, "stop_reason": stop_reason_map.get(done_reason, "end_turn"), "stop_sequence": None, "usage": { "input_tokens": ollama_response.get("prompt_eval_count", 0), "output_tokens": ollama_response.get("eval_count", 0) } } async def stream_anthropic(ollama_stream, original_model: str): """Convert Ollama streaming to Anthropic SSE format""" message_id = f"msg_{uuid.uuid4().hex[:10]}" # Send message_start message_start_data = { 'type': 'message_start', 'message': { 'id': message_id, 'type': 'message', 'role': 'assistant', 'model': original_model, 'content': [], 'stop_reason': None, 'stop_sequence': None, 'usage': {'input_tokens': 0, 'output_tokens': 0} } } yield f"data: {json.dumps(message_start_data)}\n\n" # Send content_block_start content_block_start_data = { 'type': 'content_block_start', 'index': 0, 'content_block': {'type': 'text'} } yield f"data: {json.dumps(content_block_start_data)}\n\n" input_tokens = 0 output_tokens = 0 stop_reason = "end_turn" # Stream content async for line in ollama_stream: if line.startswith("data: "): data_str = line[6:] try: data = json.loads(data_str) if data.get("done", False): input_tokens = data.get("prompt_eval_count", 0) output_tokens = data.get("eval_count", 0) stop_reason = data.get("done_reason", "stop") continue message = data.get("message", {}) content = message.get("content", "") if content: # Send text_delta content_block_delta_data = { 'type': 'content_block_delta', 'index': 0, 'delta': { 'type': 'text_delta', 'text': content } } yield f"data: {json.dumps(content_block_delta_data)}\n\n" except json.JSONDecodeError: continue # Send content_block_stop content_block_stop_data = { 'type': 'content_block_stop', 'index': 0 } yield f"data: {json.dumps(content_block_stop_data)}\n\n" # Map stop reason stop_reason_map = { "stop": "end_turn", "length": "max_tokens", "eos": "end_turn", } # Send message_delta message_delta_data = { 'type': 'message_delta', 'delta': { 'stop_reason': stop_reason_map.get(stop_reason, "end_turn"), 'stop_sequence': None }, 'usage': {'output_tokens': output_tokens} } yield f"data: {json.dumps(message_delta_data)}\n\n" # Send message_stop message_stop_data = {'type': 'message_stop'} yield f"data: {json.dumps(message_stop_data)}\n\n" # ========================================== # ENDPOINTS # ========================================== @app.get("/") def root(): return { "status": "ok", "total_keys_loaded": len(OLLAMA_KEYS), "keys_status": { v["prefix"]: { "status": "BUSY" if v["in_use"] else "IDLE", "healthy": v["healthy"], "success": v["success"], "failures": v["failures"] } for v in key_status.values() } } @app.get("/v1/models") async def list_models(request: Request): # Validate auth auth_key = request.headers.get("Authorization", "").replace("Bearer ", "") if auth_key != MASTER_API_KEY: return JSONResponse( {"error": {"type": "authentication_error", "message": "Unauthorized"}}, status_code=401 ) # Proxy to Ollama /api/tags async with httpx.AsyncClient(timeout=30.0) as client: try: resp = await client.get( f"{BASE_URL}/api/tags", headers={"Authorization": f"Bearer {OLLAMA_KEYS[0]}"} ) if resp.status_code != 200: return JSONResponse( {"error": {"type": "api_error", "message": "Failed to fetch models"}}, status_code=resp.status_code ) ollama_data = resp.json() # Convert to OpenAI format models = [] created_time = int(time.time()) for model in ollama_data.get("models", []): models.append({ "id": model.get("name", model.get("model", "")), "object": "model", "created": created_time, "owned_by": "ollama" }) return {"object": "list", "data": models} except Exception as e: log(f"Error fetching models: {e}") return JSONResponse( {"error": {"type": "api_error", "message": str(e)}}, status_code=500 ) @app.post("/v1/messages") async def anthropic_chat(request: Request): # Validate auth auth_key = request.headers.get("Authorization", "").replace("Bearer ", "") if auth_key != MASTER_API_KEY: return anthropic_error("authentication_error", "Unauthorized", 401) try: body = await request.json() except ClientDisconnect: log("Client kabur sebelum proxy selesai membaca request body.") return Response(status_code=499) except json.JSONDecodeError: return anthropic_error("invalid_request_error", "Invalid JSON", 400) is_stream = body.get("stream", False) original_model = body.get("model", "claude-sonnet-4-6") # Convert to Ollama format ollama_body = anthropic_to_ollama(body) # ========================================== # LOGIKA NON-STREAM # ========================================== if not is_stream: tried_keys = set() for attempt in range(len(OLLAMA_KEYS)): if len(tried_keys) >= len(OLLAMA_KEYS): tried_keys.clear() key = None log("Menunggu API Key idle (Antrean Non-Stream)...") # Antrean Tanpa Batas Waktu while True: if await request.is_disconnected(): log("Client membatalkan request saat mengantre (Non-Stream).") return Response(status_code=499) # Gunakan fungsi Atomic Lock key = get_and_lock_key(exclude_keys=tried_keys) if key: break # Langsung keluar loop, key SUDAH DIKUNCI await asyncio.sleep(0.5) # Cek tiap setengah detik ki = key_status[key] tried_keys.add(key) log(f"LOCK ACQUIRED: key#{ki['index']} (Non-Stream)") try: async with httpx.AsyncClient(timeout=120.0) as client: resp = await client.post( f"{BASE_URL}/v1/chat/completions", json=ollama_body, headers={"Authorization": f"Bearer {key}"} ) if resp.status_code == 200: ki["success"] += 1 ki["failures"] = 0 # Convert response to Anthropic format ollama_response = resp.json() anthropic_response = ollama_to_anthropic(ollama_response, original_model) ki["in_use"] = False log(f"RELEASE: key#{ki['index']} (Non-Stream)") return JSONResponse(anthropic_response) elif resp.status_code == 429: ki["failures"] += 1 ki["healthy"] = False log(f"RATE LIMIT: key#{ki['index']} - Skip ke key berikutnya.") continue else: ki["failures"] += 1 continue except Exception as e: ki["failures"] += 1 log(f"Error Non-Stream: {e}") continue finally: ki["in_use"] = False # SELALU LEPAS KUNCI log(f"RELEASE: key#{ki['index']} (Non-Stream)") return JSONResponse({"error": "All keys failed after multiple attempts"}, status_code=500) # ========================================== # LOGIKA STREAMING # ========================================== async def stream_generator(): current_body = ollama_body.copy() generated_text_buffer = "" tried_keys = set() for attempt in range(len(OLLAMA_KEYS)): if len(tried_keys) >= len(OLLAMA_KEYS): tried_keys.clear() key = None if attempt == 0: log("Menunggu API Key idle (Antrean Stream Baru)...") else: log(f"Menunggu API Key idle (Antrean Fallback ke-{attempt})...") # Antrean Tanpa Batas Waktu while True: if await request.is_disconnected(): log("Client membatalkan request saat mengantre stream.") return # Gunakan fungsi Atomic Lock key = get_and_lock_key(exclude_keys=tried_keys) if key: break # Langsung keluar loop, key SUDAH DIKUNCI await asyncio.sleep(0.5) ki = key_status[key] tried_keys.add(key) log(f"STREAM LOCK ACQUIRED: key#{ki['index']}") if generated_text_buffer: log(f"Resuming stream. Injecting {len(generated_text_buffer)} chars.") messages = current_body.get("messages", []) if messages and messages[-1].get("role") == "assistant": messages[-1]["content"] = generated_text_buffer else: messages.append({"role": "assistant", "content": generated_text_buffer}) current_body["messages"] = messages try: custom_timeout = httpx.Timeout(connect=15.0, read=None, write=15.0, pool=10.0) async with httpx.AsyncClient(timeout=custom_timeout) as client: async with client.stream( "POST", f"{BASE_URL}/v1/chat/completions", json=current_body, headers={"Authorization": f"Bearer {key}"} ) as response: if response.status_code == 429: ki["failures"] += 1 ki["healthy"] = False log(f"STREAM 429: key#{ki['index']} - Switching key...") continue if response.status_code != 200: ki["failures"] += 1 log(f"STREAM ERR {response.status_code}: key#{ki['index']} - Switching key...") continue stream_interrupted = False try: # Convert Ollama stream to Anthropic SSE async for chunk in stream_anthropic(response.aiter_lines(), original_model): yield chunk ki["success"] += 1 ki["failures"] = 0 return except (httpx.ReadTimeout, httpx.ReadError, httpx.RemoteProtocolError) as e: log(f"STREAM PUTUS: key#{ki['index']}. Buffering...") ki["failures"] += 1 stream_interrupted = True if not stream_interrupted: return except Exception as e: ki["failures"] += 1 log(f"STREAM EXCEPTION: key#{ki['index']} - {e}") continue finally: # SELALU LEPAS KUNCI ki["in_use"] = False log(f"STREAM RELEASE: key#{ki['index']}") yield f"data: {json.dumps({'error': 'Stream failed completely'})}\n\ndata: [DONE]\n\n" return StreamingResponse(stream_generator(), media_type="text/event-stream") @app.post("/v1/chat/completions") async def chat(req: Request): auth_key = req.headers.get("Authorization", "").replace("Bearer ", "") if auth_key != MASTER_API_KEY: return JSONResponse({"error": "Unauthorized"}, status_code=401) # Tangkap error jika client kabur (ClientDisconnect) try: body = await req.json() except ClientDisconnect: log("Client kabur sebelum proxy selesai membaca request body.") return Response(status_code=499) except json.JSONDecodeError: return JSONResponse({"error": "Invalid JSON body"}, status_code=400) is_stream = body.get("stream", False) # ========================================== # LOGIKA NON-STREAM # ========================================== if not is_stream: tried_keys = set() for attempt in range(len(OLLAMA_KEYS)): if len(tried_keys) >= len(OLLAMA_KEYS): tried_keys.clear() key = None log("Menunggu API Key idle (Antrean Non-Stream)...") # Antrean Tanpa Batas Waktu while True: if await req.is_disconnected(): log("Client membatalkan request saat mengantre (Non-Stream).") return Response(status_code=499) # Gunakan fungsi Atomic Lock key = get_and_lock_key(exclude_keys=tried_keys) if key: break # Langsung keluar loop, key SUDAH DIKUNCI await asyncio.sleep(0.5) # Cek tiap setengah detik ki = key_status[key] tried_keys.add(key) log(f"LOCK ACQUIRED: key#{ki['index']} (Non-Stream)") try: async with httpx.AsyncClient(timeout=120.0) as client: resp = await client.post( f"{BASE_URL}/v1/chat/completions", json=body, headers={"Authorization": f"Bearer {key}"} ) if resp.status_code == 200: ki["success"] += 1 ki["failures"] = 0 return Response(content=resp.content, media_type=resp.headers.get("content-type")) elif resp.status_code == 429: ki["failures"] += 1 ki["healthy"] = False log(f"RATE LIMIT: key#{ki['index']} - Skip ke key berikutnya.") continue else: ki["failures"] += 1 continue except Exception as e: ki["failures"] += 1 log(f"Error Non-Stream: {e}") continue finally: ki["in_use"] = False # SELALU LEPAS KUNCI log(f"RELEASE: key#{ki['index']} (Non-Stream)") return JSONResponse({"error": "All keys failed after multiple attempts"}, status_code=500) # ========================================== # LOGIKA STREAMING (Seamless Fallback + Queue) # ========================================== async def stream_generator(): current_body = body.copy() current_body["messages"] = [msg.copy() for msg in body.get("messages", [])] generated_text_buffer = "" tried_keys = set() for attempt in range(len(OLLAMA_KEYS)): if len(tried_keys) >= len(OLLAMA_KEYS): tried_keys.clear() key = None if attempt == 0: log("Menunggu API Key idle (Antrean Stream Baru)...") else: log(f"Menunggu API Key idle (Antrean Fallback ke-{attempt})...") # Antrean Tanpa Batas Waktu while True: if await req.is_disconnected(): log("Client membatalkan request saat mengantre stream.") return # Gunakan fungsi Atomic Lock key = get_and_lock_key(exclude_keys=tried_keys) if key: break # Langsung keluar loop, key SUDAH DIKUNCI await asyncio.sleep(0.5) ki = key_status[key] tried_keys.add(key) log(f"STREAM LOCK ACQUIRED: key#{ki['index']}") if generated_text_buffer: log(f"Resuming stream. Injecting {len(generated_text_buffer)} chars.") messages = current_body.get("messages", []) if messages and messages[-1].get("role") == "assistant": messages[-1]["content"] = generated_text_buffer else: messages.append({"role": "assistant", "content": generated_text_buffer}) current_body["messages"] = messages try: custom_timeout = httpx.Timeout(connect=15.0, read=None, write=15.0, pool=10.0) async with httpx.AsyncClient(timeout=custom_timeout) as client: async with client.stream( "POST", f"{BASE_URL}/v1/chat/completions", json=current_body, headers={"Authorization": f"Bearer {key}"} ) as response: if response.status_code == 429: ki["failures"] += 1 ki["healthy"] = False log(f"STREAM 429: key#{ki['index']} - Switching key...") continue if response.status_code != 200: ki["failures"] += 1 log(f"STREAM ERR {response.status_code}: key#{ki['index']} - Switching key...") continue stream_interrupted = False try: async for chunk in response.aiter_lines(): if chunk: if chunk.startswith("data: "): data_str = chunk[6:] if data_str.strip() == "[DONE]": ki["success"] += 1 ki["failures"] = 0 yield chunk + "\n\n" return try: data_json = json.loads(data_str) if "choices" in data_json and len(data_json["choices"]) > 0: delta = data_json["choices"][0].get("delta", {}) content = delta.get("content", "") if content: generated_text_buffer += content except json.JSONDecodeError: pass yield chunk + "\n\n" except (httpx.ReadTimeout, httpx.ReadError, httpx.RemoteProtocolError) as e: log(f"STREAM PUTUS: key#{ki['index']}. Buffering...") ki["failures"] += 1 stream_interrupted = True if not stream_interrupted: return except Exception as e: ki["failures"] += 1 log(f"STREAM EXCEPTION: key#{ki['index']} - {e}") continue finally: # SELALU LEPAS KUNCI ki["in_use"] = False log(f"STREAM RELEASE: key#{ki['index']}") yield f"data: {json.dumps({'error': 'Stream failed completely'})}\n\ndata: [DONE]\n\n" return StreamingResponse(stream_generator(), media_type="text/event-stream")