Elysiadev11 commited on
Commit
4ba202a
·
verified ·
1 Parent(s): 3f83b4e

Update proxy_cerebras.py

Browse files
Files changed (1) hide show
  1. proxy_cerebras.py +276 -206
proxy_cerebras.py CHANGED
@@ -14,24 +14,40 @@ app = FastAPI()
14
  # =====================================================
15
  # CONFIG
16
  # =====================================================
17
- BASE_URL = os.getenv("BASE_URL", "https://ollama.com")
18
  MASTER_API_KEY = os.getenv("MASTER_API_KEY", "olla")
19
 
 
 
 
20
  # =====================================================
21
- # LOAD KEYS
 
22
  # =====================================================
23
- OLLAMA_KEYS = []
24
- for i in range(1, 101):
25
- k = os.getenv(f"OLLAMA_KEY_{i}")
26
- if k:
27
- OLLAMA_KEYS.append(k)
28
 
29
- if not OLLAMA_KEYS:
30
- OLLAMA_KEYS.append("dummy")
 
 
 
 
 
 
 
 
 
 
 
 
 
31
 
 
 
 
32
  key_status = {}
33
- for idx, k in enumerate(OLLAMA_KEYS, 1):
34
- key_status[k] = {
 
35
  "index": idx,
36
  "healthy": True,
37
  "busy": False,
@@ -40,8 +56,6 @@ for idx, k in enumerate(OLLAMA_KEYS, 1):
40
  }
41
 
42
  rr_index = 0
43
-
44
- # Global async lock to prevent race condition on rr_index & busy flag
45
  _key_lock = asyncio.Lock()
46
 
47
 
@@ -61,63 +75,67 @@ def auth_ok(req: Request):
61
  return token == MASTER_API_KEY
62
 
63
 
 
 
 
 
64
  async def get_key(exclude=None):
65
- """
66
- Thread-safe round-robin key picker.
67
- Returns the key string, or None if all are busy/excluded.
68
- """
69
  global rr_index
70
-
71
  if exclude is None:
72
  exclude = set()
73
 
74
  async with _key_lock:
75
- for _ in range(len(OLLAMA_KEYS)):
76
- rr_index = (rr_index + 1) % len(OLLAMA_KEYS)
77
- k = OLLAMA_KEYS[rr_index]
78
- st = key_status[k]
 
79
 
80
- if st["healthy"] and not st["busy"] and k not in exclude:
81
  st["busy"] = True
82
- return k
83
 
84
  return None
85
 
86
 
87
- async def release_key(k):
88
  async with _key_lock:
89
- if k in key_status:
90
- key_status[k]["busy"] = False
 
91
 
92
 
93
- async def mark_fail(k):
94
  async with _key_lock:
95
- if k in key_status:
96
- key_status[k]["fail"] += 1
 
97
 
98
 
99
- async def mark_ok(k):
100
  async with _key_lock:
101
- if k in key_status:
102
- key_status[k]["success"] += 1
103
- key_status[k]["fail"] = 0
 
104
 
105
 
106
  async def wait_for_free_key(exclude=None, max_wait=30.0, interval=0.3):
107
- """
108
- Polls until a free key is available or max_wait seconds pass.
109
- Returns the key or None on timeout.
110
- """
111
  elapsed = 0.0
112
  while elapsed < max_wait:
113
- key = await get_key(exclude)
114
- if key:
115
- return key
116
  await asyncio.sleep(interval)
117
  elapsed += interval
118
  return None
119
 
120
 
 
 
 
 
 
121
  # =====================================================
122
  # ROOT
123
  # =====================================================
@@ -125,8 +143,8 @@ async def wait_for_free_key(exclude=None, max_wait=30.0, interval=0.3):
125
  async def root():
126
  async with _key_lock:
127
  safe = {}
