Elysiadev11 commited on
Commit
bff9611
Β·
verified Β·
1 Parent(s): 3f83b4e

Update proxy_cerebras.py

Browse files
Files changed (1) hide show
  1. proxy_cerebras.py +443 -240
proxy_cerebras.py CHANGED
@@ -1,3 +1,13 @@
 
 
 
 
 
 
 
 
 
 
1
  import os
2
  import json
3
  import time
@@ -16,6 +26,30 @@ app = FastAPI()
16
  # =====================================================
17
  BASE_URL = os.getenv("BASE_URL", "https://ollama.com")
18
  MASTER_API_KEY = os.getenv("MASTER_API_KEY", "olla")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
 
20
  # =====================================================
21
  # LOAD KEYS
@@ -33,16 +67,15 @@ key_status = {}
33
  for idx, k in enumerate(OLLAMA_KEYS, 1):
34
  key_status[k] = {
35
  "index": idx,
36
- "healthy": True,
37
  "busy": False,
38
  "success": 0,
39
  "fail": 0,
 
40
  }
41
 
42
  rr_index = 0
43
-
44
- # Global async lock to prevent race condition on rr_index & busy flag
45
- _key_lock = asyncio.Lock()
46
 
47
 
48
  # =====================================================
@@ -51,71 +84,241 @@ _key_lock = asyncio.Lock()
51
  def log(x):
52
  print(f"[{time.strftime('%H:%M:%S')}] {x}", flush=True)
53
 
54
-
55
  def sse(obj):
56
  return "data: " + json.dumps(obj, ensure_ascii=False) + "\n\n"
57
 
58
-
59
  def auth_ok(req: Request):
60
- token = req.headers.get("Authorization", "").replace("Bearer ", "")
61
- return token == MASTER_API_KEY
62
-
63
-
64
- async def get_key(exclude=None):
65
- """
66
- Thread-safe round-robin key picker.
67
- Returns the key string, or None if all are busy/excluded.
68
- """
69
- global rr_index
70
-
71
- if exclude is None:
72
- exclude = set()
73
 
74
- async with _key_lock:
75
- for _ in range(len(OLLAMA_KEYS)):
76
- rr_index = (rr_index + 1) % len(OLLAMA_KEYS)
77
- k = OLLAMA_KEYS[rr_index]
78
- st = key_status[k]
79
 
80
- if st["healthy"] and not st["busy"] and k not in exclude:
81
- st["busy"] = True
82
- return k
83
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
84
  return None
85
 
 
 
 
 
 
 
 
 
 
86
 
87
  async def release_key(k):
88
- async with _key_lock:
89
  if k in key_status:
90
  key_status[k]["busy"] = False
91
 
 
 
 
 
 
 
 
 
92
 
93
  async def mark_fail(k):
94
- async with _key_lock:
95
  if k in key_status:
96
  key_status[k]["fail"] += 1
97
 
98
-
99
  async def mark_ok(k):
100
- async with _key_lock:
101
  if k in key_status:
102
  key_status[k]["success"] += 1
103
  key_status[k]["fail"] = 0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
104
 
105
 
106
- async def wait_for_free_key(exclude=None, max_wait=30.0, interval=0.3):
 
 
 
107
  """
108
- Polls until a free key is available or max_wait seconds pass.
109
- Returns the key or None on timeout.
110
  """
111
- elapsed = 0.0
112
- while elapsed < max_wait:
113
- key = await get_key(exclude)
114
- if key:
115
- return key
116
- await asyncio.sleep(interval)
117
- elapsed += interval
118
- return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
119
 
120
 
121
  # =====================================================
@@ -123,23 +326,19 @@ async def wait_for_free_key(exclude=None, max_wait=30.0, interval=0.3):
123
  # =====================================================
124
  @app.get("/")
125
  async def root():
126
- async with _key_lock:
 
127
  safe = {}
128
  for k, v in key_status.items():
129
- masked = k[:4] + "****" + k[-4:]
130
- safe[masked] = {
131
  "index": v["index"],
132
- "healthy": v["healthy"],
133
- "busy": v["busy"],
134
  "success": v["success"],
135
  "fail": v["fail"],
136
  }
137
-
138
- return {
139
- "status": "ok",
140
- "keys": len(OLLAMA_KEYS),
141
- "detail": safe
142
- }
143
 
144
 
145
  # =====================================================
@@ -151,33 +350,22 @@ async def models(req: Request):
151
  return JSONResponse({"error": "Unauthorized"}, status_code=401)
152
 
153
  key = OLLAMA_KEYS[0]
154
-
155
- async with httpx.AsyncClient(timeout=60) as client:
156
- r = await client.get(
157
- f"{BASE_URL}/api/tags",
158
- headers={"Authorization": f"Bearer {key}"}
159
- )
160
-
161
- if r.status_code != 200:
162
- return JSONResponse({"error": r.text}, status_code=r.status_code)
163
-
164
- data = r.json()
165
- now = int(time.time())
166
- out = []
167
-
168
- for m in data.get("models", []):
169
- out.append({
170
- "id": m.get("name"),
171
- "object": "model",
172
- "created": now,
173
- "owned_by": "ollama"
174
- })
175
-
176
- return {"object": "list", "data": out}
177
 
178
 
179
  # =====================================================
