Elysiadev11 commited on
Commit
298e408
·
verified ·
1 Parent(s): 537d9ed

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +65 -38
app.py CHANGED
@@ -10,21 +10,22 @@ import asyncio
10
  app = FastAPI()
11
 
12
  # ==========================================
13
- # KONFIGURASI
14
  # ==========================================
15
- BASE_URL = os.getenv("BASE_URL", "https://ollama.com")
16
- MASTER_API_KEY = os.getenv("MASTER_API_KEY", "ollama")
17
 
18
  OLLAMA_KEYS = []
19
- for i in range(1, 25):
 
20
  key = os.getenv(f"OLLAMA_KEY_{i}")
21
  if key:
22
  OLLAMA_KEYS.append(key)
23
 
24
  if not OLLAMA_KEYS:
25
- OLLAMA_KEYS.append("ollama")
26
 
27
- # Status Key
28
  key_status = {}
29
  for idx, k in enumerate(OLLAMA_KEYS, 1):
30
  key_status[k] = {
@@ -39,19 +40,29 @@ for idx, k in enumerate(OLLAMA_KEYS, 1):
39
  def log(msg):
40
  print(f"[{time.strftime('%H:%M:%S')}] {msg}")
41
 
42
- def get_available_key(exclude_keys=None):
43
- """Mencari key yang sehat dan sedang tidak digunakan (idle)"""
 
 
 
44
  if exclude_keys is None:
45
  exclude_keys = set()
46
 
47
- available = [
48
- k for k, v in key_status.items()
49
- if v["healthy"] and not v["in_use"] and k not in exclude_keys
50
- ]
51
-
52
- if available:
53
- return available[0]
54
- return None
 
 
 
 
 
 
 
55
 
56
  # ==========================================
57
  # ENDPOINTS
@@ -60,12 +71,13 @@ def get_available_key(exclude_keys=None):
60
  def root():
61
  return {
62
  "status": "ok",
63
- "keys_loaded": len(OLLAMA_KEYS),
64
  "keys_status": {
65
  v["prefix"]: {
66
  "status": "BUSY" if v["in_use"] else "IDLE",
67
  "healthy": v["healthy"],
68
- "success": v["success"]
 
69
  } for v in key_status.values()
70
  }
71
  }
@@ -76,7 +88,7 @@ async def chat(req: Request):
76
  if auth_key != MASTER_API_KEY:
77
  return JSONResponse({"error": "Unauthorized"}, status_code=401)
78
 
79
- # Tangkap error jika client kabur sebelum request terkirim penuh
80
  try:
81
  body = await req.json()
82
  except ClientDisconnect:
@@ -92,7 +104,7 @@ async def chat(req: Request):
92
  # ==========================================
93
  if not is_stream:
94
  tried_keys = set()
95
- for attempt in range(len(OLLAMA_KEYS)): # Loop sebanyak jumlah key
96
  if len(tried_keys) >= len(OLLAMA_KEYS):
97
  tried_keys.clear()
98
 
@@ -105,14 +117,16 @@ async def chat(req: Request):
105
  log("Client membatalkan request saat mengantre (Non-Stream).")
106
  return Response(status_code=499)
107
 
108
- key = get_available_key(exclude_keys=tried_keys)
109
- if key: break
110
- await asyncio.sleep(1)
 
 
 
111
 
112
  ki = key_status[key]
113
- ki["in_use"] = True
114
  tried_keys.add(key)
115
- log(f"LOCK: key#{ki['index']} (Non-Stream)")
116
 
117
  try:
118
  async with httpx.AsyncClient(timeout=120.0) as client:
@@ -123,7 +137,13 @@ async def chat(req: Request):
123
  )
124
  if resp.status_code == 200:
125
  ki["success"] += 1
 
126
  return Response(content=resp.content, media_type=resp.headers.get("content-type"))
 
 
 
 
 
127
  else:
128
  ki["failures"] += 1
129
  continue
@@ -132,7 +152,7 @@ async def chat(req: Request):
132
  log(f"Error Non-Stream: {e}")
133
  continue
134
  finally:
135
- ki["in_use"] = False
136
  log(f"RELEASE: key#{ki['index']} (Non-Stream)")
137
 
138
  return JSONResponse({"error": "All keys failed after multiple attempts"}, status_code=500)
@@ -142,13 +162,12 @@ async def chat(req: Request):
142
  # ==========================================
143
  async def stream_generator():
144
  current_body = body.copy()
145
- # Deep copy messages agar tidak mengubah aslinya saat di-inject
146
  current_body["messages"] = [msg.copy() for msg in body.get("messages", [])]
147
 
148
  generated_text_buffer = ""
149
  tried_keys = set()
150
 
151
- for attempt in range(len(OLLAMA_KEYS)): # Loop sebanyak jumlah key
152
  if len(tried_keys) >= len(OLLAMA_KEYS):
