Elysiadev11 commited on
Commit
56972cd
·
verified ·
1 Parent(s): 93a977b

Update proxy_cf.py

Browse files
Files changed (1) hide show
  1. proxy_cf.py +105 -39
proxy_cf.py CHANGED
@@ -80,10 +80,6 @@ CF_AI_BASE = "https://api.cloudflare.com/client/v4/accounts/{account_id}/ai/v1"
80
  def cf_base(account_id: str) -> str:
81
  return CF_AI_BASE.format(account_id=account_id)
82
 
83
- def cf_url(account_id: str, model: str) -> str:
84
- # Legacy /run endpoint (kept for fallback)
85
- return f"https://api.cloudflare.com/client/v4/accounts/{account_id}/ai/run/{model}"
86
-
87
 
88
  async def get_key(exclude=None):
89
  global rr_index
@@ -99,7 +95,7 @@ async def get_key(exclude=None):
99
 
100
  if st["healthy"] and not st["busy"] and kid not in exclude:
101
  st["busy"] = True
102
- return acc # returns dict {"account_id": ..., "api_key": ...}
103
 
104
  return None
105
 
@@ -137,9 +133,55 @@ async def wait_for_free_key(exclude=None, max_wait=30.0, interval=0.3):
137
  return None
138
 
139
 
140
- def is_rate_limited(status_code: int, text: str) -> bool:
 
 
 
 
 
 
 
 
 
 
141
  t = text.lower()
142
- return status_code == 429 or "rate limit" in t or "too many requests" in t or "usage limit" in t
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
143
 
144
 
145
  # =====================================================
@@ -175,7 +217,6 @@ async def models(req: Request):
175
  if not auth_ok(req):
176
  return JSONResponse({"error": "Unauthorized"}, status_code=401)
177
 
178
- # Pakai account pertama yang healthy, tidak perlu mark busy
179
  acc = None
180
  async with _key_lock:
181
  for a in CF_ACCOUNTS:
@@ -196,7 +237,6 @@ async def models(req: Request):
196
  if r.status_code != 200:
197
  return JSONResponse({"error": f"CF returned {r.status_code}: {r.text}"}, status_code=r.status_code)
198
 
199
- # CF sudah return OpenAI-compatible format, langsung forward
200
  return Response(content=r.content, media_type="application/json")
201
 
202
  except Exception as e:
@@ -219,12 +259,10 @@ async def chat(req: Request):
219
 
220
  is_stream = body.get("stream", False)
221
  model = body.get("model", DEFAULT_CF_MODEL)
222
-
223
- # Pass body as-is ke CF — CF OpenAI-compatible endpoint terima format sama persis
224
  cf_body = {**body, "model": model}
225
 
226
  # -----------------------------------------
227
- # NON STREAM — forward response langsung
228
  # -----------------------------------------
229
  if not is_stream:
230
  tried = set()
@@ -247,7 +285,10 @@ async def chat(req: Request):
247
  }
248
  )
249
 
250
- if is_rate_limited(r.status_code, r.text):
 
 
 
251
  log(f"Account {acc['account_id'][:8]}... rate limited (non-stream), trying next")
252
  await mark_fail(acc)
253
  continue
@@ -258,7 +299,6 @@ async def chat(req: Request):
258
  continue
259
 
260
  await mark_ok(acc)
261
- # CF OpenAI-compatible → langsung forward, tidak perlu konversi
262
  return Response(content=r.content, media_type="application/json")
263
 
264
  except Exception as e:
@@ -271,7 +311,7 @@ async def chat(req: Request):
271
  return JSONResponse({"error": "All accounts failed"}, status_code=500)
272
 
273
  # -----------------------------------------
274
- # STREAM — CF kirim SSE OpenAI-format, langsung pipe ke client
275
  # -----------------------------------------
276
  async def gen():
277
  tried = set()
@@ -295,7 +335,8 @@ async def chat(req: Request):
295
  }
296
  ) as r:
297
 
298
- if is_rate_limited(r.status_code, ""):
 
299
  log(f"Account {acc['account_id'][:8]}... rate limited (stream), trying next")
300
  await mark_fail(acc)
301
  continue
@@ -314,14 +355,16 @@ async def chat(req: Request):
314
  if line.strip() == "data: [DONE]":
315
  break
316
 