128
- for k, v in key_status.items():
129
- masked = k[:4] + "****" + k[-4:]
130
  safe[masked] = {
131
  "index": v["index"],
132
  "healthy": v["healthy"],
@@ -137,47 +155,41 @@ async def root():
137
 
138
  return {
139
  "status": "ok",
140
- "keys": len(OLLAMA_KEYS),
 
141
  "detail": safe
142
  }
143
 
144
 
145
  # =====================================================
146
- # /v1/models
147
  # =====================================================
148
  @app.get("/v1/models")
149
  async def models(req: Request):
150
  if not auth_ok(req):
151
  return JSONResponse({"error": "Unauthorized"}, status_code=401)
152
 
153
- key = OLLAMA_KEYS[0]
154
-
155
- async with httpx.AsyncClient(timeout=60) as client:
156
- r = await client.get(
157
- f"{BASE_URL}/api/tags",
158
- headers={"Authorization": f"Bearer {key}"}
159
- )
160
-
161
- if r.status_code != 200:
162
- return JSONResponse({"error": r.text}, status_code=r.status_code)
163
-
164
- data = r.json()
165
  now = int(time.time())
166
- out = []
 
 
 
 
 
 
 
 
167
 
168
- for m in data.get("models", []):
169
- out.append({
170
- "id": m.get("name"),
171
- "object": "model",
172
- "created": now,
173
- "owned_by": "ollama"
174
- })
175
 
176
- return {"object": "list", "data": out}
177
 
178
 
179
  # =====================================================
180
- # OPENAI CHAT /v1/chat/completions
181
  # =====================================================
182
  @app.post("/v1/chat/completions")
183
  async def chat(req: Request):
@@ -190,6 +202,15 @@ async def chat(req: Request):
190
  return JSONResponse({"error": "Bad JSON"}, status_code=400)
191
 
192
  is_stream = body.get("stream", False)
 
 
 
 
 
 
 
 
 
193
 
194
  # -----------------------------------------
195
  # NON STREAM
@@ -197,109 +218,193 @@ async def chat(req: Request):
197
  if not is_stream:
198
  tried = set()
199
 
200
- for _ in range(len(OLLAMA_KEYS)):
201
- key = await wait_for_free_key(exclude=tried)
202
 
203
- if not key:
204
  break
205
 
206
- tried.add(key)
207
 
208
  try:
209
  async with httpx.AsyncClient(timeout=180) as client:
210
  r = await client.post(
211
- f"{BASE_URL}/v1/chat/completions",
212
- json=body,
213
- headers={"Authorization": f"Bearer {key}"}
 
 
 
214
  )
215
 
216
- txt = r.text.lower()
 
 
 
217
 
218
- if "weekly usage limit" in txt or r.status_code == 429:
219
- log(f"Key {key[:8]}... rate limited (non-stream chat), trying next")
220
- await mark_fail(key)
221
  continue
222
 
223
- await mark_ok(key)
 
 
 
 
 
 
224
 
225
- return Response(
226
- content=r.content,
227
- media_type=r.headers.get("content-type", "application/json")
228
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
229
 
230
  except Exception as e:
231
- log(f"Key {key[:8]}... exception: {e}")
232
- await mark_fail(key)
233
 
234
  finally:
235
- await release_key(key)
236
 
237
- return JSONResponse({"error": "All keys failed"}, status_code=500)
238
 
239
  # -----------------------------------------
240
  # STREAM
 
 
 
241
  # -----------------------------------------
242
  async def gen():
243
  tried = set()
 
 
244
 
245
- for _ in range(len(OLLAMA_KEYS)):
246
- key = await wait_for_free_key(exclude=tried)
247
 
248
- if not key:
249
  break
250
 
251
- tried.add(key)
252
 
253
  try:
254
  async with httpx.AsyncClient(timeout=None) as client:
255
  async with client.stream(
256
  "POST",
257
- f"{BASE_URL}/v1/chat/completions",
258
- json=body,
259
- headers={"Authorization": f"Bearer {key}"}
 
 
 
260
  ) as r:
261
 
262
- if r.status_code == 429:
263
- log(f"Key {key[:8]}... rate limited (stream chat), trying next")
264
- await mark_fail(key)
265
  continue
266
 
267
- hit_limit_mid_stream = False
 
 
 
 
 
268
 
269
  async for line in r.aiter_lines():
 
270
  if not line:
271
  continue
272
 
273
- # Detect mid-stream rate limit signal in data payload
274
- if "429" in line or "usage limit" in line.lower():
275
- log(f"Key {key[:8]}... mid-stream limit detected, aborting chunk")
276
- hit_limit_mid_stream = True
277
  break
278
 
279
- yield line + "\n\n"
 
 
 
 
 
 
 
280
 
281
- if hit_limit_mid_stream:
282
- await mark_fail(key)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
283
  continue
284
 
285
- await mark_ok(key)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
286
  return
287
 
288
  except Exception as e:
289
- log(f"Key {key[:8]}... stream exception: {e}")
290
- await mark_fail(key)
291
 
292
  finally:
293
- await release_key(key)
294
 
295
- yield sse({"error": "All keys failed"})
296
  yield "data: [DONE]\n\n"
297
 
298
  return StreamingResponse(gen(), media_type="text/event-stream")
299
 
300
 
301
  # =====================================================
302
- # ANTHROPIC /v1/messages
303
  # =====================================================
304
  @app.post("/v1/messages")
305
  async def anthropic(req: Request):
@@ -314,8 +419,10 @@ async def anthropic(req: Request):
314
  return JSONResponse({"error": "Bad JSON"}, status_code=400)
315
 
316
  stream = body.get("stream", False)
 
 
317
 
318
- # Build messages list for proxy
319
  messages = []
320
 
321
  if body.get("system"):
@@ -323,20 +430,18 @@ async def anthropic(req: Request):
323
 
324
  for m in body.get("messages", []):
325
  content = m.get("content", "")
326
-
327
  if isinstance(content, list):
328
  txt = ""
329
  for x in content:
330
  if x.get("type") == "text":
331
  txt += x.get("text", "")
332
  content = txt
333
-
334
  messages.append({"role": m["role"], "content": content})
335
 
336
- proxy_body = {
337
- "model": "minimax-m2.7:cloud",
338
  "messages": messages,
339
- "stream": stream
 
340
  }
341
 
342
  # -----------------------------------------
@@ -345,91 +450,101 @@ async def anthropic(req: Request):
345
  if not stream:
346
  tried = set()
347
 
348
- for _ in range(len(OLLAMA_KEYS)):
349
- key = await wait_for_free_key(exclude=tried)
350
 
351
- if not key:
352
  break
353
 
354
- tried.add(key)
355
 
356
  try:
357
  async with httpx.AsyncClient(timeout=180) as client:
358
  r = await client.post(
359
- f"{BASE_URL}/v1/chat/completions",
360
- json=proxy_body,
361
- headers={"Authorization": f"Bearer {key}"}
 
 
 
362
  )
363
 
364
- txt = r.text.lower()
 
 
 
365
 
366
- if "weekly usage limit" in txt or r.status_code == 429:
367
- log(f"Key {key[:8]}... rate limited (non-stream anthropic), trying next")
368
- await mark_fail(key)
369
  continue
370
 
371
  data = r.json()
372
- ans = data["choices"][0]["message"]["content"]
 
373
 
374
  out = {
375
  "id": "msg_" + uuid.uuid4().hex[:10],
376
  "type": "message",
377
  "role": "assistant",
378
- "model": body.get("model", "claude-opus-4-7"),
379
- "content": [{"type": "text", "text": ans}],
380
  "stop_reason": "end_turn",
381
  "stop_sequence": None,
382
  "usage": {"input_tokens": 0, "output_tokens": 0}
383
  }
384
 
385
- await mark_ok(key)
386
  return JSONResponse(out)
387
 
388
  except Exception as e:
389
- log(f"Key {key[:8]}... exception: {e}")
390
- await mark_fail(key)
391
 
392
  finally:
393
- await release_key(key)
394
 
395
- return JSONResponse({"error": "All keys failed"}, status_code=500)
396
 
397
  # -----------------------------------------
398
- # STREAM (Anthropic SSE format)
399
  # -----------------------------------------
400
  async def agen():
401
  tried = set()
402
  msg_id = "msg_" + uuid.uuid4().hex[:10]
403
  sent_any_delta = False
404
 
405
- # Send Anthropic envelope headers ONCE before first key attempt
406
- # We defer these until we have a successful connection to avoid
407
- # sending headers before knowing if any key works.
408
- # Instead we buffer and yield only on confirmed success.
409
 
410
- for _ in range(len(OLLAMA_KEYS)):
411
- key = await wait_for_free_key(exclude=tried)
412
-
413
- if not key:
414
  break
415
 
416
- tried.add(key)
417
 
418
  try:
419
  async with httpx.AsyncClient(timeout=None) as client:
420
  async with client.stream(
421
  "POST",
422
- f"{BASE_URL}/v1/chat/completions",
423
- json=proxy_body,
424
- headers={"Authorization": f"Bearer {key}"}
 
 
 
425
  ) as r:
426
 
427
- if r.status_code == 429:
428
- log(f"Key {key[:8]}... rate limited (stream anthropic), trying next")
429
- await mark_fail(key)
430
  continue
431
 
432
- # Only emit Anthropic envelope on first successful key
 
 
 
 
 
433
  if not sent_any_delta:
434
  yield sse({
435
  "type": "message_start",
@@ -437,7 +552,7 @@ async def anthropic(req: Request):
437
  "id": msg_id,
438
  "type": "message",
439
  "role": "assistant",
440
- "model": body.get("model", "claude-opus-4-7"),
441
  "content": [],
442
  "stop_reason": None,
443
  "stop_sequence": None,
@@ -450,62 +565,17 @@ async def anthropic(req: Request):
450
  "content_block": {"type": "text"}
451
  })
452
 
453
- hit_limit_mid_stream = False
454
 
455
  async for line in r.aiter_lines():
456
- if not line.startswith("data: "):
 
457
  continue
458
 
459
- raw = line[6:].strip()
460
-
461
- if raw == "[DONE]":
462
  break
463
 
464
- # Detect mid-stream 429 / limit payload
465
- if "429" in raw or "usage limit" in raw.lower():
466
- log(f"Key {key[:8]}... mid-stream limit in anthropic, aborting chunk")
467
- hit_limit_mid_stream = True
468
- break
469
-
470
- try:
471
- j = json.loads(raw)
472
- except Exception:
473
- continue
474
 
475
- delta = j["choices"][0]["delta"]
476
- txt = delta.get("content", "")
477
-
478
- if txt:
479
- sent_any_delta = True
480
- yield sse({
481
- "type": "content_block_delta",
482
- "index": 0,
483
- "delta": {"type": "text_delta", "text": txt}
484
- })
485
-
486
- if hit_limit_mid_stream:
487
- await mark_fail(key)
488
- # Continue to next key — stream resumes from where it broke
489
- # Note: client will receive continued deltas seamlessly
490
- continue
491
-
492
- await mark_ok(key)
493
- break # Success — exit key retry loop
494
-
495
- except Exception as e:
496
- log(f"Key {key[:8]}... agen exception: {e}")
497
- await mark_fail(key)
498
-
499
- finally:
500
- await release_key(key)
501
-
502
- # Close Anthropic SSE envelope
503
- yield sse({"type": "content_block_stop", "index": 0})
504
- yield sse({
505
- "type": "message_delta",
506
- "delta": {"stop_reason": "end_turn", "stop_sequence": None},
507
- "usage": {"output_tokens": 0}
508
- })
509
- yield sse({"type": "message_stop"})
510
-
511
- return StreamingResponse(agen(), media_type="text/event-stream")
 
14
  # =====================================================
15
  # CONFIG
16
  # =====================================================
 
17
  MASTER_API_KEY = os.getenv("MASTER_API_KEY", "olla")
18
 
19
+ # Default CF Workers AI model (can override via request body)
20
+ DEFAULT_CF_MODEL = os.getenv("DEFAULT_CF_MODEL", "@cf/meta/llama-3.3-70b-instruct-fp8-fast")
21
+
22
  # =====================================================
23
+ # LOAD CF CREDENTIALS
24
+ # Format env: CF_1=account_id,api_key
25
  # =====================================================
26
+ CF_ACCOUNTS = [] # list of {"account_id": ..., "api_key": ...}
 
 
 
 
27
 
28
+ for i in range(1, 101):
29
+ raw = os.getenv(f"CF_{i}")
30
+ if not raw:
31
+ continue
32
+ parts = raw.split(",", 1)
33
+ if len(parts) != 2:
34
+ print(f"[WARN] CF_{i} format invalid, expected 'account_id,api_key' — skipped")
35
+ continue
36
+ account_id, api_key = parts[0].strip(), parts[1].strip()
37
+ if account_id and api_key:
38
+ CF_ACCOUNTS.append({"account_id": account_id, "api_key": api_key})
39
+
40
+ if not CF_ACCOUNTS:
41
+ print("[WARN] No CF credentials found, inserting dummy")
42
+ CF_ACCOUNTS.append({"account_id": "dummy", "api_key": "dummy"})
43
 
44
+ # =====================================================
45
+ # KEY STATUS
46
+ # =====================================================
47
  key_status = {}
48
+ for idx, acc in enumerate(CF_ACCOUNTS, 1):
49
+ kid = acc["account_id"]
50
+ key_status[kid] = {
51
  "index": idx,
52
  "healthy": True,
53
  "busy": False,
 
56
  }
57
 
58
  rr_index = 0
 
 
59
  _key_lock = asyncio.Lock()
60
 
61
 
 
75
  return token == MASTER_API_KEY
76
 
77
 
78
+ def cf_url(account_id: str, model: str) -> str:
79
+ return f"https://api.cloudflare.com/client/v4/accounts/{account_id}/ai/run/{model}"
80
+
81
+
82
  async def get_key(exclude=None):
 
 
 
 
83
  global rr_index
 
84
  if exclude is None:
85
  exclude = set()
86
 
87
  async with _key_lock:
88
+ for _ in range(len(CF_ACCOUNTS)):
89
+ rr_index = (rr_index + 1) % len(CF_ACCOUNTS)
90
+ acc = CF_ACCOUNTS[rr_index]
91
+ kid = acc["account_id"]
92
+ st = key_status[kid]
93
 
94
+ if st["healthy"] and not st["busy"] and kid not in exclude:
95
  st["busy"] = True
96
+ return acc # returns dict {"account_id": ..., "api_key": ...}
97
 
98
  return None
99
 
100
 
101
+ async def release_key(acc):
102
  async with _key_lock:
103
+ kid = acc["account_id"]
104
+ if kid in key_status:
105
+ key_status[kid]["busy"] = False
106
 
107
 
108
+ async def mark_fail(acc):
109
  async with _key_lock:
110
+ kid = acc["account_id"]
111
+ if kid in key_status:
112
+ key_status[kid]["fail"] += 1
113
 
114
 
115
+ async def mark_ok(acc):
116
  async with _key_lock:
117
+ kid = acc["account_id"]
118
+ if kid in key_status:
119
+ key_status[kid]["success"] += 1
120
+ key_status[kid]["fail"] = 0
121
 
122
 
123
  async def wait_for_free_key(exclude=None, max_wait=30.0, interval=0.3):
 
 
 
 
124
  elapsed = 0.0
125
  while elapsed < max_wait:
126
+ acc = await get_key(exclude)
127
+ if acc:
128
+ return acc
129
  await asyncio.sleep(interval)
130
  elapsed += interval
131
  return None
132
 
133
 
134
+ def is_rate_limited(status_code: int, text: str) -> bool:
135
+ t = text.lower()
136
+ return status_code == 429 or "rate limit" in t or "too many requests" in t or "usage limit" in t
137
+
138
+
139
  # =====================================================
140
  # ROOT
141
  # =====================================================
 
143
  async def root():
144
  async with _key_lock:
145
  safe = {}
146
+ for kid, v in key_status.items():
147
+ masked = kid[:6] + "****" + kid[-4:]
148
  safe[masked] = {
149
  "index": v["index"],
150
  "healthy": v["healthy"],
 
155
 
156
  return {
157
  "status": "ok",
158
+ "accounts": len(CF_ACCOUNTS),
159
+ "default_model": DEFAULT_CF_MODEL,
160
  "detail": safe
161
  }
162
 
163
 
164
  # =====================================================
165
+ # /v1/models — static list of popular CF models
166
  # =====================================================
167
  @app.get("/v1/models")
168
  async def models(req: Request):
169
  if not auth_ok(req):
170
  return JSONResponse({"error": "Unauthorized"}, status_code=401)
171
 
 
 
 
 
 
 
 
 
 
 
 
 
172
  now = int(time.time())
173
+ cf_models = [
174
+ "@cf/meta/llama-3.3-70b-instruct-fp8-fast",
175
+ "@cf/meta/llama-3.1-8b-instruct",
176
+ "@cf/meta/llama-3.1-70b-instruct",
177
+ "@cf/mistral/mistral-7b-instruct-v0.1",
178
+ "@cf/google/gemma-7b-it",
179
+ "@cf/qwen/qwen1.5-14b-chat-awq",
180
+ "@cf/deepseek-ai/deepseek-r1-distill-qwen-32b",
181
+ ]
182
 
183
+ data = [
184
+ {"id": m, "object": "model", "created": now, "owned_by": "cloudflare"}
185
+ for m in cf_models
186
+ ]
 
 
 
187
 
188
+ return {"object": "list", "data": data}
189
 
190
 
191
  # =====================================================
192
+ # /v1/chat/completions — OpenAI-compatible endpoint
193
  # =====================================================
194
  @app.post("/v1/chat/completions")
195
  async def chat(req: Request):
 
202
  return JSONResponse({"error": "Bad JSON"}, status_code=400)
203
 
204
  is_stream = body.get("stream", False)
205
+ model = body.get("model", DEFAULT_CF_MODEL)
206
+ messages = body.get("messages", [])
207
+ max_tokens = body.get("max_tokens", 2048)
208
+
209
+ cf_body = {
210
+ "messages": messages,
211
+ "stream": is_stream,
212
+ "max_tokens": max_tokens,
213
+ }
214
 
215
  # -----------------------------------------
216
  # NON STREAM
 
218
  if not is_stream:
219
  tried = set()
220
 
221
+ for _ in range(len(CF_ACCOUNTS)):
222
+ acc = await wait_for_free_key(exclude=tried)
223
 
224
+ if not acc:
225
  break
226
 
227
+ tried.add(acc["account_id"])
228
 
229
  try:
230
  async with httpx.AsyncClient(timeout=180) as client:
231
  r = await client.post(
232
+ cf_url(acc["account_id"], model),
233
+ json=cf_body,
234
+ headers={
235
+ "Authorization": f"Bearer {acc['api_key']}",
236
+ "Content-Type": "application/json",
237
+ }
238
  )
239
 
240
+ if is_rate_limited(r.status_code, r.text):
241
+ log(f"Account {acc['account_id'][:8]}... rate limited (non-stream), trying next")
242
+ await mark_fail(acc)
243
+ continue
244
 
245
+ if r.status_code != 200:
246
+ log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code}, trying next")
247
+ await mark_fail(acc)
248
  continue
249
 
250
+ data = r.json()
251
+
252
+ # CF Workers AI response format:
253
+ # {"result": {"response": "..."}, "success": true, ...}
254
+ # Convert to OpenAI format
255
+ cf_result = data.get("result", {})
256
+ content = cf_result.get("response", "")
257
 
258
+ out = {
259
+ "id": "chatcmpl-" + uuid.uuid4().hex[:10],
260
+ "object": "chat.completion",
261
+ "created": int(time.time()),
262
+ "model": model,
263
+ "choices": [
264
+ {
265
+ "index": 0,
266
+ "message": {"role": "assistant", "content": content},
267
+ "finish_reason": "stop",
268
+ }
269
+ ],
270
+ "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
271
+ }
272
+
273
+ await mark_ok(acc)
274
+ return JSONResponse(out)
275
 
276
  except Exception as e:
277
+ log(f"Account {acc['account_id'][:8]}... exception: {e}")
278
+ await mark_fail(acc)
279
 
280
  finally:
281
+ await release_key(acc)
282
 
283
+ return JSONResponse({"error": "All accounts failed"}, status_code=500)
284
 
285
  # -----------------------------------------
286
  # STREAM
287
+ # CF Workers AI streams NDJSON lines:
288
+ # {"response":"token"} or {"p":"...","response":"token"} and ends with [DONE]
289
+ # We convert to OpenAI SSE format
290
  # -----------------------------------------
291
  async def gen():
292
  tried = set()
293
+ cid = "chatcmpl-" + uuid.uuid4().hex[:10]
294
+ sent_any = False
295
 
296
+ for _ in range(len(CF_ACCOUNTS)):
297
+ acc = await wait_for_free_key(exclude=tried)
298
 
299
+ if not acc:
300
  break
301
 
302
+ tried.add(acc["account_id"])
303
 
304
  try:
305
  async with httpx.AsyncClient(timeout=None) as client:
306
  async with client.stream(
307
  "POST",
308
+ cf_url(acc["account_id"], model),
309
+ json=cf_body,
310
+ headers={
311
+ "Authorization": f"Bearer {acc['api_key']}",
312
+ "Content-Type": "application/json",
313
+ }
314
  ) as r:
315
 
316
+ if is_rate_limited(r.status_code, ""):
317
+ log(f"Account {acc['account_id'][:8]}... rate limited (stream), trying next")
318
+ await mark_fail(acc)
319
  continue
320
 
321
+ if r.status_code != 200:
322
+ log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code} (stream), trying next")
323
+ await mark_fail(acc)
324
+ continue
325
+
326
+ hit_limit = False
327
 
328
  async for line in r.aiter_lines():
329
+ line = line.strip()
330
  if not line:
331
  continue
332
 
333
+ if line == "data: [DONE]" or line == "[DONE]":
 
 
 
334
  break
335
 
336
+ # Strip "data: " prefix if present
337
+ raw = line[6:] if line.startswith("data: ") else line
338
+
339
+ # Detect mid-stream rate limit
340
+ if is_rate_limited(0, raw):
341
+ log(f"Account {acc['account_id'][:8]}... mid-stream limit, switching key")
342
+ hit_limit = True
343
+ break
344
 
345
+ try:
346
+ j = json.loads(raw)
347
+ except Exception:
348
+ continue
349
+
350
+ token = j.get("response", "")
351
+
352
+ if token:
353
+ sent_any = True
354
+ chunk = {
355
+ "id": cid,
356
+ "object": "chat.completion.chunk",
357
+ "created": int(time.time()),
358
+ "model": model,
359
+ "choices": [
360
+ {
361
+ "index": 0,
362
+ "delta": {"role": "assistant", "content": token},
363
+ "finish_reason": None,
364
+ }
365
+ ]
366
+ }
367
+ yield sse(chunk)
368
+
369
+ if hit_limit:
370
+ await mark_fail(acc)
371
  continue
372
 
373
+ # Send finish chunk
374
+ finish_chunk = {
375
+ "id": cid,
376
+ "object": "chat.completion.chunk",
377
+ "created": int(time.time()),
378
+ "model": model,
379
+ "choices": [
380
+ {
381
+ "index": 0,
382
+ "delta": {},
383
+ "finish_reason": "stop",
384
+ }
385
+ ]
386
+ }
387
+ yield sse(finish_chunk)
388
+ yield "data: [DONE]\n\n"
389
+
390
+ await mark_ok(acc)
391
  return
392
 
393
  except Exception as e:
394
+ log(f"Account {acc['account_id'][:8]}... stream exception: {e}")
395
+ await mark_fail(acc)
396
 
397
  finally:
398
+ await release_key(acc)
399
 
400
+ yield sse({"error": "All accounts failed"})
401
  yield "data: [DONE]\n\n"
402
 
403
  return StreamingResponse(gen(), media_type="text/event-stream")
404
 
405
 
406
  # =====================================================
407
+ # /v1/messages — Anthropic-compatible endpoint
408
  # =====================================================
409
  @app.post("/v1/messages")
410
  async def anthropic(req: Request):
 
419
  return JSONResponse({"error": "Bad JSON"}, status_code=400)
420
 
421
  stream = body.get("stream", False)
422
+ model = body.get("model", DEFAULT_CF_MODEL)
423
+ max_tokens = body.get("max_tokens", 2048)
424
 
425
+ # Convert Anthropic message format to CF/OpenAI format
426
  messages = []
427
 
428
  if body.get("system"):
 
430
 
431
  for m in body.get("messages", []):
432
  content = m.get("content", "")
 
433
  if isinstance(content, list):
434
  txt = ""
435
  for x in content:
436
  if x.get("type") == "text":
437
  txt += x.get("text", "")
438
  content = txt
 
439
  messages.append({"role": m["role"], "content": content})
440
 
441
+ cf_body = {
 
442
  "messages": messages,
443
+ "stream": stream,
444
+ "max_tokens": max_tokens,
445
  }
446
 
447
  # -----------------------------------------
 
450
  if not stream:
451
  tried = set()
452
 
453
+ for _ in range(len(CF_ACCOUNTS)):
454
+ acc = await wait_for_free_key(exclude=tried)
455
 
456
+ if not acc:
457
  break
458
 
459
+ tried.add(acc["account_id"])
460
 
461
  try:
462
  async with httpx.AsyncClient(timeout=180) as client:
463
  r = await client.post(
464
+ cf_url(acc["account_id"], model),
465
+ json=cf_body,
466
+ headers={
467
+ "Authorization": f"Bearer {acc['api_key']}",
468
+ "Content-Type": "application/json",
469
+ }
470
  )
471
 
472
+ if is_rate_limited(r.status_code, r.text):
473
+ log(f"Account {acc['account_id'][:8]}... rate limited (anthropic non-stream), trying next")
474
+ await mark_fail(acc)
475
+ continue
476
 
477
+ if r.status_code != 200:
478
+ log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code}, trying next")
479
+ await mark_fail(acc)
480
  continue
