Elysiadev11 commited on
Commit
e427ab6
·
verified ·
1 Parent(s): 3d7e312

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +197 -88
app.py CHANGED
@@ -1,132 +1,241 @@
1
  import os
2
  import httpx
3
  from fastapi import FastAPI, Request
4
- from fastapi.responses import JSONResponse, Response
 
5
  import time
 
 
6
 
7
  app = FastAPI()
8
 
 
 
 
9
  BASE_URL = os.getenv("BASE_URL", "https://ollama.com")
10
- MASTER_API_KEY = os.getenv("MASTER_API_KEY", "ollama-proxy-free")
11
 
12
  OLLAMA_KEYS = []
13
- for i in range(1, 20):
14
  key = os.getenv(f"OLLAMA_KEY_{i}")
15
  if key:
16
  OLLAMA_KEYS.append(key)
17
 
 
 
 
 
18
  key_status = {}
19
  for idx, k in enumerate(OLLAMA_KEYS, 1):
20
- key_status[k] = {"index": idx, "prefix": k[:8]+"...", "failures": 0, "success": 0, "last_error": None, "healthy": True}
 
 
 
 
 
 
 
21
 
22
  def log(msg):
23
  print(f"[{time.strftime('%H:%M:%S')}] {msg}")
24
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
25
  @app.get("/")
26
  def root():
27
  return {
28
  "status": "ok",
29
- "base_url": BASE_URL,
30
- "master_key": MASTER_API_KEY[:8] + "...",
31
  "keys_loaded": len(OLLAMA_KEYS),
32
- "healthy_keys": sum(1 for v in key_status.values() if v["healthy"]),
33
- "keys_status": {v["prefix"]: {"failures": v["failures"], "success": v["success"], "healthy": v["healthy"]} for v in key_status.values()}
 
 
 
 
 
34
  }
35
 
36
- def get_healthy_keys(max_failures=2):
37
- """Get keys that are healthy (low failures)"""
38
- healthy = [k for k, v in key_status.items() if v["failures"] < max_failures and v["healthy"]]
39
- if not healthy:
40
- for v in key_status.values():
41
- v["failures"] = 0
42
- v["healthy"] = True
43
- return OLLAMA_KEYS[:2]
44
- return healthy[:2]
45
-
46
  @app.post("/v1/chat/completions")
47
  async def chat(req: Request):
48
  auth_key = req.headers.get("Authorization", "").replace("Bearer ", "")
49
  if auth_key != MASTER_API_KEY:
50
- log(f"AUTH FAIL: received '{auth_key[:8]}...' expected '{MASTER_API_KEY[:8]}...'")
51
  return JSONResponse({"error": "Unauthorized"}, status_code=401)
52
 
53
- body = await req.json()
 
 
 
 
 
 
 
 
54
  is_stream = body.get("stream", False)
55
- model = body.get("model", "?")
56
- log(f"REQUEST: model='{model}'")
57
-
58
- candidate_keys = get_healthy_keys(max_failures=2)
59
- log(f"Using top {len(candidate_keys)} keys")
60
-
61
- for attempt, key in enumerate(candidate_keys):
62
- ki = key_status[key]
63
- log(f"TRY #{attempt+1}: using key#{ki['index']} ({ki['prefix']})")
64
-
65
- try:
66
- start_time = time.time()
67
- async with httpx.AsyncClient(timeout=30.0) as client:
68
- resp = await client.post(f"{BASE_URL}/v1/chat/completions", json=body, headers={"Authorization": f"Bearer {key}"})
69
- elapsed = time.time() - start_time
70
 
71
- if resp.status_code == 200:
72
- ki["success"] += 1
73
- ki["failures"] = 0
74
- ki["healthy"] = True
75
- log(f"SUCCESS: key#{ki['index']} responded in {elapsed:.2f}s")
76
- return Response(resp.content, status_code=200)
 
 
 
 
 
 
 
 
77
 
78
- elif resp.status_code == 429:
 
 
 
 
 
 
 
 
 
 
 
 
 
79
  ki["failures"] += 1