317
- # Detect mid-stream rate limit dalam payload
318
  raw = line[6:] if line.startswith("data: ") else line
319
- if is_rate_limited(0, raw):
320
- log(f"Account {acc['account_id'][:8]}... mid-stream limit, switching key")
 
 
 
 
321
  hit_limit = True
322
  break
323
 
324
- # CF OpenAI-compatible SSE → pipe langsung ke client
325
  yield line + "\n\n"
326
 
327
  if hit_limit:
@@ -409,7 +452,10 @@ async def anthropic(req: Request):
409
  }
410
  )
411
 
412
- if is_rate_limited(r.status_code, r.text):
 
 
 
413
  log(f"Account {acc['account_id'][:8]}... rate limited (anthropic non-stream), trying next")
414
  await mark_fail(acc)
415
  continue
@@ -420,8 +466,7 @@ async def anthropic(req: Request):
420
  continue
421
 
422
  data = r.json()
423
- # CF OpenAI-compatible response → konversi ke Anthropic format
424
- content_text = data["choices"][0]["message"]["content"]
425
  usage = data.get("usage", {})
426
 
427
  out = {
@@ -456,7 +501,7 @@ async def anthropic(req: Request):
456
  async def agen():
457
  tried = set()
458
  msg_id = "msg_" + uuid.uuid4().hex[:10]
459
- sent_any_delta = False
460
 
461
  for _ in range(len(CF_ACCOUNTS)):
462
  acc = await wait_for_free_key(exclude=tried)
@@ -477,7 +522,8 @@ async def anthropic(req: Request):
477
  }
478
  ) as r:
479
 
480
- if is_rate_limited(r.status_code, ""):
 
481
  log(f"Account {acc['account_id'][:8]}... rate limited (anthropic stream), trying next")
482
  await mark_fail(acc)
483
  continue
@@ -487,8 +533,9 @@ async def anthropic(req: Request):
487
  await mark_fail(acc)
488
  continue
489
 
490
- # Emit Anthropic envelope sekali saja saat key pertama berhasil
491
- if not sent_any_delta:
 
492
  yield sse({
493
  "type": "message_start",
494
  "message": {
@@ -502,10 +549,11 @@ async def anthropic(req: Request):
502
  "usage": {"input_tokens": 0, "output_tokens": 0}
503
  }
504
  })
 
505
  yield sse({
506
  "type": "content_block_start",
507
  "index": 0,
508
- "content_block": {"type": "text"}
509
  })
510
 
511
  hit_limit = False
@@ -518,19 +566,25 @@ async def anthropic(req: Request):
518
 
519
  raw = line[6:] if line.startswith("data: ") else line
520
 
521
- if is_rate_limited(0, raw):
522
- log(f"Account {acc['account_id'][:8]}... mid-stream limit (anthropic), switching key")
523
- hit_limit = True
 
 
 
 
 
 
 
 
 
 
 
 
524
  break
525
 
526
- try:
527
- j = json.loads(raw)
528
- token = j["choices"][0]["delta"].get("content", "")
529
- except Exception:
530
- continue
531
-
532
  if token:
