Elysiadev11 commited on
Commit
c51d9d6
·
verified ·
1 Parent(s): 0efd016

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +84 -47
app.py CHANGED
@@ -79,15 +79,17 @@ if not API_KEYS:
79
  # =====================================================
80
  # KEY STATUS & ROUND ROBIN
81
  # =====================================================
 
 
82
  key_status = {}
83
  for idx, k in enumerate(API_KEYS, 1):
84
  key_status[k] = {
85
  "index": idx,
86
  "prefix": k[:8] + "..." if len(k) > 8 else k,
87
- "healthy": True,
88
  "busy": False,
89
  "success": 0,
90
  "fail": 0,
 
91
  }
92
 
93
  rr_index = 0
@@ -222,50 +224,66 @@ def is_rate_limited_error_body(text: str) -> bool:
222
  # =====================================================
223
  # KEY MANAGEMENT
224
  # =====================================================
225
- async def get_key(exclude=None):
 
 
 
 
 
 
 
226
  global rr_index
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
227
  if exclude is None:
228
  exclude = set()
229
-
230
  async with _key_lock:
231
- # Reset semua key kalau semuanya unhealthy
232
- if not any(v["healthy"] for v in key_status.values()):
233
- log("⚠️ All API Keys unhealthy. Resetting all...")
234
- for v in key_status.values():
235
- v["fail"] = 0
236
- v["healthy"] = True
237
-
238
- for _ in range(len(API_KEYS)):
239
- rr_index = (rr_index + 1) % len(API_KEYS)
240
- key = API_KEYS[rr_index]
241
- st = key_status[key]
242
-
243
- if st["healthy"] and not st["busy"] and key not in exclude:
244
- st["busy"] = True
245
- return key
246
-
247
- return None
248
 
249
  async def release_key(key):
250
  async with _key_lock:
251
  if key in key_status:
252
  key_status[key]["busy"] = False
253
 
 
 
 
 
 
 
 
 
 
 
254
  async def mark_fail(key):
255
  async with _key_lock:
256
  if key in key_status:
257
  key_status[key]["fail"] += 1
258
- if key_status[key]["fail"] >= 3:
259
- key_status[key]["healthy"] = False
260
 
261
  async def mark_ok(key):
262
  async with _key_lock:
263
  if key in key_status:
264
  key_status[key]["success"] += 1
265
  key_status[key]["fail"] = 0
266
- key_status[key]["healthy"] = True
267
 
268
- async def wait_for_free_key(exclude=None, max_wait=60.0, interval=0.3):
 
269
  elapsed = 0.0
270
  while elapsed < max_wait:
271
  key = await get_key(exclude)
@@ -277,21 +295,39 @@ async def wait_for_free_key(exclude=None, max_wait=60.0, interval=0.3):
277
 
278
  async def get_key_infinite(exclude=None):
279
  """
280
- Tunggu key tanpa batas waktu (infinite). Reset exclude setiap full cycle.
281
- Dipakai supaya tidak pernah berhenti mencoba.
 
 
 
282
  """
283
  local_exclude = set(exclude) if exclude else set()
284
  cycle = 0
 
285
  while True:
286
- key = await get_key(exclude=local_exclude)
287
- if key:
288
- return key, local_exclude
 
 
 
 
 
 
 
 
 
 
 
289
 
290
- # Semua key dicoba, mulai cycle baru
291
- await asyncio.sleep(1.0) # jeda 1 detik antar cycle
292
- cycle += 1
293
- log(f"🔄 Key cycle #{cycle}, all keys tried/busy, retrying...")
294
- local_exclude.clear() # reset exclude supaya semua key dicoba lagi
 
 
 
295
 
296
 
297
  # =====================================================
@@ -469,11 +505,14 @@ def openai_response_to_anthropic(data: dict, original_model: str) -> dict:
469
  @app.get("/")
470
  async def root():
471
  async with _key_lock:
 
472
  keys_info = {}