481
 
482
  data = r.json()
483
+ cf_result = data.get("result", {})
484
+ content = cf_result.get("response", "")
485
 
486
  out = {
487
  "id": "msg_" + uuid.uuid4().hex[:10],
488
  "type": "message",
489
  "role": "assistant",
490
+ "model": body.get("model", DEFAULT_CF_MODEL),
491
+ "content": [{"type": "text", "text": content}],
492
  "stop_reason": "end_turn",
493
  "stop_sequence": None,
494
  "usage": {"input_tokens": 0, "output_tokens": 0}
495
  }
496
 
497
+ await mark_ok(acc)
498
  return JSONResponse(out)
499
 
500
  except Exception as e:
501
+ log(f"Account {acc['account_id'][:8]}... exception: {e}")
502
+ await mark_fail(acc)
503
 
504
  finally:
505
+ await release_key(acc)
506
 
507
+ return JSONResponse({"error": "All accounts failed"}, status_code=500)
508
 
509
  # -----------------------------------------
510
+ # STREAM (Anthropic SSE envelope)
511
  # -----------------------------------------
512
  async def agen():
513
  tried = set()
514
  msg_id = "msg_" + uuid.uuid4().hex[:10]
515
  sent_any_delta = False
516
 
517
+ for _ in range(len(CF_ACCOUNTS)):
518
+ acc = await wait_for_free_key(exclude=tried)
 
 
519
 