180
- # OPENAI CHAT /v1/chat/completions
181
  # =====================================================
182
  @app.post("/v1/chat/completions")
183
  async def chat(req: Request):
@@ -196,15 +384,12 @@ async def chat(req: Request):
196
  # -----------------------------------------
197
  if not is_stream:
198
  tried = set()
199
-
200
  for _ in range(len(OLLAMA_KEYS)):
201
- key = await wait_for_free_key(exclude=tried)
202
-
203
  if not key:
204
- break
205
-
206
  tried.add(key)
207
-
208
  try:
209
  async with httpx.AsyncClient(timeout=180) as client:
210
  r = await client.post(
@@ -212,94 +397,71 @@ async def chat(req: Request):
212
  json=body,
213
  headers={"Authorization": f"Bearer {key}"}
214
  )
215
-
216
- txt = r.text.lower()
217
-
218
- if "weekly usage limit" in txt or r.status_code == 429:
219
- log(f"Key {key[:8]}... rate limited (non-stream chat), trying next")
220
- await mark_fail(key)
221
  continue
222
-
223
  await mark_ok(key)
224
-
225
- return Response(
226
- content=r.content,
227
- media_type=r.headers.get("content-type", "application/json")
228
- )
229
-
230
  except Exception as e:
231
- log(f"Key {key[:8]}... exception: {e}")
232
  await mark_fail(key)
233
-
234
  finally:
235
  await release_key(key)
236
-
237
  return JSONResponse({"error": "All keys failed"}, status_code=500)
238
 
239
  # -----------------------------------------
240
- # STREAM
241
  # -----------------------------------------
242
  async def gen():
243
- tried = set()
244
-
245
- for _ in range(len(OLLAMA_KEYS)):
246
- key = await wait_for_free_key(exclude=tried)
247
-
248
- if not key:
249
- break
250
-
251
- tried.add(key)
252
-
253
  try:
254
  async with httpx.AsyncClient(timeout=None) as client:
255
  async with client.stream(
256
- "POST",
257
- f"{BASE_URL}/v1/chat/completions",
258
- json=body,
259
- headers={"Authorization": f"Bearer {key}"}
260
  ) as r:
261
-
262
- if r.status_code == 429:
263
- log(f"Key {key[:8]}... rate limited (stream chat), trying next")
264
- await mark_fail(key)
265
  continue
266
 
267
- hit_limit_mid_stream = False
268
-
269
  async for line in r.aiter_lines():
270
  if not line:
271
  continue
272
-
273
- # Detect mid-stream rate limit signal in data payload
274
- if "429" in line or "usage limit" in line.lower():
275
- log(f"Key {key[:8]}... mid-stream limit detected, aborting chunk")
276
- hit_limit_mid_stream = True
277
  break
278
-
 
 
 
 
 
 
 
279
  yield line + "\n\n"
280
 
281
- if hit_limit_mid_stream:
282
- await mark_fail(key)
283
  continue
284
 
 
285
  await mark_ok(key)
286
  return
287
 
288
  except Exception as e:
289
- log(f"Key {key[:8]}... stream exception: {e}")
290
  await mark_fail(key)
291
-
292
  finally:
293
  await release_key(key)
294
 
295
- yield sse({"error": "All keys failed"})
296
- yield "data: [DONE]\n\n"
297
-
298
  return StreamingResponse(gen(), media_type="text/event-stream")
299
 
300
 
301
  # =====================================================
302
- # ANTHROPIC /v1/messages
 
303
  # =====================================================
304
  @app.post("/v1/messages")
305
  async def anthropic(req: Request):
@@ -314,45 +476,50 @@ async def anthropic(req: Request):
314
  return JSONResponse({"error": "Bad JSON"}, status_code=400)
315
 
316
  stream = body.get("stream", False)
 
 
317
 
318
- # Build messages list for proxy
319
  messages = []
320
-
321
  if body.get("system"):
322
- messages.append({"role": "system", "content": body["system"]})
323
-
324
- for m in body.get("messages", []):
325
- content = m.get("content", "")
326
-
327
- if isinstance(content, list):
328
- txt = ""
329
- for x in content:
330
- if x.get("type") == "text":
331
- txt += x.get("text", "")
332
- content = txt
333
 
334
- messages.append({"role": m["role"], "content": content})
 
335
 
336
  proxy_body = {
337
- "model": "minimax-m2.7:cloud",
338
  "messages": messages,
339
- "stream": stream
340
  }
341
 
 
 
 
 
 
 
 
 
 
 
 
 
 
342
  # -----------------------------------------
343
  # NON STREAM
344
  # -----------------------------------------
345
  if not stream:
346
  tried = set()
347
-
348
  for _ in range(len(OLLAMA_KEYS)):
349
- key = await wait_for_free_key(exclude=tried)
350
-
351
  if not key:
352
- break
353
-
354
  tried.add(key)
355
-
356
  try:
357
  async with httpx.AsyncClient(timeout=180) as client:
358
  r = await client.post(
@@ -360,152 +527,188 @@ async def anthropic(req: Request):
360
  json=proxy_body,
361
  headers={"Authorization": f"Bearer {key}"}
362
  )
363
-
364
- txt = r.text.lower()
365
-
366
- if "weekly usage limit" in txt or r.status_code == 429:
367
- log(f"Key {key[:8]}... rate limited (non-stream anthropic), trying next")
368
  await mark_fail(key)