153
  tried_keys.clear()
154
 
@@ -163,15 +182,17 @@ async def chat(req: Request):
163
  if await req.is_disconnected():
164
  log("Client membatalkan request saat mengantre stream.")
165
  return
166
- key = get_available_key(exclude_keys=tried_keys)
167
- if key: break
168
- await asyncio.sleep(1)
 
 
 
 
169
 
170
  ki = key_status[key]
171
- ki["in_use"] = True # Kunci key agar tidak dipakai yang lain
172
  tried_keys.add(key)
173
-
174
- log(f"STREAM LOCK: key#{ki['index']}")
175
 
176
  if generated_text_buffer:
177
  log(f"Resuming stream. Injecting {len(generated_text_buffer)} chars.")
@@ -183,7 +204,6 @@ async def chat(req: Request):
183
  current_body["messages"] = messages
184
 
185
  try:
186
- # Timeout read=None memastikan tidak putus otomatis di tengah stream panjang
187
  custom_timeout = httpx.Timeout(connect=15.0, read=None, write=15.0, pool=10.0)
188
  async with httpx.AsyncClient(timeout=custom_timeout) as client:
189
  async with client.stream(
@@ -191,10 +211,16 @@ async def chat(req: Request):
191
  json=current_body, headers={"Authorization": f"Bearer {key}"}
192
  ) as response:
193
 
194
- if response.status_code != 200:
195
  ki["failures"] += 1
196
- log(f"STREAM ERR {response.status_code}: key#{ki['index']}")
 
197
  continue
 
 
 
 
 
198
 
199
  stream_interrupted = False
200
  try:
@@ -204,6 +230,7 @@ async def chat(req: Request):
204
  data_str = chunk[6:]
205
  if data_str.strip() == "[DONE]":
206
  ki["success"] += 1
 
207
  yield chunk + "\n\n"
208
  return
209
  try:
@@ -231,7 +258,7 @@ async def chat(req: Request):
231
  continue
232
 
233
  finally:
234
- # SANGAT PENTING: Selalu lepas kuncian saat selesai, berhasil, atau gagal
235
  ki["in_use"] = False
236
  log(f"STREAM RELEASE: key#{ki['index']}")
237
 
 
10
  app = FastAPI()
11
 
12
  # ==========================================
13
+ # KONFIGURASI & LOAD KEYS
14
  # ==========================================
15
+ BASE_URL = os.getenv("BASE_URL", "https://elysiadev11-proxyollma.hf.space")
16
+ MASTER_API_KEY = os.getenv("MASTER_API_KEY", "ollama-proxy-free")
17
 
18
  OLLAMA_KEYS = []
19
+ # Mendukung hingga 100 API Key (OLLAMA_KEY_1 sampai OLLAMA_KEY_100)
20
+ for i in range(1, 101):
21
  key = os.getenv(f"OLLAMA_KEY_{i}")
22
  if key:
23
  OLLAMA_KEYS.append(key)
24
 
25
  if not OLLAMA_KEYS:
26
+ OLLAMA_KEYS.append("ollama-proxy-free") # Dummy key jika ENV kosong
27
 
28
+ # Inisialisasi Status Key
29
  key_status = {}
30
  for idx, k in enumerate(OLLAMA_KEYS, 1):
31
  key_status[k] = {
 
40
  def log(msg):
41
  print(f"[{time.strftime('%H:%M:%S')}] {msg}")
42
 
43
+ def get_and_lock_key(exclude_keys=None):
44
+ """
45
+ Fungsi ATOMIC: Mencari key yang idle sekaligus langsung MENGUNCINYA.
46
+ Ini menghilangkan celah kebocoran saat burst request.
47
+ """
48
  if exclude_keys is None:
49
  exclude_keys = set()
50
 
51
+ # Cek apakah semua key mati? Jika ya, reset semuanya agar sistem tidak macet permanen
52
+ if not any(v["healthy"] for v in key_status.values()):
53
+ log("⚠️ Semua API Key berstatus mati/unhealthy. Melakukan RESET MASSAL...")
54
+ for v in key_status.values():
55
+ v["failures"] = 0
56
+ v["healthy"] = True
57
+
58
+ # Proses pencarian dan penguncian instan
59
+ for k, v in key_status.items():
60
+ if v["healthy"] and not v["in_use"] and k not in exclude_keys:
61
+ # ---> GEMBOK DIPASANG DI SINI (ATOMIC) <---
62
+ v["in_use"] = True
63
+ return k
64
+
65
+ return None # Mengembalikan None jika semua key sedang dipakai
66
 
67
  # ==========================================
68
  # ENDPOINTS
 
71
  def root():
72
  return {
73
  "status": "ok",
74
+ "total_keys_loaded": len(OLLAMA_KEYS),
75
  "keys_status": {
76
  v["prefix"]: {
77
  "status": "BUSY" if v["in_use"] else "IDLE",
78
  "healthy": v["healthy"],
79
+ "success": v["success"],
80
+ "failures": v["failures"]
81
  } for v in key_status.values()
82
  }
83
  }
 
88
  if auth_key != MASTER_API_KEY:
89
  return JSONResponse({"error": "Unauthorized"}, status_code=401)
90
 
91
+ # Tangkap error jika client kabur (ClientDisconnect)
92
  try:
93
  body = await req.json()
94
  except ClientDisconnect:
 
104
  # ==========================================
105
  if not is_stream:
106
  tried_keys = set()
107
+ for attempt in range(len(OLLAMA_KEYS)):
108
  if len(tried_keys) >= len(OLLAMA_KEYS):
109
  tried_keys.clear()
110
 
 
117
  log("Client membatalkan request saat mengantre (Non-Stream).")
118
  return Response(status_code=499)
119
 
120
+ # Gunakan fungsi Atomic Lock
121
+ key = get_and_lock_key(exclude_keys=tried_keys)
122
+ if key:
123
+ break # Langsung keluar loop, key SUDAH DIKUNCI
124
+
125
+ await asyncio.sleep(0.5) # Cek tiap setengah detik
126
 
127
  ki = key_status[key]
 
128
  tried_keys.add(key)
129
+ log(f"LOCK ACQUIRED: key#{ki['index']} (Non-Stream)")
130
 
131
  try:
132
  async with httpx.AsyncClient(timeout=120.0) as client:
 
137
  )