80
- ki["healthy"] = False
81
- log(f"RATE LIMIT: key#{ki['index']} - skip to next")
82
  continue
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
83
 
84
- elif resp.status_code >= 500:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
85
  ki["failures"] += 1
86
- ki["last_error"] = f"http {resp.status_code}"
87
- log(f"SERVER ERROR: key#{ki['index']} got {resp.status_code}, trying next")
88
  continue
89
 
90
- else:
91
- ki["last_error"] = f"http {resp.status_code}"
92
- log(f"ERROR: key#{ki['index']} got {resp.status_code}")
93
- return Response(resp.content, status_code=resp.status_code)
94
 
95
- except httpx.TimeoutException:
96
- ki["failures"] += 1
97
- ki["healthy"] = False
98
- ki["last_error"] = "timeout after 30s"
99
- log(f"TIMEOUT: key#{ki['index']} - already healthy=False, try next")
100
- continue
101
-
102
- except Exception as e:
103
- ki["failures"] += 1
104
- ki["last_error"] = str(e)[:50]
105
- log(f"EXCEPTION: key#{ki['index']} error: {e}")
106
- continue
107
-
108
- log(f"ALL KEYS FAILED for model='{model}'")
109
- return JSONResponse({"error": "all keys failed after 2 attempts", "model": model,
110
- "keys_status": {v["prefix"]: {"failures": v["failures"], "last_error": v["last_error"]} for v in key_status.values()}}, status_code=500)
111
 
112
- @app.get("/v1/models")
113
- def models(req: Request):
114
- auth_key = req.headers.get("Authorization", "").replace("Bearer ", "")
115
- if auth_key != MASTER_API_KEY:
116
- return JSONResponse({"error": "Unauthorized"}, status_code=401)
117
-
118
- log("REQUEST: GET /v1/models")
119
-
120
- for key in get_healthy_keys(max_failures=2):
121
- ki = key_status[key]
122
- try:
123
- resp = httpx.get(f"{BASE_URL}/v1/models", headers={"Authorization": f"Bearer {key}"}, timeout=10)
124
- if resp.status_code == 200:
125
- ki["success"] += 1
126
- log(f"MODELS OK: key#{ki['index']}")
127
- return Response(resp.content, status_code=200)
128
- except Exception as e:
129
- ki["last_error"] = str(e)[:50]
130
- log(f"MODELS FAIL: key#{ki['index']} - {e}")
131
-
132
- return JSONResponse({"error": "no keys available"}, status_code=500)
 
1
  import os
2
  import httpx
3
  from fastapi import FastAPI, Request
4
+ from fastapi.responses import JSONResponse, Response, StreamingResponse
5
+ from starlette.requests import ClientDisconnect
6
  import time
7
+ import json
8
+ import asyncio
9
 
10
  app = FastAPI()
11
 
12
+ # ==========================================
13
+ # KONFIGURASI
14
+ # ==========================================
15
  BASE_URL = os.getenv("BASE_URL", "https://ollama.com")
16
+ MASTER_API_KEY = os.getenv("MASTER_API_KEY", "ollama")
17
 
18
  OLLAMA_KEYS = []
19
+ for i in range(1, 15):
20
  key = os.getenv(f"OLLAMA_KEY_{i}")
21
  if key:
22
  OLLAMA_KEYS.append(key)
23
 
24
+ if not OLLAMA_KEYS:
25
+ OLLAMA_KEYS.append("ollama")
26
+
27
+ # Status Key
28
  key_status = {}
29
  for idx, k in enumerate(OLLAMA_KEYS, 1):
30
+ key_status[k] = {
31
+ "index": idx,
32
+ "prefix": k[:8] + "...",
33
+ "failures": 0,
34
+ "success": 0,
35
+ "healthy": True,
36
+ "in_use": False # Fitur Lock: 1 Key = 1 Request
37
+ }
38
 
39
  def log(msg):
40
  print(f"[{time.strftime('%H:%M:%S')}] {msg}")
41
 