533
- sent_any_delta = True
534
  yield sse({
535
  "type": "content_block_delta",
536
  "index": 0,
@@ -552,6 +606,18 @@ async def anthropic(req: Request):
552
  await release_key(acc)
553
 
554
  # Tutup Anthropic SSE envelope
 
 
 
 
 
 
 
 
 
 
 
 
555
  yield sse({"type": "content_block_stop", "index": 0})
556
  yield sse({
557
  "type": "message_delta",
@@ -560,4 +626,4 @@ async def anthropic(req: Request):
560
  })
561
  yield sse({"type": "message_stop"})
562
 
563
- return StreamingResponse(agen(), media_type="text/event-stream")
 
80
  def cf_base(account_id: str) -> str:
81
  return CF_AI_BASE.format(account_id=account_id)
82
 
 
 
 
 
83
 
84
  async def get_key(exclude=None):
85
  global rr_index
 
95
 
96
  if st["healthy"] and not st["busy"] and kid not in exclude:
97
  st["busy"] = True
98
+ return acc
99
 
100
  return None
101
 
 
133
  return None
134
 
135
 
136
+ def is_rate_limited_status(status_code: int) -> bool:
137
+ """Cek rate limit hanya dari HTTP status code."""
138
+ return status_code == 429
139
+
140
+
141
+ def is_rate_limited_error_body(text: str) -> bool:
142
+ """
143
+ Cek rate limit dari body HTTP error response.
144
+ HANYA dipakai pada non-200 HTTP response body atau JSON error object
145
+ — BUKAN pada token output model (supaya tidak false positive).
146
+ """
147
  t = text.lower()
148
+ return "rate limit" in t or "too many requests" in t or "usage limit" in t
149
+
150
+
151
+ def parse_sse_chunk(raw: str):
152
+ """
153
+ Parse satu SSE data chunk dari CF (OpenAI-compatible format).
154
+
155
+ Return: (token, is_cf_error, error_text)
156
+ - token : string content untuk di-stream ke client (bisa "" kalau thinking/kosong)
157
+ - is_cf_error: True kalau chunk ini adalah error dari CF API, bukan output model
158
+ - error_text : teks error kalau is_cf_error=True
159
+ """
160
+ try:
161
+ j = json.loads(raw)
162
+ except json.JSONDecodeError:
163
+ # Non-JSON → kemungkinan error text plain dari CF
164
+ return None, True, raw
165
+
166
+ # JSON dengan "error" key dan tanpa "choices" → error dari CF API
167
+ if "error" in j and "choices" not in j:
168
+ return None, True, json.dumps(j)
169
+
170
+ # Normal OpenAI delta chunk
171
+ choices = j.get("choices", [])
172
+ if not choices:
173
+ return "", False, ""
174
+
175
+ delta = choices[0].get("delta", {})
176
+
177
+ # content utama (None selama thinking phase di beberapa model)
178
+ content = delta.get("content") or ""
179
+
180
+ # Beberapa model thinking (Kimi K2, DeepSeek R1, dll) pakai reasoning_content
181
+ # untuk thinking tokens — ikutkan supaya thinking juga ke-stream
182
+ reasoning = delta.get("reasoning_content") or delta.get("reasoning") or ""
183
+
184
+ return reasoning + content, False, ""
185
 
186
 
187
  # =====================================================
 
217
  if not auth_ok(req):
218
  return JSONResponse({"error": "Unauthorized"}, status_code=401)
219
 
 
220
  acc = None
221
  async with _key_lock:
222
  for a in CF_ACCOUNTS:
 
237
  if r.status_code != 200:
238
  return JSONResponse({"error": f"CF returned {r.status_code}: {r.text}"}, status_code=r.status_code)
239
 
 
240
  return Response(content=r.content, media_type="application/json")
241
 
242
  except Exception as e:
 
259
 
260
  is_stream = body.get("stream", False)
261
  model = body.get("model", DEFAULT_CF_MODEL)
 
 
262
  cf_body = {**body, "model": model}
263
 
264
  # -----------------------------------------
265
+ # NON STREAM
266
  # -----------------------------------------
267
  if not is_stream:
268
  tried = set()
 
285
  }
286
  )
287
 
288
+ # FIX: cek rate limit hanya dari HTTP status/error body, bukan dari model output
289
+ if is_rate_limited_status(r.status_code) or (
290
+ r.status_code != 200 and is_rate_limited_error_body(r.text)
291
+ ):
292
  log(f"Account {acc['account_id'][:8]}... rate limited (non-stream), trying next")
293
  await mark_fail(acc)
294
  continue
 
299
  continue
300
 
301
  await mark_ok(acc)
 
302
  return Response(content=r.content, media_type="application/json")
303
 
304
  except Exception as e:
 
311
  return JSONResponse({"error": "All accounts failed"}, status_code=500)
312
 
313
  # -----------------------------------------
314
+ # STREAM — pipe OpenAI SSE langsung ke client
315
  # -----------------------------------------
316
  async def gen():
317
  tried = set()
 
335
  }
336
  ) as r:
337
 
338
+ # FIX: hanya cek status code untuk rate limit di sini
339
+ if is_rate_limited_status(r.status_code):
340
  log(f"Account {acc['account_id'][:8]}... rate limited (stream), trying next")
341
  await mark_fail(acc)
342
  continue
 
355
  if line.strip() == "data: [DONE]":
356
  break
357
 
 
358
  raw = line[6:] if line.startswith("data: ") else line