369
  continue
370
 
371
  data = r.json()
372
- ans = data["choices"][0]["message"]["content"]
373
-
374
- out = {
375
- "id": "msg_" + uuid.uuid4().hex[:10],
376
- "type": "message",
377
- "role": "assistant",
378
- "model": body.get("model", "claude-opus-4-7"),
379
- "content": [{"type": "text", "text": ans}],
380
- "stop_reason": "end_turn",
381
- "stop_sequence": None,
382
- "usage": {"input_tokens": 0, "output_tokens": 0}
383
- }
384
-
385
  await mark_ok(key)
386
  return JSONResponse(out)
387
 
388
  except Exception as e:
389
- log(f"Key {key[:8]}... exception: {e}")
390
  await mark_fail(key)
391
-
392
  finally:
393
  await release_key(key)
394
 
395
  return JSONResponse({"error": "All keys failed"}, status_code=500)
396
 
397
  # -----------------------------------------
398
- # STREAM (Anthropic SSE format)
 
399
  # -----------------------------------------
400
  async def agen():
401
- tried = set()
402
  msg_id = "msg_" + uuid.uuid4().hex[:10]
403
- sent_any_delta = False
404
-
405
- # Send Anthropic envelope headers ONCE before first key attempt
406
- # We defer these until we have a successful connection to avoid
407
- # sending headers before knowing if any key works.
408
- # Instead we buffer and yield only on confirmed success.
409
-
410
- for _ in range(len(OLLAMA_KEYS)):
411
- key = await wait_for_free_key(exclude=tried)
412
-
413
- if not key:
414
- break
415
-
416
- tried.add(key)
417
 
 
 
418
  try:
419
  async with httpx.AsyncClient(timeout=None) as client:
420
  async with client.stream(
421
- "POST",
422
- f"{BASE_URL}/v1/chat/completions",
423
- json=proxy_body,
424
- headers={"Authorization": f"Bearer {key}"}
425
  ) as r:
426
 
427
- if r.status_code == 429:
428
- log(f"Key {key[:8]}... rate limited (stream anthropic), trying next")
 
 
 
 
429
  await mark_fail(key)
430
  continue
431
 
432
- # Only emit Anthropic envelope on first successful key
433
- if not sent_any_delta:
 
434
  yield sse({
435
  "type": "message_start",
436
  "message": {
437
- "id": msg_id,
438
- "type": "message",
439
- "role": "assistant",
440
- "model": body.get("model", "claude-opus-4-7"),
441
- "content": [],
442
- "stop_reason": None,
443
- "stop_sequence": None,
444
  "usage": {"input_tokens": 0, "output_tokens": 0}
445
  }
446
  })
447
  yield sse({
448
- "type": "content_block_start",
449
- "index": 0,
450
- "content_block": {"type": "text"}
451
  })
452
 
453
- hit_limit_mid_stream = False
 
 
 
 
454
 
455
  async for line in r.aiter_lines():
456
- if not line.startswith("data: "):
457
  continue
458
-
459
- raw = line[6:].strip()
460
-
461
- if raw == "[DONE]":
462
  break
463
 
464
- # Detect mid-stream 429 / limit payload
465
- if "429" in raw or "usage limit" in raw.lower():
466
- log(f"Key {key[:8]}... mid-stream limit in anthropic, aborting chunk")
467
- hit_limit_mid_stream = True
468
- break
469
 
470
  try:
471
  j = json.loads(raw)
472
  except Exception:
473
  continue
474
 
475
- delta = j["choices"][0]["delta"]
476
- txt = delta.get("content", "")
 
 
 
 
 
 
 
 
 
 
 
477
 
 
 
478
  if txt:
479
- sent_any_delta = True
480
  yield sse({
481
- "type": "content_block_delta",
482
- "index": 0,
483
  "delta": {"type": "text_delta", "text": txt}
484
  })
485
 
486
- if hit_limit_mid_stream:
487
- await mark_fail(key)
488
- # Continue to next key β€” stream resumes from where it broke
489
- # Note: client will receive continued deltas seamlessly
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
490
  continue
491
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
492
  await mark_ok(key)
493
- break # Success β€” exit key retry loop
494
 
495
  except Exception as e:
496
- log(f"Key {key[:8]}... agen exception: {e}")
497
  await mark_fail(key)
498
-
499
  finally:
500
  await release_key(key)
501
 
502
- # Close Anthropic SSE envelope
 
 
 
 
 
 
 
 
 
 
503
  yield sse({"type": "content_block_stop", "index": 0})
504
- yield sse({
505
- "type": "message_delta",
506
- "delta": {"stop_reason": "end_turn", "stop_sequence": None},
507
- "usage": {"output_tokens": 0}
508
- })
509
  yield sse({"type": "message_stop"})
510
 
511
- return StreamingResponse(agen(), media_type="text/event-stream")
 
1
+ """
2
+ proxy_cerebras.py β€” Proxy ke Ollama backend dengan Anthropic + OpenAI compatible API
3
+ FIXED:
4
+ - Tool calling support penuh (Anthropic <-> OpenAI)
5
+ - Non-stream tidak crash saat finish_reason=tool_calls
6
+ - Stream handle delta.tool_calls
7
+ - Model tidak hardcoded, diteruskan dari request
8
+ - Infinite loop dengan smart cooldown (tidak hammering)
9
+ """
10
+
11
  import os