42
+ def get_available_key(exclude_keys=None):
43
+ """Mencari key yang sehat dan sedang tidak digunakan (idle)"""
44
+ if exclude_keys is None:
45
+ exclude_keys = set()
46
+
47
+ available = [
48
+ k for k, v in key_status.items()
49
+ if v["healthy"] and not v["in_use"] and k not in exclude_keys
50
+ ]
51
+
52
+ if available:
53
+ return available[0]
54
+ return None
55
+
56
+ # ==========================================
57
+ # ENDPOINTS
58
+ # ==========================================
59
  @app.get("/")
60
  def root():
61
  return {
62
  "status": "ok",
 
 
63
  "keys_loaded": len(OLLAMA_KEYS),
64
+ "keys_status": {
65
+ v["prefix"]: {
66
+ "status": "BUSY" if v["in_use"] else "IDLE",
67
+ "healthy": v["healthy"],
68
+ "success": v["success"]
69
+ } for v in key_status.values()
70
+ }
71
  }
72
 
 
 
 
 
 
 
 
 
 
 
73
  @app.post("/v1/chat/completions")
74
  async def chat(req: Request):
75
  auth_key = req.headers.get("Authorization", "").replace("Bearer ", "")
76
  if auth_key != MASTER_API_KEY:
 
77
  return JSONResponse({"error": "Unauthorized"}, status_code=401)
78
 
79
+ # Tangkap error jika client kabur sebelum request terkirim penuh
80
+ try:
81
+ body = await req.json()
82
+ except ClientDisconnect:
83
+ log("Client kabur sebelum proxy selesai membaca request body.")
84
+ return Response(status_code=499)
85
+ except json.JSONDecodeError:
86
+ return JSONResponse({"error": "Invalid JSON body"}, status_code=400)
87
+
88
  is_stream = body.get("stream", False)
89
+
90
+ # ==========================================
91
+ # LOGIKA NON-STREAM
92
+ # ==========================================
93
+ if not is_stream:
94
+ tried_keys = set()
95
+ for attempt in range(len(OLLAMA_KEYS)): # Loop sebanyak jumlah key
96
+ if len(tried_keys) >= len(OLLAMA_KEYS):
97
+ tried_keys.clear()
98
+
99
+ key = None
100
+ log("Menunggu API Key idle (Antrean Non-Stream)...")
 
 
 
101
 
102
+ # Antrean Tanpa Batas Waktu
103
+ while True:
104
+ if await req.is_disconnected():
105
+ log("Client membatalkan request saat mengantre (Non-Stream).")
106
+ return Response(status_code=499)
107
+
108
+ key = get_available_key(exclude_keys=tried_keys)
109
+ if key: break
110
+ await asyncio.sleep(1)
111
+
112
+ ki = key_status[key]
113
+ ki["in_use"] = True
114
+ tried_keys.add(key)
115
+ log(f"LOCK: key#{ki['index']} (Non-Stream)")
116
 
117
+ try:
118
+ async with httpx.AsyncClient(timeout=120.0) as client:
119
+ resp = await client.post(
120
+ f"{BASE_URL}/v1/chat/completions",
121
+ json=body,
122
+ headers={"Authorization": f"Bearer {key}"}
123
+ )
124
+ if resp.status_code == 200:
125
+ ki["success"] += 1
126
+ return Response(content=resp.content, media_type=resp.headers.get("content-type"))
127
+ else:
128
+ ki["failures"] += 1
129
+ continue
130
+ except Exception as e:
131
  ki["failures"] += 1
132
+ log(f"Error Non-Stream: {e}")
 
133
  continue