359
+
360
+ # FIX: gunakan parse_sse_chunk, cek error hanya pada CF error object
361
+ # — jangan cek kata "rate limit" pada konten model
362
+ _, is_cf_err, err_text = parse_sse_chunk(raw)
363
+ if is_cf_err and is_rate_limited_error_body(err_text):
364
+ log(f"Account {acc['account_id'][:8]}... mid-stream CF error, switching key")
365
  hit_limit = True
366
  break
367
 
 
368
  yield line + "\n\n"
369
 
370
  if hit_limit:
 
452
  }
453
  )
454
 
455
+ # FIX: cek rate limit hanya dari HTTP status/error body
456
+ if is_rate_limited_status(r.status_code) or (
457
+ r.status_code != 200 and is_rate_limited_error_body(r.text)
458
+ ):
459
  log(f"Account {acc['account_id'][:8]}... rate limited (anthropic non-stream), trying next")
460
  await mark_fail(acc)
461
  continue
 
466
  continue
467
 
468
  data = r.json()
469
+ content_text = data["choices"][0]["message"]["content"] or ""
 
470
  usage = data.get("usage", {})
471
 
472
  out = {
 
501
  async def agen():
502
  tried = set()
503
  msg_id = "msg_" + uuid.uuid4().hex[:10]
504
+ envelope_sent = False
505
 
506
  for _ in range(len(CF_ACCOUNTS)):
507
  acc = await wait_for_free_key(exclude=tried)
 
522
  }
523
  ) as r:
524
 
525
+ # FIX: hanya cek status code untuk rate limit
526
+ if is_rate_limited_status(r.status_code):
527
  log(f"Account {acc['account_id'][:8]}... rate limited (anthropic stream), trying next")
528
  await mark_fail(acc)
529
  continue
 
533
  await mark_fail(acc)
534
  continue
535
 
536
+ # Kirim Anthropic envelope hanya sekali
537
+ if not envelope_sent:
538
+ envelope_sent = True
539
  yield sse({
540
  "type": "message_start",
541
  "message": {
 
549
  "usage": {"input_tokens": 0, "output_tokens": 0}
550
  }
551
  })
552
+ # FIX: tambah "text": "" sesuai spec Anthropic
553
  yield sse({
554
  "type": "content_block_start",
555
  "index": 0,
556
+ "content_block": {"type": "text", "text": ""}
557
  })
558
 
559
  hit_limit = False
 
566
 
567
  raw = line[6:] if line.startswith("data: ") else line
568
 
569
+ # =============================================
570
+ # FIX UTAMA: parse chunk dulu, baru cek error
571
+ # JANGAN cek is_rate_limited pada teks model!
572
+ # Ini penyebab response berhenti di tengah karena
573
+ # model nulis kata "rate limit" / "too many requests"
574
+ # dalam output / thinking-nya.
575
+ # =============================================
576
+ token, is_cf_err, err_text = parse_sse_chunk(raw)
577
+
578
+ if is_cf_err:
579
+ if is_rate_limited_error_body(err_text):
580
+ log(f"Account {acc['account_id'][:8]}... mid-stream CF rate limit, switching key")
581
+ hit_limit = True
582
+ else:
583
+ log(f"Account {acc['account_id'][:8]}... mid-stream CF error: {err_text[:120]}")
584
  break
585
 
586
+ # token "" → thinking phase tanpa content, skip saja
 
 
 
 
 
587
  if token:
 
588
  yield sse({
589
  "type": "content_block_delta",
590
  "index": 0,
 
606
  await release_key(acc)
607
 
608
  # Tutup Anthropic SSE envelope
609
+ # Edge case: semua account gagal sebelum sempat kirim envelope
610
+ if not envelope_sent:
611
+ yield sse({
612
+ "type": "message_start",
613
+ "message": {
614
+ "id": msg_id, "type": "message", "role": "assistant",
615
+ "model": model, "content": [], "stop_reason": None,
616
+ "stop_sequence": None, "usage": {"input_tokens": 0, "output_tokens": 0}
617
+ }
618
+ })
619
+ yield sse({"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}})
620
+
621
  yield sse({"type": "content_block_stop", "index": 0})
622
  yield sse({
623
  "type": "message_delta",
 
626
  })
627
  yield sse({"type": "message_stop"})
628
 
629
+ return StreamingResponse(agen(), media_type="text/event-stream")