Elysiadev11 commited on
Commit
65979a9
·
verified ·
1 Parent(s): 4ba202a

Upload proxy_cf.py

Browse files
Files changed (1) hide show
  1. proxy_cf.py +623 -0
proxy_cf.py ADDED
@@ -0,0 +1,623 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import time
4
+ import uuid
5
+ import asyncio
6
+ import httpx
7
+
8
+ from fastapi import FastAPI, Request
9
+ from fastapi.responses import JSONResponse, Response, StreamingResponse
10
+ from starlette.requests import ClientDisconnect
11
+
12
+ app = FastAPI()
13
+
14
+ # =====================================================
15
+ # CONFIG
16
+ # =====================================================
17
+ MASTER_API_KEY = os.getenv("MASTER_API_KEY", "olla")
18
+
19
+ # Default CF Workers AI model (can override via request body)
20
+ DEFAULT_CF_MODEL = os.getenv("DEFAULT_CF_MODEL", "@cf/meta/llama-3.3-70b-instruct-fp8-fast")
21
+
22
+ # =====================================================
23
+ # LOAD CF CREDENTIALS
24
+ # Format env: CF_1=account_id,api_key
25
+ # =====================================================
26
+ CF_ACCOUNTS = [] # list of {"account_id": ..., "api_key": ...}
27
+
28
+ for i in range(1, 101):
29
+ raw = os.getenv(f"CF_{i}")
30
+ if not raw:
31
+ continue
32
+ parts = raw.split(",", 1)
33
+ if len(parts) != 2:
34
+ print(f"[WARN] CF_{i} format invalid, expected 'account_id,api_key' — skipped")
35
+ continue
36
+ account_id, api_key = parts[0].strip(), parts[1].strip()
37
+ if account_id and api_key:
38
+ CF_ACCOUNTS.append({"account_id": account_id, "api_key": api_key})
39
+
40
+ if not CF_ACCOUNTS:
41
+ print("[WARN] No CF credentials found, inserting dummy")
42
+ CF_ACCOUNTS.append({"account_id": "dummy", "api_key": "dummy"})
43
+
44
+ # =====================================================
45
+ # KEY STATUS
46
+ # =====================================================
47
+ key_status = {}
48
+ for idx, acc in enumerate(CF_ACCOUNTS, 1):
49
+ kid = acc["account_id"]
50
+ key_status[kid] = {
51
+ "index": idx,
52
+ "healthy": True,
53
+ "busy": False,
54
+ "success": 0,
55
+ "fail": 0,
56
+ }
57
+
58
+ rr_index = 0
59
+ _key_lock = asyncio.Lock()
60
+
61
+
62
+ # =====================================================
63
+ # HELPERS
64
+ # =====================================================
65
+ def log(x):
66
+ print(f"[{time.strftime('%H:%M:%S')}] {x}", flush=True)
67
+
68
+
69
+ def sse(obj):
70
+ return "data: " + json.dumps(obj, ensure_ascii=False) + "\n\n"
71
+
72
+
73
+ def auth_ok(req: Request):
74
+ token = req.headers.get("Authorization", "").replace("Bearer ", "")
75
+ return token == MASTER_API_KEY
76
+
77
+
78
+ def cf_url(account_id: str, model: str) -> str:
79
+ return f"https://api.cloudflare.com/client/v4/accounts/{account_id}/ai/run/{model}"
80
+
81
+
82
+ async def get_key(exclude=None):
83
+ global rr_index
84
+ if exclude is None:
85
+ exclude = set()
86
+
87
+ async with _key_lock:
88
+ for _ in range(len(CF_ACCOUNTS)):
89
+ rr_index = (rr_index + 1) % len(CF_ACCOUNTS)
90
+ acc = CF_ACCOUNTS[rr_index]
91
+ kid = acc["account_id"]
92
+ st = key_status[kid]
93
+
94
+ if st["healthy"] and not st["busy"] and kid not in exclude:
95
+ st["busy"] = True
96
+ return acc # returns dict {"account_id": ..., "api_key": ...}
97
+
98
+ return None
99
+
100
+
101
+ async def release_key(acc):
102
+ async with _key_lock:
103
+ kid = acc["account_id"]
104
+ if kid in key_status:
105
+ key_status[kid]["busy"] = False
106
+
107
+
108
+ async def mark_fail(acc):
109
+ async with _key_lock:
110
+ kid = acc["account_id"]
111
+ if kid in key_status:
112
+ key_status[kid]["fail"] += 1
113
+
114
+
115
+ async def mark_ok(acc):
116
+ async with _key_lock:
117
+ kid = acc["account_id"]
118
+ if kid in key_status:
119
+ key_status[kid]["success"] += 1
120
+ key_status[kid]["fail"] = 0
121
+
122
+
123
+ async def wait_for_free_key(exclude=None, max_wait=30.0, interval=0.3):
124
+ elapsed = 0.0
125
+ while elapsed < max_wait:
126
+ acc = await get_key(exclude)
127
+ if acc:
128
+ return acc
129
+ await asyncio.sleep(interval)
130
+ elapsed += interval
131
+ return None
132
+
133
+
134
+ def is_rate_limited(status_code: int, text: str) -> bool:
135
+ t = text.lower()
136
+ return status_code == 429 or "rate limit" in t or "too many requests" in t or "usage limit" in t
137
+
138
+
139
+ # =====================================================
140
+ # ROOT
141
+ # =====================================================
142
+ @app.get("/")
143
+ async def root():
144
+ async with _key_lock:
145
+ safe = {}
146
+ for kid, v in key_status.items():
147
+ masked = kid[:6] + "****" + kid[-4:]
148
+ safe[masked] = {
149
+ "index": v["index"],
150
+ "healthy": v["healthy"],
151
+ "busy": v["busy"],
152
+ "success": v["success"],
153
+ "fail": v["fail"],
154
+ }
155
+
156
+ return {
157
+ "status": "ok",
158
+ "accounts": len(CF_ACCOUNTS),
159
+ "default_model": DEFAULT_CF_MODEL,
160
+ "detail": safe
161
+ }
162
+
163
+
164
+ # =====================================================
165
+ # /v1/models — static list of popular CF models
166
+ # =====================================================
167
+ @app.get("/v1/models")
168
+ async def models(req: Request):
169
+ if not auth_ok(req):
170
+ return JSONResponse({"error": "Unauthorized"}, status_code=401)
171
+
172
+ now = int(time.time())
173
+ cf_models = [
174
+ "@cf/meta/llama-3.3-70b-instruct-fp8-fast",
175
+ "@cf/meta/llama-3.1-8b-instruct",
176
+ "@cf/meta/llama-3.1-70b-instruct",
177
+ "@cf/mistral/mistral-7b-instruct-v0.1",
178
+ "@cf/google/gemma-7b-it",
179
+ "@cf/qwen/qwen1.5-14b-chat-awq",
180
+ "@cf/deepseek-ai/deepseek-r1-distill-qwen-32b",
181
+ ]
182
+
183
+ data = [
184
+ {"id": m, "object": "model", "created": now, "owned_by": "cloudflare"}
185
+ for m in cf_models
186
+ ]
187
+
188
+ return {"object": "list", "data": data}
189
+
190
+
191
+ # =====================================================
192
+ # /v1/chat/completions — OpenAI-compatible endpoint
193
+ # =====================================================
194
+ @app.post("/v1/chat/completions")
195
+ async def chat(req: Request):
196
+ if not auth_ok(req):
197
+ return JSONResponse({"error": "Unauthorized"}, status_code=401)
198
+
199
+ try:
200
+ body = await req.json()
201
+ except Exception:
202
+ return JSONResponse({"error": "Bad JSON"}, status_code=400)
203
+
204
+ is_stream = body.get("stream", False)
205
+ model = body.get("model", DEFAULT_CF_MODEL)
206
+ messages = body.get("messages", [])
207
+ max_tokens = body.get("max_tokens", 2048)
208
+
209
+ cf_body = {
210
+ "messages": messages,
211
+ "stream": is_stream,
212
+ "max_tokens": max_tokens,
213
+ }
214
+
215
+ # -----------------------------------------
216
+ # NON STREAM
217
+ # -----------------------------------------
218
+ if not is_stream:
219
+ tried = set()
220
+
221
+ for _ in range(len(CF_ACCOUNTS)):
222
+ acc = await wait_for_free_key(exclude=tried)
223
+
224
+ if not acc:
225
+ break
226
+
227
+ tried.add(acc["account_id"])
228
+
229
+ try:
230
+ async with httpx.AsyncClient(timeout=180) as client:
231
+ r = await client.post(
232
+ cf_url(acc["account_id"], model),
233
+ json=cf_body,
234
+ headers={
235
+ "Authorization": f"Bearer {acc['api_key']}",
236
+ "Content-Type": "application/json",
237
+ }
238
+ )
239
+
240
+ if is_rate_limited(r.status_code, r.text):
241
+ log(f"Account {acc['account_id'][:8]}... rate limited (non-stream), trying next")
242
+ await mark_fail(acc)
243
+ continue
244
+
245
+ if r.status_code != 200:
246
+ log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code}, trying next")
247
+ await mark_fail(acc)
248
+ continue
249
+
250
+ data = r.json()
251
+
252
+ # CF Workers AI response format:
253
+ # {"result": {"response": "..."}, "success": true, ...}
254
+ # Convert to OpenAI format
255
+ cf_result = data.get("result", {})
256
+ content = cf_result.get("response", "")
257
+
258
+ out = {
259
+ "id": "chatcmpl-" + uuid.uuid4().hex[:10],
260
+ "object": "chat.completion",
261
+ "created": int(time.time()),
262
+ "model": model,
263
+ "choices": [
264
+ {
265
+ "index": 0,
266
+ "message": {"role": "assistant", "content": content},
267
+ "finish_reason": "stop",
268
+ }
269
+ ],
270
+ "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
271
+ }
272
+
273
+ await mark_ok(acc)
274
+ return JSONResponse(out)
275
+
276
+ except Exception as e:
277
+ log(f"Account {acc['account_id'][:8]}... exception: {e}")
278
+ await mark_fail(acc)
279
+
280
+ finally:
281
+ await release_key(acc)
282
+
283
+ return JSONResponse({"error": "All accounts failed"}, status_code=500)
284
+
285
+ # -----------------------------------------
286
+ # STREAM
287
+ # CF Workers AI streams NDJSON lines:
288
+ # {"response":"token"} or {"p":"...","response":"token"} and ends with [DONE]
289
+ # We convert to OpenAI SSE format
290
+ # -----------------------------------------
291
+ async def gen():
292
+ tried = set()
293
+ cid = "chatcmpl-" + uuid.uuid4().hex[:10]
294
+ sent_any = False
295
+
296
+ for _ in range(len(CF_ACCOUNTS)):
297
+ acc = await wait_for_free_key(exclude=tried)
298
+
299
+ if not acc:
300
+ break
301
+
302
+ tried.add(acc["account_id"])
303
+
304
+ try:
305
+ async with httpx.AsyncClient(timeout=None) as client:
306
+ async with client.stream(
307
+ "POST",
308
+ cf_url(acc["account_id"], model),
309
+ json=cf_body,
310
+ headers={
311
+ "Authorization": f"Bearer {acc['api_key']}",
312
+ "Content-Type": "application/json",
313
+ }
314
+ ) as r:
315
+
316
+ if is_rate_limited(r.status_code, ""):
317
+ log(f"Account {acc['account_id'][:8]}... rate limited (stream), trying next")
318
+ await mark_fail(acc)
319
+ continue
320
+
321
+ if r.status_code != 200:
322
+ log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code} (stream), trying next")
323
+ await mark_fail(acc)
324
+ continue
325
+
326
+ hit_limit = False
327
+
328
+ async for line in r.aiter_lines():
329
+ line = line.strip()
330
+ if not line:
331
+ continue
332
+
333
+ if line == "data: [DONE]" or line == "[DONE]":
334
+ break
335
+
336
+ # Strip "data: " prefix if present
337
+ raw = line[6:] if line.startswith("data: ") else line
338
+
339
+ # Detect mid-stream rate limit
340
+ if is_rate_limited(0, raw):
341
+ log(f"Account {acc['account_id'][:8]}... mid-stream limit, switching key")
342
+ hit_limit = True
343
+ break
344
+
345
+ try:
346
+ j = json.loads(raw)
347
+ except Exception:
348
+ continue
349
+
350
+ token = j.get("response", "")
351
+
352
+ if token:
353
+ sent_any = True
354
+ chunk = {
355
+ "id": cid,
356
+ "object": "chat.completion.chunk",
357
+ "created": int(time.time()),
358
+ "model": model,
359
+ "choices": [
360
+ {
361
+ "index": 0,
362
+ "delta": {"role": "assistant", "content": token},
363
+ "finish_reason": None,
364
+ }
365
+ ]
366
+ }
367
+ yield sse(chunk)
368
+
369
+ if hit_limit:
370
+ await mark_fail(acc)
371
+ continue
372
+
373
+ # Send finish chunk
374
+ finish_chunk = {
375
+ "id": cid,
376
+ "object": "chat.completion.chunk",
377
+ "created": int(time.time()),
378
+ "model": model,
379
+ "choices": [
380
+ {
381
+ "index": 0,
382
+ "delta": {},
383
+ "finish_reason": "stop",
384
+ }
385
+ ]
386
+ }
387
+ yield sse(finish_chunk)
388
+ yield "data: [DONE]\n\n"
389
+
390
+ await mark_ok(acc)
391
+ return
392
+
393
+ except Exception as e:
394
+ log(f"Account {acc['account_id'][:8]}... stream exception: {e}")
395
+ await mark_fail(acc)
396
+
397
+ finally:
398
+ await release_key(acc)
399
+
400
+ yield sse({"error": "All accounts failed"})
401
+ yield "data: [DONE]\n\n"
402
+
403
+ return StreamingResponse(gen(), media_type="text/event-stream")
404
+
405
+
406
+ # =====================================================
407
+ # /v1/messages — Anthropic-compatible endpoint
408
+ # =====================================================
409
+ @app.post("/v1/messages")
410
+ async def anthropic(req: Request):
411
+ if not auth_ok(req):
412
+ return JSONResponse({"error": "Unauthorized"}, status_code=401)
413
+
414
+ try:
415
+ body = await req.json()
416
+ except ClientDisconnect:
417
+ return Response(status_code=499)
418
+ except Exception:
419
+ return JSONResponse({"error": "Bad JSON"}, status_code=400)
420
+
421
+ stream = body.get("stream", False)
422
+ model = body.get("model", DEFAULT_CF_MODEL)
423
+ max_tokens = body.get("max_tokens", 2048)
424
+
425
+ # Convert Anthropic message format to CF/OpenAI format
426
+ messages = []
427
+
428
+ if body.get("system"):
429
+ messages.append({"role": "system", "content": body["system"]})
430
+
431
+ for m in body.get("messages", []):
432
+ content = m.get("content", "")
433
+ if isinstance(content, list):
434
+ txt = ""
435
+ for x in content:
436
+ if x.get("type") == "text":
437
+ txt += x.get("text", "")
438
+ content = txt
439
+ messages.append({"role": m["role"], "content": content})
440
+
441
+ cf_body = {
442
+ "messages": messages,
443
+ "stream": stream,
444
+ "max_tokens": max_tokens,
445
+ }
446
+
447
+ # -----------------------------------------
448
+ # NON STREAM
449
+ # -----------------------------------------
450
+ if not stream:
451
+ tried = set()
452
+
453
+ for _ in range(len(CF_ACCOUNTS)):
454
+ acc = await wait_for_free_key(exclude=tried)
455
+
456
+ if not acc:
457
+ break
458
+
459
+ tried.add(acc["account_id"])
460
+
461
+ try:
462
+ async with httpx.AsyncClient(timeout=180) as client:
463
+ r = await client.post(
464
+ cf_url(acc["account_id"], model),
465
+ json=cf_body,
466
+ headers={
467
+ "Authorization": f"Bearer {acc['api_key']}",
468
+ "Content-Type": "application/json",
469
+ }
470
+ )
471
+
472
+ if is_rate_limited(r.status_code, r.text):
473
+ log(f"Account {acc['account_id'][:8]}... rate limited (anthropic non-stream), trying next")
474
+ await mark_fail(acc)
475
+ continue
476
+
477
+ if r.status_code != 200:
478
+ log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code}, trying next")
479
+ await mark_fail(acc)
480
+ continue
481
+
482
+ data = r.json()
483
+ cf_result = data.get("result", {})
484
+ content = cf_result.get("response", "")
485
+
486
+ out = {
487
+ "id": "msg_" + uuid.uuid4().hex[:10],
488
+ "type": "message",
489
+ "role": "assistant",
490
+ "model": body.get("model", DEFAULT_CF_MODEL),
491
+ "content": [{"type": "text", "text": content}],
492
+ "stop_reason": "end_turn",
493
+ "stop_sequence": None,
494
+ "usage": {"input_tokens": 0, "output_tokens": 0}
495
+ }
496
+
497
+ await mark_ok(acc)
498
+ return JSONResponse(out)
499
+
500
+ except Exception as e:
501
+ log(f"Account {acc['account_id'][:8]}... exception: {e}")
502
+ await mark_fail(acc)
503
+
504
+ finally:
505
+ await release_key(acc)
506
+
507
+ return JSONResponse({"error": "All accounts failed"}, status_code=500)
508
+
509
+ # -----------------------------------------
510
+ # STREAM (Anthropic SSE envelope)
511
+ # -----------------------------------------
512
+ async def agen():
513
+ tried = set()
514
+ msg_id = "msg_" + uuid.uuid4().hex[:10]
515
+ sent_any_delta = False
516
+
517
+ for _ in range(len(CF_ACCOUNTS)):
518
+ acc = await wait_for_free_key(exclude=tried)
519
+
520
+ if not acc:
521
+ break
522
+
523
+ tried.add(acc["account_id"])
524
+
525
+ try:
526
+ async with httpx.AsyncClient(timeout=None) as client:
527
+ async with client.stream(
528
+ "POST",
529
+ cf_url(acc["account_id"], model),
530
+ json=cf_body,
531
+ headers={
532
+ "Authorization": f"Bearer {acc['api_key']}",
533
+ "Content-Type": "application/json",
534
+ }
535
+ ) as r:
536
+
537
+ if is_rate_limited(r.status_code, ""):
538
+ log(f"Account {acc['account_id'][:8]}... rate limited (anthropic stream), trying next")
539
+ await mark_fail(acc)
540
+ continue
541
+
542
+ if r.status_code != 200:
543
+ log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code} (anthropic stream), trying next")
544
+ await mark_fail(acc)
545
+ continue
546
+
547
+ # Emit Anthropic envelope only once on first successful key
548
+ if not sent_any_delta:
549
+ yield sse({
550
+ "type": "message_start",
551
+ "message": {
552
+ "id": msg_id,
553
+ "type": "message",
554
+ "role": "assistant",
555
+ "model": body.get("model", DEFAULT_CF_MODEL),
556
+ "content": [],
557
+ "stop_reason": None,
558
+ "stop_sequence": None,
559
+ "usage": {"input_tokens": 0, "output_tokens": 0}
560
+ }
561
+ })
562
+ yield sse({
563
+ "type": "content_block_start",
564
+ "index": 0,
565
+ "content_block": {"type": "text"}
566
+ })
567
+
568
+ hit_limit = False
569
+
570
+ async for line in r.aiter_lines():
571
+ line = line.strip()
572
+ if not line:
573
+ continue
574
+
575
+ if line == "data: [DONE]" or line == "[DONE]":
576
+ break
577
+
578
+ raw = line[6:] if line.startswith("data: ") else line
579
+
580
+ if is_rate_limited(0, raw):
581
+ log(f"Account {acc['account_id'][:8]}... mid-stream limit (anthropic), switching key")
582
+ hit_limit = True
583
+ break
584
+
585
+ try:
586
+ j = json.loads(raw)
587
+ except Exception:
588
+ continue
589
+
590
+ token = j.get("response", "")
591
+
592
+ if token:
593
+ sent_any_delta = True
594
+ yield sse({
595
+ "type": "content_block_delta",
596
+ "index": 0,
597
+ "delta": {"type": "text_delta", "text": token}
598
+ })
599
+
600
+ if hit_limit:
601
+ await mark_fail(acc)
602
+ continue
603
+
604
+ await mark_ok(acc)
605
+ break
606
+
607
+ except Exception as e:
608
+ log(f"Account {acc['account_id'][:8]}... agen exception: {e}")
609
+ await mark_fail(acc)
610
+
611
+ finally:
612
+ await release_key(acc)
613
+
614
+ # Close Anthropic SSE envelope
615
+ yield sse({"type": "content_block_stop", "index": 0})
616
+ yield sse({
617
+ "type": "message_delta",
618
+ "delta": {"stop_reason": "end_turn", "stop_sequence": None},
619
+ "usage": {"output_tokens": 0}
620
+ })
621
+ yield sse({"type": "message_stop"})
622
+
623
+ return StreamingResponse(agen(), media_type="text/event-stream")