Elysiadev11 commited on
Commit
2df7925
·
verified ·
1 Parent(s): 933ca20

Force rebuild after delete

Browse files
Files changed (1) hide show
  1. proxy_cerebras.py +763 -0
proxy_cerebras.py ADDED
@@ -0,0 +1,763 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import httpx
3
+ from fastapi import FastAPI, Request
4
+ from fastapi.responses import JSONResponse, Response, StreamingResponse
5
+ from starlette.requests import ClientDisconnect
6
+ import time
7
+ import json
8
+ import asyncio
9
+ import uuid
10
+
11
+ app = FastAPI()
12
+
13
+ # ==========================================
14
+ # KONFIGURASI & LOAD KEYS
15
+ # ==========================================
16
+ BASE_URL = os.getenv("BASE_URL", "https://elysiadev11-proxyollma.hf.space")
17
+ MASTER_API_KEY = os.getenv("MASTER_API_KEY", "olla")
18
+
19
+ # Default model mapping (Claude → MiniMax)
20
+ DEFAULT_MODEL_MAPPING = {
21
+ # Opus models
22
+ "claude-opus-4-7": "minimax-m2.7:cloud",
23
+ "claude-opus-4-6": "minimax-m2.7:cloud",
24
+ "claude-opus-4-5": "minimax-m2.7:cloud",
25
+ "claude-opus-4-1": "minimax-m2.7:cloud",
26
+ "claude-opus-4-20250514": "minimax-m2.7:cloud",
27
+
28
+ # Sonnet models
29
+ "claude-sonnet-4-6": "minimax-m2.7:cloud",
30
+ "claude-sonnet-4-5": "minimax-m2.7:cloud",
31
+ "claude-sonnet-4-20250514": "minimax-m2.7:cloud",
32
+
33
+ # Haiku models
34
+ "claude-haiku-4-5": "minimax-m2.7:cloud",
35
+ "claude-haiku-4-5-20251001": "minimax-m2.7:cloud",
36
+ }
37
+
38
+ # Load model mapping dari ENV
39
+ def load_model_mapping():
40
+ mapping = DEFAULT_MODEL_MAPPING.copy()
41
+
42
+ env_map = os.getenv("CLAUDE_MODEL_MAP")
43
+ if env_map:
44
+ for pair in env_map.split(","):
45
+ if ":" in pair:
46
+ parts = pair.split(":", 1)
47
+ if len(parts) == 2:
48
+ claude_model = parts[0].strip()
49
+ ollama_model = parts[1].strip()
50
+ mapping[claude_model] = ollama_model
51
+
52
+ return mapping
53
+
54
+ def map_model(claude_model: str) -> str:
55
+ """Map Claude model name to Ollama model"""
56
+ model_mapping = load_model_mapping()
57
+
58
+ # Try exact match first
59
+ if claude_model in model_mapping:
60
+ return model_mapping[claude_model]
61
+
62
+ # Fallback based on model family
63
+ if "opus" in claude_model.lower():
64
+ return os.getenv("DEFAULT_OPUS_MODEL", "minimax-m2.7:cloud")
65
+
66
+ if "haiku" in claude_model.lower():
67
+ return os.getenv("DEFAULT_HAIKU_MODEL", "minimax-m2.7:cloud")
68
+
69
+ # Default to Sonnet model
70
+ return os.getenv("DEFAULT_SONNET_MODEL", "minimax-m2.7:cloud")
71
+
72
+ OLLAMA_KEYS = []
73
+ # Mendukung hingga 100 API Key (OLLAMA_KEY_1 sampai OLLAMA_KEY_100)
74
+ for i in range(1, 101):
75
+ key = os.getenv(f"OLLAMA_KEY_{i}")
76
+ if key:
77
+ OLLAMA_KEYS.append(key)
78
+
79
+ if not OLLAMA_KEYS:
80
+ OLLAMA_KEYS.append("ollam") # Dummy key jika ENV kosong
81
+
82
+ # Inisialisasi Status Key
83
+ # Round-Robin Index for load balancing
84
+ last_used_index = 0
85
+
86
+ key_status = {}
87
+ for idx, k in enumerate(OLLAMA_KEYS, 1):
88
+ key_status[k] = {
89
+ "index": idx,
90
+ "prefix": k[:8] + "...",
91
+ "failures": 0,
92
+ "success": 0,
93
+ "healthy": True,
94
+ "in_use": False # Fitur Lock: 1 Key = 1 Request
95
+ }
96
+
97
+ def log(msg):
98
+ print(f"[{time.strftime('%H:%M:%S')}] {msg}")
99
+
100
+ def get_and_lock_key(exclude_keys=None):
101
+ """
102
+ Round-Robin + Atomic Lock: Pilih key berurutan dari last_used_index.
103
+ Ini memastikan burst request terdistribusi merata ke semua key.
104
+ """
105
+ global last_used_index
106
+
107
+ if exclude_keys is None:
108
+ exclude_keys = set()
109
+
110
+ # Cek apakah semua key mati? Jika ya, reset semuanya
111
+ if not any(v["healthy"] for v in key_status.values()):
112
+ log("⚠️ Semua API Key berstatus mati/unhealthy. Melakukan RESET MASSAL...")
113
+ for v in key_status.values():
114
+ v["failures"] = 0
115
+ v["healthy"] = True
116
+ last_used_index = 0
117
+
118
+ # Round-robin: cari key berurutan dari last_used_index
119
+ for i in range(len(OLLAMA_KEYS)):
120
+ idx = (last_used_index + i) % len(OLLAMA_KEYS)
121
+ key = OLLAMA_KEYS[idx]
122
+
123
+ if key_status[key]["healthy"] and not key_status[key]["in_use"] and key not in exclude_keys:
124
+ last_used_index = idx
125
+ key_status[key]["in_use"] = True
126
+ return key
127
+
128
+ return None
129
+
130
+ def anthropic_error(error_type: str, message: str, status_code: int = 400):
131
+ """Format error in Anthropic style"""
132
+ return JSONResponse(
133
+ {
134
+ "type": "error",
135
+ "error": {
136
+ "type": error_type,
137
+ "message": message
138
+ }
139
+ },
140
+ status_code=status_code
141
+ )
142
+
143
+ def anthropic_to_ollama(body: dict) -> dict:
144
+ """Convert Anthropic request to Ollama format"""
145
+
146
+ # Build messages array
147
+ messages = []
148
+
149
+ # Add system message if exists
150
+ if body.get("system"):
151
+ messages.append({
152
+ "role": "system",
153
+ "content": body["system"]
154
+ })
155
+
156
+ # Add conversation messages
157
+ for msg in body.get("messages", []):
158
+ # Handle content blocks (Anthropic support array or string)
159
+ content = msg["content"]
160
+ if isinstance(content, list):
161
+ # Extract text from content blocks
162
+ text_content = ""
163
+ for block in content:
164
+ if block.get("type") == "text":
165
+ text_content += block.get("text", "")
166
+ content = text_content
167
+
168
+ messages.append({
169
+ "role": msg["role"],
170
+ "content": content
171
+ })
172
+
173
+ # Map model
174
+ ollama_model = map_model(body.get("model", "claude-sonnet-4-6"))
175
+
176
+ # Build Ollama request
177
+ ollama_body = {
178
+ "model": ollama_model,
179
+ "messages": messages,
180
+ "stream": body.get("stream", False),
181
+ "options": {}
182
+ }
183
+
184
+ # Add optional parameters
185
+ if "max_tokens" in body:
186
+ ollama_body["options"]["num_predict"] = body["max_tokens"]
187
+
188
+ if "temperature" in body:
189
+ ollama_body["options"]["temperature"] = body["temperature"]
190
+
191
+ if "top_p" in body:
192
+ ollama_body["options"]["top_p"] = body["top_p"]
193
+
194
+ if "top_k" in body:
195
+ ollama_body["options"]["top_k"] = body["top_k"]
196
+
197
+ return ollama_body
198
+
199
+ def ollama_to_anthropic(ollama_response: dict, original_model: str) -> dict:
200
+ """Convert Ollama response to Anthropic format"""
201
+
202
+ message = ollama_response.get("message", {})
203
+
204
+ # Map stop reasons
205
+ stop_reason_map = {
206
+ "stop": "end_turn",
207
+ "length": "max_tokens",
208
+ "eos": "end_turn",
209
+ "load": "end_turn",
210
+ "unload": "end_turn",
211
+ }
212
+
213
+ done_reason = ollama_response.get("done_reason", "stop")
214
+
215
+ # Handle MiniMax quirk: content might be in 'reasoning' field
216
+ text_content = message.get("content", "")
217
+ if not text_content and message.get("reasoning"):
218
+ text_content = message.get("reasoning", "")
219
+
220
+ return {
221
+ "id": f"msg_{uuid.uuid4().hex[:10]}",
222
+ "type": "message",
223
+ "role": "assistant",
224
+ "content": [
225
+ {
226
+ "type": "text",
227
+ "text": text_content
228
+ }
229
+ ],
230
+ "model": original_model,
231
+ "stop_reason": stop_reason_map.get(done_reason, "end_turn"),
232
+ "stop_sequence": None,
233
+ "usage": {
234
+ "input_tokens": ollama_response.get("prompt_eval_count", 0),
235
+ "output_tokens": ollama_response.get("eval_count", 0)
236
+ }
237
+ }
238
+
239
+ async def stream_anthropic(ollama_stream, original_model: str):
240
+ """Convert Ollama streaming to Anthropic SSE format"""
241
+
242
+ message_id = f"msg_{uuid.uuid4().hex[:10]}"
243
+
244
+ # Send message_start
245
+ message_start_data = {
246
+ 'type': 'message_start',
247
+ 'message': {
248
+ 'id': message_id,
249
+ 'type': 'message',
250
+ 'role': 'assistant',
251
+ 'model': original_model,
252
+ 'content': [],
253
+ 'stop_reason': None,
254
+ 'stop_sequence': None,
255
+ 'usage': {'input_tokens': 0, 'output_tokens': 0}
256
+ }
257
+ }
258
+ yield f"data: {json.dumps(message_start_data)}\n\n"
259
+
260
+ # Send content_block_start
261
+ content_block_start_data = {
262
+ 'type': 'content_block_start',
263
+ 'index': 0,
264
+ 'content_block': {'type': 'text'}
265
+ }
266
+ yield f"data: {json.dumps(content_block_start_data)}\n\n"
267
+
268
+ input_tokens = 0
269
+ output_tokens = 0
270
+ stop_reason = "end_turn"
271
+
272
+ # Stream content
273
+ async for line in ollama_stream:
274
+ if line.startswith("data: "):
275
+ data_str = line[6:]
276
+ try:
277
+ data = json.loads(data_str)
278
+
279
+ if data.get("done", False):
280
+ input_tokens = data.get("prompt_eval_count", 0)
281
+ output_tokens = data.get("eval_count", 0)
282
+ stop_reason = data.get("done_reason", "stop")
283
+ continue
284
+
285
+ message = data.get("message", {})
286
+ content = message.get("content", "")
287
+
288
+ # Handle MiniMax quirk: content might be in 'reasoning' field
289
+ if not content and message.get("reasoning"):
290
+ content = message.get("reasoning", "")
291
+
292
+ if content:
293
+ # Send text_delta
294
+ content_block_delta_data = {
295
+ 'type': 'content_block_delta',
296
+ 'index': 0,
297
+ 'delta': {
298
+ 'type': 'text_delta',
299
+ 'text': content
300
+ }
301
+ }
302
+ yield f"data: {json.dumps(content_block_delta_data)}\n\n"
303
+ except json.JSONDecodeError:
304
+ continue
305
+
306
+ # Send content_block_stop
307
+ content_block_stop_data = {
308
+ 'type': 'content_block_stop',
309
+ 'index': 0
310
+ }
311
+ yield f"data: {json.dumps(content_block_stop_data)}\n\n"
312
+
313
+ # Map stop reason
314
+ stop_reason_map = {
315
+ "stop": "end_turn",
316
+ "length": "max_tokens",
317
+ "eos": "end_turn",
318
+ }
319
+
320
+ # Send message_delta
321
+ message_delta_data = {
322
+ 'type': 'message_delta',
323
+ 'delta': {
324
+ 'stop_reason': stop_reason_map.get(stop_reason, "end_turn"),
325
+ 'stop_sequence': None
326
+ },
327
+ 'usage': {'output_tokens': output_tokens}
328
+ }
329
+ yield f"data: {json.dumps(message_delta_data)}\n\n"
330
+
331
+ # Send message_stop
332
+ message_stop_data = {'type': 'message_stop'}
333
+ yield f"data: {json.dumps(message_stop_data)}\n\n"
334
+
335
+ # ==========================================
336
+ # ENDPOINTS
337
+ # ==========================================
338
+ @app.get("/")
339
+ def root():
340
+ return {
341
+ "status": "ok",
342
+ "total_keys_loaded": len(OLLAMA_KEYS),
343
+ "keys_status": {
344
+ v["prefix"]: {
345
+ "status": "BUSY" if v["in_use"] else "IDLE",
346
+ "healthy": v["healthy"],
347
+ "success": v["success"],
348
+ "failures": v["failures"]
349
+ } for v in key_status.values()
350
+ }
351
+ }
352
+
353
+ @app.get("/v1/models")
354
+ async def list_models(request: Request):
355
+ # Validate auth
356
+ auth_key = request.headers.get("Authorization", "").replace("Bearer ", "")
357
+ if auth_key != MASTER_API_KEY:
358
+ return JSONResponse(
359
+ {"error": {"type": "authentication_error", "message": "Unauthorized"}},
360
+ status_code=401
361
+ )
362
+
363
+ # Proxy to Ollama /api/tags
364
+ async with httpx.AsyncClient(timeout=30.0) as client:
365
+ try:
366
+ resp = await client.get(
367
+ f"{BASE_URL}/api/tags",
368
+ headers={"Authorization": f"Bearer {OLLAMA_KEYS[0]}"}
369
+ )
370
+
371
+ if resp.status_code != 200:
372
+ return JSONResponse(
373
+ {"error": {"type": "api_error", "message": "Failed to fetch models"}},
374
+ status_code=resp.status_code
375
+ )
376
+
377
+ ollama_data = resp.json()
378
+
379
+ # Convert to OpenAI format
380
+ models = []
381
+ created_time = int(time.time())
382
+
383
+ for model in ollama_data.get("models", []):
384
+ models.append({
385
+ "id": model.get("name", model.get("model", "")),
386
+ "object": "model",
387
+ "created": created_time,
388
+ "owned_by": "ollama"
389
+ })
390
+
391
+ return {"object": "list", "data": models}
392
+
393
+ except Exception as e:
394
+ log(f"Error fetching models: {e}")
395
+ return JSONResponse(
396
+ {"error": {"type": "api_error", "message": str(e)}},
397
+ status_code=500
398
+ )
399
+
400
+ @app.post("/v1/messages")
401
+ async def anthropic_chat(request: Request):
402
+ # Validate auth
403
+ auth_key = request.headers.get("Authorization", "").replace("Bearer ", "")
404
+ if auth_key != MASTER_API_KEY:
405
+ return anthropic_error("authentication_error", "Unauthorized", 401)
406
+
407
+ try:
408
+ body = await request.json()
409
+ except ClientDisconnect:
410
+ log("Client kabur sebelum proxy selesai membaca request body.")
411
+ return Response(status_code=499)
412
+ except json.JSONDecodeError:
413
+ return anthropic_error("invalid_request_error", "Invalid JSON", 400)
414
+
415
+ is_stream = body.get("stream", False)
416
+ original_model = body.get("model", "claude-sonnet-4-6")
417
+
418
+ # Convert to Ollama format
419
+ ollama_body = anthropic_to_ollama(body)
420
+
421
+ # ==========================================
422
+ # LOGIKA NON-STREAM
423
+ # ==========================================
424
+ if not is_stream:
425
+ tried_keys = set()
426
+ for attempt in range(len(OLLAMA_KEYS)):
427
+ if len(tried_keys) >= len(OLLAMA_KEYS):
428
+ tried_keys.clear()
429
+
430
+ key = None
431
+ log("Menunggu API Key idle (Antrean Non-Stream)...")
432
+
433
+ # Antrean Tanpa Batas Waktu
434
+ while True:
435
+ if await request.is_disconnected():
436
+ log("Client membatalkan request saat mengantre (Non-Stream).")
437
+ return Response(status_code=499)
438
+
439
+ # Gunakan fungsi Atomic Lock
440
+ key = get_and_lock_key(exclude_keys=tried_keys)
441
+ if key:
442
+ break # Langsung keluar loop, key SUDAH DIKUNCI
443
+
444
+ await asyncio.sleep(0.5) # Cek tiap setengah detik
445
+
446
+ ki = key_status[key]
447
+ tried_keys.add(key)
448
+ log(f"LOCK ACQUIRED: key#{ki['index']} (Non-Stream)")
449
+
450
+ try:
451
+ async with httpx.AsyncClient(timeout=120.0) as client:
452
+ resp = await client.post(
453
+ f"{BASE_URL}/v1/chat/completions",
454
+ json=ollama_body,
455
+ headers={"Authorization": f"Bearer {key}"}
456
+ )
457
+ if resp.status_code == 200:
458
+ ki["success"] += 1
459
+ ki["failures"] = 0
460
+
461
+ # Convert response to Anthropic format
462
+ ollama_response = resp.json()
463
+ anthropic_response = ollama_to_anthropic(ollama_response, original_model)
464
+
465
+ ki["in_use"] = False
466
+ log(f"RELEASE: key#{ki['index']} (Non-Stream)")
467
+ return JSONResponse(anthropic_response)
468
+ elif resp.status_code == 429:
469
+ ki["failures"] += 1
470
+ ki["healthy"] = False
471
+ log(f"RATE LIMIT: key#{ki['index']} - Skip ke key berikutnya.")
472
+ continue
473
+ else:
474
+ ki["failures"] += 1
475
+ continue
476
+ except Exception as e:
477
+ ki["failures"] += 1
478
+ log(f"Error Non-Stream: {e}")
479
+ continue
480
+ finally:
481
+ ki["in_use"] = False # SELALU LEPAS KUNCI
482
+ log(f"RELEASE: key#{ki['index']} (Non-Stream)")
483
+
484
+ return JSONResponse({"error": "All keys failed after multiple attempts"}, status_code=500)
485
+
486
+ # ==========================================
487
+ # LOGIKA STREAMING
488
+ # ==========================================
489
+ async def stream_generator():
490
+ current_body = ollama_body.copy()
491
+ generated_text_buffer = ""
492
+ tried_keys = set()
493
+
494
+ for attempt in range(len(OLLAMA_KEYS)):
495
+ if len(tried_keys) >= len(OLLAMA_KEYS):
496
+ tried_keys.clear()
497
+
498
+ key = None
499
+ if attempt == 0:
500
+ log("Menunggu API Key idle (Antrean Stream Baru)...")
501
+ else:
502
+ log(f"Menunggu API Key idle (Antrean Fallback ke-{attempt})...")
503
+
504
+ # Antrean Tanpa Batas Waktu
505
+ while True:
506
+ if await request.is_disconnected():
507
+ log("Client membatalkan request saat mengantre stream.")
508
+ return
509
+
510
+ # Gunakan fungsi Atomic Lock
511
+ key = get_and_lock_key(exclude_keys=tried_keys)
512
+ if key:
513
+ break # Langsung keluar loop, key SUDAH DIKUNCI
514
+
515
+ await asyncio.sleep(0.5)
516
+
517
+ ki = key_status[key]
518
+ tried_keys.add(key)
519
+ log(f"STREAM LOCK ACQUIRED: key#{ki['index']}")
520
+
521
+ if generated_text_buffer:
522
+ log(f"Resuming stream. Injecting {len(generated_text_buffer)} chars.")
523
+ messages = current_body.get("messages", [])
524
+ if messages and messages[-1].get("role") == "assistant":
525
+ messages[-1]["content"] = generated_text_buffer
526
+ else:
527
+ messages.append({"role": "assistant", "content": generated_text_buffer})
528
+ current_body["messages"] = messages
529
+
530
+ try:
531
+ custom_timeout = httpx.Timeout(connect=15.0, read=None, write=15.0, pool=10.0)
532
+ async with httpx.AsyncClient(timeout=custom_timeout) as client:
533
+ async with client.stream(
534
+ "POST", f"{BASE_URL}/v1/chat/completions",
535
+ json=current_body, headers={"Authorization": f"Bearer {key}"}
536
+ ) as response:
537
+
538
+ if response.status_code == 429:
539
+ ki["failures"] += 1
540
+ ki["healthy"] = False
541
+ log(f"STREAM 429: key#{ki['index']} - Switching key...")
542
+ continue
543
+
544
+ if response.status_code != 200:
545
+ ki["failures"] += 1
546
+ log(f"STREAM ERR {response.status_code}: key#{ki['index']} - Switching key...")
547
+ continue
548
+
549
+ stream_interrupted = False
550
+ try:
551
+ # Convert Ollama stream to Anthropic SSE
552
+ async for chunk in stream_anthropic(response.aiter_lines(), original_model):
553
+ yield chunk
554
+
555
+ ki["success"] += 1
556
+ ki["failures"] = 0
557
+ return
558
+
559
+ except (httpx.ReadTimeout, httpx.ReadError, httpx.RemoteProtocolError) as e:
560
+ log(f"STREAM PUTUS: key#{ki['index']}. Buffering...")
561
+ ki["failures"] += 1
562
+ stream_interrupted = True
563
+
564
+ if not stream_interrupted:
565
+ return
566
+
567
+ except Exception as e:
568
+ ki["failures"] += 1
569
+ log(f"STREAM EXCEPTION: key#{ki['index']} - {e}")
570
+ continue
571
+
572
+ finally:
573
+ # SELALU LEPAS KUNCI
574
+ ki["in_use"] = False
575
+ log(f"STREAM RELEASE: key#{ki['index']}")
576
+
577
+ yield f"data: {json.dumps({'error': 'Stream failed completely'})}\n\ndata: [DONE]\n\n"
578
+
579
+ return StreamingResponse(stream_generator(), media_type="text/event-stream")
580
+
581
+ @app.post("/v1/chat/completions")
582
+ async def chat(req: Request):
583
+ auth_key = req.headers.get("Authorization", "").replace("Bearer ", "")
584
+ if auth_key != MASTER_API_KEY:
585
+ return JSONResponse({"error": "Unauthorized"}, status_code=401)
586
+
587
+ # Tangkap error jika client kabur (ClientDisconnect)
588
+ try:
589
+ body = await req.json()
590
+ except ClientDisconnect:
591
+ log("Client kabur sebelum proxy selesai membaca request body.")
592
+ return Response(status_code=499)
593
+ except json.JSONDecodeError:
594
+ return JSONResponse({"error": "Invalid JSON body"}, status_code=400)
595
+
596
+ is_stream = body.get("stream", False)
597
+
598
+ # ==========================================
599
+ # LOGIKA NON-STREAM
600
+ # ==========================================
601
+ if not is_stream:
602
+ tried_keys = set()
603
+ for attempt in range(len(OLLAMA_KEYS)):
604
+ if len(tried_keys) >= len(OLLAMA_KEYS):
605
+ tried_keys.clear()
606
+
607
+ key = None
608
+ log("Menunggu API Key idle (Antrean Non-Stream)...")
609
+
610
+ # Antrean Tanpa Batas Waktu
611
+ while True:
612
+ if await req.is_disconnected():
613
+ log("Client membatalkan request saat mengantre (Non-Stream).")
614
+ return Response(status_code=499)
615
+
616
+ # Gunakan fungsi Atomic Lock
617
+ key = get_and_lock_key(exclude_keys=tried_keys)
618
+ if key:
619
+ break # Langsung keluar loop, key SUDAH DIKUNCI
620
+
621
+ await asyncio.sleep(0.5) # Cek tiap setengah detik
622
+
623
+ ki = key_status[key]
624
+ tried_keys.add(key)
625
+ log(f"LOCK ACQUIRED: key#{ki['index']} (Non-Stream)")
626
+
627
+ try:
628
+ async with httpx.AsyncClient(timeout=120.0) as client:
629
+ resp = await client.post(
630
+ f"{BASE_URL}/v1/chat/completions",
631
+ json=body,
632
+ headers={"Authorization": f"Bearer {key}"}
633
+ )
634
+ if resp.status_code == 200:
635
+ ki["success"] += 1
636
+ ki["failures"] = 0
637
+ return Response(content=resp.content, media_type=resp.headers.get("content-type"))
638
+ elif resp.status_code == 429:
639
+ ki["failures"] += 1
640
+ ki["healthy"] = False
641
+ log(f"RATE LIMIT: key#{ki['index']} - Skip ke key berikutnya.")
642
+ continue
643
+ else:
644
+ ki["failures"] += 1
645
+ continue
646
+ except Exception as e:
647
+ ki["failures"] += 1
648
+ log(f"Error Non-Stream: {e}")
649
+ continue
650
+ finally:
651
+ ki["in_use"] = False # SELALU LEPAS KUNCI
652
+ log(f"RELEASE: key#{ki['index']} (Non-Stream)")
653
+
654
+ return JSONResponse({"error": "All keys failed after multiple attempts"}, status_code=500)
655
+
656
+ # ==========================================
657
+ # LOGIKA STREAMING (Seamless Fallback + Queue)
658
+ # ==========================================
659
+ async def stream_generator():
660
+ current_body = body.copy()
661
+ current_body["messages"] = [msg.copy() for msg in body.get("messages", [])]
662
+
663
+ generated_text_buffer = ""
664
+ tried_keys = set()
665
+
666
+ for attempt in range(len(OLLAMA_KEYS)):
667
+ if len(tried_keys) >= len(OLLAMA_KEYS):
668
+ tried_keys.clear()
669
+
670
+ key = None
671
+ if attempt == 0:
672
+ log("Menunggu API Key idle (Antrean Stream Baru)...")
673
+ else:
674
+ log(f"Menunggu API Key idle (Antrean Fallback ke-{attempt})...")
675
+
676
+ # Antrean Tanpa Batas Waktu
677
+ while True:
678
+ if await req.is_disconnected():
679
+ log("Client membatalkan request saat mengantre stream.")
680
+ return
681
+
682
+ # Gunakan fungsi Atomic Lock
683
+ key = get_and_lock_key(exclude_keys=tried_keys)
684
+ if key:
685
+ break # Langsung keluar loop, key SUDAH DIKUNCI
686
+
687
+ await asyncio.sleep(0.5)
688
+
689
+ ki = key_status[key]
690
+ tried_keys.add(key)
691
+ log(f"STREAM LOCK ACQUIRED: key#{ki['index']}")
692
+
693
+ if generated_text_buffer:
694
+ log(f"Resuming stream. Injecting {len(generated_text_buffer)} chars.")
695
+ messages = current_body.get("messages", [])
696
+ if messages and messages[-1].get("role") == "assistant":
697
+ messages[-1]["content"] = generated_text_buffer
698
+ else:
699
+ messages.append({"role": "assistant", "content": generated_text_buffer})
700
+ current_body["messages"] = messages
701
+
702
+ try:
703
+ custom_timeout = httpx.Timeout(connect=15.0, read=None, write=15.0, pool=10.0)
704
+ async with httpx.AsyncClient(timeout=custom_timeout) as client:
705
+ async with client.stream(
706
+ "POST", f"{BASE_URL}/v1/chat/completions",
707
+ json=current_body, headers={"Authorization": f"Bearer {key}"}
708
+ ) as response:
709
+
710
+ if response.status_code == 429:
711
+ ki["failures"] += 1
712
+ ki["healthy"] = False
713
+ log(f"STREAM 429: key#{ki['index']} - Switching key...")
714
+ continue
715
+
716
+ if response.status_code != 200:
717
+ ki["failures"] += 1
718
+ log(f"STREAM ERR {response.status_code}: key#{ki['index']} - Switching key...")
719
+ continue
720
+
721
+ stream_interrupted = False
722
+ try:
723
+ async for chunk in response.aiter_lines():
724
+ if chunk:
725
+ if chunk.startswith("data: "):
726
+ data_str = chunk[6:]
727
+ if data_str.strip() == "[DONE]":
728
+ ki["success"] += 1
729
+ ki["failures"] = 0
730
+ yield chunk + "\n\n"
731
+ return
732
+ try:
733
+ data_json = json.loads(data_str)
734
+ if "choices" in data_json and len(data_json["choices"]) > 0:
735
+ delta = data_json["choices"][0].get("delta", {})
736
+ content = delta.get("content", "")
737
+ if content:
738
+ generated_text_buffer += content
739
+ except json.JSONDecodeError:
740
+ pass
741
+ yield chunk + "\n\n"
742
+
743
+ except (httpx.ReadTimeout, httpx.ReadError, httpx.RemoteProtocolError) as e:
744
+ log(f"STREAM PUTUS: key#{ki['index']}. Buffering...")
745
+ ki["failures"] += 1
746
+ stream_interrupted = True
747
+
748
+ if not stream_interrupted:
749
+ return
750
+
751
+ except Exception as e:
752
+ ki["failures"] += 1
753
+ log(f"STREAM EXCEPTION: key#{ki['index']} - {e}")
754
+ continue
755
+
756
+ finally:
757
+ # SELALU LEPAS KUNCI
758
+ ki["in_use"] = False
759
+ log(f"STREAM RELEASE: key#{ki['index']}")
760
+
761
+ yield f"data: {json.dumps({'error': 'Stream failed completely'})}\n\ndata: [DONE]\n\n"
762
+
763
+ return StreamingResponse(stream_generator(), media_type="text/event-stream")