| """ |
| GeminiCli API Client - Handles all communication with GeminiCli API. |
| This module is used by both OpenAI compatibility layer and native Gemini endpoints. |
| GeminiCli API 客户端 - 处理与 GeminiCli API 的所有通信 |
| """ |
|
|
| import sys |
| from pathlib import Path |
|
|
| |
| if __name__ == "__main__": |
| project_root = Path(__file__).resolve().parent.parent.parent |
| if str(project_root) not in sys.path: |
| sys.path.insert(0, str(project_root)) |
|
|
| import asyncio |
| import json |
| from typing import Any, Dict, Optional |
|
|
| from fastapi import Response |
| from config import get_code_assist_endpoint, get_auto_ban_error_codes |
| from src.api.utils import get_model_group |
| from log import log |
|
|
| from src.credential_manager import CredentialManager |
| from src.httpx_client import stream_post_async, post_async |
|
|
| |
| from src.api.utils import ( |
| handle_error_with_retry, |
| get_retry_config, |
| record_api_call_success, |
| record_api_call_error, |
| parse_and_log_cooldown, |
| ) |
| from src.utils import GEMINICLI_USER_AGENT |
|
|
| |
|
|
| |
| _credential_manager: Optional[CredentialManager] = None |
|
|
|
|
| async def _get_credential_manager() -> CredentialManager: |
| """ |
| 获取全局凭证管理器实例 |
| |
| Returns: |
| CredentialManager实例 |
| """ |
| global _credential_manager |
| if not _credential_manager: |
| _credential_manager = CredentialManager() |
| await _credential_manager.initialize() |
| return _credential_manager |
|
|
|
|
| |
|
|
| async def prepare_request_headers_and_payload( |
| payload: dict, credential_data: dict, target_url: str |
| ): |
| """ |
| 从凭证数据准备请求头和最终payload |
| |
| Args: |
| payload: 原始请求payload |
| credential_data: 凭证数据字典 |
| target_url: 目标URL |
| |
| Returns: |
| 元组: (headers, final_payload, target_url) |
| |
| Raises: |
| Exception: 如果凭证中缺少必要字段 |
| """ |
| token = credential_data.get("token") or credential_data.get("access_token", "") |
| if not token: |
| raise Exception("凭证中没有找到有效的访问令牌(token或access_token字段)") |
|
|
| source_request = payload.get("request", {}) |
|
|
| |
| headers = { |
| "Authorization": f"Bearer {token}", |
| "Content-Type": "application/json", |
| "User-Agent": GEMINICLI_USER_AGENT, |
| } |
| project_id = credential_data.get("project_id", "") |
| if not project_id: |
| raise Exception("项目ID不存在于凭证数据中") |
| final_payload = { |
| "model": payload.get("model"), |
| "project": project_id, |
| "request": source_request, |
| } |
|
|
| return headers, final_payload, target_url |
|
|
|
|
| |
|
|
| async def stream_request( |
| body: Dict[str, Any], |
| native: bool = False, |
| headers: Optional[Dict[str, str]] = None, |
| ): |
| """ |
| 流式请求函数 |
| |
| Args: |
| body: 请求体 |
| native: 是否返回原生bytes流,False则返回str流 |
| headers: 额外的请求头 |
| |
| Yields: |
| Response对象(错误时)或 bytes流/str流(成功时) |
| """ |
| |
| credential_manager = await _get_credential_manager() |
|
|
| model_name = body.get("model", "") |
| model_group = get_model_group(model_name) |
|
|
| |
| cred_result = await credential_manager.get_valid_credential( |
| mode="geminicli", model_key=model_group |
| ) |
|
|
| if not cred_result: |
| |
| yield Response( |
| content=json.dumps({"error": "当前无可用凭证"}), |
| status_code=500, |
| media_type="application/json" |
| ) |
| return |
|
|
| current_file, credential_data = cred_result |
|
|
| |
| try: |
| auth_headers, final_payload, target_url = await prepare_request_headers_and_payload( |
| body, credential_data, |
| f"{await get_code_assist_endpoint()}/v1internal:streamGenerateContent?alt=sse" |
| ) |
|
|
| |
| if headers: |
| auth_headers.update(headers) |
|
|
| except Exception as e: |
| log.error(f"准备请求失败: {e}") |
| yield Response( |
| content=json.dumps({"error": f"准备请求失败: {str(e)}"}), |
| status_code=500, |
| media_type="application/json" |
| ) |
| return |
|
|
| |
| retry_config = await get_retry_config() |
| max_retries = retry_config["max_retries"] |
| retry_interval = retry_config["retry_interval"] |
|
|
| DISABLE_ERROR_CODES = await get_auto_ban_error_codes() |
| last_error_response = None |
|
|
| for attempt in range(max_retries + 1): |
| success_recorded = False |
|
|
| try: |
| async for chunk in stream_post_async( |
| url=target_url, |
| body=final_payload, |
| native=native, |
| headers=auth_headers |
| ): |
| |
| if isinstance(chunk, Response): |
| status_code = chunk.status_code |
| last_error_response = chunk |
|
|
| |
| if status_code == 429 or status_code not in DISABLE_ERROR_CODES: |
| |
| try: |
| error_body = chunk.body.decode('utf-8') if isinstance(chunk.body, bytes) else str(chunk.body) |
| log.warning(f"流式请求失败 (status={status_code}), 凭证: {current_file}, 响应: {error_body[:500]}") |
| except Exception: |
| log.warning(f"流式请求失败 (status={status_code}), 凭证: {current_file}") |
|
|
| |
| cooldown_until = None |
| if status_code == 429: |
| |
| try: |
| error_body = chunk.body.decode('utf-8') if isinstance(chunk.body, bytes) else str(chunk.body) |
| cooldown_until = await parse_and_log_cooldown(error_body, mode="geminicli") |
| except Exception: |
| pass |
|
|
| await record_api_call_error( |
| credential_manager, current_file, status_code, |
| cooldown_until, mode="geminicli", model_key=model_group |
| ) |
|
|
| |
| should_retry = await handle_error_with_retry( |
| credential_manager, status_code, current_file, |
| retry_config["retry_enabled"], attempt, max_retries, retry_interval, |
| mode="geminicli" |
| ) |
|
|
| if should_retry and attempt < max_retries: |
| |
| log.info(f"[STREAM] 重试请求 (attempt {attempt + 2}/{max_retries + 1})...") |
| await asyncio.sleep(retry_interval) |
|
|
| |
| cred_result = await credential_manager.get_valid_credential( |
| mode="geminicli", model_key=model_group |
| ) |
| if not cred_result: |
| log.error("[STREAM] 重试时无可用凭证") |
| yield Response( |
| content=json.dumps({"error": "当前无可用凭证"}), |
| status_code=500, |
| media_type="application/json" |
| ) |
| return |
|
|
| current_file, credential_data = cred_result |
| auth_headers, final_payload, target_url = await prepare_request_headers_and_payload( |
| body, credential_data, |
| f"{await get_code_assist_endpoint()}/v1internal:streamGenerateContent?alt=sse" |
| ) |
| if headers: |
| auth_headers.update(headers) |
| break |
| else: |
| |
| log.error(f"[STREAM] 达到最大重试次数或不应重试,返回原始错误") |
| yield chunk |
| return |
| else: |
| |
| try: |
| error_body = chunk.body.decode('utf-8') if isinstance(chunk.body, bytes) else str(chunk.body) |
| log.error(f"流式请求失败,禁用错误码 (status={status_code}), 凭证: {current_file}, 响应: {error_body[:500]}") |
| except Exception: |
| log.error(f"流式请求失败,禁用错误码 (status={status_code}), 凭证: {current_file}") |
| await record_api_call_error( |
| credential_manager, current_file, status_code, |
| None, mode="geminicli", model_key=model_group |
| ) |
| yield chunk |
| return |
| else: |
| |
| |
| if not success_recorded: |
| await record_api_call_success( |
| credential_manager, current_file, mode="geminicli", model_key=model_group |
| ) |
| success_recorded = True |
|
|
| yield chunk |
|
|
| |
| return |
|
|
| except Exception as e: |
| log.error(f"流式请求异常: {e}, 凭证: {current_file}") |
| if attempt < max_retries: |
| log.info(f"[STREAM] 异常后重试 (attempt {attempt + 2}/{max_retries + 1})...") |
| await asyncio.sleep(retry_interval) |
| continue |
| else: |
| |
| log.error(f"[STREAM] 所有重试均失败,最后异常: {e}") |
| yield last_error_response |
|
|
|
|
| async def non_stream_request( |
| body: Dict[str, Any], |
| headers: Optional[Dict[str, str]] = None, |
| ) -> Response: |
| """ |
| 非流式请求函数 |
| |
| Args: |
| body: 请求体 |
| native: 保留参数以保持接口一致性(实际未使用) |
| headers: 额外的请求头 |
| |
| Returns: |
| Response对象 |
| """ |
| |
| credential_manager = await _get_credential_manager() |
|
|
| model_name = body.get("model", "") |
| model_group = get_model_group(model_name) |
|
|
| |
| cred_result = await credential_manager.get_valid_credential( |
| mode="geminicli", model_key=model_group |
| ) |
|
|
| if not cred_result: |
| |
| return Response( |
| content=json.dumps({"error": "当前无可用凭证"}), |
| status_code=500, |
| media_type="application/json" |
| ) |
|
|
| current_file, credential_data = cred_result |
|
|
| |
| try: |
| auth_headers, final_payload, target_url = await prepare_request_headers_and_payload( |
| body, credential_data, |
| f"{await get_code_assist_endpoint()}/v1internal:generateContent" |
| ) |
|
|
| |
| if headers: |
| auth_headers.update(headers) |
|
|
| except Exception as e: |
| log.error(f"准备请求失败: {e}") |
| return Response( |
| content=json.dumps({"error": f"准备请求失败: {str(e)}"}), |
| status_code=500, |
| media_type="application/json" |
| ) |
|
|
| |
| retry_config = await get_retry_config() |
| max_retries = retry_config["max_retries"] |
| retry_interval = retry_config["retry_interval"] |
|
|
| DISABLE_ERROR_CODES = await get_auto_ban_error_codes() |
| last_error_response = None |
|
|
| for attempt in range(max_retries + 1): |
| try: |
| response = await post_async( |
| url=target_url, |
| json=final_payload, |
| headers=auth_headers, |
| timeout=300.0 |
| ) |
|
|
| status_code = response.status_code |
|
|
| |
| if status_code == 200: |
| await record_api_call_success( |
| credential_manager, current_file, mode="geminicli", model_key=model_group |
| ) |
| |
| response_headers = dict(response.headers) |
| response_headers.pop('content-encoding', None) |
| response_headers.pop('content-length', None) |
|
|
| return Response( |
| content=response.content, |
| status_code=200, |
| headers=response_headers |
| ) |
|
|
| |
| |
| error_headers = dict(response.headers) |
| error_headers.pop('content-encoding', None) |
| error_headers.pop('content-length', None) |
|
|
| last_error_response = Response( |
| content=response.content, |
| status_code=status_code, |
| headers=error_headers |
| ) |
|
|
| |
| if status_code == 429 or status_code not in DISABLE_ERROR_CODES: |
| try: |
| error_text = response.text |
| log.warning(f"非流式请求失败 (status={status_code}), 凭证: {current_file}, 响应: {error_text[:500]}") |
| except Exception: |
| log.warning(f"非流式请求失败 (status={status_code}), 凭证: {current_file}") |
|
|
| |
| cooldown_until = None |
| if status_code == 429: |
| |
| try: |
| error_text = response.text |
| cooldown_until = await parse_and_log_cooldown(error_text, mode="geminicli") |
| except Exception: |
| pass |
|
|
| await record_api_call_error( |
| credential_manager, current_file, status_code, |
| cooldown_until, mode="geminicli", model_key=model_group |
| ) |
|
|
| |
| should_retry = await handle_error_with_retry( |
| credential_manager, status_code, current_file, |
| retry_config["retry_enabled"], attempt, max_retries, retry_interval, |
| mode="geminicli" |
| ) |
|
|
| if should_retry and attempt < max_retries: |
| |
| log.info(f"[NON-STREAM] 重试请求 (attempt {attempt + 2}/{max_retries + 1})...") |
| await asyncio.sleep(retry_interval) |
|
|
| |
| cred_result = await credential_manager.get_valid_credential( |
| mode="geminicli", model_key=model_group |
| ) |
| if not cred_result: |
| log.error("[NON-STREAM] 重试时无可用凭证") |
| return Response( |
| content=json.dumps({"error": "当前无可用凭证"}), |
| status_code=500, |
| media_type="application/json" |
| ) |
|
|
| current_file, credential_data = cred_result |
| auth_headers, final_payload, target_url = await prepare_request_headers_and_payload( |
| body, credential_data, |
| f"{await get_code_assist_endpoint()}/v1internal:generateContent" |
| ) |
| if headers: |
| auth_headers.update(headers) |
| continue |
| else: |
| |
| log.error(f"[NON-STREAM] 达到最大重试次数或不应重试,返回原始错误") |
| return last_error_response |
| else: |
| |
| try: |
| error_text = response.text |
| log.error(f"非流式请求失败,禁用错误码 (status={status_code}), 凭证: {current_file}, 响应: {error_text[:500]}") |
| except Exception: |
| log.error(f"非流式请求失败,禁用错误码 (status={status_code}), 凭证: {current_file}") |
| await record_api_call_error( |
| credential_manager, current_file, status_code, |
| None, mode="geminicli", model_key=model_group |
| ) |
| return last_error_response |
|
|
| except Exception as e: |
| log.error(f"非流式请求异常: {e}, 凭证: {current_file}") |
| if attempt < max_retries: |
| log.info(f"[NON-STREAM] 异常后重试 (attempt {attempt + 2}/{max_retries + 1})...") |
| await asyncio.sleep(retry_interval) |
| continue |
| else: |
| |
| log.error(f"[NON-STREAM] 所有重试均失败,最后异常: {e}") |
| if last_error_response: |
| return last_error_response |
| else: |
| return Response( |
| content=json.dumps({"error": f"请求异常: {str(e)}"}), |
| status_code=500, |
| media_type="application/json" |
| ) |
|
|
| |
| log.error("[NON-STREAM] 所有重试均失败") |
| return last_error_response |
|
|
|
|
| |
|
|
| if __name__ == "__main__": |
| """ |
| 测试代码:演示API返回的流式和非流式数据格式 |
| 运行方式: python src/api/geminicli.py |
| """ |
| print("=" * 80) |
| print("GeminiCli API 测试") |
| print("=" * 80) |
|
|
| |
| test_body = { |
| "model": "gemini-2.5-flash", |
| "request": { |
| "contents": [ |
| { |
| "role": "user", |
| "parts": [{"text": "Hello, tell me a joke in one sentence."}] |
| } |
| ] |
| } |
| } |
|
|
| async def test_stream_request(): |
| """测试流式请求""" |
| print("\n" + "=" * 80) |
| print("【测试1】流式请求 (stream_request with native=False)") |
| print("=" * 80) |
| print(f"请求体: {json.dumps(test_body, indent=2, ensure_ascii=False)}\n") |
|
|
| print("流式响应数据 (每个chunk):") |
| print("-" * 80) |
|
|
| chunk_count = 0 |
| async for chunk in stream_request(body=test_body, native=False): |
| chunk_count += 1 |
| if isinstance(chunk, Response): |
| |
| print(f"\n❌ 错误响应:") |
| print(f" 状态码: {chunk.status_code}") |
| print(f" Content-Type: {chunk.headers.get('content-type', 'N/A')}") |
| try: |
| content = chunk.body.decode('utf-8') if isinstance(chunk.body, bytes) else str(chunk.body) |
| print(f" 内容: {content}") |
| except Exception as e: |
| print(f" 内容解析失败: {e}") |
| else: |
| |
| print(f"\nChunk #{chunk_count}:") |
| print(f" 类型: {type(chunk).__name__}") |
| print(f" 长度: {len(chunk) if hasattr(chunk, '__len__') else 'N/A'}") |
| print(f" 内容预览: {repr(chunk[:200] if len(chunk) > 200 else chunk)}") |
|
|
| |
| if isinstance(chunk, str) and chunk.startswith("data: "): |
| try: |
| data_line = chunk.strip() |
| if data_line.startswith("data: "): |
| json_str = data_line[6:] |
| json_data = json.loads(json_str) |
| print(f" 解析后的JSON: {json.dumps(json_data, indent=4, ensure_ascii=False)}") |
| except Exception as e: |
| print(f" SSE解析尝试失败: {e}") |
|
|
| print(f"\n总共收到 {chunk_count} 个chunk") |
|
|
| async def test_non_stream_request(): |
| """测试非流式请求""" |
| print("\n" + "=" * 80) |
| print("【测试2】非流式请求 (non_stream_request)") |
| print("=" * 80) |
| print(f"请求体: {json.dumps(test_body, indent=2, ensure_ascii=False)}\n") |
|
|
| response = await non_stream_request(body=test_body) |
|
|
| print("非流式响应数据:") |
| print("-" * 80) |
| print(f"状态码: {response.status_code}") |
| print(f"Content-Type: {response.headers.get('content-type', 'N/A')}") |
| print(f"\n响应头: {dict(response.headers)}\n") |
|
|
| try: |
| content = response.body.decode('utf-8') if isinstance(response.body, bytes) else str(response.body) |
| print(f"响应内容 (原始):\n{content}\n") |
|
|
| |
| try: |
| json_data = json.loads(content) |
| print(f"响应内容 (格式化JSON):") |
| print(json.dumps(json_data, indent=2, ensure_ascii=False)) |
| except json.JSONDecodeError: |
| print("(非JSON格式)") |
| except Exception as e: |
| print(f"内容解析失败: {e}") |
|
|
| async def main(): |
| """主测试函数""" |
| try: |
| |
| await test_stream_request() |
|
|
| |
| await test_non_stream_request() |
|
|
| print("\n" + "=" * 80) |
| print("测试完成") |
| print("=" * 80) |
|
|
| except Exception as e: |
| print(f"\n❌ 测试过程中出现异常: {e}") |
| import traceback |
| traceback.print_exc() |
|
|
| |
| asyncio.run(main()) |
|
|