| import requests |
| import json |
| import uuid |
| import time |
| from flask import Flask, request, jsonify, Response |
|
|
| |
| app = Flask(__name__) |
|
|
| |
| GPT_OSS_API_URL = "https://api.gpt-oss.com/chatkit" |
| BASE_GPT_OSS_HEADERS = { |
| 'authority': 'api.gpt-oss.com', |
| 'accept': 'text/event-stream', |
| 'content-type': 'application/json', |
| 'origin': 'https://gpt-oss.com', |
| 'referer': 'https://gpt-oss.com/', |
| 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', |
| 'x-selected-model': 'gpt-oss-120b', |
| } |
|
|
| |
| @app.route('/', methods=['GET']) |
| def root(): |
| return jsonify({ |
| "message": "欢迎使用 GPT-OSS to OpenAI 格式代理API", |
| "status": "ok", |
| "authentication_method": "使用动态API Key作为Session", |
| "api_key_format": "在'Authorization: Bearer'后填入 '你的user_id|你的session令牌'" |
| }) |
|
|
| |
| def create_openai_chunk(content, model="gpt-oss-120b"): |
| return { |
| "id": f"chatcmpl-{str(uuid.uuid4())}", "object": "chat.completion.chunk", |
| "created": int(time.time()), "model": model, |
| "choices": [{"index": 0, "delta": {"content": content}, "finish_reason": None}] |
| } |
|
|
| |
| @app.route('/v1/chat/completions', methods=['POST']) |
| def chat_completions_proxy(): |
| |
| auth_header = request.headers.get('Authorization') |
| if not auth_header or not auth_header.startswith('Bearer '): |
| return jsonify({"error": "缺少或格式错误的Authorization Header。请提供 'Bearer 你的组合密钥'。"}), 401 |
|
|
| combined_key = auth_header.split('Bearer ')[1] |
| if '|' not in combined_key: |
| return jsonify({"error": "API Key格式错误。正确格式为 '你的user_id|你的session令牌'。"}), 401 |
|
|
| try: |
| user_id, session_token = combined_key.split('|', 1) |
| except ValueError: |
| return jsonify({"error": "API Key格式解析失败。请确保格式为 'user_id|session_token'。"}), 401 |
|
|
| |
| request_headers = BASE_GPT_OSS_HEADERS.copy() |
| request_headers['cookie'] = f"user_id={user_id}; session={session_token}" |
| |
| try: |
| openai_request_data = request.json |
| user_prompt = next((m['content'] for m in reversed(openai_request_data.get("messages", [])) if m.get('role') == 'user'), None) |
| if not user_prompt: return jsonify({"error": "未找到用户消息。"}), 400 |
| stream_requested = openai_request_data.get("stream", False) |
| except Exception as e: |
| return jsonify({"error": f"请求格式无效: {e}"}), 400 |
|
|
| request_headers['x-show-reasoning'] = 'true' if stream_requested else 'false' |
| |
| gpt_oss_payload = { |
| "op": "threads.create", |
| "params": {"input": {"text": user_prompt, "content": [{"type": "input_text", "text": user_prompt}]}} |
| } |
| |
| def _internal_proxy_stream(): |
| try: |
| with requests.post( |
| GPT_OSS_API_URL, headers=request_headers, |
| json=gpt_oss_payload, stream=True, timeout=120 |
| ) as response: |
| response.raise_for_status() |
| for line in response.iter_lines(): |
| if line and line.decode('utf-8').startswith('data: '): |
| try: |
| yield json.loads(line.decode('utf-8')[6:]) |
| except json.JSONDecodeError: continue |
| except requests.exceptions.RequestException as e: |
| raise IOError(f"与后端服务通信失败: {e}") |
|
|
| |
| if stream_requested: |
| def stream_formatter(): |
| thinking_buffer = [] |
| thinking_block_sent = False |
| try: |
| for gpt_oss_data in _internal_proxy_stream(): |
| event_type = gpt_oss_data.get('type') |
| update_type = gpt_oss_data.get('update', {}).get('type') |
| |
| if event_type == 'thread.item_updated' and update_type == 'cot.entry_added': |
| thinking_buffer.append(f"- {gpt_oss_data['update']['entry']['content']}") |
| continue |
|
|
| if event_type == 'thread.item_updated' and update_type == 'assistant_message.content_part.text_delta': |
| if not thinking_block_sent and thinking_buffer: |
| all_thoughts = "\n".join(thinking_buffer) |
| formatted_block = f"```markdown\n[思考过程]\n{all_thoughts}\n```\n\n" |
| yield f"data: {json.dumps(create_openai_chunk(formatted_block))}\n\n" |
| thinking_block_sent = True |
|
|
| yield f"data: {json.dumps(create_openai_chunk(gpt_oss_data['update'].get('delta', '')))}\n\n" |
| |
| yield "data: [DONE]\n\n" |
| except IOError as e: |
| yield f"data: {json.dumps({'error': str(e)})}\n\n" |
| return Response(stream_formatter(), mimetype='text/event-stream') |
| else: |
| try: |
| full_response_content = "" |
| for gpt_oss_data in _internal_proxy_stream(): |
| if gpt_oss_data.get('type') == 'thread.item_updated' and gpt_oss_data.get('update', {}).get('type') == 'assistant_message.content_part.text_delta': |
| full_response_content += gpt_oss_data['update'].get('delta', '') |
| |
| final_response = { |
| "id": f"chatcmpl-{str(uuid.uuid4())}", "object": "chat.completion", "created": int(time.time()), |
| "model": "gpt-oss-120b", |
| "choices": [{"index": 0, "message": {"role": "assistant", "content": full_response_content.strip()}, "finish_reason": "stop"}], |
| "usage": {} |
| } |
| return jsonify(final_response) |
| except IOError as e: |
| return jsonify({"error": str(e)}), 500 |
|
|
| |
| if __name__ == '__main__': |
| app.run(host='0.0.0.0', port=7860) |