| |
| |
| |
|
|
| import json |
| import os |
| import time |
| import uuid |
| import threading |
| from typing import Any, Dict, List, Optional, TypedDict, Union |
|
|
| import requests |
| from fastapi import FastAPI, HTTPException, Depends, Query |
| from fastapi.responses import StreamingResponse |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
| from pydantic import BaseModel, Field |
|
|
| |
| class CodeGeeXToken(TypedDict): |
| token: str |
| is_valid: bool |
| last_used: float |
| error_count: int |
|
|
| VALID_CLIENT_KEYS: set = set() |
| CODEGEEX_TOKENS: List[CodeGeeXToken] = [] |
| CODEGEEX_MODELS: List[str] = ["claude-3-7-sonnet", "claude-sonnet-4"] |
| token_rotation_lock = threading.Lock() |
| MAX_ERROR_COUNT = 3 |
| ERROR_COOLDOWN = 300 |
| DEBUG_MODE = os.environ.get("DEBUG_MODE", "false").lower() == "true" |
|
|
| |
| class ChatMessage(BaseModel): |
| role: str |
| content: Union[str, List[Dict[str, Any]]] |
| reasoning_content: Optional[str] = None |
| class ChatCompletionRequest(BaseModel): |
| model: str |
| messages: List[ChatMessage] |
| stream: bool = True |
| temperature: Optional[float] = None |
| max_tokens: Optional[int] = None |
| top_p: Optional[float] = None |
| class ModelInfo(BaseModel): |
| id: str |
| object: str = "model" |
| created: int |
| owned_by: str |
| class ModelList(BaseModel): |
| object: str = "list" |
| data: List[ModelInfo] |
| class ChatCompletionChoice(BaseModel): |
| message: ChatMessage |
| index: int = 0 |
| finish_reason: str = "stop" |
| class ChatCompletionResponse(BaseModel): |
| id: str = Field(default_factory=lambda: f"chatcmpl-{uuid.uuid4().hex}") |
| object: str = "chat.completion" |
| created: int = Field(default_factory=lambda: int(time.time())) |
| model: str |
| choices: List[ChatCompletionChoice] |
| usage: Dict[str, int] = Field(default_factory=lambda: {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}) |
| class StreamChoice(BaseModel): |
| delta: Dict[str, Any] = Field(default_factory=dict) |
| index: int = 0 |
| finish_reason: Optional[str] = None |
| class StreamResponse(BaseModel): |
| id: str = Field(default_factory=lambda: f"chatcmpl-{uuid.uuid4().hex}") |
| object: str = "chat.completion.chunk" |
| created: int = Field(default_factory=lambda: int(time.time())) |
| model: str |
| choices: List[StreamChoice] |
|
|
| |
| app = FastAPI(title="CodeGeeX OpenAI API Adapter") |
| security = HTTPBearer(auto_error=False) |
|
|
| def log_debug(message: str): |
| if DEBUG_MODE: |
| print(f"[DEBUG] {message}") |
|
|
| |
| def load_client_api_keys_from_secrets(): |
| global VALID_CLIENT_KEYS |
| try: |
| keys_str = os.environ.get("CLIENT_API_KEYS") |
| if not keys_str: raise ValueError("Secret 'CLIENT_API_KEYS' not found.") |
| keys = json.loads(keys_str) |
| VALID_CLIENT_KEYS = set(keys) if isinstance(keys, list) else set() |
| print(f"Successfully loaded {len(VALID_CLIENT_KEYS)} client API keys.") |
| except Exception as e: |
| print(f"FATAL: Error loading client API keys: {e}") |
| VALID_CLIENT_KEYS = set() |
|
|
| def load_codegeex_tokens_from_secrets(): |
| global CODEGEEX_TOKENS |
| CODEGEEX_TOKENS = [] |
| try: |
| tokens_str = os.environ.get("CODEGEEX_TOKENS") |
| if not tokens_str: raise ValueError("Secret 'CODEGEEX_TOKENS' not found.") |
| tokens = json.loads(tokens_str) |
| if not isinstance(tokens, list): raise TypeError("Secret 'CODEGEEX_TOKENS' must be a JSON list.") |
| for token in tokens: |
| if isinstance(token, str) and token: |
| CODEGEEX_TOKENS.append({"token": token, "is_valid": True, "last_used": 0, "error_count": 0}) |
| print(f"Successfully loaded {len(CODEGEEX_TOKENS)} CodeGeeX tokens.") |
| except Exception as e: |
| print(f"FATAL: Error loading CodeGeeX tokens: {e}") |
|
|
| |
| def get_best_codegeex_token() -> Optional[CodeGeeXToken]: |
| with token_rotation_lock: |
| now = time.time() |
| valid_tokens = [t for t in CODEGEEX_TOKENS if t["is_valid"] and (t["error_count"] < MAX_ERROR_COUNT or now - t["last_used"] > ERROR_COOLDOWN)] |
| if not valid_tokens: return None |
| for token in valid_tokens: |
| if token["error_count"] >= MAX_ERROR_COUNT and now - token["last_used"] > ERROR_COOLDOWN: token["error_count"] = 0 |
| valid_tokens.sort(key=lambda x: (x["last_used"], x["error_count"])) |
| token = valid_tokens[0] |
| token["last_used"] = now |
| return token |
|
|
| def _convert_messages_to_codegeex_format(messages: List[ChatMessage]): |
| if not messages: return "", [] |
| last_user_msg = next((msg for msg in reversed(messages) if msg.role == "user"), None) |
| if not last_user_msg: raise HTTPException(status_code=400, detail="No user message found.") |
| prompt = last_user_msg.content if isinstance(last_user_msg.content, str) else "" |
| history, user_content, assistant_content = [], "", "" |
| for msg in messages: |
| if msg == last_user_msg: break |
| if msg.role == "user": |
| if user_content and assistant_content: history.append({"query": user_content, "answer": assistant_content, "id": f"{uuid.uuid4()}"}); user_content, assistant_content = "", "" |
| user_content = msg.content if isinstance(msg.content, str) else "" |
| elif msg.role == "assistant": |
| assistant_content = msg.content if isinstance(msg.content, str) else "" |
| if user_content: history.append({"query": user_content, "answer": assistant_content, "id": f"{uuid.uuid4()}"}); user_content, assistant_content = "", "" |
| if user_content and not assistant_content: prompt = user_content + "\n" + prompt |
| return prompt, history |
|
|
| async def authenticate_client(auth: Optional[HTTPAuthorizationCredentials] = Depends(security)): |
| if not VALID_CLIENT_KEYS: raise HTTPException(status_code=503, detail="Service unavailable: Client API keys not configured.") |
| if not auth or not auth.credentials: raise HTTPException(status_code=401, detail="API key required.", headers={"WWW-Authenticate": "Bearer"}) |
| if auth.credentials not in VALID_CLIENT_KEYS: raise HTTPException(status_code=403, detail="Invalid client API key.") |
|
|
| |
| @app.on_event("startup") |
| async def startup(): |
| print("Starting CodeGeeX OpenAI API Adapter server...") |
| load_client_api_keys_from_secrets() |
| load_codegeex_tokens_from_secrets() |
| print("Server initialization completed.") |
|
|
| @app.get("/") |
| def health_check(): |
| return {"status": "ok", "message": "CodeGeeX API Adapter is running."} |
|
|
| def get_models_list_response() -> ModelList: |
| return ModelList(data=[ModelInfo(id=model, created=int(time.time()), owned_by="anthropic") for model in CODEGEEX_MODELS]) |
|
|
| @app.get("/v1/models", response_model=ModelList) |
| async def list_v1_models(_: None = Depends(authenticate_client)): |
| return get_models_list_response() |
|
|
| @app.get("/models", response_model=ModelList) |
| async def list_models_no_auth(): |
| return get_models_list_response() |
|
|
| def _codegeex_stream_generator(response, model: str): |
| stream_id = f"chatcmpl-{uuid.uuid4().hex}" |
| created_time = int(time.time()) |
| yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model, choices=[StreamChoice(delta={'role': 'assistant'})]).json()}\n\n" |
| buffer = "" |
| try: |
| for chunk in response.iter_content(chunk_size=1024): |
| if not chunk: continue |
| buffer += chunk.decode("utf-8", errors='ignore') |
| while "\n\n" in buffer: |
| event_data, buffer = buffer.split("\n\n", 1) |
| event_data = event_data.strip() |
| if not event_data: continue |
| event_type, data_json = None, None |
| for line in event_data.split("\n"): |
| if line.startswith("event:"): event_type = line[6:].strip() |
| elif line.startswith("data:"): |
| try: data_json = json.loads(line[5:].strip()) |
| except: continue |
| if not event_type or not data_json: continue |
| if event_type == "add": |
| delta = data_json.get("text", "") |
| if delta: yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model, choices=[StreamChoice(delta={'content': delta})]).json()}\n\n" |
| elif event_type == "finish": |
| yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model, choices=[StreamChoice(delta={}, finish_reason='stop')]).json()}\n\n" |
| yield "data: [DONE]\n\n" |
| return |
| except Exception as e: |
| log_debug(f"Stream processing error: {e}") |
| yield f"data: {json.dumps({'error': str(e)})}\n\n" |
| yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model, choices=[StreamChoice(delta={}, finish_reason='stop')]).json()}\n\n" |
| yield "data: [DONE]\n\n" |
|
|
| def _build_codegeex_non_stream_response(response, model: str) -> ChatCompletionResponse: |
| full_content = "" |
| buffer = "" |
| for chunk in response.iter_content(chunk_size=1024): |
| if not chunk: continue |
| buffer += chunk.decode("utf-8", errors='ignore') |
| while "\n\n" in buffer: |
| event_data, buffer = buffer.split("\n\n", 1) |
| event_data = event_data.strip() |
| if not event_data: continue |
| event_type, data_json = None, None |
| for line in event_data.split("\n"): |
| if line.startswith("event:"): event_type = line[6:].strip() |
| elif line.startswith("data:"): |
| try: data_json = json.loads(line[5:].strip()) |
| except: continue |
| if not event_type or not data_json: continue |
| if event_type == "add": full_content += data_json.get("text", "") |
| elif event_type == "finish": |
| finish_text = data_json.get("text", "") |
| if finish_text: full_content = finish_text |
| return ChatCompletionResponse(model=model, choices=[ChatCompletionChoice(message=ChatMessage(role="assistant", content=full_content))]) |
| return ChatCompletionResponse(model=model, choices=[ChatCompletionChoice(message=ChatMessage(role="assistant", content=full_content))]) |
|
|
| @app.post("/v1/chat/completions") |
| async def chat_completions(request: ChatCompletionRequest, _: None = Depends(authenticate_client)): |
| if request.model not in CODEGEEX_MODELS: raise HTTPException(status_code=404, detail=f"Model '{request.model}' not found.") |
| if not request.messages: raise HTTPException(status_code=400, detail="No messages provided.") |
| try: prompt, history = _convert_messages_to_codegeex_format(request.messages) |
| except Exception as e: raise HTTPException(status_code=400, detail=f"Failed to process messages: {e}") |
| for attempt in range(len(CODEGEEX_TOKENS) + 1): |
| if attempt == len(CODEGEEX_TOKENS): raise HTTPException(status_code=503, detail="All attempts to contact CodeGeeX API failed.") |
| token = get_best_codegeex_token() |
| if not token: raise HTTPException(status_code=503, detail="No valid CodeGeeX tokens available.") |
| try: |
| payload = {"user_role": 0, "ide": "VSCode", "prompt": prompt, "model": request.model, "history": history, "talkId": f"{uuid.uuid4()}", "plugin_version": "", "locale": "", "agent": None, "candidates": {"candidate_msg_id": "", "candidate_type": "", "selected_candidate": ""}, "ide_version": "", "machineId": ""} |
| headers = {"User-Agent": "Mozilla/5.0", "Accept": "text/event-stream", "Content-Type": "application/json", "code-token": token["token"]} |
| response = requests.post("https://codegeex.cn/prod/code/chatCodeSseV3/chat", data=json.dumps(payload), headers=headers, stream=True, timeout=300.0) |
| response.raise_for_status() |
| if request.stream: return StreamingResponse(_codegeex_stream_generator(response, request.model), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"}) |
| else: return _build_codegeex_non_stream_response(response, request.model) |
| except requests.HTTPError as e: |
| status_code = getattr(e.response, "status_code", 500) |
| with token_rotation_lock: |
| if status_code in [401, 403]: token["is_valid"] = False |
| elif status_code in [429, 500, 502, 503, 504]: token["error_count"] += 1 |
| except Exception as e: |
| with token_rotation_lock: token["error_count"] += 1 |
|
|
|
|