12
  import json
13
  import time
 
26
  # =====================================================
27
  BASE_URL = os.getenv("BASE_URL", "https://ollama.com")
28
  MASTER_API_KEY = os.getenv("MASTER_API_KEY", "olla")
29
+ DEFAULT_MODEL = os.getenv("DEFAULT_MODEL", "minimax-m2.7:cloud")
30
+ RATE_LIMIT_COOLDOWN = int(os.getenv("RATE_LIMIT_COOLDOWN", "62"))
31
+
32
+ # Model mapping Claude/GPT β†’ Ollama model
33
+ MODEL_MAP = {
34
+ "claude-opus-4-7": DEFAULT_MODEL,
35
+ "claude-opus-4-6": DEFAULT_MODEL,
36
+ "claude-opus-4-5": DEFAULT_MODEL,
37
+ "claude-opus-4-1": DEFAULT_MODEL,
38
+ "claude-opus-4-20250514": DEFAULT_MODEL,
39
+ "claude-sonnet-4-6": DEFAULT_MODEL,
40
+ "claude-sonnet-4-5": DEFAULT_MODEL,
41
+ "claude-sonnet-4-20250514": DEFAULT_MODEL,
42
+ "claude-haiku-4-5": DEFAULT_MODEL,
43
+ "claude-haiku-4-5-20251001": DEFAULT_MODEL,
44
+ "gpt-4": DEFAULT_MODEL,
45
+ "gpt-4o": DEFAULT_MODEL,
46
+ "gpt-4o-mini": DEFAULT_MODEL,
47
+ "gpt-4-turbo": DEFAULT_MODEL,
48
+ "gpt-3.5-turbo": DEFAULT_MODEL,
49
+ }
50
+
51
+ def map_model(name: str) -> str:
52
+ return MODEL_MAP.get(name, name)
53
 
54
  # =====================================================
55
  # LOAD KEYS
 
67
  for idx, k in enumerate(OLLAMA_KEYS, 1):
68
  key_status[k] = {
69
  "index": idx,
70
+ "prefix": k[:6] + "..." if len(k) > 6 else k,
71
  "busy": False,
72
  "success": 0,
73
  "fail": 0,
74
+ "rate_limited_until": 0.0,
75
  }
76
 
77
  rr_index = 0
78
+ _lock = asyncio.Lock()
 
 
79
 
80
 
81
  # =====================================================
 
84
  def log(x):
85
  print(f"[{time.strftime('%H:%M:%S')}] {x}", flush=True)
86
 
 
87
  def sse(obj):
88
  return "data: " + json.dumps(obj, ensure_ascii=False) + "\n\n"
89
 
 
90
  def auth_ok(req: Request):
91
+ return req.headers.get("Authorization", "").replace("Bearer ", "") == MASTER_API_KEY
 
 
 
 
 
 
 
 
 
 
 
 
92
 
93
+ def is_rate_limited(status: int, body_text: str = "") -> bool:
94
+ if status == 429:
95
+ return True
96
+ t = body_text.lower()
97
+ return "weekly usage limit" in t or "rate limit" in t or "too many requests" in t
98
 
 
 
 
99
 
100
+ # =====================================================
101
+ # KEY MANAGEMENT
102
+ # =====================================================
103
+ def _pick_key(exclude: set):
104
+ """Sync, dipanggil dalam _lock. Pilih key yang ready (tidak busy & tidak cooldown)."""
105
+ global rr_index
106
+ now = time.time()
107
+ for _ in range(len(OLLAMA_KEYS)):
108
+ rr_index = (rr_index + 1) % len(OLLAMA_KEYS)
109
+ k = OLLAMA_KEYS[rr_index]
110
+ st = key_status[k]
111
+ if not st["busy"] and now >= st["rate_limited_until"] and k not in exclude:
112
+ st["busy"] = True
113
+ return k
114
  return None
115
 
116
+ def _next_ready_time() -> float:
117
+ """Epoch time kapan key pertama keluar dari cooldown."""
118
+ now = time.time()
119
+ times = [st["rate_limited_until"] for st in key_status.values() if st["rate_limited_until"] > now]
120
+ return min(times) if times else now
121
+
122
+ async def get_key(exclude=None):
123
+ async with _lock:
124
+ return _pick_key(exclude or set())
125
 
126
  async def release_key(k):
127
+ async with _lock:
128
  if k in key_status:
129
  key_status[k]["busy"] = False
130
 
131
+ async def mark_rate_limited(k):
132
+ async with _lock:
133
+ if k in key_status:
134
+ until = time.time() + RATE_LIMIT_COOLDOWN
135
+ key_status[k]["rate_limited_until"] = until
136
+ key_status[k]["fail"] += 1
137
+ idx = key_status[k]["index"]
138
+ log(f"⏳ key#{idx} cooldown {RATE_LIMIT_COOLDOWN}s (ready {time.strftime('%H:%M:%S', time.localtime(until))})")
139
 
140
  async def mark_fail(k):
141
+ async with _lock:
142
  if k in key_status:
143
  key_status[k]["fail"] += 1
144
 
 
145
  async def mark_ok(k):
146
+ async with _lock:
147
  if k in key_status:
