Elysiadev11 commited on
Commit
933ca20
·
verified ·
1 Parent(s): 871b08f

Delete proxy_cerebras.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. proxy_cerebras.py +0 -763
proxy_cerebras.py DELETED
@@ -1,763 +0,0 @@
1
- import os
2
- import httpx
3
- from fastapi import FastAPI, Request
4
- from fastapi.responses import JSONResponse, Response, StreamingResponse
5
- from starlette.requests import ClientDisconnect
6
- import time
7
- import json
8
- import asyncio
9
- import uuid
10
-
11
- app = FastAPI()
12
-
13
- # ==========================================
14
- # KONFIGURASI & LOAD KEYS
15
- # ==========================================
16
- BASE_URL = os.getenv("BASE_URL", "https://elysiadev11-proxyollma.hf.space")
17
- MASTER_API_KEY = os.getenv("MASTER_API_KEY", "olla")
18
-
19
- # Default model mapping (Claude → MiniMax)
20
- DEFAULT_MODEL_MAPPING = {
21
- # Opus models
22
- "claude-opus-4-7": "minimax-m2.7:cloud",
23
- "claude-opus-4-6": "minimax-m2.7:cloud",
24
- "claude-opus-4-5": "minimax-m2.7:cloud",
25
- "claude-opus-4-1": "minimax-m2.7:cloud",
26
- "claude-opus-4-20250514": "minimax-m2.7:cloud",
27
-
28
- # Sonnet models
29
- "claude-sonnet-4-6": "minimax-m2.7:cloud",
30
- "claude-sonnet-4-5": "minimax-m2.7:cloud",
31
- "claude-sonnet-4-20250514": "minimax-m2.7:cloud",
32
-
33
- # Haiku models
34
- "claude-haiku-4-5": "minimax-m2.7:cloud",
35
- "claude-haiku-4-5-20251001": "minimax-m2.7:cloud",
36
- }
37
-
38
- # Load model mapping dari ENV
39
- def load_model_mapping():
40
- mapping = DEFAULT_MODEL_MAPPING.copy()
41
-
42
- env_map = os.getenv("CLAUDE_MODEL_MAP")
43
- if env_map:
44
- for pair in env_map.split(","):
45
- if ":" in pair:
46
- parts = pair.split(":", 1)
47
- if len(parts) == 2:
48
- claude_model = parts[0].strip()
49
- ollama_model = parts[1].strip()
50
- mapping[claude_model] = ollama_model
51
-
52
- return mapping
53
-
54
- def map_model(claude_model: str) -> str:
55
- """Map Claude model name to Ollama model"""
56
- model_mapping = load_model_mapping()
57
-
58
- # Try exact match first
59
- if claude_model in model_mapping:
60
- return model_mapping[claude_model]
61
-
62
- # Fallback based on model family
63
- if "opus" in claude_model.lower():
64
- return os.getenv("DEFAULT_OPUS_MODEL", "minimax-m2.7:cloud")
65
-
66
- if "haiku" in claude_model.lower():
67
- return os.getenv("DEFAULT_HAIKU_MODEL", "minimax-m2.7:cloud")
68
-
69
- # Default to Sonnet model
70
- return os.getenv("DEFAULT_SONNET_MODEL", "minimax-m2.7:cloud")
71
-
72
- OLLAMA_KEYS = []
73
- # Mendukung hingga 100 API Key (OLLAMA_KEY_1 sampai OLLAMA_KEY_100)
74
- for i in range(1, 101):
75
- key = os.getenv(f"OLLAMA_KEY_{i}")
76
- if key:
77
- OLLAMA_KEYS.append(key)
78
-
79
- if not OLLAMA_KEYS:
80
- OLLAMA_KEYS.append("ollam") # Dummy key jika ENV kosong
81
-
82
- # Inisialisasi Status Key
83
- # Round-Robin Index for load balancing
84
- last_used_index = 0
85
-
86
- key_status = {}
87
- for idx, k in enumerate(OLLAMA_KEYS, 1):
88
- key_status[k] = {
89
- "index": idx,
90
- "prefix": k[:8] + "...",
91
- "failures": 0,
92
- "success": 0,
93
- "healthy": True,
94
- "in_use": False # Fitur Lock: 1 Key = 1 Request
95
- }
96
-
97
- def log(msg):
98
- print(f"[{time.strftime('%H:%M:%S')}] {msg}")
99
-
100
- def get_and_lock_key(exclude_keys=None):
101
- """
102
- Round-Robin + Atomic Lock: Pilih key berurutan dari last_used_index.
103
- Ini memastikan burst request terdistribusi merata ke semua key.
104
- """
105
- global last_used_index
106
-
107
- if exclude_keys is None:
108
- exclude_keys = set()
109
-
110
- # Cek apakah semua key mati? Jika ya, reset semuanya
111
- if not any(v["healthy"] for v in key_status.values()):
112
- log("⚠️ Semua API Key berstatus mati/unhealthy. Melakukan RESET MASSAL...")
113
- for v in key_status.values():
114
- v["failures"] = 0
115
- v["healthy"] = True
116
- last_used_index = 0
117
-
118
- # Round-robin: cari key berurutan dari last_used_index
119
- for i in range(len(OLLAMA_KEYS)):
120
- idx = (last_used_index + i) % len(OLLAMA_KEYS)
121
- key = OLLAMA_KEYS[idx]
122
-
123
- if key_status[key]["healthy"] and not key_status[key]["in_use"] and key not in exclude_keys:
124
- last_used_index = idx
125
- key_status[key]["in_use"] = True
126
- return key
127
-
128
- return None
129
-
130
- def anthropic_error(error_type: str, message: str, status_code: int = 400):
131
- """Format error in Anthropic style"""
132
- return JSONResponse(
133
- {
134
- "type": "error",
135
- "error": {
136
- "type": error_type,
137
- "message": message
138
- }
139
- },
140
- status_code=status_code
141
- )
142
-
143
- def anthropic_to_ollama(body: dict) -> dict:
144
- """Convert Anthropic request to Ollama format"""
145
-
146
- # Build messages array
147
- messages = []
148
-
149
- # Add system message if exists
150
- if body.get("system"):
151
- messages.append({
152
- "role": "system",
153
- "content": body["system"]
154
- })
155
-
156
- # Add conversation messages
157
- for msg in body.get("messages", []):
158
- # Handle content blocks (Anthropic support array or string)
159
- content = msg["content"]
160
- if isinstance(content, list):
161
- # Extract text from content blocks
162
- text_content = ""
163
- for block in content:
164
- if block.get("type") == "text":
165
- text_content += block.get("text", "")
166
- content = text_content
167
-
168
- messages.append({
169
- "role": msg["role"],
170
- "content": content
171
- })
172
-
173
- # Map model
174
- ollama_model = map_model(body.get("model", "claude-sonnet-4-6"))
175
-
176
- # Build Ollama request
177
- ollama_body = {
178
- "model": ollama_model,
179
- "messages": messages,
180
- "stream": body.get("stream", False),
181
- "options": {}
182
- }
183
-
184
- # Add optional parameters
185
- if "max_tokens" in body:
186
- ollama_body["options"]["num_predict"] = body["max_tokens"]
187
-
188
- if "temperature" in body:
189
- ollama_body["options"]["temperature"] = body["temperature"]
190
-
191
- if "top_p" in body:
192
- ollama_body["options"]["top_p"] = body["top_p"]
193
-
194
- if "top_k" in body:
195
- ollama_body["options"]["top_k"] = body["top_k"]
196
-
197
- return ollama_body
198
-
199
- def ollama_to_anthropic(ollama_response: dict, original_model: str) -> dict:
200
- """Convert Ollama response to Anthropic format"""
201
-
202
- message = ollama_response.get("message", {})
203
-
204
- # Map stop reasons
205
- stop_reason_map = {
206
- "stop": "end_turn",
207
- "length": "max_tokens",
208
- "eos": "end_turn",
209
- "load": "end_turn",
210
- "unload": "end_turn",
211
- }
212
-
213
- done_reason = ollama_response.get("done_reason", "stop")
214
-
215
- # Handle MiniMax quirk: content might be in 'reasoning' field
216
- text_content = message.get("content", "")
217
- if not text_content and message.get("reasoning"):
218
- text_content = message.get("reasoning", "")
219
-
220
- return {
221
- "id": f"msg_{uuid.uuid4().hex[:10]}",
222
- "type": "message",
223
- "role": "assistant",
224
- "content": [
225
- {
226
- "type": "text",
227
- "text": text_content
228
- }
229
- ],
230
- "model": original_model,
231
- "stop_reason": stop_reason_map.get(done_reason, "end_turn"),
232
- "stop_sequence": None,
233
- "usage": {
234
- "input_tokens": ollama_response.get("prompt_eval_count", 0),
235
- "output_tokens": ollama_response.get("eval_count", 0)
236
- }
237
- }
238
-
239
- async def stream_anthropic(ollama_stream, original_model: str):
240
- """Convert Ollama streaming to Anthropic SSE format"""
241
-
242
- message_id = f"msg_{uuid.uuid4().hex[:10]}"
243
-
244
- # Send message_start
245
- message_start_data = {
246
- 'type': 'message_start',
247
- 'message': {
248
- 'id': message_id,
249
- 'type': 'message',
250
- 'role': 'assistant',
251
- 'model': original_model,
252
- 'content': [],
253
- 'stop_reason': None,
254
- 'stop_sequence': None,
255
- 'usage': {'input_tokens': 0, 'output_tokens': 0}
256
- }
257
- }
258
- yield f"data: {json.dumps(message_start_data)}\n\n"
259
-
260
- # Send content_block_start
261
- content_block_start_data = {
262
- 'type': 'content_block_start',
263
- 'index': 0,
264
- 'content_block': {'type': 'text'}
265
- }
266
- yield f"data: {json.dumps(content_block_start_data)}\n\n"
267
-
268
- input_tokens = 0
269
- output_tokens = 0
270
- stop_reason = "end_turn"
271
-
272
- # Stream content
273
- async for line in ollama_stream:
274
- if line.startswith("data: "):
275
- data_str = line[6:]
276
- try:
277
- data = json.loads(data_str)
278
-
279
- if data.get("done", False):
280
- input_tokens = data.get("prompt_eval_count", 0)
281
- output_tokens = data.get("eval_count", 0)
282
- stop_reason = data.get("done_reason", "stop")
283
- continue
284
-
285
- message = data.get("message", {})
286
- content = message.get("content", "")
287
-
288
- # Handle MiniMax quirk: content might be in 'reasoning' field
289
- if not content and message.get("reasoning"):
290
- content = message.get("reasoning", "")
291
-
292
- if content:
293
- # Send text_delta
294
- content_block_delta_data = {
295
- 'type': 'content_block_delta',
296
- 'index': 0,
297
- 'delta': {
298
- 'type': 'text_delta',
299
- 'text': content
300
- }
301
- }
302
- yield f"data: {json.dumps(content_block_delta_data)}\n\n"
303
- except json.JSONDecodeError:
304
- continue
305
-
306
- # Send content_block_stop
307
- content_block_stop_data = {
308
- 'type': 'content_block_stop',
309
- 'index': 0
310
- }
311
- yield f"data: {json.dumps(content_block_stop_data)}\n\n"
312
-
313
- # Map stop reason
314
- stop_reason_map = {
315
- "stop": "end_turn",
316
- "length": "max_tokens",
317
- "eos": "end_turn",
318
- }
319
-
320
- # Send message_delta
321
- message_delta_data = {
322
- 'type': 'message_delta',
323
- 'delta': {
324
- 'stop_reason': stop_reason_map.get(stop_reason, "end_turn"),
325
- 'stop_sequence': None
326
- },
327
- 'usage': {'output_tokens': output_tokens}
328
- }
329
- yield f"data: {json.dumps(message_delta_data)}\n\n"
330
-
331
- # Send message_stop
332
- message_stop_data = {'type': 'message_stop'}
333
- yield f"data: {json.dumps(message_stop_data)}\n\n"
334
-
335
- # ==========================================
336
- # ENDPOINTS
337
- # ==========================================
338
- @app.get("/")
339
- def root():
340
- return {
341
- "status": "ok",
342
- "total_keys_loaded": len(OLLAMA_KEYS),
343
- "keys_status": {
344
- v["prefix"]: {
345
- "status": "BUSY" if v["in_use"] else "IDLE",
346
- "healthy": v["healthy"],
347
- "success": v["success"],
348
- "failures": v["failures"]
349
- } for v in key_status.values()
350
- }
351
- }
352
-
353
- @app.get("/v1/models")
354
- async def list_models(request: Request):
355
- # Validate auth
356
- auth_key = request.headers.get("Authorization", "").replace("Bearer ", "")
357
- if auth_key != MASTER_API_KEY:
358
- return JSONResponse(
359
- {"error": {"type": "authentication_error", "message": "Unauthorized"}},
360
- status_code=401
361
- )
362
-
363
- # Proxy to Ollama /api/tags
364
- async with httpx.AsyncClient(timeout=30.0) as client:
365
- try:
366
- resp = await client.get(
367
- f"{BASE_URL}/api/tags",
368
- headers={"Authorization": f"Bearer {OLLAMA_KEYS[0]}"}
369
- )
370
-
371
- if resp.status_code != 200:
372
- return JSONResponse(
373
- {"error": {"type": "api_error", "message": "Failed to fetch models"}},
374
- status_code=resp.status_code
375
- )
376
-
377
- ollama_data = resp.json()
378
-
379
- # Convert to OpenAI format
380
- models = []
381
- created_time = int(time.time())
382
-
383
- for model in ollama_data.get("models", []):
384
- models.append({
385
- "id": model.get("name", model.get("model", "")),
386
- "object": "model",
387
- "created": created_time,
388
- "owned_by": "ollama"
389
- })
390
-
391
- return {"object": "list", "data": models}
392
-
393
- except Exception as e:
394
- log(f"Error fetching models: {e}")
395
- return JSONResponse(
396
- {"error": {"type": "api_error", "message": str(e)}},
397
- status_code=500
398
- )
399
-
400
- @app.post("/v1/messages")
401
- async def anthropic_chat(request: Request):
402
- # Validate auth
403
- auth_key = request.headers.get("Authorization", "").replace("Bearer ", "")
404
- if auth_key != MASTER_API_KEY:
405
- return anthropic_error("authentication_error", "Unauthorized", 401)
406
-
407
- try:
408
- body = await request.json()
409
- except ClientDisconnect:
410
- log("Client kabur sebelum proxy selesai membaca request body.")
411
- return Response(status_code=499)
412
- except json.JSONDecodeError:
413
- return anthropic_error("invalid_request_error", "Invalid JSON", 400)
414
-
415
- is_stream = body.get("stream", False)
416
- original_model = body.get("model", "claude-sonnet-4-6")
417
-
418
- # Convert to Ollama format
419
- ollama_body = anthropic_to_ollama(body)
420
-
421
- # ==========================================
422
- # LOGIKA NON-STREAM
423
- # ==========================================
424
- if not is_stream:
425
- tried_keys = set()
426
- for attempt in range(len(OLLAMA_KEYS)):
427
- if len(tried_keys) >= len(OLLAMA_KEYS):
428
- tried_keys.clear()
429
-
430
- key = None
431
- log("Menunggu API Key idle (Antrean Non-Stream)...")
432
-
433
- # Antrean Tanpa Batas Waktu
434
- while True:
435
- if await request.is_disconnected():
436
- log("Client membatalkan request saat mengantre (Non-Stream).")
437
- return Response(status_code=499)
438
-
439
- # Gunakan fungsi Atomic Lock
440
- key = get_and_lock_key(exclude_keys=tried_keys)
441
- if key:
442
- break # Langsung keluar loop, key SUDAH DIKUNCI
443
-
444
- await asyncio.sleep(0.5) # Cek tiap setengah detik
445
-
446
- ki = key_status[key]
447
- tried_keys.add(key)
448
- log(f"LOCK ACQUIRED: key#{ki['index']} (Non-Stream)")
449
-
450
- try:
451
- async with httpx.AsyncClient(timeout=120.0) as client:
452
- resp = await client.post(
453
- f"{BASE_URL}/v1/chat/completions",
454
- json=ollama_body,
455
- headers={"Authorization": f"Bearer {key}"}
456
- )
457
- if resp.status_code == 200:
458
- ki["success"] += 1
459
- ki["failures"] = 0
460
-
461
- # Convert response to Anthropic format
462
- ollama_response = resp.json()
463
- anthropic_response = ollama_to_anthropic(ollama_response, original_model)
464
-
465
- ki["in_use"] = False
466
- log(f"RELEASE: key#{ki['index']} (Non-Stream)")
467
- return JSONResponse(anthropic_response)
468
- elif resp.status_code == 429:
469
- ki["failures"] += 1
470
- ki["healthy"] = False
471
- log(f"RATE LIMIT: key#{ki['index']} - Skip ke key berikutnya.")
472
- continue
473
- else:
474
- ki["failures"] += 1
475
- continue
476
- except Exception as e:
477
- ki["failures"] += 1
478
- log(f"Error Non-Stream: {e}")
479
- continue
480
- finally:
481
- ki["in_use"] = False # SELALU LEPAS KUNCI
482
- log(f"RELEASE: key#{ki['index']} (Non-Stream)")
483
-
484
- return JSONResponse({"error": "All keys failed after multiple attempts"}, status_code=500)
485
-
486
- # ==========================================
487
- # LOGIKA STREAMING
488
- # ==========================================
489
- async def stream_generator():
490
- current_body = ollama_body.copy()
491
- generated_text_buffer = ""
492
- tried_keys = set()
493
-
494
- for attempt in range(len(OLLAMA_KEYS)):
495
- if len(tried_keys) >= len(OLLAMA_KEYS):
496
- tried_keys.clear()
497
-
498
- key = None
499
- if attempt == 0:
500
- log("Menunggu API Key idle (Antrean Stream Baru)...")
501
- else:
502
- log(f"Menunggu API Key idle (Antrean Fallback ke-{attempt})...")
503
-
504
- # Antrean Tanpa Batas Waktu
505
- while True:
506
- if await request.is_disconnected():
507
- log("Client membatalkan request saat mengantre stream.")
508
- return
509
-
510
- # Gunakan fungsi Atomic Lock
511
- key = get_and_lock_key(exclude_keys=tried_keys)
512
- if key:
513
- break # Langsung keluar loop, key SUDAH DIKUNCI
514
-
515
- await asyncio.sleep(0.5)
516
-
517
- ki = key_status[key]
518
- tried_keys.add(key)
519
- log(f"STREAM LOCK ACQUIRED: key#{ki['index']}")
520
-
521
- if generated_text_buffer:
522
- log(f"Resuming stream. Injecting {len(generated_text_buffer)} chars.")
523
- messages = current_body.get("messages", [])
524
- if messages and messages[-1].get("role") == "assistant":
525
- messages[-1]["content"] = generated_text_buffer
526
- else:
527
- messages.append({"role": "assistant", "content": generated_text_buffer})
528
- current_body["messages"] = messages
529
-
530
- try:
531
- custom_timeout = httpx.Timeout(connect=15.0, read=None, write=15.0, pool=10.0)
532
- async with httpx.AsyncClient(timeout=custom_timeout) as client:
533
- async with client.stream(
534
- "POST", f"{BASE_URL}/v1/chat/completions",
535
- json=current_body, headers={"Authorization": f"Bearer {key}"}
536
- ) as response:
537
-
538
- if response.status_code == 429:
539
- ki["failures"] += 1
540
- ki["healthy"] = False
541
- log(f"STREAM 429: key#{ki['index']} - Switching key...")
542
- continue
543
-
544
- if response.status_code != 200:
545
- ki["failures"] += 1
546
- log(f"STREAM ERR {response.status_code}: key#{ki['index']} - Switching key...")
547
- continue
548
-
549
- stream_interrupted = False
550
- try:
551
- # Convert Ollama stream to Anthropic SSE
552
- async for chunk in stream_anthropic(response.aiter_lines(), original_model):
553
- yield chunk
554
-
555
- ki["success"] += 1
556
- ki["failures"] = 0
557
- return
558
-
559
- except (httpx.ReadTimeout, httpx.ReadError, httpx.RemoteProtocolError) as e:
560
- log(f"STREAM PUTUS: key#{ki['index']}. Buffering...")
561
- ki["failures"] += 1
562
- stream_interrupted = True
563
-
564
- if not stream_interrupted:
565
- return
566
-
567
- except Exception as e:
568
- ki["failures"] += 1
569
- log(f"STREAM EXCEPTION: key#{ki['index']} - {e}")
570
- continue
571
-
572
- finally:
573
- # SELALU LEPAS KUNCI
574
- ki["in_use"] = False
575
- log(f"STREAM RELEASE: key#{ki['index']}")
576
-
577
- yield f"data: {json.dumps({'error': 'Stream failed completely'})}\n\ndata: [DONE]\n\n"
578
-
579
- return StreamingResponse(stream_generator(), media_type="text/event-stream")
580
-
581
- @app.post("/v1/chat/completions")
582
- async def chat(req: Request):
583
- auth_key = req.headers.get("Authorization", "").replace("Bearer ", "")
584
- if auth_key != MASTER_API_KEY:
585
- return JSONResponse({"error": "Unauthorized"}, status_code=401)
586
-
587
- # Tangkap error jika client kabur (ClientDisconnect)
588
- try:
589
- body = await req.json()
590
- except ClientDisconnect:
591
- log("Client kabur sebelum proxy selesai membaca request body.")
592
- return Response(status_code=499)
593
- except json.JSONDecodeError:
594
- return JSONResponse({"error": "Invalid JSON body"}, status_code=400)
595
-
596
- is_stream = body.get("stream", False)
597
-
598
- # ==========================================
599
- # LOGIKA NON-STREAM
600
- # ==========================================
601
- if not is_stream:
602
- tried_keys = set()
603
- for attempt in range(len(OLLAMA_KEYS)):
604
- if len(tried_keys) >= len(OLLAMA_KEYS):
605
- tried_keys.clear()
606
-
607
- key = None
608
- log("Menunggu API Key idle (Antrean Non-Stream)...")
609
-
610
- # Antrean Tanpa Batas Waktu
611
- while True:
612
- if await req.is_disconnected():
613
- log("Client membatalkan request saat mengantre (Non-Stream).")
614
- return Response(status_code=499)
615
-
616
- # Gunakan fungsi Atomic Lock
617
- key = get_and_lock_key(exclude_keys=tried_keys)
618
- if key:
619
- break # Langsung keluar loop, key SUDAH DIKUNCI
620
-
621
- await asyncio.sleep(0.5) # Cek tiap setengah detik
622
-
623
- ki = key_status[key]
624
- tried_keys.add(key)
625
- log(f"LOCK ACQUIRED: key#{ki['index']} (Non-Stream)")
626
-
627
- try:
628
- async with httpx.AsyncClient(timeout=120.0) as client:
629
- resp = await client.post(
630
- f"{BASE_URL}/v1/chat/completions",
631
- json=body,
632
- headers={"Authorization": f"Bearer {key}"}
633
- )
634
- if resp.status_code == 200:
635
- ki["success"] += 1
636
- ki["failures"] = 0
637
- return Response(content=resp.content, media_type=resp.headers.get("content-type"))
638
- elif resp.status_code == 429:
639
- ki["failures"] += 1
640
- ki["healthy"] = False
641
- log(f"RATE LIMIT: key#{ki['index']} - Skip ke key berikutnya.")
642
- continue
643
- else:
644
- ki["failures"] += 1
645
- continue
646
- except Exception as e:
647
- ki["failures"] += 1
648
- log(f"Error Non-Stream: {e}")
649
- continue
650
- finally:
651
- ki["in_use"] = False # SELALU LEPAS KUNCI
652
- log(f"RELEASE: key#{ki['index']} (Non-Stream)")
653
-
654
- return JSONResponse({"error": "All keys failed after multiple attempts"}, status_code=500)
655
-
656
- # ==========================================
657
- # LOGIKA STREAMING (Seamless Fallback + Queue)
658
- # ==========================================
659
- async def stream_generator():
660
- current_body = body.copy()
661
- current_body["messages"] = [msg.copy() for msg in body.get("messages", [])]
662
-
663
- generated_text_buffer = ""
664
- tried_keys = set()
665
-
666
- for attempt in range(len(OLLAMA_KEYS)):
667
- if len(tried_keys) >= len(OLLAMA_KEYS):
668
- tried_keys.clear()
669
-
670
- key = None
671
- if attempt == 0:
672
- log("Menunggu API Key idle (Antrean Stream Baru)...")
673
- else:
674
- log(f"Menunggu API Key idle (Antrean Fallback ke-{attempt})...")
675
-
676
- # Antrean Tanpa Batas Waktu
677
- while True:
678
- if await req.is_disconnected():
679
- log("Client membatalkan request saat mengantre stream.")
680
- return
681
-
682
- # Gunakan fungsi Atomic Lock
683
- key = get_and_lock_key(exclude_keys=tried_keys)
684
- if key:
685
- break # Langsung keluar loop, key SUDAH DIKUNCI
686
-
687
- await asyncio.sleep(0.5)
688
-
689
- ki = key_status[key]
690
- tried_keys.add(key)
691
- log(f"STREAM LOCK ACQUIRED: key#{ki['index']}")
692
-
693
- if generated_text_buffer:
694
- log(f"Resuming stream. Injecting {len(generated_text_buffer)} chars.")
695
- messages = current_body.get("messages", [])
696
- if messages and messages[-1].get("role") == "assistant":
697
- messages[-1]["content"] = generated_text_buffer
698
- else:
699
- messages.append({"role": "assistant", "content": generated_text_buffer})
700
- current_body["messages"] = messages
701
-
702
- try:
703
- custom_timeout = httpx.Timeout(connect=15.0, read=None, write=15.0, pool=10.0)
704
- async with httpx.AsyncClient(timeout=custom_timeout) as client:
705
- async with client.stream(
706
- "POST", f"{BASE_URL}/v1/chat/completions",
707
- json=current_body, headers={"Authorization": f"Bearer {key}"}
708
- ) as response:
709
-
710
- if response.status_code == 429:
711
- ki["failures"] += 1
712
- ki["healthy"] = False
713
- log(f"STREAM 429: key#{ki['index']} - Switching key...")
714
- continue
715
-
716
- if response.status_code != 200:
717
- ki["failures"] += 1
718
- log(f"STREAM ERR {response.status_code}: key#{ki['index']} - Switching key...")
719
- continue
720
-
721
- stream_interrupted = False
722
- try:
723
- async for chunk in response.aiter_lines():
724
- if chunk:
725
- if chunk.startswith("data: "):
726
- data_str = chunk[6:]
727
- if data_str.strip() == "[DONE]":
728
- ki["success"] += 1
729
- ki["failures"] = 0
730
- yield chunk + "\n\n"
731
- return
732
- try:
733
- data_json = json.loads(data_str)
734
- if "choices" in data_json and len(data_json["choices"]) > 0:
735
- delta = data_json["choices"][0].get("delta", {})
736
- content = delta.get("content", "")
737
- if content:
738
- generated_text_buffer += content
739
- except json.JSONDecodeError:
740
- pass
741
- yield chunk + "\n\n"
742
-
743
- except (httpx.ReadTimeout, httpx.ReadError, httpx.RemoteProtocolError) as e:
744
- log(f"STREAM PUTUS: key#{ki['index']}. Buffering...")
745
- ki["failures"] += 1
746
- stream_interrupted = True
747
-
748
- if not stream_interrupted:
749
- return
750
-
751
- except Exception as e:
752
- ki["failures"] += 1
753
- log(f"STREAM EXCEPTION: key#{ki['index']} - {e}")
754
- continue
755
-
756
- finally:
757
- # SELALU LEPAS KUNCI
758
- ki["in_use"] = False
759
- log(f"STREAM RELEASE: key#{ki['index']}")
760
-
761
- yield f"data: {json.dumps({'error': 'Stream failed completely'})}\n\ndata: [DONE]\n\n"
762
-
763
- return StreamingResponse(stream_generator(), media_type="text/event-stream")