134
+ finally:
135
+ ki["in_use"] = False
136
+ log(f"RELEASE: key#{ki['index']} (Non-Stream)")
137
+
138
+ return JSONResponse({"error": "All keys failed after multiple attempts"}, status_code=500)
139
+
140
+ # ==========================================
141
+ # LOGIKA STREAMING (Seamless Fallback + Queue)
142
+ # ==========================================
143
+ async def stream_generator():
144
+ current_body = body.copy()
145
+ # Deep copy messages agar tidak mengubah aslinya saat di-inject
146
+ current_body["messages"] = [msg.copy() for msg in body.get("messages", [])]
147
+
148
+ generated_text_buffer = ""
149
+ tried_keys = set()
150
+
151
+ for attempt in range(len(OLLAMA_KEYS)): # Loop sebanyak jumlah key
152
+ if len(tried_keys) >= len(OLLAMA_KEYS):
153
+ tried_keys.clear()
154
+
155
+ key = None
156
+ if attempt == 0:
157
+ log("Menunggu API Key idle (Antrean Stream Baru)...")
158
+ else:
159
+ log(f"Menunggu API Key idle (Antrean Fallback ke-{attempt})...")
160
+
161
+ # Antrean Tanpa Batas Waktu
162
+ while True:
163
+ if await req.is_disconnected():
164
+ log("Client membatalkan request saat mengantre stream.")
165
+ return
166
+ key = get_available_key(exclude_keys=tried_keys)
167
+ if key: break
168
+ await asyncio.sleep(1)
169
+
170
+ ki = key_status[key]
171
+ ki["in_use"] = True # Kunci key agar tidak dipakai yang lain
172
+ tried_keys.add(key)
173
 
174
+ log(f"STREAM LOCK: key#{ki['index']}")
175
+
176
+ if generated_text_buffer:
177
+ log(f"Resuming stream. Injecting {len(generated_text_buffer)} chars.")
178
+ messages = current_body.get("messages", [])
179
+ if messages and messages[-1].get("role") == "assistant":
180
+ messages[-1]["content"] = generated_text_buffer
181
+ else:
182
+ messages.append({"role": "assistant", "content": generated_text_buffer})
183
+ current_body["messages"] = messages
184
+
185
+ try:
186
+ # Timeout read=None memastikan tidak putus otomatis di tengah stream panjang
187
+ custom_timeout = httpx.Timeout(connect=15.0, read=None, write=15.0, pool=10.0)
188
+ async with httpx.AsyncClient(timeout=custom_timeout) as client:
189
+ async with client.stream(
190
+ "POST", f"{BASE_URL}/v1/chat/completions",
191
+ json=current_body, headers={"Authorization": f"Bearer {key}"}
192
+ ) as response:
193
+
194
+ if response.status_code != 200:
195
+ ki["failures"] += 1
196
+ log(f"STREAM ERR {response.status_code}: key#{ki['index']}")
197
+ continue
198
+
199
+ stream_interrupted = False
200
+ try:
201
+ async for chunk in response.aiter_lines():
202
+ if chunk:
203
+ if chunk.startswith("data: "):
204
+ data_str = chunk[6:]
205
+ if data_str.strip() == "[DONE]":
206
+ ki["success"] += 1
207
+ yield chunk + "\n\n"
208
+ return
209
+ try:
210
+ data_json = json.loads(data_str)
211
+ if "choices" in data_json and len(data_json["choices"]) > 0:
212
+ delta = data_json["choices"][0].get("delta", {})
213
+ content = delta.get("content", "")
214
+ if content:
215
+ generated_text_buffer += content
216
+ except json.JSONDecodeError:
217
+ pass
218
+ yield chunk + "\n\n"
219
+
220
+ except (httpx.ReadTimeout, httpx.ReadError, httpx.RemoteProtocolError) as e:
221
+ log(f"STREAM PUTUS: key#{ki['index']}. Buffering...")
222
+ ki["failures"] += 1
223
+ stream_interrupted = True
224
+
225
+ if not stream_interrupted:
226
+ return
227
+
228
+ except Exception as e:
229
  ki["failures"] += 1
230
+ log(f"STREAM EXCEPTION: key#{ki['index']} - {e}")
 
231
  continue
232
 
233
+ finally:
234
+ # SANGAT PENTING: Selalu lepas kuncian saat selesai, berhasil, atau gagal
235
+ ki["in_use"] = False
236
+ log(f"STREAM RELEASE: key#{ki['index']}")
237
 
238
+ yield f"data: {json.dumps({'error': 'Stream failed completely'})}\n\ndata: [DONE]\n\n"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
239
 
240
+ return StreamingResponse(stream_generator(), media_type="text/event-stream")
241
+