148
  key_status[k]["success"] += 1
149
  key_status[k]["fail"] = 0
150
+ key_status[k]["rate_limited_until"] = 0.0
151
+
152
+ async def get_key_infinite(exclude=None):
153
+ """
154
+ Tunggu key tanpa batas. Kalau semua cooldown, sleep TEPAT sampai key paling cepat ready.
155
+ Return: (key, exclude_set)
156
+ """
157
+ local_exclude = set(exclude) if exclude else set()
158
+ cycle = 0
159
+
160
+ while True:
161
+ async with _lock:
162
+ k = _pick_key(local_exclude)
163
+ if k:
164
+ return k, local_exclude
165
+
166
+ now = time.time()
167
+ next_ready = _next_ready_time()
168
+ wait_sec = max(0.5, next_ready - now)
169
+ all_cooldown = all(
170
+ st["rate_limited_until"] > now or st["busy"]
171
+ for st in key_status.values()
172
+ )
173
+
174
+ if all_cooldown:
175
+ cycle += 1
176
+ log(f"⏳ Semua key cooldown. Tunggu {wait_sec:.1f}s... (cycle #{cycle})")
177
+ local_exclude.clear()
178
+ await asyncio.sleep(wait_sec)
179
+ else:
180
+ await asyncio.sleep(0.3)
181
 
182
 
183
+ # =====================================================
184
+ # TOOL CONVERSION: Anthropic ↔ OpenAI
185
+ # =====================================================
186
+ def anthropic_tools_to_openai(tools: list) -> list:
187
  """
188
+ Anthropic: {"name", "description", "input_schema"}
189
+ OpenAI: {"type": "function", "function": {"name", "description", "parameters"}}
190
  """
191
+ return [
192
+ {
193
+ "type": "function",
194
+ "function": {
195
+ "name": t.get("name", ""),
196
+ "description": t.get("description", ""),
197
+ "parameters": t.get("input_schema", {"type": "object", "properties": {}}),
198
+ }
199
+ }
200
+ for t in tools
201
+ ]
202
+
203
+ def anthropic_tool_choice_to_openai(tc):
204
+ if tc is None:
205
+ return None
206
+ if isinstance(tc, str):
207
+ return {"auto": "auto", "any": "required", "none": "none"}.get(tc, "auto")
208
+ if isinstance(tc, dict):
209
+ t = tc.get("type", "")
210
+ if t == "tool":
211
+ return {"type": "function", "function": {"name": tc.get("name", "")}}
212
+ return {"auto": "auto", "any": "required", "none": "none"}.get(t, "auto")
213
+ return "auto"
214
+
215
+ def convert_anthropic_messages(messages: list) -> list:
216
+ """
217
+ Convert Anthropic message list β†’ OpenAI message list.
218
+ Handles: text, tool_use (assistant), tool_result (user).
219
+ """
220
+ out = []
221
+ for m in messages:
222
+ role = m.get("role", "user")
223
+ content = m.get("content", "")
224
+
225
+ # String content β†’ langsung
226
+ if isinstance(content, str):
227
+ out.append({"role": role, "content": content})
228
+ continue
229
+
230
+ if not isinstance(content, list):
231
+ out.append({"role": role, "content": str(content)})
232
+ continue
233
+
234
+ tool_use_blocks = [b for b in content if b.get("type") == "tool_use"]
235
+ tool_result_blocks = [b for b in content if b.get("type") == "tool_result"]
236
+ text_blocks = [b for b in content if b.get("type") == "text"]
237
+
238
+ # Assistant dengan tool_use β†’ OpenAI assistant + tool_calls
239
+ if tool_use_blocks and role == "assistant":
240
+ text_content = "".join(b.get("text", "") for b in text_blocks) or None
241
+ tool_calls = [
242
+ {
243
+ "id": b.get("id", "call_" + uuid.uuid4().hex[:8]),
244
+ "type": "function",
245
+ "function": {
246
+ "name": b.get("name", ""),
247
+ "arguments": json.dumps(b.get("input", {}))
248
+ }
249
+ }
250
+ for b in tool_use_blocks
251
+ ]
252
+ out.append({"role": "assistant", "content": text_content, "tool_calls": tool_calls})
253
+ continue
254
+
255
+ # User dengan tool_result β†’ OpenAI role=tool messages
256
+ if tool_result_blocks and role == "user":
257
+ for b in tool_result_blocks:
258
+ rc = b.get("content", "")
259
+ if isinstance(rc, list):
260
+ rc = "".join(x.get("text", "") if isinstance(x, dict) else str(x) for x in rc)
261
+ out.append({
262
+ "role": "tool",
263
+ "tool_call_id": b.get("tool_use_id", ""),
264
+ "content": str(rc),
265
+ })
266
+ if text_blocks:
267
+ txt = "".join(b.get("text", "") for b in text_blocks)
268
+ if txt:
269
+ out.append({"role": "user", "content": txt})
270
+ continue
271
+
272
+ # Default: gabung semua text
273
+ out.append({"role": role, "content": "".join(b.get("text", "") for b in text_blocks)})
274
+
275
+ return out
276
+
277
+ def openai_to_anthropic_response(data: dict, original_model: str) -> dict:
278
+ """
279
+ Convert OpenAI non-stream response β†’ Anthropic format.
280
+ Handle text + tool_calls.
281
+ """
282
+ choice = data["choices"][0]
283
+ message = choice.get("message", {})
284
+ finish_reason = choice.get("finish_reason", "stop")
285
+ usage = data.get("usage", {})
286
+
287
+ stop_map = {"stop": "end_turn", "length": "max_tokens", "eos": "end_turn", "tool_calls": "tool_use"}
288
+ stop_reason = stop_map.get(finish_reason, "end_turn")
289
+
290
+ content_blocks = []
291
+
292
+ text = message.get("content") or ""
293
+ if text:
294
+ content_blocks.append({"type": "text", "text": text})
295
+
296
+ for tc in (message.get("tool_calls") or []):
297
+ fn = tc.get("function", {})
298
+ try:
299
+ inp = json.loads(fn.get("arguments", "{}"))
300
+ except Exception:
301
+ inp = {"_raw": fn.get("arguments", "")}
302
+ content_blocks.append({
303
+ "type": "tool_use",
304
+ "id": tc.get("id", "toolu_" + uuid.uuid4().hex[:10]),
305
+ "name": fn.get("name", ""),
306
+ "input": inp,
307
+ })
308
+
309
+ return {
310
+ "id": "msg_" + uuid.uuid4().hex[:10],
311
+ "type": "message",
312
+ "role": "assistant",
313
+ "model": original_model,
314
+ "content": content_blocks,
315
+ "stop_reason": stop_reason,
316
+ "stop_sequence": None,
317
+ "usage": {
318
+ "input_tokens": usage.get("prompt_tokens", 0),
319
+ "output_tokens": usage.get("completion_tokens", 0),
320
+ }
321
+ }
322
 
