Elysiadev11 commited on
Commit
82d935a
·
verified ·
1 Parent(s): 99ef059

Replace proxy_cerebras.py with updated app.py code with Claude endpoint support

Browse files
Files changed (1) hide show
  1. proxy_cerebras.py +754 -0
proxy_cerebras.py ADDED
@@ -0,0 +1,754 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import httpx
3
+ from fastapi import FastAPI, Request
4
+ from fastapi.responses import JSONResponse, Response, StreamingResponse
5
+ from starlette.requests import ClientDisconnect
6
+ import time
7
+ import json
8
+ import asyncio
9
+ import uuid
10
+
11
+ app = FastAPI()
12
+
13
+ # ==========================================
14
+ # KONFIGURASI & LOAD KEYS
15
+ # ==========================================
16
+ BASE_URL = os.getenv("BASE_URL", "https://elysiadev11-proxyollma.hf.space")
17
+ MASTER_API_KEY = os.getenv("MASTER_API_KEY", "olla")
18
+
19
+ # Default model mapping (Claude → MiniMax)
20
+ DEFAULT_MODEL_MAPPING = {
21
+ # Opus models
22
+ "claude-opus-4-7": "minimax-m2.7:cloud",
23
+ "claude-opus-4-6": "minimax-m2.7:cloud",
24
+ "claude-opus-4-5": "minimax-m2.7:cloud",
25
+ "claude-opus-4-1": "minimax-m2.7:cloud",
26
+ "claude-opus-4-20250514": "minimax-m2.7:cloud",
27
+
28
+ # Sonnet models
29
+ "claude-sonnet-4-6": "minimax-m2.7:cloud",
30
+ "claude-sonnet-4-5": "minimax-m2.7:cloud",
31
+ "claude-sonnet-4-20250514": "minimax-m2.7:cloud",
32
+
33
+ # Haiku models
34
+ "claude-haiku-4-5": "minimax-m2.7:cloud",
35
+ "claude-haiku-4-5-20251001": "minimax-m2.7:cloud",
36
+ }
37
+
38
+ # Load model mapping dari ENV
39
+ def load_model_mapping():
40
+ mapping = DEFAULT_MODEL_MAPPING.copy()
41
+
42
+ env_map = os.getenv("CLAUDE_MODEL_MAP")
43
+ if env_map:
44
+ for pair in env_map.split(","):
45
+ if ":" in pair:
46
+ parts = pair.split(":", 1)
47
+ if len(parts) == 2:
48
+ claude_model = parts[0].strip()
49
+ ollama_model = parts[1].strip()
50
+ mapping[claude_model] = ollama_model
51
+
52
+ return mapping
53
+
54
+ def map_model(claude_model: str) -> str:
55
+ """Map Claude model name to Ollama model"""
56
+ model_mapping = load_model_mapping()
57
+
58
+ # Try exact match first
59
+ if claude_model in model_mapping:
60
+ return model_mapping[claude_model]
61
+
62
+ # Fallback based on model family
63
+ if "opus" in claude_model.lower():
64
+ return os.getenv("DEFAULT_OPUS_MODEL", "minimax-m2.7:cloud")
65
+
66
+ if "haiku" in claude_model.lower():
67
+ return os.getenv("DEFAULT_HAIKU_MODEL", "minimax-m2.7:cloud")
68
+
69
+ # Default to Sonnet model
70
+ return os.getenv("DEFAULT_SONNET_MODEL", "minimax-m2.7:cloud")
71
+
72
+ OLLAMA_KEYS = []
73
+ # Mendukung hingga 100 API Key (OLLAMA_KEY_1 sampai OLLAMA_KEY_100)
74
+ for i in range(1, 101):
75
+ key = os.getenv(f"OLLAMA_KEY_{i}")
76
+ if key:
77
+ OLLAMA_KEYS.append(key)
78
+
79
+ if not OLLAMA_KEYS:
80
+ OLLAMA_KEYS.append("ollam") # Dummy key jika ENV kosong
81
+
82
+ # Inisialisasi Status Key
83
+ # Round-Robin Index for load balancing
84
+ last_used_index = 0
85
+
86
+ key_status = {}
87
+ for idx, k in enumerate(OLLAMA_KEYS, 1):
88
+ key_status[k] = {
89
+ "index": idx,
90
+ "prefix": k[:8] + "...",
91
+ "failures": 0,
92
+ "success": 0,
93
+ "healthy": True,
94
+ "in_use": False # Fitur Lock: 1 Key = 1 Request
95
+ }
96
+
97
+ def log(msg):
98
+ print(f"[{time.strftime('%H:%M:%S')}] {msg}")
99
+
100
+ def get_and_lock_key(exclude_keys=None):
101
+ """
102
+ Round-Robin + Atomic Lock: Pilih key berurutan dari last_used_index.
103
+ Ini memastikan burst request terdistribusi merata ke semua key.
104
+ """
105
+ global last_used_index
106
+
107
+ if exclude_keys is None:
108
+ exclude_keys = set()
109
+
110
+ # Cek apakah semua key mati? Jika ya, reset semuanya
111
+ if not any(v["healthy"] for v in key_status.values()):
112
+ log("⚠️ Semua API Key berstatus mati/unhealthy. Melakukan RESET MASSAL...")
113
+ for v in key_status.values():
114
+ v["failures"] = 0
115
+ v["healthy"] = True
116
+ last_used_index = 0
117
+
118
+ # Round-robin: cari key berurutan dari last_used_index
119
+ for i in range(len(OLLAMA_KEYS)):
120
+ idx = (last_used_index + i) % len(OLLAMA_KEYS)
121
+ key = OLLAMA_KEYS[idx]
122
+
123
+ if key_status[key]["healthy"] and not key_status[key]["in_use"] and key not in exclude_keys:
124
+ last_used_index = idx
125
+ key_status[key]["in_use"] = True
126
+ return key
127
+
128
+ return None
129
+
130
+ def anthropic_error(error_type: str, message: str, status_code: int = 400):
131
+ """Format error in Anthropic style"""
132
+ return JSONResponse(
133
+ {
134
+ "type": "error",
135
+ "error": {
136
+ "type": error_type,
137
+ "message": message
138
+ }
139
+ },
140
+ status_code=status_code
141
+ )
142
+
143
+ def anthropic_to_ollama(body: dict) -> dict:
144
+ """Convert Anthropic request to Ollama format"""
145
+
146
+ # Build messages array
147
+ messages = []
148
+
149
+ # Add system message if exists
150
+ if body.get("system"):
151
+ messages.append({
152
+ "role": "system",
153
+ "content": body["system"]
154
+ })
155
+
156
+ # Add conversation messages
157
+ for msg in body.get("messages", []):
158
+ # Handle content blocks (Anthropic support array or string)
159
+ content = msg["content"]
160
+ if isinstance(content, list):
161
+ # Extract text from content blocks
162
+ text_content = ""
163
+ for block in content:
164
+ if block.get("type") == "text":
165
+ text_content += block.get("text", "")
166
+ content = text_content
167
+
168
+ messages.append({
169
+ "role": msg["role"],
170
+ "content": content
171
+ })
172
+
173
+ # Map model
174
+ ollama_model = map_model(body.get("model", "claude-sonnet-4-6"))
175
+
176
+ # Build Ollama request
177
+ ollama_body = {
178
+ "model": ollama_model,
179
+ "messages": messages,
180
+ "stream": body.get("stream", False),
181
+ "options": {}
182
+ }
183
+
184
+ # Add optional parameters
185
+ if "max_tokens" in body:
186
+ ollama_body["options"]["num_predict"] = body["max_tokens"]
187
+
188
+ if "temperature" in body:
189
+ ollama_body["options"]["temperature"] = body["temperature"]
190
+
191
+ if "top_p" in body:
192
+ ollama_body["options"]["top_p"] = body["top_p"]
193
+
194
+ if "top_k" in body:
195
+ ollama_body["options"]["top_k"] = body["top_k"]
196
+
197
+ return ollama_body
198
+
199
+ def ollama_to_anthropic(ollama_response: dict, original_model: str) -> dict:
200
+ """Convert Ollama response to Anthropic format"""
201
+
202
+ message = ollama_response.get("message", {})
203
+
204
+ # Map stop reasons
205
+ stop_reason_map = {
206
+ "stop": "end_turn",
207
+ "length": "max_tokens",
208
+ "eos": "end_turn",
209
+ "load": "end_turn",
210
+ "unload": "end_turn",
211
+ }
212
+
213
+ done_reason = ollama_response.get("done_reason", "stop")
214
+
215
+ return {
216
+ "id": f"msg_{uuid.uuid4().hex[:10]}",
217
+ "type": "message",
218
+ "role": "assistant",
219
+ "content": [
220
+ {
221
+ "type": "text",
222
+ "text": message.get("content", "")
223
+ }
224
+ ],
225
+ "model": original_model,
226
+ "stop_reason": stop_reason_map.get(done_reason, "end_turn"),
227
+ "stop_sequence": None,
228
+ "usage": {
229
+ "input_tokens": ollama_response.get("prompt_eval_count", 0),
230
+ "output_tokens": ollama_response.get("eval_count", 0)
231
+ }
232
+ }
233
+
234
+ async def stream_anthropic(ollama_stream, original_model: str):
235
+ """Convert Ollama streaming to Anthropic SSE format"""
236
+
237
+ message_id = f"msg_{uuid.uuid4().hex[:10]}"
238
+
239
+ # Send message_start
240
+ message_start_data = {
241
+ 'type': 'message_start',
242
+ 'message': {
243
+ 'id': message_id,
244
+ 'type': 'message',
245
+ 'role': 'assistant',
246
+ 'model': original_model,
247
+ 'content': [],
248
+ 'stop_reason': None,
249
+ 'stop_sequence': None,
250
+ 'usage': {'input_tokens': 0, 'output_tokens': 0}
251
+ }
252
+ }
253
+ yield f"data: {json.dumps(message_start_data)}\n\n"
254
+
255
+ # Send content_block_start
256
+ content_block_start_data = {
257
+ 'type': 'content_block_start',
258
+ 'index': 0,
259
+ 'content_block': {'type': 'text'}
260
+ }
261
+ yield f"data: {json.dumps(content_block_start_data)}\n\n"
262
+
263
+ input_tokens = 0
264
+ output_tokens = 0
265
+ stop_reason = "end_turn"
266
+
267
+ # Stream content
268
+ async for line in ollama_stream:
269
+ if line.startswith("data: "):
270
+ data_str = line[6:]
271
+ try:
272
+ data = json.loads(data_str)
273
+
274
+ if data.get("done", False):
275
+ input_tokens = data.get("prompt_eval_count", 0)
276
+ output_tokens = data.get("eval_count", 0)
277
+ stop_reason = data.get("done_reason", "stop")
278
+ continue
279
+
280
+ message = data.get("message", {})
281
+ content = message.get("content", "")
282
+
283
+ if content:
284
+ # Send text_delta
285
+ content_block_delta_data = {
286
+ 'type': 'content_block_delta',
287
+ 'index': 0,
288
+ 'delta': {
289
+ 'type': 'text_delta',
290
+ 'text': content
291
+ }
292
+ }
293
+ yield f"data: {json.dumps(content_block_delta_data)}\n\n"
294
+ except json.JSONDecodeError:
295
+ continue
296
+
297
+ # Send content_block_stop
298
+ content_block_stop_data = {
299
+ 'type': 'content_block_stop',
300
+ 'index': 0
301
+ }
302
+ yield f"data: {json.dumps(content_block_stop_data)}\n\n"
303
+
304
+ # Map stop reason
305
+ stop_reason_map = {
306
+ "stop": "end_turn",
307
+ "length": "max_tokens",
308
+ "eos": "end_turn",
309
+ }
310
+
311
+ # Send message_delta
312
+ message_delta_data = {
313
+ 'type': 'message_delta',
314
+ 'delta': {
315
+ 'stop_reason': stop_reason_map.get(stop_reason, "end_turn"),
316
+ 'stop_sequence': None
317
+ },
318
+ 'usage': {'output_tokens': output_tokens}
319
+ }
320
+ yield f"data: {json.dumps(message_delta_data)}\n\n"
321
+
322
+ # Send message_stop
323
+ message_stop_data = {'type': 'message_stop'}
324
+ yield f"data: {json.dumps(message_stop_data)}\n\n"
325
+
326
+ # ==========================================
327
+ # ENDPOINTS
328
+ # ==========================================
329
+ @app.get("/")
330
+ def root():
331
+ return {
332
+ "status": "ok",
333
+ "total_keys_loaded": len(OLLAMA_KEYS),
334
+ "keys_status": {
335
+ v["prefix"]: {
336
+ "status": "BUSY" if v["in_use"] else "IDLE",
337
+ "healthy": v["healthy"],
338
+ "success": v["success"],
339
+ "failures": v["failures"]
340
+ } for v in key_status.values()
341
+ }
342
+ }
343
+
344
+ @app.get("/v1/models")
345
+ async def list_models(request: Request):
346
+ # Validate auth
347
+ auth_key = request.headers.get("Authorization", "").replace("Bearer ", "")
348
+ if auth_key != MASTER_API_KEY:
349
+ return JSONResponse(
350
+ {"error": {"type": "authentication_error", "message": "Unauthorized"}},
351
+ status_code=401
352
+ )
353
+
354
+ # Proxy to Ollama /api/tags
355
+ async with httpx.AsyncClient(timeout=30.0) as client:
356
+ try:
357
+ resp = await client.get(
358
+ f"{BASE_URL}/api/tags",
359
+ headers={"Authorization": f"Bearer {OLLAMA_KEYS[0]}"}
360
+ )
361
+
362
+ if resp.status_code != 200:
363
+ return JSONResponse(
364
+ {"error": {"type": "api_error", "message": "Failed to fetch models"}},
365
+ status_code=resp.status_code
366
+ )
367
+
368
+ ollama_data = resp.json()
369
+
370
+ # Convert to OpenAI format
371
+ models = []
372
+ created_time = int(time.time())
373
+
374
+ for model in ollama_data.get("models", []):
375
+ models.append({
376
+ "id": model.get("name", model.get("model", "")),
377
+ "object": "model",
378
+ "created": created_time,
379
+ "owned_by": "ollama"
380
+ })
381
+
382
+ return {"object": "list", "data": models}
383
+
384
+ except Exception as e:
385
+ log(f"Error fetching models: {e}")
386
+ return JSONResponse(
387
+ {"error": {"type": "api_error", "message": str(e)}},
388
+ status_code=500
389
+ )
390
+
391
+ @app.post("/v1/messages")
392
+ async def anthropic_chat(request: Request):
393
+ # Validate auth
394
+ auth_key = request.headers.get("Authorization", "").replace("Bearer ", "")
395
+ if auth_key != MASTER_API_KEY:
396
+ return anthropic_error("authentication_error", "Unauthorized", 401)
397
+
398
+ try:
399
+ body = await request.json()
400
+ except ClientDisconnect:
401
+ log("Client kabur sebelum proxy selesai membaca request body.")
402
+ return Response(status_code=499)
403
+ except json.JSONDecodeError:
404
+ return anthropic_error("invalid_request_error", "Invalid JSON", 400)
405
+
406
+ is_stream = body.get("stream", False)
407
+ original_model = body.get("model", "claude-sonnet-4-6")
408
+
409
+ # Convert to Ollama format
410
+ ollama_body = anthropic_to_ollama(body)
411
+
412
+ # ==========================================
413
+ # LOGIKA NON-STREAM
414
+ # ==========================================
415
+ if not is_stream:
416
+ tried_keys = set()
417
+ for attempt in range(len(OLLAMA_KEYS)):
418
+ if len(tried_keys) >= len(OLLAMA_KEYS):
419
+ tried_keys.clear()
420
+
421
+ key = None
422
+ log("Menunggu API Key idle (Antrean Non-Stream)...")
423
+
424
+ # Antrean Tanpa Batas Waktu
425
+ while True:
426
+ if await request.is_disconnected():
427
+ log("Client membatalkan request saat mengantre (Non-Stream).")
428
+ return Response(status_code=499)
429
+
430
+ # Gunakan fungsi Atomic Lock
431
+ key = get_and_lock_key(exclude_keys=tried_keys)
432
+ if key:
433
+ break # Langsung keluar loop, key SUDAH DIKUNCI
434
+
435
+ await asyncio.sleep(0.5) # Cek tiap setengah detik
436
+
437
+ ki = key_status[key]
438
+ tried_keys.add(key)
439
+ log(f"LOCK ACQUIRED: key#{ki['index']} (Non-Stream)")
440
+
441
+ try:
442
+ async with httpx.AsyncClient(timeout=120.0) as client:
443
+ resp = await client.post(
444
+ f"{BASE_URL}/v1/chat/completions",
445
+ json=ollama_body,
446
+ headers={"Authorization": f"Bearer {key}"}
447
+ )
448
+ if resp.status_code == 200:
449
+ ki["success"] += 1
450
+ ki["failures"] = 0
451
+
452
+ # Convert response to Anthropic format
453
+ ollama_response = resp.json()
454
+ anthropic_response = ollama_to_anthropic(ollama_response, original_model)
455
+
456
+ ki["in_use"] = False
457
+ log(f"RELEASE: key#{ki['index']} (Non-Stream)")
458
+ return JSONResponse(anthropic_response)
459
+ elif resp.status_code == 429:
460
+ ki["failures"] += 1
461
+ ki["healthy"] = False
462
+ log(f"RATE LIMIT: key#{ki['index']} - Skip ke key berikutnya.")
463
+ continue
464
+ else:
465
+ ki["failures"] += 1
466
+ continue
467
+ except Exception as e:
468
+ ki["failures"] += 1
469
+ log(f"Error Non-Stream: {e}")
470
+ continue
471
+ finally:
472
+ ki["in_use"] = False # SELALU LEPAS KUNCI
473
+ log(f"RELEASE: key#{ki['index']} (Non-Stream)")
474
+
475
+ return JSONResponse({"error": "All keys failed after multiple attempts"}, status_code=500)
476
+
477
+ # ==========================================
478
+ # LOGIKA STREAMING
479
+ # ==========================================
480
+ async def stream_generator():
481
+ current_body = ollama_body.copy()
482
+ generated_text_buffer = ""
483
+ tried_keys = set()
484
+
485
+ for attempt in range(len(OLLAMA_KEYS)):
486
+ if len(tried_keys) >= len(OLLAMA_KEYS):
487
+ tried_keys.clear()
488
+
489
+ key = None
490
+ if attempt == 0:
491
+ log("Menunggu API Key idle (Antrean Stream Baru)...")
492
+ else:
493
+ log(f"Menunggu API Key idle (Antrean Fallback ke-{attempt})...")
494
+
495
+ # Antrean Tanpa Batas Waktu
496
+ while True:
497
+ if await request.is_disconnected():
498
+ log("Client membatalkan request saat mengantre stream.")
499
+ return
500
+
501
+ # Gunakan fungsi Atomic Lock
502
+ key = get_and_lock_key(exclude_keys=tried_keys)
503
+ if key:
504
+ break # Langsung keluar loop, key SUDAH DIKUNCI
505
+
506
+ await asyncio.sleep(0.5)
507
+
508
+ ki = key_status[key]
509
+ tried_keys.add(key)
510
+ log(f"STREAM LOCK ACQUIRED: key#{ki['index']}")
511
+
512
+ if generated_text_buffer:
513
+ log(f"Resuming stream. Injecting {len(generated_text_buffer)} chars.")
514
+ messages = current_body.get("messages", [])
515
+ if messages and messages[-1].get("role") == "assistant":
516
+ messages[-1]["content"] = generated_text_buffer
517
+ else:
518
+ messages.append({"role": "assistant", "content": generated_text_buffer})
519
+ current_body["messages"] = messages
520
+
521
+ try:
522
+ custom_timeout = httpx.Timeout(connect=15.0, read=None, write=15.0, pool=10.0)
523
+ async with httpx.AsyncClient(timeout=custom_timeout) as client:
524
+ async with client.stream(
525
+ "POST", f"{BASE_URL}/v1/chat/completions",
526
+ json=current_body, headers={"Authorization": f"Bearer {key}"}
527
+ ) as response:
528
+
529
+ if response.status_code == 429:
530
+ ki["failures"] += 1
531
+ ki["healthy"] = False
532
+ log(f"STREAM 429: key#{ki['index']} - Switching key...")
533
+ continue
534
+
535
+ if response.status_code != 200:
536
+ ki["failures"] += 1
537
+ log(f"STREAM ERR {response.status_code}: key#{ki['index']} - Switching key...")
538
+ continue
539
+
540
+ stream_interrupted = False
541
+ try:
542
+ # Convert Ollama stream to Anthropic SSE
543
+ async for chunk in stream_anthropic(response.aiter_lines(), original_model):
544
+ yield chunk
545
+
546
+ ki["success"] += 1
547
+ ki["failures"] = 0
548
+ return
549
+
550
+ except (httpx.ReadTimeout, httpx.ReadError, httpx.RemoteProtocolError) as e:
551
+ log(f"STREAM PUTUS: key#{ki['index']}. Buffering...")
552
+ ki["failures"] += 1
553
+ stream_interrupted = True
554
+
555
+ if not stream_interrupted:
556
+ return
557
+
558
+ except Exception as e:
559
+ ki["failures"] += 1
560
+ log(f"STREAM EXCEPTION: key#{ki['index']} - {e}")
561
+ continue
562
+
563
+ finally:
564
+ # SELALU LEPAS KUNCI
565
+ ki["in_use"] = False
566
+ log(f"STREAM RELEASE: key#{ki['index']}")
567
+
568
+ yield f"data: {json.dumps({'error': 'Stream failed completely'})}\n\ndata: [DONE]\n\n"
569
+
570
+ return StreamingResponse(stream_generator(), media_type="text/event-stream")
571
+
572
+ @app.post("/v1/chat/completions")
573
+ async def chat(req: Request):
574
+ auth_key = req.headers.get("Authorization", "").replace("Bearer ", "")
575
+ if auth_key != MASTER_API_KEY:
576
+ return JSONResponse({"error": "Unauthorized"}, status_code=401)
577
+
578
+ # Tangkap error jika client kabur (ClientDisconnect)
579
+ try:
580
+ body = await req.json()
581
+ except ClientDisconnect:
582
+ log("Client kabur sebelum proxy selesai membaca request body.")
583
+ return Response(status_code=499)
584
+ except json.JSONDecodeError:
585
+ return JSONResponse({"error": "Invalid JSON body"}, status_code=400)
586
+
587
+ is_stream = body.get("stream", False)
588
+
589
+ # ==========================================
590
+ # LOGIKA NON-STREAM
591
+ # ==========================================
592
+ if not is_stream:
593
+ tried_keys = set()
594
+ for attempt in range(len(OLLAMA_KEYS)):
595
+ if len(tried_keys) >= len(OLLAMA_KEYS):
596
+ tried_keys.clear()
597
+
598
+ key = None
599
+ log("Menunggu API Key idle (Antrean Non-Stream)...")
600
+
601
+ # Antrean Tanpa Batas Waktu
602
+ while True:
603
+ if await req.is_disconnected():
604
+ log("Client membatalkan request saat mengantre (Non-Stream).")
605
+ return Response(status_code=499)
606
+
607
+ # Gunakan fungsi Atomic Lock
608
+ key = get_and_lock_key(exclude_keys=tried_keys)
609
+ if key:
610
+ break # Langsung keluar loop, key SUDAH DIKUNCI
611
+
612
+ await asyncio.sleep(0.5) # Cek tiap setengah detik
613
+
614
+ ki = key_status[key]
615
+ tried_keys.add(key)
616
+ log(f"LOCK ACQUIRED: key#{ki['index']} (Non-Stream)")
617
+
618
+ try:
619
+ async with httpx.AsyncClient(timeout=120.0) as client:
620
+ resp = await client.post(
621
+ f"{BASE_URL}/v1/chat/completions",
622
+ json=body,
623
+ headers={"Authorization": f"Bearer {key}"}
624
+ )
625
+ if resp.status_code == 200:
626
+ ki["success"] += 1
627
+ ki["failures"] = 0
628
+ return Response(content=resp.content, media_type=resp.headers.get("content-type"))
629
+ elif resp.status_code == 429:
630
+ ki["failures"] += 1
631
+ ki["healthy"] = False
632
+ log(f"RATE LIMIT: key#{ki['index']} - Skip ke key berikutnya.")
633
+ continue
634
+ else:
635
+ ki["failures"] += 1
636
+ continue
637
+ except Exception as e:
638
+ ki["failures"] += 1
639
+ log(f"Error Non-Stream: {e}")
640
+ continue
641
+ finally:
642
+ ki["in_use"] = False # SELALU LEPAS KUNCI
643
+ log(f"RELEASE: key#{ki['index']} (Non-Stream)")
644
+
645
+ return JSONResponse({"error": "All keys failed after multiple attempts"}, status_code=500)
646
+
647
+ # ==========================================
648
+ # LOGIKA STREAMING (Seamless Fallback + Queue)
649
+ # ==========================================
650
+ async def stream_generator():
651
+ current_body = body.copy()
652
+ current_body["messages"] = [msg.copy() for msg in body.get("messages", [])]
653
+
654
+ generated_text_buffer = ""
655
+ tried_keys = set()
656
+
657
+ for attempt in range(len(OLLAMA_KEYS)):
658
+ if len(tried_keys) >= len(OLLAMA_KEYS):
659
+ tried_keys.clear()
660
+
661
+ key = None
662
+ if attempt == 0:
663
+ log("Menunggu API Key idle (Antrean Stream Baru)...")
664
+ else:
665
+ log(f"Menunggu API Key idle (Antrean Fallback ke-{attempt})...")
666
+
667
+ # Antrean Tanpa Batas Waktu
668
+ while True:
669
+ if await req.is_disconnected():
670
+ log("Client membatalkan request saat mengantre stream.")
671
+ return
672
+
673
+ # Gunakan fungsi Atomic Lock
674
+ key = get_and_lock_key(exclude_keys=tried_keys)
675
+ if key:
676
+ break # Langsung keluar loop, key SUDAH DIKUNCI
677
+
678
+ await asyncio.sleep(0.5)
679
+
680
+ ki = key_status[key]
681
+ tried_keys.add(key)
682
+ log(f"STREAM LOCK ACQUIRED: key#{ki['index']}")
683
+
684
+ if generated_text_buffer:
685
+ log(f"Resuming stream. Injecting {len(generated_text_buffer)} chars.")
686
+ messages = current_body.get("messages", [])
687
+ if messages and messages[-1].get("role") == "assistant":
688
+ messages[-1]["content"] = generated_text_buffer
689
+ else:
690
+ messages.append({"role": "assistant", "content": generated_text_buffer})
691
+ current_body["messages"] = messages
692
+
693
+ try:
694
+ custom_timeout = httpx.Timeout(connect=15.0, read=None, write=15.0, pool=10.0)
695
+ async with httpx.AsyncClient(timeout=custom_timeout) as client:
696
+ async with client.stream(
697
+ "POST", f"{BASE_URL}/v1/chat/completions",
698
+ json=current_body, headers={"Authorization": f"Bearer {key}"}
699
+ ) as response:
700
+
701
+ if response.status_code == 429:
702
+ ki["failures"] += 1
703
+ ki["healthy"] = False
704
+ log(f"STREAM 429: key#{ki['index']} - Switching key...")
705
+ continue
706
+
707
+ if response.status_code != 200:
708
+ ki["failures"] += 1
709
+ log(f"STREAM ERR {response.status_code}: key#{ki['index']} - Switching key...")
710
+ continue
711
+
712
+ stream_interrupted = False
713
+ try:
714
+ async for chunk in response.aiter_lines():
715
+ if chunk:
716
+ if chunk.startswith("data: "):
717
+ data_str = chunk[6:]
718
+ if data_str.strip() == "[DONE]":
719
+ ki["success"] += 1
720
+ ki["failures"] = 0
721
+ yield chunk + "\n\n"
722
+ return
723
+ try:
724
+ data_json = json.loads(data_str)
725
+ if "choices" in data_json and len(data_json["choices"]) > 0:
726
+ delta = data_json["choices"][0].get("delta", {})
727
+ content = delta.get("content", "")
728
+ if content:
729
+ generated_text_buffer += content
730
+ except json.JSONDecodeError:
731
+ pass
732
+ yield chunk + "\n\n"
733
+
734
+ except (httpx.ReadTimeout, httpx.ReadError, httpx.RemoteProtocolError) as e:
735
+ log(f"STREAM PUTUS: key#{ki['index']}. Buffering...")
736
+ ki["failures"] += 1
737
+ stream_interrupted = True
738
+
739
+ if not stream_interrupted:
740
+ return
741
+
742
+ except Exception as e:
743
+ ki["failures"] += 1
744
+ log(f"STREAM EXCEPTION: key#{ki['index']} - {e}")
745
+ continue
746
+
747
+ finally:
748
+ # SELALU LEPAS KUNCI
749
+ ki["in_use"] = False
750
+ log(f"STREAM RELEASE: key#{ki['index']}")
751
+
752
+ yield f"data: {json.dumps({'error': 'Stream failed completely'})}\n\ndata: [DONE]\n\n"
753
+
754
+ return StreamingResponse(stream_generator(), media_type="text/event-stream")