520
+ if not acc:
 
 
 
521
  break
522
 
523
+ tried.add(acc["account_id"])
524
 
525
  try:
526
  async with httpx.AsyncClient(timeout=None) as client:
527
  async with client.stream(
528
  "POST",
529
+ cf_url(acc["account_id"], model),
530
+ json=cf_body,
531
+ headers={
532
+ "Authorization": f"Bearer {acc['api_key']}",
533
+ "Content-Type": "application/json",
534
+ }
535
  ) as r:
536
 
537
+ if is_rate_limited(r.status_code, ""):
538
+ log(f"Account {acc['account_id'][:8]}... rate limited (anthropic stream), trying next")
539
+ await mark_fail(acc)
540
  continue
541
 
542
+ if r.status_code != 200:
543
+ log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code} (anthropic stream), trying next")
544
+ await mark_fail(acc)
545
+ continue
546
+
547
+ # Emit Anthropic envelope only once on first successful key
548
  if not sent_any_delta:
549
  yield sse({
550
  "type": "message_start",
 
552
  "id": msg_id,
553
  "type": "message",
554
  "role": "assistant",
555
+ "model": body.get("model", DEFAULT_CF_MODEL),
556
  "content": [],
557
  "stop_reason": None,
558
  "stop_sequence": None,
 
565
  "content_block": {"type": "text"}
566
  })
567
 
568
+ hit_limit = False
569
 
570
  async for line in r.aiter_lines():
571
+ line = line.strip()
572
+ if not line:
573
  continue
574
 
575
+ if line == "data: [DONE]" or line == "[DONE]":
 
 
576
  break
577
 
578
+ raw = line[6:] if line.startswith("data: ") else line
 
 
 
 
 
 
 
 
 
579
 
580
+ if is_rate_limited(0, raw):
581
+ log(f"Account {acc['account_id'][:8]}... mid-stream limit (anthr