323
 
324
  # =====================================================
 
326
  # =====================================================
327
  @app.get("/")
328
  async def root():
329
+ now = time.time()
330
+ async with _lock:
331
  safe = {}
332
  for k, v in key_status.items():
333
+ cd = max(0, v["rate_limited_until"] - now)
334
+ safe[v["prefix"]] = {
335
  "index": v["index"],
336
+ "status": "BUSY" if v["busy"] else ("COOLDOWN" if cd > 0 else "IDLE"),
337
+ "cooldown_sec": round(cd, 1) if cd > 0 else 0,
338
  "success": v["success"],
339
  "fail": v["fail"],
340
  }
341
+ return {"status": "ok", "base_url": BASE_URL, "default_model": DEFAULT_MODEL, "keys": safe}
 
 
 
 
 
342
 
343
 
344
  # =====================================================
 
350
  return JSONResponse({"error": "Unauthorized"}, status_code=401)
351
 
352
  key = OLLAMA_KEYS[0]
353
+ try:
354
+ async with httpx.AsyncClient(timeout=60) as client:
355
+ r = await client.get(f"{BASE_URL}/api/tags", headers={"Authorization": f"Bearer {key}"})
356
+ if r.status_code == 200:
357
+ data = r.json()
358
+ now = int(time.time())
359
+ out = [{"id": m.get("name"), "object": "model", "created": now, "owned_by": "ollama"}
360
+ for m in data.get("models", [])]
361
+ return {"object": "list", "data": out}
362
+ except Exception as e:
363
+ log(f"[/v1/models] {e}")
364
+ return JSONResponse({"error": "Failed to fetch models"}, status_code=500)
 
 
 
 
 
 
 
 
 
 
 
365
 
366
 
367
  # =====================================================
368
+ # /v1/chat/completions (OpenAI-compatible, pipe through)
369
  # =====================================================
370
  @app.post("/v1/chat/completions")
371
  async def chat(req: Request):
 
384
  # -----------------------------------------
385
  if not is_stream:
386
  tried = set()
 
387
  for _ in range(len(OLLAMA_KEYS)):
388
+ key = await get_key(tried)
 
389
  if not key:
390
+ await asyncio.sleep(0.5)
391
+ continue
392
  tried.add(key)
 
393
  try:
394
  async with httpx.AsyncClient(timeout=180) as client:
395
  r = await client.post(
 
397
  json=body,
398
  headers={"Authorization": f"Bearer {key}"}
399
  )
400
+ if is_rate_limited(r.status_code, r.text if r.status_code != 200 else ""):
401
+ await mark_rate_limited(key)
 
 
 
 
402
  continue
 
403
  await mark_ok(key)
404
+ return Response(content=r.content, media_type=r.headers.get("content-type", "application/json"))
 
 
 
 
 
405
  except Exception as e:
406
+ log(e)
407
  await mark_fail(key)
 
408
  finally:
409
  await release_key(key)
 
410
  return JSONResponse({"error": "All keys failed"}, status_code=500)
411
 
412
  # -----------------------------------------
413
+ # STREAM β€” infinite loop
414
  # -----------------------------------------
415
  async def gen():
416
+ exclude = set()
417
+ while True:
418
+ key, exclude = await get_key_infinite(exclude)
 
 
 
 
 
 
 
419
  try:
420
  async with httpx.AsyncClient(timeout=None) as client:
421
  async with client.stream(
422
+ "POST", f"{BASE_URL}/v1/chat/completions",
423
+ json=body, headers={"Authorization": f"Bearer {key}"}
 
 
424
  ) as r:
425
+ if is_rate_limited(r.status_code):
426
+ await mark_rate_limited(key)
 
 
427
  continue