473
  for k, v in key_status.items():
 
 
474
  keys_info[v["prefix"]] = {
475
- "status": "BUSY" if v["busy"] else "IDLE",
476
- "healthy": v["healthy"],
477
  "success": v["success"],
478
  "fail": v["fail"],
479
  }
@@ -484,6 +523,7 @@ async def root():
484
  "base_url": CEREBRAS_BASE_URL,
485
  "default_model": DEFAULT_MODEL,
486
  "max_request_tokens": MAX_REQUEST_TOKENS,
 
487
  "total_keys": len(API_KEYS),
488
  "keys": keys_info,
489
  }
@@ -587,8 +627,7 @@ async def chat(req: Request):
587
 
588
  if is_rate_limited_status(r.status_code) or (r.status_code != 200 and is_rate_limited_error_body(r.text)):
589
  log(f"RATE LIMITED: key#{ki['index']}")
590
- await mark_fail(key)
591
- await asyncio.sleep(2)
592
  continue
593
 
594
  if r.status_code != 200:
@@ -610,6 +649,7 @@ async def chat(req: Request):
610
 
611
  # -----------------------------------------
612
  # STREAM — infinite loop, tidak pernah stop
 
613
  # -----------------------------------------
614
  async def stream_gen():
615
  exclude = set()
@@ -630,8 +670,7 @@ async def chat(req: Request):
630
 
631
  if is_rate_limited_status(r.status_code):
632
  log(f"STREAM RATE LIMITED: key#{ki['index']}")
633
- await mark_fail(key)
634
- await asyncio.sleep(2)
635
  continue
636
 
637
  if r.status_code != 200:
@@ -649,7 +688,6 @@ async def chat(req: Request):
649
 
650
  raw = line[6:] if line.startswith("data: ") else line
651
 
652
- # Cek error CF hanya pada JSON error object, bukan model output
653
  try:
654
  j = json.loads(raw)
655
  if "error" in j and "choices" not in j:
@@ -663,12 +701,12 @@ async def chat(req: Request):
663
  yield line + "\n\n"
664
 
665
  if hit_limit:
666
- await mark_fail(key)
667
  continue
668
 
669
  yield "data: [DONE]\n\n"
670
  await mark_ok(key)
671
- return # sukses, keluar dari loop
672
 
673
  except Exception as e:
674
  log(f"STREAM EXCEPTION: key#{ki['index']} - {e}")
@@ -768,7 +806,7 @@ async def anthropic_messages(req: Request):
768
 
769
  if is_rate_limited_status(r.status_code) or (r.status_code != 200 and is_rate_limited_error_body(r.text)):
770
  log(f"RATE LIMITED: key#{ki['index']}")
771
- await mark_fail(key)
772
  continue
773
 
774
  if r.status_code != 200:
@@ -821,8 +859,7 @@ async def anthropic_messages(req: Request):
821
 
822
  if is_rate_limited_status(r.status_code):
823
  log(f"STREAM RATE LIMITED: key#{ki['index']}")
824
- await mark_fail(key)
825
- await asyncio.sleep(2)
826
  continue
827
 
828
  if r.status_code != 200:
@@ -953,7 +990,7 @@ async def anthropic_messages(req: Request):
953
  })
954
 
955
  if hit_limit:
956
- await mark_fail(key)
957
  continue
958
 
959
  # Tutup text block
 
79
  # =====================================================
80
  # KEY STATUS & ROUND ROBIN
81
  # =====================================================
82
+ RATE_LIMIT_COOLDOWN = int(os.getenv("RATE_LIMIT_COOLDOWN", "62")) # detik cooldown setelah rate limit
83
+
84
  key_status = {}
85
  for idx, k in enumerate(API_KEYS, 1):
86
  key_status[k] = {
87
  "index": idx,
88
  "prefix": k[:8] + "..." if len(k) > 8 else k,
 
89
  "busy": False,
90
  "success": 0,
91
  "fail": 0,
92
+ "rate_limited_until": 0.0, # timestamp epoch; 0 = tidak sedang cooldown
93
  }
