Elysiadev11 commited on
Commit
6d91b7c
·
verified ·
1 Parent(s): 4302bc0

Delete proxy_cf.py

Browse files
Files changed (1) hide show
  1. proxy_cf.py +0 -619
proxy_cf.py DELETED
@@ -1,619 +0,0 @@
1
- import os
2
- import json
3
- import time
4
- import uuid
5
- import asyncio
6
- import httpx
7
-
8
- from fastapi import FastAPI, Request
9
- from fastapi.responses import JSONResponse, Response, StreamingResponse
10
- from starlette.requests import ClientDisconnect
11
-
12
- app = FastAPI()
13
-
14
- # =====================================================
15
- # CONFIG
16
- # =====================================================
17
- MASTER_API_KEY = os.getenv("MASTER_API_KEY", "olla")
18
-
19
- # Default CF Workers AI model (can override via request body)
20
- DEFAULT_CF_MODEL = os.getenv("DEFAULT_CF_MODEL", "@cf/meta/llama-3.3-70b-instruct-fp8-fast")
21
-
22
- # =====================================================
23
- # LOAD CF CREDENTIALS
24
- # Format env: CF_1=account_id,api_key
25
- # =====================================================
26
- CF_ACCOUNTS = [] # list of {"account_id": ..., "api_key": ...}
27
-
28
- for i in range(1, 101):
29
- raw = os.getenv(f"CF_{i}")
30
- if not raw:
31
- continue
32
- parts = raw.split(",", 1)
33
- if len(parts) != 2:
34
- print(f"[WARN] CF_{i} format invalid, expected 'account_id,api_key' — skipped")
35
- continue
36
- account_id, api_key = parts[0].strip(), parts[1].strip()
37
- if account_id and api_key:
38
- CF_ACCOUNTS.append({"account_id": account_id, "api_key": api_key})
39
-
40
- if not CF_ACCOUNTS:
41
- print("[WARN] No CF credentials found, inserting dummy")
42
- CF_ACCOUNTS.append({"account_id": "dummy", "api_key": "dummy"})
43
-
44
- # =====================================================
45
- # KEY STATUS
46
- # =====================================================
47
- key_status = {}
48
- for idx, acc in enumerate(CF_ACCOUNTS, 1):
49
- kid = acc["account_id"]
50
- key_status[kid] = {
51
- "index": idx,
52
- "healthy": True,
53
- "busy": False,
54
- "success": 0,
55
- "fail": 0,
56
- }
57
-
58
- rr_index = 0
59
- _key_lock = asyncio.Lock()
60
-
61
-
62
- # =====================================================
63
- # HELPERS
64
- # =====================================================
65
- def log(x):
66
- print(f"[{time.strftime('%H:%M:%S')}] {x}", flush=True)
67
-
68
-
69
- def sse(obj):
70
- return "data: " + json.dumps(obj, ensure_ascii=False) + "\n\n"
71
-
72
-
73
- def auth_ok(req: Request):
74
- token = req.headers.get("Authorization", "").replace("Bearer ", "")
75
- return token == MASTER_API_KEY
76
-
77
-
78
- def cf_url(account_id: str, model: str) -> str:
79
- return f"https://api.cloudflare.com/client/v4/accounts/{account_id}/ai/run/{model}"
80
-
81
-
82
- async def get_key(exclude=None):
83
- global rr_index
84
- if exclude is None:
85
- exclude = set()
86
-
87
- async with _key_lock:
88
- for _ in range(len(CF_ACCOUNTS)):
89
- rr_index = (rr_index + 1) % len(CF_ACCOUNTS)
90
- acc = CF_ACCOUNTS[rr_index]
91
- kid = acc["account_id"]
92
- st = key_status[kid]
93
-
94
- if st["healthy"] and not st["busy"] and kid not in exclude:
95
- st["busy"] = True
96
- return acc # returns dict {"account_id": ..., "api_key": ...}
97
-
98
- return None
99
-
100
-
101
- async def release_key(acc):
102
- async with _key_lock:
103
- kid = acc["account_id"]
104
- if kid in key_status:
105
- key_status[kid]["busy"] = False
106
-
107
-
108
- async def mark_fail(acc):
109
- async with _key_lock:
110
- kid = acc["account_id"]
111
- if kid in key_status:
112
- key_status[kid]["fail"] += 1
113
-
114
-
115
- async def mark_ok(acc):
116
- async with _key_lock:
117
- kid = acc["account_id"]
118
- if kid in key_status:
119
- key_status[kid]["success"] += 1
120
- key_status[kid]["fail"] = 0
121
-
122
-
123
- async def wait_for_free_key(exclude=None, max_wait=30.0, interval=0.3):
124
- elapsed = 0.0
125
- while elapsed < max_wait:
126
- acc = await get_key(exclude)
127
- if acc:
128
- return acc
129
- await asyncio.sleep(interval)
130
- elapsed += interval
131
- return None
132
-
133
-
134
- def is_rate_limited(status_code: int, text: str) -> bool:
135
- t = text.lower()
136
- return status_code == 429 or "rate limit" in t or "too many requests" in t or "usage limit" in t
137
-
138
-
139
- # =====================================================
140
- # ROOT
141
- # =====================================================
142
- @app.get("/")
143
- async def root():
144
- async with _key_lock:
145
- safe = {}
146
- for kid, v in key_status.items():
147
- masked = kid[:6] + "****" + kid[-4:]
148
- safe[masked] = {
149
- "index": v["index"],
150
- "healthy": v["healthy"],
151
- "busy": v["busy"],
152
- "success": v["success"],
153
- "fail": v["fail"],
154
- }
155
-
156
- return {
157
- "status": "ok",
158
- "accounts": len(CF_ACCOUNTS),
159
- "default_model": DEFAULT_CF_MODEL,
160
- "detail": safe
161
- }
162
-
163
-
164
- # =====================================================
165
- # /v1/models — static list of popular CF models
166
- # =====================================================
167
- @app.get("/v1/models")
168
- async def models(req: Request):
169
- if not auth_ok(req):
170
- return JSONResponse({"error": "Unauthorized"}, status_code=401)
171
-
172
- now = int(time.time())
173
- cf_models = [
174
- "@cf/moonshotai/kimi-k2.6",
175
- "@cf/zai-org/glm-4.7-flash",
176
- "@cf/moonshotai/kimi-k2.5",
177
- ]
178
-
179
- data = [
180
- {"id": m, "object": "model", "created": now, "owned_by": "cloudflare"}
181
- for m in cf_models
182
- ]
183
-
184
- return {"object": "list", "data": data}
185
-
186
-
187
- # =====================================================
188
- # /v1/chat/completions — OpenAI-compatible endpoint
189
- # =====================================================
190
- @app.post("/v1/chat/completions")
191
- async def chat(req: Request):
192
- if not auth_ok(req):
193
- return JSONResponse({"error": "Unauthorized"}, status_code=401)
194
-
195
- try:
196
- body = await req.json()
197
- except Exception:
198
- return JSONResponse({"error": "Bad JSON"}, status_code=400)
199
-
200
- is_stream = body.get("stream", False)
201
- model = body.get("model", DEFAULT_CF_MODEL)
202
- messages = body.get("messages", [])
203
- max_tokens = body.get("max_tokens", 2048)
204
-
205
- cf_body = {
206
- "messages": messages,
207
- "stream": is_stream,
208
- "max_tokens": max_tokens,
209
- }
210
-
211
- # -----------------------------------------
212
- # NON STREAM
213
- # -----------------------------------------
214
- if not is_stream:
215
- tried = set()
216
-
217
- for _ in range(len(CF_ACCOUNTS)):
218
- acc = await wait_for_free_key(exclude=tried)
219
-
220
- if not acc:
221
- break
222
-
223
- tried.add(acc["account_id"])
224
-
225
- try:
226
- async with httpx.AsyncClient(timeout=180) as client:
227
- r = await client.post(
228
- cf_url(acc["account_id"], model),
229
- json=cf_body,
230
- headers={
231
- "Authorization": f"Bearer {acc['api_key']}",
232
- "Content-Type": "application/json",
233
- }
234
- )
235
-
236
- if is_rate_limited(r.status_code, r.text):
237
- log(f"Account {acc['account_id'][:8]}... rate limited (non-stream), trying next")
238
- await mark_fail(acc)
239
- continue
240
-
241
- if r.status_code != 200:
242
- log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code}, trying next")
243
- await mark_fail(acc)
244
- continue
245
-
246
- data = r.json()
247
-
248
- # CF Workers AI response format:
249
- # {"result": {"response": "..."}, "success": true, ...}
250
- # Convert to OpenAI format
251
- cf_result = data.get("result", {})
252
- content = cf_result.get("response", "")
253
-
254
- out = {
255
- "id": "chatcmpl-" + uuid.uuid4().hex[:10],
256
- "object": "chat.completion",
257
- "created": int(time.time()),
258
- "model": model,
259
- "choices": [
260
- {
261
- "index": 0,
262
- "message": {"role": "assistant", "content": content},
263
- "finish_reason": "stop",
264
- }
265
- ],
266
- "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
267
- }
268
-
269
- await mark_ok(acc)
270
- return JSONResponse(out)
271
-
272
- except Exception as e:
273
- log(f"Account {acc['account_id'][:8]}... exception: {e}")
274
- await mark_fail(acc)
275
-
276
- finally:
277
- await release_key(acc)
278
-
279
- return JSONResponse({"error": "All accounts failed"}, status_code=500)
280
-
281
- # -----------------------------------------
282
- # STREAM
283
- # CF Workers AI streams NDJSON lines:
284
- # {"response":"token"} or {"p":"...","response":"token"} and ends with [DONE]
285
- # We convert to OpenAI SSE format
286
- # -----------------------------------------
287
- async def gen():
288
- tried = set()
289
- cid = "chatcmpl-" + uuid.uuid4().hex[:10]
290
- sent_any = False
291
-
292
- for _ in range(len(CF_ACCOUNTS)):
293
- acc = await wait_for_free_key(exclude=tried)
294
-
295
- if not acc:
296
- break
297
-
298
- tried.add(acc["account_id"])
299
-
300
- try:
301
- async with httpx.AsyncClient(timeout=None) as client:
302
- async with client.stream(
303
- "POST",
304
- cf_url(acc["account_id"], model),
305
- json=cf_body,
306
- headers={
307
- "Authorization": f"Bearer {acc['api_key']}",
308
- "Content-Type": "application/json",
309
- }
310
- ) as r:
311
-
312
- if is_rate_limited(r.status_code, ""):
313
- log(f"Account {acc['account_id'][:8]}... rate limited (stream), trying next")
314
- await mark_fail(acc)
315
- continue
316
-
317
- if r.status_code != 200:
318
- log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code} (stream), trying next")
319
- await mark_fail(acc)
320
- continue
321
-
322
- hit_limit = False
323
-
324
- async for line in r.aiter_lines():
325
- line = line.strip()
326
- if not line:
327
- continue
328
-
329
- if line == "data: [DONE]" or line == "[DONE]":
330
- break
331
-
332
- # Strip "data: " prefix if present
333
- raw = line[6:] if line.startswith("data: ") else line
334
-
335
- # Detect mid-stream rate limit
336
- if is_rate_limited(0, raw):
337
- log(f"Account {acc['account_id'][:8]}... mid-stream limit, switching key")
338
- hit_limit = True
339
- break
340
-
341
- try:
342
- j = json.loads(raw)
343
- except Exception:
344
- continue
345
-
346
- token = j.get("response", "")
347
-
348
- if token:
349
- sent_any = True
350
- chunk = {
351
- "id": cid,
352
- "object": "chat.completion.chunk",
353
- "created": int(time.time()),
354
- "model": model,
355
- "choices": [
356
- {
357
- "index": 0,
358
- "delta": {"role": "assistant", "content": token},
359
- "finish_reason": None,
360
- }
361
- ]
362
- }
363
- yield sse(chunk)
364
-
365
- if hit_limit:
366
- await mark_fail(acc)
367
- continue
368
-
369
- # Send finish chunk
370
- finish_chunk = {
371
- "id": cid,
372
- "object": "chat.completion.chunk",
373
- "created": int(time.time()),
374
- "model": model,
375
- "choices": [
376
- {
377
- "index": 0,
378
- "delta": {},
379
- "finish_reason": "stop",
380
- }
381
- ]
382
- }
383
- yield sse(finish_chunk)
384
- yield "data: [DONE]\n\n"
385
-
386
- await mark_ok(acc)
387
- return
388
-
389
- except Exception as e:
390
- log(f"Account {acc['account_id'][:8]}... stream exception: {e}")
391
- await mark_fail(acc)
392
-
393
- finally:
394
- await release_key(acc)
395
-
396
- yield sse({"error": "All accounts failed"})
397
- yield "data: [DONE]\n\n"
398
-
399
- return StreamingResponse(gen(), media_type="text/event-stream")
400
-
401
-
402
- # =====================================================
403
- # /v1/messages — Anthropic-compatible endpoint
404
- # =====================================================
405
- @app.post("/v1/messages")
406
- async def anthropic(req: Request):
407
- if not auth_ok(req):
408
- return JSONResponse({"error": "Unauthorized"}, status_code=401)
409
-
410
- try:
411
- body = await req.json()
412
- except ClientDisconnect:
413
- return Response(status_code=499)
414
- except Exception:
415
- return JSONResponse({"error": "Bad JSON"}, status_code=400)
416
-
417
- stream = body.get("stream", False)
418
- model = body.get("model", DEFAULT_CF_MODEL)
419
- max_tokens = body.get("max_tokens", 2048)
420
-
421
- # Convert Anthropic message format to CF/OpenAI format
422
- messages = []
423
-
424
- if body.get("system"):
425
- messages.append({"role": "system", "content": body["system"]})
426
-
427
- for m in body.get("messages", []):
428
- content = m.get("content", "")
429
- if isinstance(content, list):
430
- txt = ""
431
- for x in content:
432
- if x.get("type") == "text":
433
- txt += x.get("text", "")
434
- content = txt
435
- messages.append({"role": m["role"], "content": content})
436
-
437
- cf_body = {
438
- "messages": messages,
439
- "stream": stream,
440
- "max_tokens": max_tokens,
441
- }
442
-
443
- # -----------------------------------------
444
- # NON STREAM
445
- # -----------------------------------------
446
- if not stream:
447
- tried = set()
448
-
449
- for _ in range(len(CF_ACCOUNTS)):
450
- acc = await wait_for_free_key(exclude=tried)
451
-
452
- if not acc:
453
- break
454
-
455
- tried.add(acc["account_id"])
456
-
457
- try:
458
- async with httpx.AsyncClient(timeout=180) as client:
459
- r = await client.post(
460
- cf_url(acc["account_id"], model),
461
- json=cf_body,
462
- headers={
463
- "Authorization": f"Bearer {acc['api_key']}",
464
- "Content-Type": "application/json",
465
- }
466
- )
467
-
468
- if is_rate_limited(r.status_code, r.text):
469
- log(f"Account {acc['account_id'][:8]}... rate limited (anthropic non-stream), trying next")
470
- await mark_fail(acc)
471
- continue
472
-
473
- if r.status_code != 200:
474
- log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code}, trying next")
475
- await mark_fail(acc)
476
- continue
477
-
478
- data = r.json()
479
- cf_result = data.get("result", {})
480
- content = cf_result.get("response", "")
481
-
482
- out = {
483
- "id": "msg_" + uuid.uuid4().hex[:10],
484
- "type": "message",
485
- "role": "assistant",
486
- "model": body.get("model", DEFAULT_CF_MODEL),
487
- "content": [{"type": "text", "text": content}],
488
- "stop_reason": "end_turn",
489
- "stop_sequence": None,
490
- "usage": {"input_tokens": 0, "output_tokens": 0}
491
- }
492
-
493
- await mark_ok(acc)
494
- return JSONResponse(out)
495
-
496
- except Exception as e:
497
- log(f"Account {acc['account_id'][:8]}... exception: {e}")
498
- await mark_fail(acc)
499
-
500
- finally:
501
- await release_key(acc)
502
-
503
- return JSONResponse({"error": "All accounts failed"}, status_code=500)
504
-
505
- # -----------------------------------------
506
- # STREAM (Anthropic SSE envelope)
507
- # -----------------------------------------
508
- async def agen():
509
- tried = set()
510
- msg_id = "msg_" + uuid.uuid4().hex[:10]
511
- sent_any_delta = False
512
-
513
- for _ in range(len(CF_ACCOUNTS)):
514
- acc = await wait_for_free_key(exclude=tried)
515
-
516
- if not acc:
517
- break
518
-
519
- tried.add(acc["account_id"])
520
-
521
- try:
522
- async with httpx.AsyncClient(timeout=None) as client:
523
- async with client.stream(
524
- "POST",
525
- cf_url(acc["account_id"], model),
526
- json=cf_body,
527
- headers={
528
- "Authorization": f"Bearer {acc['api_key']}",
529
- "Content-Type": "application/json",
530
- }
531
- ) as r:
532
-
533
- if is_rate_limited(r.status_code, ""):
534
- log(f"Account {acc['account_id'][:8]}... rate limited (anthropic stream), trying next")
535
- await mark_fail(acc)
536
- continue
537
-
538
- if r.status_code != 200:
539
- log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code} (anthropic stream), trying next")
540
- await mark_fail(acc)
541
- continue
542
-
543
- # Emit Anthropic envelope only once on first successful key
544
- if not sent_any_delta:
545
- yield sse({
546
- "type": "message_start",
547
- "message": {
548
- "id": msg_id,
549
- "type": "message",
550
- "role": "assistant",
551
- "model": body.get("model", DEFAULT_CF_MODEL),
552
- "content": [],
553
- "stop_reason": None,
554
- "stop_sequence": None,
555
- "usage": {"input_tokens": 0, "output_tokens": 0}
556
- }
557
- })
558
- yield sse({
559
- "type": "content_block_start",
560
- "index": 0,
561
- "content_block": {"type": "text"}
562
- })
563
-
564
- hit_limit = False
565
-
566
- async for line in r.aiter_lines():
567
- line = line.strip()
568
- if not line:
569
- continue
570
-
571
- if line == "data: [DONE]" or line == "[DONE]":
572
- break
573
-
574
- raw = line[6:] if line.startswith("data: ") else line
575
-
576
- if is_rate_limited(0, raw):
577
- log(f"Account {acc['account_id'][:8]}... mid-stream limit (anthropic), switching key")
578
- hit_limit = True
579
- break
580
-
581
- try:
582
- j = json.loads(raw)
583
- except Exception:
584
- continue
585
-
586
- token = j.get("response", "")
587
-
588
- if token:
589
- sent_any_delta = True
590
- yield sse({
591
- "type": "content_block_delta",
592
- "index": 0,
593
- "delta": {"type": "text_delta", "text": token}
594
- })
595
-
596
- if hit_limit:
597
- await mark_fail(acc)
598
- continue
599
-
600
- await mark_ok(acc)
601
- break
602
-
603
- except Exception as e:
604
- log(f"Account {acc['account_id'][:8]}... agen exception: {e}")
605
- await mark_fail(acc)
606
-
607
- finally:
608
- await release_key(acc)
609
-
610
- # Close Anthropic SSE envelope
611
- yield sse({"type": "content_block_stop", "index": 0})
612
- yield sse({
613
- "type": "message_delta",
614
- "delta": {"stop_reason": "end_turn", "stop_sequence": None},
615
- "usage": {"output_tokens": 0}
616
- })
617
- yield sse({"type": "message_stop"})
618
-
619
- return StreamingResponse(agen(), media_type="text/event-stream")