428
 
429
+ hit_limit = False
 
430
  async for line in r.aiter_lines():
431
  if not line:
432
  continue
433
+ if line.strip() == "data: [DONE]":
 
 
 
 
434
  break
435
+ raw = line[6:] if line.startswith("data: ") else line
436
+ try:
437
+ j = json.loads(raw)
438
+ if "error" in j and "choices" not in j and is_rate_limited(0, json.dumps(j)):
439
+ hit_limit = True
440
+ break
441
+ except Exception:
442
+ pass
443
  yield line + "\n\n"
444
 
445
+ if hit_limit:
446
+ await mark_rate_limited(key)
447
  continue
448
 
449
+ yield "data: [DONE]\n\n"
450
  await mark_ok(key)
451
  return
452
 
453
  except Exception as e:
454
+ log(e)
455
  await mark_fail(key)
 
456
  finally:
457
  await release_key(key)
458
 
 
 
 
459
  return StreamingResponse(gen(), media_type="text/event-stream")
460
 
461
 
462
  # =====================================================
463
+ # /v1/messages (Anthropic-compatible)
464
+ # FIXED: Full tool calling support
465
  # =====================================================
466
  @app.post("/v1/messages")
467
  async def anthropic(req: Request):
 
476
  return JSONResponse({"error": "Bad JSON"}, status_code=400)
477
 
478
  stream = body.get("stream", False)
479
+ original_model = body.get("model", "claude-opus-4-7")
480
+ ollama_model = map_model(original_model)
481
 
482
+ # Build OpenAI-format messages
483
  messages = []
 
484
  if body.get("system"):
485
+ sys = body["system"]
486
+ if isinstance(sys, list):
487
+ sys = "".join(x.get("text", "") for x in sys if x.get("type") == "text")
488
+ messages.append({"role": "system", "content": sys})
 
 
 
 
 
 
 
489
 
490
+ # FIXED: Konversi penuh termasuk tool_use dan tool_result
491
+ messages.extend(convert_anthropic_messages(body.get("messages", [])))
492
 
493
  proxy_body = {
494
+ "model": ollama_model,
495
  "messages": messages,
496
+ "stream": stream,
497
  }
498
 
499
+ if "max_tokens" in body:
500
+ proxy_body["max_tokens"] = body["max_tokens"]
501
+ if "temperature" in body:
502
+ proxy_body["temperature"] = body["temperature"]
503
+ if "top_p" in body:
504
+ proxy_body["top_p"] = body["top_p"]
505
+
506
+ # FIXED: Forward tools β†’ OpenAI format
507
+ if body.get("tools"):
508
+ proxy_body["tools"] = anthropic_tools_to_openai(body["tools"])
509
+ if body.get("tool_choice"):
510
+ proxy_body["tool_choice"] = anthropic_tool_choice_to_openai(body["tool_choice"])
511
+
512
  # -----------------------------------------
513
  # NON STREAM
514
  # -----------------------------------------
515
  if not stream:
516
  tried = set()
 
517
  for _ in range(len(OLLAMA_KEYS)):
518
+ key = await get_key(tried)
 
519
  if not key:
520
+ await asyncio.sleep(0.5)
521
+ continue
522
  tried.add(key)
 
523
  try:
524
  async with httpx.AsyncClient(timeout=180) as client:
525
  r = await client.post(
 
527
  json=proxy_body,
528
  headers={"Authorization": f"Bearer {key}"}
529
  )
530
+ if is_rate_limited(r.status_code, r.text if r.status_code != 200 else ""):
531
+ await mark_rate_limited(key)
532
+ continue
533
+ if r.status_code != 200:
534
+ log(f"HTTP {r.status_code}: {r.text[:200]}")
535
  await mark_fail(key)
536
  continue
537
 
538
  data = r.json()
539
+ # FIXED: convert OpenAI response β†’ Anthropic (handles tool_calls too)
540
+ out = openai_to_anthropic_response(data, original_model)
 
 
 
 
 
 
 
 
 
 
 
541
  await mark_ok(key)
542
  return JSONResponse(out)
543
 
544
  except Exception as e:
545
+ log(e)
546
  await mark_fail(key)
 
547
  finally:
548
  await release_key(key)
549
 
550
  return JSONResponse({"error": "All keys failed"}, status_code=500)
551
 
552
  # -----------------------------------------
553
+ # STREAM β€” infinite loop, Anthropic SSE format
554
+ # FIXED: Handle tool_calls delta dari streaming
555
  # -----------------------------------------
556
  async def agen():
557
+ exclude = set()
558
  msg_id = "msg_" + uuid.uuid4().hex[:10]
559
+ sent_header = False
 
 
 
 
 
 
 
 
 
 
 
 
 
560
 
561
+ while True:
562
+ key, exclude = await get_key_infinite(exclude)
563
  try:
564
  async with httpx.AsyncClient(timeout=None) as client:
565
  async with client.stream(
566
+ "POST", f"{BASE_URL}/v1/chat/completions",
567
+ json=proxy_body, headers={"Authorization": f"Bearer {key}"}
 
 
568
  ) as r:
569
 
570
+ if is_rate_limited(r.status_code):
571
+ await mark_rate_limited(key)
572
+ continue
573
+
574
+ if r.status_code != 200:
575
+ log(f"STREAM HTTP {r.status_code}")
576
  await mark_fail(key)