138
  if resp.status_code == 200:
139
  ki["success"] += 1
140
+ ki["failures"] = 0
141
  return Response(content=resp.content, media_type=resp.headers.get("content-type"))
142
+ elif resp.status_code == 429:
143
+ ki["failures"] += 1
144
+ ki["healthy"] = False
145
+ log(f"RATE LIMIT: key#{ki['index']} - Skip ke key berikutnya.")
146
+ continue
147
  else:
148
  ki["failures"] += 1
149
  continue
 
152
  log(f"Error Non-Stream: {e}")
153
  continue
154
  finally:
155
+ ki["in_use"] = False # SELALU LEPAS KUNCI
156
  log(f"RELEASE: key#{ki['index']} (Non-Stream)")
157
 
158
  return JSONResponse({"error": "All keys failed after multiple attempts"}, status_code=500)
 
162
  # ==========================================
163
  async def stream_generator():
164
  current_body = body.copy()
 
165
  current_body["messages"] = [msg.copy() for msg in body.get("messages", [])]
166
 
167
  generated_text_buffer = ""
168
  tried_keys = set()
169
 
170
+ for attempt in range(len(OLLAMA_KEYS)):
171
  if len(tried_keys) >= len(OLLAMA_KEYS):
172
  tried_keys.clear()
173
 
 
182
  if await req.is_disconnected():
183
  log("Client membatalkan request saat mengantre stream.")
184
  return
185
+
186
+ # Gunakan fungsi Atomic Lock
187
+ key = get_and_lock_key(exclude_keys=tried_keys)
188
+ if key:
189
+ break # Langsung keluar loop, key SUDAH DIKUNCI
190
+
191
+ await asyncio.sleep(0.5)
192
 
193
  ki = key_status[key]
 
194
  tried_keys.add(key)
195
+ log(f"STREAM LOCK ACQUIRED: key#{ki['index']}")
 
196
 
197
  if generated_text_buffer:
198
  log(f"Resuming stream. Injecting {len(generated_text_buffer)} chars.")
 
204
  current_body["messages"] = messages
205
 
206
  try:
 
207
  custom_timeout = httpx.Timeout(connect=15.0, read=None, write=15.0, pool=10.0)
208
  async with httpx.AsyncClient(timeout=custom_timeout) as client:
209
  async with client.stream(
 
211
  json=current_body, headers={"Authorization": f"Bearer {key}"}
212
  ) as response:
213
 
214
+ if response.status_code == 429:
215
  ki["failures"] += 1
216
+ ki["healthy"] = False
217
+ log(f"STREAM 429: key#{ki['index']} - Switching key...")
218
  continue
219
+
220
+ if response.status_code != 200:
221
+ ki["failures"] += 1
222
+ log(f"STREAM ERR {response.status_code}: key#{ki['index']} - Switching key...")
223
+ continue
224
 
225
  stream_interrupted = False
226
  try:
 
230
  data_str = chunk[6:]
231
  if data_str.strip() == "[DONE]":
232
  ki["success"] += 1
233
+ ki["failures"] = 0
234
  yield chunk + "\n\n"
235
  return
236
  try:
 
258
  continue
259
 
260
  finally:
261
+ # SELALU LEPAS KUNCI
262
  ki["in_use"] = False
263
  log(f"STREAM RELEASE: key#{ki['index']}")
264