94
 
95
  rr_index = 0
 
224
  # =====================================================
225
  # KEY MANAGEMENT
226
  # =====================================================
227
+ def _get_available_key(exclude: set) -> str | None:
228
+ """
229
+ Internal (sync, dipanggil dalam _key_lock): cari key yang:
230
+ 1. Tidak sedang busy
231
+ 2. Tidak sedang cooldown rate limit
232
+ 3. Tidak ada di exclude set
233
+ Round-robin.
234
+ """
235
  global rr_index
236
+ now = time.time()
237
+ for _ in range(len(API_KEYS)):
238
+ rr_index = (rr_index + 1) % len(API_KEYS)
239
+ key = API_KEYS[rr_index]
240
+ st = key_status[key]
241
+ if not st["busy"] and now >= st["rate_limited_until"] and key not in exclude:
242
+ st["busy"] = True
243
+ return key
244
+ return None
245
+
246
+ def _next_available_time() -> float:
247
+ """Kapan key pertama keluar dari cooldown (epoch seconds)."""
248
+ now = time.time()
249
+ times = [st["rate_limited_until"] for st in key_status.values() if st["rate_limited_until"] > now]
250
+ return min(times) if times else now
251
+
252
+ async def get_key(exclude=None):
253
  if exclude is None:
254
  exclude = set()
 
255
  async with _key_lock:
256
+ return _get_available_key(exclude)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
257
 
258
  async def release_key(key):
259
  async with _key_lock:
260
  if key in key_status:
261
  key_status[key]["busy"] = False
262
 
263
+ async def mark_rate_limited(key):
264
+ """Tandai key kena rate limit: set cooldown RATE_LIMIT_COOLDOWN detik."""
265
+ async with _key_lock:
266
+ if key in key_status:
267
+ until = time.time() + RATE_LIMIT_COOLDOWN
268
+ key_status[key]["rate_limited_until"] = until
269
+ key_status[key]["fail"] += 1
270
+ idx = key_status[key]["index"]
271
+ log(f"⏳ key#{idx} cooldown {RATE_LIMIT_COOLDOWN}s (sampai {time.strftime('%H:%M:%S', time.localtime(until))})")
272
+
273
  async def mark_fail(key):
274
  async with _key_lock:
275
  if key in key_status:
276
  key_status[key]["fail"] += 1
 
 
277
 
278
  async def mark_ok(key):
279
  async with _key_lock:
280
  if key in key_status:
281
  key_status[key]["success"] += 1
282
  key_status[key]["fail"] = 0
283
+ key_status[key]["rate_limited_until"] = 0.0
284
 
285
+ async def wait_for_free_key(exclude=None, max_wait=120.0, interval=0.5):
286
+ """Tunggu key tersedia, max max_wait detik."""
287
  elapsed = 0.0
288
  while elapsed < max_wait:
289
  key = await get_key(exclude)
 
295
 
296
  async def get_key_infinite(exclude=None):
297
  """
298
+ Tunggu key tanpa batas waktu (infinite).
299
+ - Kalau ada key tersedia: return langsung.
300
+ - Kalau semua key busy/cooldown: sleep TEPAT sampai key paling cepat ready,
301
+ lalu retry — tidak perlu hammering setiap 2 detik.
302
+ - exclude di-reset setiap full cycle supaya key bisa dipakai lagi.
303
  """
304
  local_exclude = set(exclude) if exclude else set()
305
  cycle = 0
306
+
307
  while True:
308
+ async with _key_lock:
309
+ key = _get_available_key(local_exclude)
310
+ if key:
311
+ return key, local_exclude
312
+
313
+ # Hitung berapa lama sampai key berikutnya ready
314
+ now = time.time()
315
+ next_ready = _next_available_time()
316
+ wait_sec = max(0.5, next_ready - now)
317
+
318
+ all_in_cooldown = all(
319
+ st["rate_limited_until"] > now or st["busy"]
320
+ for st in key_status.values()
321
+ )
322
 