577
  continue
578
 
579
+ # Kirim Anthropic header sekali
580
+ if not sent_header:
581
+ sent_header = True
582
  yield sse({
583
  "type": "message_start",
584
  "message": {
585
+ "id": msg_id, "type": "message", "role": "assistant",
586
+ "model": original_model, "content": [],
587
+ "stop_reason": None, "stop_sequence": None,
 
 
 
 
588
  "usage": {"input_tokens": 0, "output_tokens": 0}
589
  }
590
  })
591
  yield sse({
592
+ "type": "content_block_start", "index": 0,
593
+ "content_block": {"type": "text", "text": ""}
 
594
  })
595
 
596
+ hit_limit = False
597
+ finish_reason = None
598
+ # track tool call blocks: openai index β†’ {block_index, id, name}
599
+ tool_blocks = {} # tc_idx β†’ anthropic block_index
600
+ next_block = 1 # 0 = text block
601
 
602
  async for line in r.aiter_lines():
603
+ if not line:
604
  continue
605
+ if line.strip() == "data: [DONE]":
 
 
 
606
  break
607
 
608
+ raw = line[6:] if line.startswith("data: ") else line
 
 
 
 
609
 
610
  try:
611
  j = json.loads(raw)
612
  except Exception:
613
  continue
614
 
615
+ # Cek error API (bukan model output)
616
+ if "error" in j and "choices" not in j:
617
+ if is_rate_limited(0, json.dumps(j)):
618
+ hit_limit = True
619
+ break
620
+
621
+ choices = j.get("choices", [])
622
+ if not choices:
623
+ continue
624
+
625
+ choice = choices[0]
626
+ delta = choice.get("delta", {})
627
+ finish_reason = choice.get("finish_reason") or finish_reason
628
 
629
+ # ── TEXT content ──
630
+ txt = delta.get("content") or ""
631
  if txt:
 
632
  yield sse({
633
+ "type": "content_block_delta", "index": 0,
 
634
  "delta": {"type": "text_delta", "text": txt}
635
  })
636
 
637
+ # ── TOOL CALLS ── FIXED
638
+ for tc in (delta.get("tool_calls") or []):
639
+ tc_idx = tc.get("index", 0)
640
+
641
+ # Tool call baru β†’ buka block
642
+ if tc.get("id") or tc.get("function", {}).get("name"):
643
+ if tc_idx not in tool_blocks:
644
+ block_idx = next_block
645
+ next_block += 1
646
+ tool_blocks[tc_idx] = block_idx
647
+ yield sse({
648
+ "type": "content_block_start",
649
+ "index": block_idx,
650
+ "content_block": {
651
+ "type": "tool_use",
652
+ "id": tc.get("id", "toolu_" + uuid.uuid4().hex[:10]),
653
+ "name": tc.get("function", {}).get("name", ""),
654
+ "input": {}
655
+ }
656
+ })
657
+
658
+ # Stream argument chunks
659
+ args_chunk = tc.get("function", {}).get("arguments", "")
660
+ if args_chunk and tc_idx in tool_blocks:
661
+ yield sse({
662
+ "type": "content_block_delta",
663
+ "index": tool_blocks[tc_idx],
664
+ "delta": {"type": "input_json_delta", "partial_json": args_chunk}
665
+ })
666
+
667
+ if hit_limit:
668
+ await mark_rate_limited(key)
669
  continue
670
 
671
+ # Tutup semua blocks
672
+ yield sse({"type": "content_block_stop", "index": 0})
673
+ for block_idx in tool_blocks.values():
674
+ yield sse({"type": "content_block_stop", "index": block_idx})
675
+
676
+ # Stop reason
677
+ if finish_reason == "tool_calls" or tool_blocks:
678
+ stop_reason = "tool_use"
679
+ elif finish_reason == "length":
680
+ stop_reason = "max_tokens"
681
+ else:
682
+ stop_reason = "end_turn"
683
+
684
+ yield sse({
685
+ "type": "message_delta",
686
+ "delta": {"stop_reason": stop_reason, "stop_sequence": None},
687
+ "usage": {"output_tokens": 0}
688
+ })
689
+ yield sse({"type": "message_stop"})
690
  await mark_ok(key)
691
+ return # sukses
692
 
693
  except Exception as e:
694
+ log(e)
695
  await mark_fail(key)
 
696
  finally:
697
  await release_key(key)
698
 
699
+ # Fallback kalau belum kirim header
700
+ if not sent_header:
701
+ yield sse({
702
+ "type": "message_start",
703
+ "message": {
704
+ "id": msg_id, "type": "message", "role": "assistant",
705
+ "model": original_model, "content": [], "stop_reason": None,
706
+ "stop_sequence": None, "usage": {"input_tokens": 0, "output_tokens": 0}
707
+ }
708
+ })
709
+ yield sse({"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}})
710
  yield sse({"type": "content_block_stop", "index": 0})
711
+ yield sse({"type": "message_delta", "delta": {"stop_reason": "end_turn", "stop_sequence": None}, "usage": {"output_tokens": 0}})
 
 
 
 
712
  yield sse({"type": "message_stop"})
713
 
714
+ return StreamingResponse(agen(), media_type="text/event-stream")