Elysiadev11 commited on
Commit
503d0e3
·
verified ·
1 Parent(s): 6d91b7c

Upload proxy_cf.py

Browse files
Files changed (1) hide show
  1. proxy_cf.py +563 -0
proxy_cf.py ADDED
@@ -0,0 +1,563 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import time
4
+ import uuid
5
+ import asyncio
6
+ import httpx
7
+
8
+ from fastapi import FastAPI, Request
9
+ from fastapi.responses import JSONResponse, Response, StreamingResponse
10
+ from starlette.requests import ClientDisconnect
11
+
12
+ app = FastAPI()
13
+
14
+ # =====================================================
15
+ # CONFIG
16
+ # =====================================================
17
+ MASTER_API_KEY = os.getenv("MASTER_API_KEY", "olla")
18
+
19
+ # Default CF Workers AI model (can override via request body)
20
+ DEFAULT_CF_MODEL = os.getenv("DEFAULT_CF_MODEL", "@cf/meta/llama-3.3-70b-instruct-fp8-fast")
21
+
22
+ # =====================================================
23
+ # LOAD CF CREDENTIALS
24
+ # Format env: CF_1=account_id,api_key
25
+ # =====================================================
26
+ CF_ACCOUNTS = [] # list of {"account_id": ..., "api_key": ...}
27
+
28
+ for i in range(1, 101):
29
+ raw = os.getenv(f"CF_{i}")
30
+ if not raw:
31
+ continue
32
+ parts = raw.split(",", 1)
33
+ if len(parts) != 2:
34
+ print(f"[WARN] CF_{i} format invalid, expected 'account_id,api_key' — skipped")
35
+ continue
36
+ account_id, api_key = parts[0].strip(), parts[1].strip()
37
+ if account_id and api_key:
38
+ CF_ACCOUNTS.append({"account_id": account_id, "api_key": api_key})
39
+
40
+ if not CF_ACCOUNTS:
41
+ print("[WARN] No CF credentials found, inserting dummy")
42
+ CF_ACCOUNTS.append({"account_id": "dummy", "api_key": "dummy"})
43
+
44
+ # =====================================================
45
+ # KEY STATUS
46
+ # =====================================================
47
+ key_status = {}
48
+ for idx, acc in enumerate(CF_ACCOUNTS, 1):
49
+ kid = acc["account_id"]
50
+ key_status[kid] = {
51
+ "index": idx,
52
+ "healthy": True,
53
+ "busy": False,
54
+ "success": 0,
55
+ "fail": 0,
56
+ }
57
+
58
+ rr_index = 0
59
+ _key_lock = asyncio.Lock()
60
+
61
+
62
+ # =====================================================
63
+ # HELPERS
64
+ # =====================================================
65
+ def log(x):
66
+ print(f"[{time.strftime('%H:%M:%S')}] {x}", flush=True)
67
+
68
+
69
+ def sse(obj):
70
+ return "data: " + json.dumps(obj, ensure_ascii=False) + "\n\n"
71
+
72
+
73
+ def auth_ok(req: Request):
74
+ token = req.headers.get("Authorization", "").replace("Bearer ", "")
75
+ return token == MASTER_API_KEY
76
+
77
+
78
+ CF_AI_BASE = "https://api.cloudflare.com/client/v4/accounts/{account_id}/ai/v1"
79
+
80
+ def cf_base(account_id: str) -> str:
81
+ return CF_AI_BASE.format(account_id=account_id)
82
+
83
+ def cf_url(account_id: str, model: str) -> str:
84
+ # Legacy /run endpoint (kept for fallback)
85
+ return f"https://api.cloudflare.com/client/v4/accounts/{account_id}/ai/run/{model}"
86
+
87
+
88
+ async def get_key(exclude=None):
89
+ global rr_index
90
+ if exclude is None:
91
+ exclude = set()
92
+
93
+ async with _key_lock:
94
+ for _ in range(len(CF_ACCOUNTS)):
95
+ rr_index = (rr_index + 1) % len(CF_ACCOUNTS)
96
+ acc = CF_ACCOUNTS[rr_index]
97
+ kid = acc["account_id"]
98
+ st = key_status[kid]
99
+
100
+ if st["healthy"] and not st["busy"] and kid not in exclude:
101
+ st["busy"] = True
102
+ return acc # returns dict {"account_id": ..., "api_key": ...}
103
+
104
+ return None
105
+
106
+
107
+ async def release_key(acc):
108
+ async with _key_lock:
109
+ kid = acc["account_id"]
110
+ if kid in key_status:
111
+ key_status[kid]["busy"] = False
112
+
113
+
114
+ async def mark_fail(acc):
115
+ async with _key_lock:
116
+ kid = acc["account_id"]
117
+ if kid in key_status:
118
+ key_status[kid]["fail"] += 1
119
+
120
+
121
+ async def mark_ok(acc):
122
+ async with _key_lock:
123
+ kid = acc["account_id"]
124
+ if kid in key_status:
125
+ key_status[kid]["success"] += 1
126
+ key_status[kid]["fail"] = 0
127
+
128
+
129
+ async def wait_for_free_key(exclude=None, max_wait=30.0, interval=0.3):
130
+ elapsed = 0.0
131
+ while elapsed < max_wait:
132
+ acc = await get_key(exclude)
133
+ if acc:
134
+ return acc
135
+ await asyncio.sleep(interval)
136
+ elapsed += interval
137
+ return None
138
+
139
+
140
+ def is_rate_limited(status_code: int, text: str) -> bool:
141
+ t = text.lower()
142
+ return status_code == 429 or "rate limit" in t or "too many requests" in t or "usage limit" in t
143
+
144
+
145
+ # =====================================================
146
+ # ROOT
147
+ # =====================================================
148
+ @app.get("/")
149
+ async def root():
150
+ async with _key_lock:
151
+ safe = {}
152
+ for kid, v in key_status.items():
153
+ masked = kid[:6] + "****" + kid[-4:]
154
+ safe[masked] = {
155
+ "index": v["index"],
156
+ "healthy": v["healthy"],
157
+ "busy": v["busy"],
158
+ "success": v["success"],
159
+ "fail": v["fail"],
160
+ }
161
+
162
+ return {
163
+ "status": "ok",
164
+ "accounts": len(CF_ACCOUNTS),
165
+ "default_model": DEFAULT_CF_MODEL,
166
+ "detail": safe
167
+ }
168
+
169
+
170
+ # =====================================================
171
+ # /v1/models — live proxy langsung ke CF
172
+ # =====================================================
173
+ @app.get("/v1/models")
174
+ async def models(req: Request):
175
+ if not auth_ok(req):
176
+ return JSONResponse({"error": "Unauthorized"}, status_code=401)
177
+
178
+ # Pakai account pertama yang healthy, tidak perlu mark busy
179
+ acc = None
180
+ async with _key_lock:
181
+ for a in CF_ACCOUNTS:
182
+ if key_status[a["account_id"]]["healthy"]:
183
+ acc = a
184
+ break
185
+
186
+ if not acc:
187
+ return JSONResponse({"error": "No healthy accounts"}, status_code=503)
188
+
189
+ try:
190
+ async with httpx.AsyncClient(timeout=30) as client:
191
+ r = await client.get(
192
+ f"{cf_base(acc['account_id'])}/models",
193
+ headers={"Authorization": f"Bearer {acc['api_key']}"}
194
+ )
195
+
196
+ if r.status_code != 200:
197
+ return JSONResponse({"error": f"CF returned {r.status_code}: {r.text}"}, status_code=r.status_code)
198
+
199
+ # CF sudah return OpenAI-compatible format, langsung forward
200
+ return Response(content=r.content, media_type="application/json")
201
+
202
+ except Exception as e:
203
+ log(f"[/v1/models] exception: {e}")
204
+ return JSONResponse({"error": str(e)}, status_code=500)
205
+
206
+
207
+ # =====================================================
208
+ # /v1/chat/completions — OpenAI-compatible endpoint
209
+ # =====================================================
210
+ @app.post("/v1/chat/completions")
211
+ async def chat(req: Request):
212
+ if not auth_ok(req):
213
+ return JSONResponse({"error": "Unauthorized"}, status_code=401)
214
+
215
+ try:
216
+ body = await req.json()
217
+ except Exception:
218
+ return JSONResponse({"error": "Bad JSON"}, status_code=400)
219
+
220
+ is_stream = body.get("stream", False)
221
+ model = body.get("model", DEFAULT_CF_MODEL)
222
+
223
+ # Pass body as-is ke CF — CF OpenAI-compatible endpoint terima format sama persis
224
+ cf_body = {**body, "model": model}
225
+
226
+ # -----------------------------------------
227
+ # NON STREAM — forward response langsung
228
+ # -----------------------------------------
229
+ if not is_stream:
230
+ tried = set()
231
+
232
+ for _ in range(len(CF_ACCOUNTS)):
233
+ acc = await wait_for_free_key(exclude=tried)
234
+ if not acc:
235
+ break
236
+
237
+ tried.add(acc["account_id"])
238
+
239
+ try:
240
+ async with httpx.AsyncClient(timeout=180) as client:
241
+ r = await client.post(
242
+ f"{cf_base(acc['account_id'])}/chat/completions",
243
+ json=cf_body,
244
+ headers={
245
+ "Authorization": f"Bearer {acc['api_key']}",
246
+ "Content-Type": "application/json",
247
+ }
248
+ )
249
+
250
+ if is_rate_limited(r.status_code, r.text):
251
+ log(f"Account {acc['account_id'][:8]}... rate limited (non-stream), trying next")
252
+ await mark_fail(acc)
253
+ continue
254
+
255
+ if r.status_code != 200:
256
+ log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code}, trying next")
257
+ await mark_fail(acc)
258
+ continue
259
+
260
+ await mark_ok(acc)
261
+ # CF OpenAI-compatible → langsung forward, tidak perlu konversi
262
+ return Response(content=r.content, media_type="application/json")
263
+
264
+ except Exception as e:
265
+ log(f"Account {acc['account_id'][:8]}... exception: {e}")
266
+ await mark_fail(acc)
267
+
268
+ finally:
269
+ await release_key(acc)
270
+
271
+ return JSONResponse({"error": "All accounts failed"}, status_code=500)
272
+
273
+ # -----------------------------------------
274
+ # STREAM — CF kirim SSE OpenAI-format, langsung pipe ke client
275
+ # -----------------------------------------
276
+ async def gen():
277
+ tried = set()
278
+
279
+ for _ in range(len(CF_ACCOUNTS)):
280
+ acc = await wait_for_free_key(exclude=tried)
281
+ if not acc:
282
+ break
283
+
284
+ tried.add(acc["account_id"])
285
+
286
+ try:
287
+ async with httpx.AsyncClient(timeout=None) as client:
288
+ async with client.stream(
289
+ "POST",
290
+ f"{cf_base(acc['account_id'])}/chat/completions",
291
+ json=cf_body,
292
+ headers={
293
+ "Authorization": f"Bearer {acc['api_key']}",
294
+ "Content-Type": "application/json",
295
+ }
296
+ ) as r:
297
+
298
+ if is_rate_limited(r.status_code, ""):
299
+ log(f"Account {acc['account_id'][:8]}... rate limited (stream), trying next")
300
+ await mark_fail(acc)
301
+ continue
302
+
303
+ if r.status_code != 200:
304
+ log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code} (stream), trying next")
305
+ await mark_fail(acc)
306
+ continue
307
+
308
+ hit_limit = False
309
+
310
+ async for line in r.aiter_lines():
311
+ if not line:
312
+ continue
313
+
314
+ if line.strip() == "data: [DONE]":
315
+ break
316
+
317
+ # Detect mid-stream rate limit dalam payload
318
+ raw = line[6:] if line.startswith("data: ") else line
319
+ if is_rate_limited(0, raw):
320
+ log(f"Account {acc['account_id'][:8]}... mid-stream limit, switching key")
321
+ hit_limit = True
322
+ break
323
+
324
+ # CF OpenAI-compatible SSE → pipe langsung ke client
325
+ yield line + "\n\n"
326
+
327
+ if hit_limit:
328
+ await mark_fail(acc)
329
+ continue
330
+
331
+ yield "data: [DONE]\n\n"
332
+ await mark_ok(acc)
333
+ return
334
+
335
+ except Exception as e:
336
+ log(f"Account {acc['account_id'][:8]}... stream exception: {e}")
337
+ await mark_fail(acc)
338
+
339
+ finally:
340
+ await release_key(acc)
341
+
342
+ yield sse({"error": "All accounts failed"})
343
+ yield "data: [DONE]\n\n"
344
+
345
+ return StreamingResponse(gen(), media_type="text/event-stream")
346
+
347
+
348
+ # =====================================================
349
+ # /v1/messages — Anthropic-compatible endpoint
350
+ # Konversi Anthropic format → CF OpenAI-compatible
351
+ # =====================================================
352
+ @app.post("/v1/messages")
353
+ async def anthropic(req: Request):
354
+ if not auth_ok(req):
355
+ return JSONResponse({"error": "Unauthorized"}, status_code=401)
356
+
357
+ try:
358
+ body = await req.json()
359
+ except ClientDisconnect:
360
+ return Response(status_code=499)
361
+ except Exception:
362
+ return JSONResponse({"error": "Bad JSON"}, status_code=400)
363
+
364
+ stream = body.get("stream", False)
365
+ model = body.get("model", DEFAULT_CF_MODEL)
366
+ max_tokens = body.get("max_tokens", 2048)
367
+
368
+ # Konversi Anthropic messages → OpenAI format
369
+ messages = []
370
+
371
+ if body.get("system"):
372
+ messages.append({"role": "system", "content": body["system"]})
373
+
374
+ for m in body.get("messages", []):
375
+ content = m.get("content", "")
376
+ if isinstance(content, list):
377
+ txt = "".join(x.get("text", "") for x in content if x.get("type") == "text")
378
+ content = txt
379
+ messages.append({"role": m["role"], "content": content})
380
+
381
+ cf_body = {
382
+ "model": model,
383
+ "messages": messages,
384
+ "max_tokens": max_tokens,
385
+ "stream": stream,
386
+ }
387
+
388
+ # -----------------------------------------
389
+ # NON STREAM
390
+ # -----------------------------------------
391
+ if not stream:
392
+ tried = set()
393
+
394
+ for _ in range(len(CF_ACCOUNTS)):
395
+ acc = await wait_for_free_key(exclude=tried)
396
+ if not acc:
397
+ break
398
+
399
+ tried.add(acc["account_id"])
400
+
401
+ try:
402
+ async with httpx.AsyncClient(timeout=180) as client:
403
+ r = await client.post(
404
+ f"{cf_base(acc['account_id'])}/chat/completions",
405
+ json=cf_body,
406
+ headers={
407
+ "Authorization": f"Bearer {acc['api_key']}",
408
+ "Content-Type": "application/json",
409
+ }
410
+ )
411
+
412
+ if is_rate_limited(r.status_code, r.text):
413
+ log(f"Account {acc['account_id'][:8]}... rate limited (anthropic non-stream), trying next")
414
+ await mark_fail(acc)
415
+ continue
416
+
417
+ if r.status_code != 200:
418
+ log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code}, trying next")
419
+ await mark_fail(acc)
420
+ continue
421
+
422
+ data = r.json()
423
+ # CF OpenAI-compatible response → konversi ke Anthropic format
424
+ content_text = data["choices"][0]["message"]["content"]
425
+ usage = data.get("usage", {})
426
+
427
+ out = {
428
+ "id": "msg_" + uuid.uuid4().hex[:10],
429
+ "type": "message",
430
+ "role": "assistant",
431
+ "model": model,
432
+ "content": [{"type": "text", "text": content_text}],
433
+ "stop_reason": "end_turn",
434
+ "stop_sequence": None,
435
+ "usage": {
436
+ "input_tokens": usage.get("prompt_tokens", 0),
437
+ "output_tokens": usage.get("completion_tokens", 0),
438
+ }
439
+ }
440
+
441
+ await mark_ok(acc)
442
+ return JSONResponse(out)
443
+
444
+ except Exception as e:
445
+ log(f"Account {acc['account_id'][:8]}... exception: {e}")
446
+ await mark_fail(acc)
447
+
448
+ finally:
449
+ await release_key(acc)
450
+
451
+ return JSONResponse({"error": "All accounts failed"}, status_code=500)
452
+
453
+ # -----------------------------------------
454
+ # STREAM — CF kirim OpenAI SSE, kita konversi ke Anthropic SSE
455
+ # -----------------------------------------
456
+ async def agen():
457
+ tried = set()
458
+ msg_id = "msg_" + uuid.uuid4().hex[:10]
459
+ sent_any_delta = False
460
+
461
+ for _ in range(len(CF_ACCOUNTS)):
462
+ acc = await wait_for_free_key(exclude=tried)
463
+ if not acc:
464
+ break
465
+
466
+ tried.add(acc["account_id"])
467
+
468
+ try:
469
+ async with httpx.AsyncClient(timeout=None) as client:
470
+ async with client.stream(
471
+ "POST",
472
+ f"{cf_base(acc['account_id'])}/chat/completions",
473
+ json=cf_body,
474
+ headers={
475
+ "Authorization": f"Bearer {acc['api_key']}",
476
+ "Content-Type": "application/json",
477
+ }
478
+ ) as r:
479
+
480
+ if is_rate_limited(r.status_code, ""):
481
+ log(f"Account {acc['account_id'][:8]}... rate limited (anthropic stream), trying next")
482
+ await mark_fail(acc)
483
+ continue
484
+
485
+ if r.status_code != 200:
486
+ log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code} (anthropic stream), trying next")
487
+ await mark_fail(acc)
488
+ continue
489
+
490
+ # Emit Anthropic envelope sekali saja saat key pertama berhasil
491
+ if not sent_any_delta:
492
+ yield sse({
493
+ "type": "message_start",
494
+ "message": {
495
+ "id": msg_id,
496
+ "type": "message",
497
+ "role": "assistant",
498
+ "model": model,
499
+ "content": [],
500
+ "stop_reason": None,
501
+ "stop_sequence": None,
502
+ "usage": {"input_tokens": 0, "output_tokens": 0}
503
+ }
504
+ })
505
+ yield sse({
506
+ "type": "content_block_start",
507
+ "index": 0,
508
+ "content_block": {"type": "text"}
509
+ })
510
+
511
+ hit_limit = False
512
+
513
+ async for line in r.aiter_lines():
514
+ if not line:
515
+ continue
516
+ if line.strip() == "data: [DONE]":
517
+ break
518
+
519
+ raw = line[6:] if line.startswith("data: ") else line
520
+
521
+ if is_rate_limited(0, raw):
522
+ log(f"Account {acc['account_id'][:8]}... mid-stream limit (anthropic), switching key")
523
+ hit_limit = True
524
+ break
525
+
526
+ try:
527
+ j = json.loads(raw)
528
+ token = j["choices"][0]["delta"].get("content", "")
529
+ except Exception:
530
+ continue
531
+
532
+ if token:
533
+ sent_any_delta = True
534
+ yield sse({
535
+ "type": "content_block_delta",
536
+ "index": 0,
537
+ "delta": {"type": "text_delta", "text": token}
538
+ })
539
+
540
+ if hit_limit:
541
+ await mark_fail(acc)
542
+ continue
543
+
544
+ await mark_ok(acc)
545
+ break
546
+
547
+ except Exception as e:
548
+ log(f"Account {acc['account_id'][:8]}... agen exception: {e}")
549
+ await mark_fail(acc)
550
+
551
+ finally:
552
+ await release_key(acc)
553
+
554
+ # Tutup Anthropic SSE envelope
555
+ yield sse({"type": "content_block_stop", "index": 0})
556
+ yield sse({
557
+ "type": "message_delta",
558
+ "delta": {"stop_reason": "end_turn", "stop_sequence": None},
559
+ "usage": {"output_tokens": 0}
560
+ })
561
+ yield sse({"type": "message_stop"})
562
+
563
+ return StreamingResponse(agen(), media_type="text/event-stream")