323
+ if all_in_cooldown:
324
+ cycle += 1
325
+ log(f"⏳ Semua key cooldown. Tunggu {wait_sec:.1f}s sampai key berikutnya ready... (cycle #{cycle})")
326
+ local_exclude.clear() # reset exclude agar key dicoba lagi setelah cooldown
327
+ await asyncio.sleep(wait_sec)
328
+ else:
329
+ # Ada key yang sudah lewat cooldown tapi mungkin busy — tunggu sebentar
330
+ await asyncio.sleep(0.3)
331
 
332
 
333
  # =====================================================
 
505
  @app.get("/")
506
  async def root():
507
  async with _key_lock:
508
+ now = time.time()
509
  keys_info = {}
510
  for k, v in key_status.items():
511
+ rl_until = v["rate_limited_until"]
512
+ cooldown_remaining = max(0, rl_until - now)
513
  keys_info[v["prefix"]] = {
514
+ "status": "BUSY" if v["busy"] else ("COOLDOWN" if cooldown_remaining > 0 else "IDLE"),
515
+ "cooldown_remaining_sec": round(cooldown_remaining, 1) if cooldown_remaining > 0 else 0,
516
  "success": v["success"],
517
  "fail": v["fail"],
518
  }
 
523
  "base_url": CEREBRAS_BASE_URL,
524
  "default_model": DEFAULT_MODEL,
525
  "max_request_tokens": MAX_REQUEST_TOKENS,
526
+ "rate_limit_cooldown_sec": RATE_LIMIT_COOLDOWN,
527
  "total_keys": len(API_KEYS),
528
  "keys": keys_info,
529
  }
 
627
 
628
  if is_rate_limited_status(r.status_code) or (r.status_code != 200 and is_rate_limited_error_body(r.text)):
629
  log(f"RATE LIMITED: key#{ki['index']}")
630
+ await mark_rate_limited(key)
 
631
  continue
632
 
633
  if r.status_code != 200:
 
649
 
650
  # -----------------------------------------
651
  # STREAM — infinite loop, tidak pernah stop
652
+ # Ketika semua key cooldown, sleep TEPAT sampai key siap
653
  # -----------------------------------------
654
  async def stream_gen():
655
  exclude = set()
 
670
 
671
  if is_rate_limited_status(r.status_code):
672
  log(f"STREAM RATE LIMITED: key#{ki['index']}")
673
+ await mark_rate_limited(key)
 
674
  continue
675
 
676
  if r.status_code != 200:
 
688
 
689
  raw = line[6:] if line.startswith("data: ") else line
690
 
 
691
  try:
692
  j = json.loads(raw)
693
  if "error" in j and "choices" not in j:
 
701
  yield line + "\n\n"
702
 
703
  if hit_limit:
704
+ await mark_rate_limited(key)
705
  continue
706
 
707
  yield "data: [DONE]\n\n"
708
  await mark_ok(key)
709
+ return # sukses
710
 
711
  except Exception as e:
712
  log(f"STREAM EXCEPTION: key#{ki['index']} - {e}")
 
806
 
807
  if is_rate_limited_status(r.status_code) or (r.status_code != 200 and is_rate_limited_error_body(r.text)):
808
  log(f"RATE LIMITED: key#{ki['index']}")
809
+ await mark_rate_limited(key)
810
  continue
811
 
812
  if r.status_code != 200:
 
859
 
860
  if is_rate_limited_status(r.status_code):
861
  log(f"STREAM RATE LIMITED: key#{ki['index']}")
862
+ await mark_rate_limited(key)
 
863
  continue
864
 
865
  if r.status_code != 200:
 
990
  })
991
 
992
  if hit_limit:
993
+ await mark_rate_limited(key)
994
  continue
995
 
996
  # Tutup text block