| import json |
| import time |
| import uuid |
| import html |
| import hashlib |
| from datetime import datetime |
| from typing import Dict, List, Any, Optional |
| from flask import request, Response, stream_with_context, jsonify, render_template, redirect, url_for, flash |
| from datetime import datetime |
|
|
| from utils import logger, generate_request_id, count_tokens, count_message_tokens |
| import config |
| from auth import RateLimiter |
| from client import OnDemandAPIClient |
| from datetime import timedelta |
|
|
| |
| |
|
|
| |
| config_instance = config.config_instance |
| rate_limiter = RateLimiter(config_instance.get('rate_limit_per_minute', 60)) |
|
|
| |
| |
|
|
| def format_datetime(timestamp): |
| """将ISO格式时间戳格式化为更易读的格式""" |
| if not timestamp or timestamp == "从未保存": |
| return timestamp |
| |
| try: |
| |
| if 'T' in timestamp: |
| dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00')) |
| return dt.strftime('%Y-%m-%d %H:%M:%S') |
| |
| return timestamp |
| except Exception: |
| return timestamp |
|
|
| def format_number(value): |
| """根据数值大小自动转换单位""" |
| if value is None or value == '-': |
| return '-' |
| |
| try: |
| value = float(value) |
| if value >= 1000000000000: |
| return f"{value/1000000000000:.2f}T" |
| elif value >= 1000000000: |
| return f"{value/1000000000:.2f}G" |
| elif value >= 1000000: |
| return f"{value/1000000:.2f}M" |
| elif value >= 1000: |
| return f"{value/1000:.2f}K" |
| elif value == 0: |
| return "0" |
| elif abs(value) < 0.01: |
| return f"{value:.2e}" |
| else: |
| return f"{value:.0f}" if value == int(value) else f"{value:.2f}" |
| except (ValueError, TypeError): |
| return str(value) |
|
|
| def format_duration(ms): |
| """将毫秒格式化为更易读的格式""" |
| if ms is None or ms == '-': |
| return '-' |
| |
| try: |
| ms = float(ms) |
| if ms >= 86400000: |
| return f"{ms/86400000:.2f}天" |
| elif ms >= 3600000: |
| return f"{ms/3600000:.2f}小时" |
| elif ms >= 60000: |
| return f"{ms/60000:.2f}分钟" |
| elif ms >= 1000: |
| return f"{ms/1000:.2f}秒" |
| else: |
| return f"{ms:.0f}" if ms == int(ms) else f"{ms:.2f}毫秒" |
| except (ValueError, TypeError): |
| return str(ms) |
|
|
| def _update_usage_statistics( |
| config_inst, |
| request_id: str, |
| requested_model_name: str, |
| account_email: Optional[str], |
| is_success: bool, |
| duration_ms: int, |
| is_stream: bool, |
| prompt_tokens_val: int, |
| completion_tokens_val: int, |
| total_tokens_val: int, |
| prompt_length: Optional[int] = None, |
| completion_length: Optional[int] = None, |
| error_message: Optional[str] = None, |
| used_actual_tokens_for_history: bool = False |
| ): |
| """更新使用统计与请求历史的辅助函数。""" |
| with config_inst.usage_stats_lock: |
| config_inst.usage_stats["total_requests"] += 1 |
| |
| current_email_for_stats = account_email if account_email else "unknown_account" |
|
|
| if is_success: |
| config_inst.usage_stats["successful_requests"] += 1 |
| config_inst.usage_stats["model_usage"].setdefault(requested_model_name, 0) |
| config_inst.usage_stats["model_usage"][requested_model_name] += 1 |
| |
| config_inst.usage_stats["account_usage"].setdefault(current_email_for_stats, 0) |
| config_inst.usage_stats["account_usage"][current_email_for_stats] += 1 |
|
|
| config_inst.usage_stats["total_prompt_tokens"] += prompt_tokens_val |
| config_inst.usage_stats["total_completion_tokens"] += completion_tokens_val |
| config_inst.usage_stats["total_tokens"] += total_tokens_val |
| config_inst.usage_stats["model_tokens"].setdefault(requested_model_name, 0) |
| config_inst.usage_stats["model_tokens"][requested_model_name] += total_tokens_val |
|
|
| today = datetime.now().strftime("%Y-%m-%d") |
| hour = datetime.now().strftime("%Y-%m-%d %H:00") |
| |
| config_inst.usage_stats["daily_usage"].setdefault(today, 0) |
| config_inst.usage_stats["daily_usage"][today] += 1 |
| |
| config_inst.usage_stats["hourly_usage"].setdefault(hour, 0) |
| config_inst.usage_stats["hourly_usage"][hour] += 1 |
| |
| config_inst.usage_stats["daily_tokens"].setdefault(today, 0) |
| config_inst.usage_stats["daily_tokens"][today] += total_tokens_val |
| |
| config_inst.usage_stats["hourly_tokens"].setdefault(hour, 0) |
| config_inst.usage_stats["hourly_tokens"][hour] += total_tokens_val |
| else: |
| config_inst.usage_stats["failed_requests"] += 1 |
|
|
| history_entry = { |
| "id": request_id, |
| "timestamp": datetime.now().isoformat(), |
| "model": requested_model_name, |
| "account": current_email_for_stats, |
| "success": is_success, |
| "duration_ms": duration_ms, |
| "stream": is_stream, |
| } |
|
|
| if is_success: |
| if prompt_length is not None: |
| history_entry["prompt_length"] = prompt_length |
| if completion_length is not None: |
| history_entry["completion_length"] = completion_length |
| |
| if is_stream: |
| if used_actual_tokens_for_history: |
| history_entry["prompt_tokens"] = prompt_tokens_val |
| history_entry["completion_tokens"] = completion_tokens_val |
| history_entry["total_tokens"] = total_tokens_val |
| else: |
| history_entry["prompt_tokens"] = prompt_tokens_val |
| history_entry["estimated_completion_tokens"] = completion_tokens_val |
| history_entry["estimated_total_tokens"] = total_tokens_val |
| else: |
| history_entry["prompt_tokens"] = prompt_tokens_val |
| history_entry["completion_tokens"] = completion_tokens_val |
| history_entry["total_tokens"] = total_tokens_val |
| else: |
| if error_message: |
| history_entry["error"] = error_message |
| if prompt_tokens_val > 0: |
| history_entry["prompt_tokens_attempted"] = prompt_tokens_val |
|
|
| config_inst.usage_stats["request_history"].append(history_entry) |
| max_history_items = config_inst.get('max_history_items', 1000) |
| if len(config_inst.usage_stats["request_history"]) > max_history_items: |
| config_inst.usage_stats["request_history"] = \ |
| config_inst.usage_stats["request_history"][-max_history_items:] |
|
|
| def _generate_hash_for_full_history(full_messages_list: List[Dict[str, str]], req_id: str) -> Optional[str]: |
| """ |
| Generates a SHA256 hash from a list of messages, considering all messages. |
| """ |
| if not full_messages_list: |
| logger.debug(f"[{req_id}] (_generate_hash_for_full_history) No messages to hash.") |
| return None |
| try: |
| |
| |
| simplified_history = [{"role": msg.get("role"), "content": msg.get("content")} for msg in full_messages_list] |
| serialized_history = json.dumps(simplified_history, sort_keys=True) |
| return hashlib.sha256(serialized_history.encode('utf-8')).hexdigest() |
| except (TypeError, ValueError) as e: |
| logger.error(f"[{req_id}] (_generate_hash_for_full_history) Failed to serialize full history messages for hashing: {e}") |
| return None |
|
|
| def _update_client_context_hash_after_reply( |
| original_request_messages: List[Dict[str, str]], |
| assistant_reply_content: str, |
| request_id: str, |
| user_identifier: str, |
| email_for_stats: Optional[str], |
| current_ondemand_client_instance: Optional[OnDemandAPIClient], |
| config_inst: config.Config, |
| logger_instance |
| ): |
| """ |
| Helper to update the client's active_context_hash after a successful reply |
| using the full conversation history up to the assistant's reply. |
| """ |
| if not assistant_reply_content or not email_for_stats or not current_ondemand_client_instance: |
| logger_instance.debug(f"[{request_id}] 更新客户端上下文哈希的条件不足(回复内容 '{bool(assistant_reply_content)}', 邮箱 '{email_for_stats}', 客户端实例 '{bool(current_ondemand_client_instance)}'),跳过。") |
| return |
|
|
| assistant_message = {"role": "assistant", "content": assistant_reply_content} |
| |
| full_history_up_to_assistant_reply = original_request_messages + [assistant_message] |
| |
| next_active_context_hash = _generate_hash_for_full_history(full_history_up_to_assistant_reply, request_id) |
| |
| if next_active_context_hash: |
| with config_inst.client_sessions_lock: |
| if user_identifier in config_inst.client_sessions and \ |
| email_for_stats in config_inst.client_sessions[user_identifier]: |
| |
| session_data_to_update = config_inst.client_sessions[user_identifier][email_for_stats] |
| client_in_session = session_data_to_update.get("client") |
|
|
| |
| logger_instance.debug(f"[{request_id}] HASH_UPDATE_DEBUG: client_in_session id={id(client_in_session)}, email={getattr(client_in_session, 'email', 'N/A')}, session_id={getattr(client_in_session, 'session_id', 'N/A')}") |
| logger_instance.debug(f"[{request_id}] HASH_UPDATE_DEBUG: current_ondemand_client_instance id={id(current_ondemand_client_instance)}, email={getattr(current_ondemand_client_instance, 'email', 'N/A')}, session_id={getattr(current_ondemand_client_instance, 'session_id', 'N/A')}") |
| logger_instance.debug(f"[{request_id}] HASH_UPDATE_DEBUG: Comparison result (client_in_session == current_ondemand_client_instance): {client_in_session == current_ondemand_client_instance}") |
| logger_instance.debug(f"[{request_id}] HASH_UPDATE_DEBUG: Comparison result (client_in_session is current_ondemand_client_instance): {client_in_session is current_ondemand_client_instance}") |
| |
|
|
| if client_in_session == current_ondemand_client_instance: |
| old_hash = session_data_to_update.get("active_context_hash") |
| session_data_to_update["active_context_hash"] = next_active_context_hash |
| session_data_to_update["last_time"] = datetime.now() |
| logger_instance.info(f"[{request_id}] 客户端 (账户: {email_for_stats}) 的 active_context_hash 已从 '{old_hash}' 更新为 '{next_active_context_hash}' 以反映对话进展。") |
| else: |
| logger_instance.warning(f"[{request_id}] 尝试更新哈希时,发现 email_for_stats '{email_for_stats}' 对应的存储客户端与当前使用的 ondemand_client 不一致。跳过更新。") |
| else: |
| logger_instance.warning(f"[{request_id}] 尝试更新哈希时,在 client_sessions 中未找到用户 '{user_identifier}' 或账户 '{email_for_stats}'。跳过更新。") |
| else: |
| logger_instance.warning(f"[{request_id}] 未能为下一次交互生成新的 active_context_hash (基于回复 '{bool(assistant_reply_content)}'). 客户端的哈希未更新。") |
| |
| def _get_context_key_from_messages(messages: List[Dict[str, str]], req_id: str) -> Optional[str]: |
| """ |
| 从末次用户消息前的消息列表生成上下文哈希密钥。 |
| """ |
| if not messages: |
| logger.debug(f"[{req_id}] 无消息可供生成上下文密钥。") |
| return None |
|
|
| last_user_msg_idx = -1 |
| for i in range(len(messages) - 1, -1, -1): |
| if messages[i].get('role') == 'user': |
| last_user_msg_idx = i |
| break |
| |
| |
| if last_user_msg_idx <= 0: |
| logger.debug(f"[{req_id}] 无先前历史可生成上下文密钥 (last_user_msg_idx: {last_user_msg_idx})。") |
| return None |
| |
| historical_messages = messages[:last_user_msg_idx] |
| if not historical_messages: |
| logger.debug(f"[{req_id}] 上下文密钥的历史消息列表为空。") |
| return None |
|
|
| try: |
| |
| |
| simplified_history = [{"role": msg.get("role"), "content": msg.get("content")} for msg in historical_messages] |
| serialized_history = json.dumps(simplified_history, sort_keys=True) |
| return hashlib.sha256(serialized_history.encode('utf-8')).hexdigest() |
| except (TypeError, ValueError) as e: |
| logger.error(f"[{req_id}] 序列化历史消息以生成上下文密钥失败: {e}") |
| return None |
|
|
| def register_routes(app): |
| """注册所有路由到Flask应用""" |
| |
| |
| app.jinja_env.filters['format_datetime'] = format_datetime |
| app.jinja_env.filters['format_number'] = format_number |
| app.jinja_env.filters['format_duration'] = format_duration |
| |
| @app.route('/health', methods=['GET']) |
| def health_check(): |
| """健康检查端点,返回服务状态""" |
| return {"status": "ok", "message": "2API服务运行正常"}, 200 |
| |
| @app.route('/v1/models', methods=['GET']) |
| def list_models(): |
| """以 OpenAI 格式返回可用模型列表。""" |
| data = [] |
| |
| created_time = int(time.time()) |
| model_mapping = config_instance._model_mapping |
| for openai_name in model_mapping.keys(): |
| data.append({ |
| "id": openai_name, |
| "object": "model", |
| "created": created_time, |
| "owned_by": "on-demand.io" |
| }) |
| return {"object": "list", "data": data} |
| |
| @app.route('/v1/chat/completions', methods=['POST']) |
| def chat_completions(): |
| """处理聊天补全请求,兼容 OpenAI 格式。""" |
| request_id = generate_request_id() |
| logger.info(f"[{request_id}] CHAT_COMPLETIONS_ENTRY_POINT") |
| client_ip = request.remote_addr |
| logger.info(f"[{request_id}] 收到来自 IP: {client_ip} 的 /v1/chat/completions 请求") |
|
|
| |
| logger.info(f"[{request_id}] DEBUG_ENTRY: 进入 chat_completions。") |
| |
| |
| auth_header = request.headers.get('Authorization') |
| if not auth_header or not auth_header.startswith('Bearer '): |
| logger.warning(f"[{request_id}] 未提供认证令牌或格式错误") |
| return {"error": {"message": "缺少有效的认证令牌", "type": "auth_error", "code": "missing_token"}}, 401 |
| |
| |
| api_access_token = config_instance.get('api_access_token') |
| token = auth_header[7:] |
| if token != api_access_token: |
| logger.warning(f"[{request_id}] 提供了无效的认证令牌") |
| return {"error": {"message": "无效的认证令牌", "type": "auth_error", "code": "invalid_token"}}, 401 |
|
|
| |
| if not rate_limiter.is_allowed(token): |
| logger.warning(f"[{request_id}] 用户 {token[:8]}... 超过速率限制") |
| return {"error": {"message": "请求频率过高,请稍后再试", "type": "rate_limit_error", "code": "rate_limit_exceeded"}}, 429 |
|
|
| openai_data = request.get_json() |
| if not openai_data: |
| logger.error(f"[{request_id}] 请求体不是有效的JSON") |
| return {"error": {"message": "请求体必须是 JSON。", "type": "invalid_request_error", "code": None}}, 400 |
| |
| if app.config.get('DEBUG_MODE', False): |
| logger.debug(f"[{request_id}] OpenAI 请求数据: {json.dumps(openai_data, indent=2, ensure_ascii=False)}") |
|
|
| |
| |
| initial_messages_from_request: List[Dict[str, str]] = openai_data.get('messages', []) |
| messages: List[Dict[str, str]] = initial_messages_from_request |
| stream_requested: bool = openai_data.get('stream', False) |
| |
| model_mapping = config_instance._model_mapping |
| default_endpoint_id = config_instance.get('default_endpoint_id') |
| requested_model_name: str = openai_data.get('model', list(model_mapping.keys())[0] if model_mapping else default_endpoint_id) |
| |
| |
| temperature: Optional[float] = openai_data.get('temperature') |
| max_tokens: Optional[int] = openai_data.get('max_tokens') |
| top_p: Optional[float] = openai_data.get('top_p') |
| frequency_penalty: Optional[float] = openai_data.get('frequency_penalty') |
| presence_penalty: Optional[float] = openai_data.get('presence_penalty') |
|
|
| if not messages: |
| logger.error(f"[{request_id}] 缺少 'messages' 字段") |
| return {"error": {"message": "缺少 'messages' 字段。", "type": "invalid_request_error", "code": "missing_messages"}}, 400 |
|
|
| |
| |
| |
| |
|
|
| |
| logger.info(f"[{request_id}] DEBUG_PRE_HASH_COMPUTATION: 即将计算 request_context_hash。") |
| request_context_hash = _get_context_key_from_messages(messages, request_id) |
| logger.info(f"[{request_id}] 请求上下文哈希值: {repr(request_context_hash)}") |
|
|
| logger.info(f"[{request_id}] DEBUG_POINT_A: 即将初始化 historical_messages。") |
| historical_messages = [] |
| logger.info(f"[{request_id}] DEBUG_POINT_B: historical_messages 初始化为空列表。即将检查 request_context_hash ({repr(request_context_hash)}).") |
|
|
| if request_context_hash: |
| logger.info(f"[{request_id}] DEBUG_POINT_C: request_context_hash ({repr(request_context_hash)}) 为真,进入历史提取块。") |
| last_user_idx = -1 |
| try: |
| for i in range(len(messages) - 1, -1, -1): |
| if messages[i].get('role') == 'user': last_user_idx = i; break |
| except Exception as e_loop: |
| logger.error(f"[{request_id}] DEBUG_LOOP_ERROR: 在查找 last_user_idx 的循环中发生错误: {e_loop}") |
| last_user_idx = -1 |
|
|
| logger.info(f"[{request_id}] DEBUG_POINT_D: last_user_idx = {last_user_idx}") |
| if last_user_idx > 0: |
| try: |
| historical_messages = messages[:last_user_idx] |
| logger.info(f"[{request_id}] DEBUG_POINT_E: historical_messages 赋值自 messages[:{last_user_idx}]") |
| except Exception as e_slice: |
| logger.error(f"[{request_id}] DEBUG_SLICE_ERROR: 在切片 messages[:{last_user_idx}] 时发生错误: {e_slice}") |
| historical_messages = [] |
| |
| if historical_messages: |
| logger.info(f"[{request_id}] DEBUG_HISTORICAL_CONTENT: 'historical_messages' 提取后内容: {json.dumps(historical_messages, ensure_ascii=False, indent=2)}") |
| else: |
| logger.info(f"[{request_id}] DEBUG_HISTORICAL_EMPTY: 'historical_messages' 提取后为空列表。last_user_idx={last_user_idx}, request_context_hash='{request_context_hash}'") |
| |
| elif not request_context_hash: |
| logger.info(f"[{request_id}] DEBUG_HISTORICAL_NOHASH: 'request_context_hash' ({repr(request_context_hash)}) 为假, 'historical_messages' 保持为空列表。") |
|
|
| logger.info(f"[{request_id}] DEBUG_POST_HISTORICAL_EXTRACTION: 即将提取 system 和 user query。") |
| current_system_prompts_contents = [msg['content'] for msg in messages if msg.get('role') == 'system' and msg.get('content')] |
| system_prompt_combined = "\n".join(current_system_prompts_contents) |
| |
| current_user_messages_contents = [msg['content'] for msg in messages if msg.get('role') == 'user' and msg.get('content')] |
| current_user_query = current_user_messages_contents[-1] if current_user_messages_contents else "" |
|
|
| if not current_user_query: |
| logger.error(f"[{request_id}] 'messages' 中未找到有效的 'user' 角色的消息内容。") |
| |
| logger.debug(f"[{request_id}] 接收到的消息: {json.dumps(messages, ensure_ascii=False)}") |
| return {"error": {"message": "'messages' 中未找到有效的 'user' 角色的消息内容。", "type": "invalid_request_error", "code": "no_user_message"}}, 400 |
| |
| user_identifier = token |
| |
| request_start_time = time.time() |
| ondemand_client = None |
| email_for_stats = None |
| |
| is_newly_assigned_context = True |
| |
| |
| ondemand_session_timeout_minutes = config_instance.get('ondemand_session_timeout_minutes', 30) |
| logger.info(f"[{request_id}] OnDemand 会话超时设置为: {ondemand_session_timeout_minutes} 分钟。") |
| |
| session_timeout_delta = timedelta(minutes=ondemand_session_timeout_minutes) |
|
|
| with config_instance.client_sessions_lock: |
| current_time_dt = datetime.now() |
| if user_identifier not in config_instance.client_sessions: |
| config_instance.client_sessions[user_identifier] = {} |
| user_sessions_for_id = config_instance.client_sessions[user_identifier] |
|
|
| |
| |
| sorted_sessions = sorted( |
| user_sessions_for_id.items(), |
| key=lambda item: item[1].get("last_time", datetime.min), |
| reverse=True |
| ) |
|
|
| for acc_email_p0, session_data_p0 in sorted_sessions: |
| client_p0 = session_data_p0.get("client") |
| last_time_p0 = session_data_p0.get("last_time") |
|
|
| if client_p0 and client_p0.token and client_p0.session_id and last_time_p0: |
| if (current_time_dt - last_time_p0) < session_timeout_delta: |
| stored_active_hash = session_data_p0.get("active_context_hash") |
| hash_match_status = "匹配" if stored_active_hash == request_context_hash else "不匹配" |
| logger.info(f"[{request_id}] 阶段0: 找到账户 {acc_email_p0} 的活跃会话。请求上下文哈希 ({request_context_hash or 'None'}) 与存储哈希 ({stored_active_hash or 'None'}) {hash_match_status}。") |
|
|
| |
| if stored_active_hash == request_context_hash: |
| |
| ondemand_client = client_p0 |
| email_for_stats = acc_email_p0 |
| ondemand_client._associated_user_identifier = user_identifier |
| ondemand_client._associated_request_ip = client_ip |
| session_data_p0["last_time"] = current_time_dt |
| session_data_p0["ip"] = client_ip |
| is_newly_assigned_context = False |
| logger.info(f"[{request_id}] 阶段0: 上下文哈希匹配,复用账户 {email_for_stats} 的活跃会话。") |
| break |
| else: |
| logger.info(f"[{request_id}] 阶段0: 上下文哈希不匹配,跳过复用此活跃会话。") |
| |
|
|
| |
| if not ondemand_client and request_context_hash: |
| for acc_email_p1, session_data_p1 in user_sessions_for_id.items(): |
| client_p1 = session_data_p1.get("client") |
| if client_p1 and client_p1.token and client_p1.session_id and \ |
| session_data_p1.get("active_context_hash") == request_context_hash: |
| |
| |
| last_time_p1 = session_data_p1.get("last_time") |
| if last_time_p1 and (current_time_dt - last_time_p1) >= session_timeout_delta: |
| logger.info(f"[{request_id}] 阶段1: 找到精确哈希匹配的账户 {acc_email_p1},但其会话已超时。将跳过并尝试创建新会话。") |
| continue |
|
|
| ondemand_client = client_p1 |
| email_for_stats = acc_email_p1 |
| ondemand_client._associated_user_identifier = user_identifier |
| ondemand_client._associated_request_ip = client_ip |
| session_data_p1["last_time"] = current_time_dt |
| session_data_p1["ip"] = client_ip |
| is_newly_assigned_context = False |
| logger.info(f"[{request_id}] 阶段1: 上下文精确匹配。复用账户 {email_for_stats} 的客户端 (上下文哈希: {request_context_hash})。") |
| break |
| |
| |
| if not ondemand_client: |
| logger.info(f"[{request_id}] 阶段0及阶段1均未找到可复用会话 (请求上下文哈希: {request_context_hash or 'None'})。尝试获取/创建新客户端会话。") |
| MAX_ACCOUNT_ATTEMPTS = config_instance.get('max_account_attempts', 3) |
| for attempt in range(MAX_ACCOUNT_ATTEMPTS): |
| new_ondemand_email, new_ondemand_password = config.get_next_ondemand_account_details() |
| if not new_ondemand_email: |
| logger.error(f"[{request_id}] 尝试 {attempt+1} 次后,配置中无可用 OnDemand 账户。") |
| break |
|
|
| email_for_stats = new_ondemand_email |
| |
| |
| |
| |
| |
| |
| logger.info(f"[{request_id}] 阶段2: 为账户 {new_ondemand_email} 创建新客户端实例和会话 (尝试 {attempt+1})。") |
| client_id_for_log = f"{user_identifier[:8]}-{new_ondemand_email.split('@')[0]}-{request_id[:4]}" |
| temp_ondemand_client = OnDemandAPIClient(new_ondemand_email, new_ondemand_password, client_id=client_id_for_log) |
| |
| if not temp_ondemand_client.sign_in() or not temp_ondemand_client.create_session(): |
| logger.error(f"[{request_id}] 为 {new_ondemand_email} 初始化新客户端会话失败: {temp_ondemand_client.last_error}") |
| |
| |
| continue |
| |
| ondemand_client = temp_ondemand_client |
| ondemand_client._associated_user_identifier = user_identifier |
| ondemand_client._associated_request_ip = client_ip |
| |
| user_sessions_for_id[new_ondemand_email] = { |
| "client": ondemand_client, |
| "last_time": current_time_dt, |
| "ip": client_ip, |
| "active_context_hash": request_context_hash |
| } |
| is_newly_assigned_context = True |
| logger.info(f"[{request_id}] 阶段2: 已为账户 {email_for_stats} 成功创建/分配新客户端会话 (is_newly_assigned_context=True, 关联上下文哈希: {request_context_hash or 'None'})。") |
| break |
| |
| if not ondemand_client: |
| |
| last_attempt_error = temp_ondemand_client.last_error if 'temp_ondemand_client' in locals() and temp_ondemand_client else '未知错误' |
| logger.error(f"[{request_id}] 尝试 {MAX_ACCOUNT_ATTEMPTS} 次后获取/创建客户端失败 (is_newly_assigned_context 保持为 {is_newly_assigned_context})。最后一次尝试失败原因: {last_attempt_error}") |
| |
| prompt_tok_val_err, _, _ = count_message_tokens(messages, requested_model_name) |
| _update_usage_statistics( |
| config_inst=config_instance, request_id=request_id, requested_model_name=requested_model_name, |
| account_email=email_for_stats, |
| is_success=False, duration_ms=int((time.time() - request_start_time) * 1000), |
| is_stream=stream_requested, prompt_tokens_val=prompt_tok_val_err or 0, |
| completion_tokens_val=0, total_tokens_val=prompt_tok_val_err or 0, |
| error_message=f"多次尝试后获取/创建客户端会话失败。最后一次尝试失败原因: {last_attempt_error}" |
| ) |
| return {"error": {"message": f"当前无法与 OnDemand 服务建立会话。最后一次尝试失败原因: {last_attempt_error}", "type": "api_error", "code": "ondemand_session_unavailable"}}, 503 |
|
|
| |
|
|
| |
| final_query_to_ondemand = "" |
| query_parts = [] |
|
|
| |
| logger.debug(f"[{request_id}] 查询构建前状态:is_newly_assigned_context={is_newly_assigned_context}, request_context_hash='{request_context_hash}', historical_messages_empty={not bool(historical_messages)}") |
| if historical_messages: |
| logger.debug(f"[{request_id}] 查询构建前状态:historical_messages 内容: {json.dumps(historical_messages, ensure_ascii=False, indent=2)}") |
| else: |
| logger.debug(f"[{request_id}] 查询构建前状态:historical_messages 为空列表。") |
|
|
| if is_newly_assigned_context: |
| |
| logger.info(f"[{request_id}] 查询构建:会话为新建/重分配 (is_newly_assigned_context=True, 账户: {email_for_stats})。") |
| |
| |
| if system_prompt_combined: |
| query_parts.append(f"System: {system_prompt_combined}") |
| logger.debug(f"[{request_id}] 查询构建:新建会话,添加了合并的系统提示。") |
|
|
| if request_context_hash and historical_messages: |
| logger.info(f"[{request_id}] 查询构建:存在历史上下文 ({request_context_hash}),将发送历史消息。") |
| formatted_historical_parts = [] |
| for msg in historical_messages: |
| role = msg.get('role', 'unknown').capitalize() |
| content = msg.get('content', '') |
| if content: formatted_historical_parts.append(f"{role}: {content}") |
| if formatted_historical_parts: query_parts.append("\n".join(formatted_historical_parts)) |
| else: |
| logger.info(f"[{request_id}] 查询构建:无历史上下文。仅发送当前用户查询。") |
|
|
| else: |
| |
| |
| stored_active_hash = "N/A" |
| if ondemand_client: |
| |
| client_session_data = config_instance.client_sessions.get(user_identifier, {}).get(email_for_stats, {}) |
| stored_active_hash = client_session_data.get('active_context_hash', 'N/A') |
|
|
| hash_match_status = "匹配" if stored_active_hash == request_context_hash else "不匹配" |
| logger.info(f"[{request_id}] 查询构建:复用现有会话 (is_newly_assigned_context=False, 账户: {email_for_stats})。不发送历史消息或系统提示。请求上下文哈希 ({request_context_hash or 'None'}) 与存储哈希 ({stored_active_hash or 'None'}) {hash_match_status}。") |
|
|
| |
| if current_user_query: |
| query_parts.append(f"User: {current_user_query}") |
| logger.debug(f"[{request_id}] 查询构建:添加了当前用户查询。") |
| else: |
| logger.error(f"[{request_id}] 严重错误: 最终查询构建时 current_user_query 为空!") |
| if not query_parts: query_parts.append(" ") |
|
|
| final_query_to_ondemand = "\n\n".join(filter(None, query_parts)) |
| if not final_query_to_ondemand.strip(): |
| logger.warning(f"[{request_id}] 构建的查询为空或全为空格。发送占位符查询。") |
| final_query_to_ondemand = " " |
| |
| logger.info(f"[{request_id}] 构建的 OnDemand 查询 (前1000字符): {final_query_to_ondemand[:1000]}...") |
|
|
| |
| endpoint_id = model_mapping.get(requested_model_name, default_endpoint_id) |
| if requested_model_name not in model_mapping: |
| logger.warning(f"[{request_id}] 模型 '{requested_model_name}' 不在映射表中, 将使用默认端点 '{default_endpoint_id}'.") |
|
|
| |
| model_configs = {} |
| |
| |
| if temperature is not None: |
| model_configs["temperature"] = temperature |
| if max_tokens is not None: |
| model_configs["maxTokens"] = max_tokens |
| if top_p is not None: |
| model_configs["topP"] = top_p |
| if frequency_penalty is not None: |
| model_configs["frequency_penalty"] = frequency_penalty |
| if presence_penalty is not None: |
| model_configs["presence_penalty"] = presence_penalty |
| |
| logger.info(f"[{request_id}] 构建的模型配置: {json.dumps(model_configs, ensure_ascii=False)}") |
| |
| |
| |
| |
| |
| if ondemand_client: |
| ondemand_client._current_request_context_hash = request_context_hash |
| logger.debug(f"[{request_id}] Stored request_context_hash ('{request_context_hash}') onto ondemand_client instance before send_query.") |
| else: |
| logger.error(f"[{request_id}] CRITICAL: ondemand_client is None before send_query. This should not happen.") |
| |
| |
|
|
| |
| ondemand_result = ondemand_client.send_query(final_query_to_ondemand, endpoint_id=endpoint_id, |
| stream=stream_requested, model_configs_input=model_configs) |
| |
| |
| if stream_requested: |
| |
| def generate_openai_stream(captured_initial_request_messages: List[Dict[str, str]]): |
| full_assistant_reply_parts = [] |
| stream_response_obj = ondemand_result.get("response_obj") |
| if not stream_response_obj: |
| |
| prompt_tokens, _, _ = count_message_tokens(messages, requested_model_name) |
| |
| if prompt_tokens is None: |
| prompt_tokens = 0 |
| |
| estimated_completion_tokens = 0 |
| |
| estimated_total_tokens = prompt_tokens |
| |
| error_json = { |
| "id": request_id, |
| "object": "chat.completion.chunk", |
| "created": int(time.time()), |
| "model": requested_model_name, |
| "choices": [{"delta": {"content": "[流错误:未获取到响应对象]"}, "index": 0, "finish_reason": "error"}], |
| "usage": { |
| "prompt_tokens": prompt_tokens, |
| "completion_tokens": estimated_completion_tokens, |
| "total_tokens": estimated_total_tokens |
| } |
| } |
| yield f"data: {json.dumps(error_json, ensure_ascii=False)}\n\n" |
| yield "data: [DONE]\n\n" |
| return |
|
|
| logger.info(f"[{request_id}] 开始流式传输 OpenAI 格式的响应。") |
| |
| actual_input_tokens = None |
| actual_output_tokens = None |
| actual_total_tokens = None |
| |
| try: |
| for line in stream_response_obj.iter_lines(): |
| if line: |
| decoded_line = line.decode('utf-8') |
| if decoded_line.startswith("data:"): |
| json_str = decoded_line[len("data:"):].strip() |
| if json_str == "[DONE]": |
| break |
| try: |
| event_data = json.loads(json_str) |
| event_type = event_data.get("eventType", "") |
| |
| |
| if event_type == "fulfillment": |
| content_chunk = event_data.get("answer", "") |
| if content_chunk is not None: |
| full_assistant_reply_parts.append(content_chunk) |
| openai_chunk = { |
| "id": request_id, |
| "object": "chat.completion.chunk", |
| "created": int(time.time()), |
| "model": requested_model_name, |
| "choices": [ |
| { |
| "delta": {"content": content_chunk}, |
| "index": 0, |
| "finish_reason": None |
| } |
| ] |
| } |
| yield f"data: {json.dumps(openai_chunk, ensure_ascii=False)}\n\n" |
| |
| |
| elif event_type == "metricsLog": |
| public_metrics = event_data.get("publicMetrics", {}) |
| if public_metrics: |
| |
| actual_input_tokens = public_metrics.get("inputTokens", 0) |
| if actual_input_tokens is None: |
| actual_input_tokens = 0 |
| |
| actual_output_tokens = public_metrics.get("outputTokens", 0) |
| if actual_output_tokens is None: |
| actual_output_tokens = 0 |
| |
| actual_total_tokens = public_metrics.get("totalTokens", 0) |
| if actual_total_tokens is None: |
| actual_total_tokens = 0 |
| |
| logger.info(f"[{request_id}] 从metricsLog获取到准确的token计数: 输入={actual_input_tokens}, 输出={actual_output_tokens}, 总计={actual_total_tokens}") |
| |
| except json.JSONDecodeError: |
| logger.warning(f"[{request_id}] 流式传输中 JSONDecodeError: {json_str}") |
| continue |
| |
| |
| if actual_input_tokens == 0 or actual_output_tokens == 0 or actual_total_tokens == 0: |
| logger.warning(f"[{request_id}] 未从metricsLog获取到有效的token计数,使用估算方法") |
| prompt_tokens, _, _ = count_message_tokens(messages, requested_model_name) |
| |
| if prompt_tokens is None: |
| prompt_tokens = 0 |
| estimated_completion_tokens = max(1, prompt_tokens // 2) |
| estimated_total_tokens = prompt_tokens + estimated_completion_tokens |
| else: |
| |
| prompt_tokens = actual_input_tokens |
| estimated_completion_tokens = actual_output_tokens |
| estimated_total_tokens = actual_total_tokens |
| |
| |
| final_chunk = { |
| "id": request_id, |
| "object": "chat.completion.chunk", |
| "created": int(time.time()), |
| "model": requested_model_name, |
| "choices": [{"delta": {}, "index": 0, "finish_reason": "stop"}], |
| "usage": { |
| "prompt_tokens": prompt_tokens, |
| "completion_tokens": estimated_completion_tokens, |
| "total_tokens": estimated_total_tokens |
| } |
| } |
| yield f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n" |
| yield "data: [DONE]\n\n" |
| logger.info(f"[{request_id}] 完成 OpenAI 格式响应的流式传输。") |
| |
| full_streamed_reply = "".join(full_assistant_reply_parts) |
|
|
| |
| request_duration_val = int((time.time() - request_start_time) * 1000) |
| final_prompt_tokens_for_stats = actual_input_tokens if actual_input_tokens is not None and actual_input_tokens > 0 else prompt_tokens |
| final_completion_tokens_for_stats = actual_output_tokens if actual_output_tokens is not None and actual_output_tokens > 0 else estimated_completion_tokens |
| final_total_tokens_for_stats = actual_total_tokens if actual_total_tokens is not None and actual_total_tokens > 0 else estimated_total_tokens |
| used_actual_for_history = actual_input_tokens is not None and actual_input_tokens > 0 |
|
|
| _update_usage_statistics( |
| config_inst=config_instance, |
| request_id=request_id, |
| requested_model_name=requested_model_name, |
| account_email=ondemand_client.email, |
| is_success=True, |
| duration_ms=request_duration_val, |
| is_stream=True, |
| prompt_tokens_val=final_prompt_tokens_for_stats, |
| completion_tokens_val=final_completion_tokens_for_stats, |
| total_tokens_val=final_total_tokens_for_stats, |
| prompt_length=len(final_query_to_ondemand), |
| used_actual_tokens_for_history=used_actual_for_history |
| ) |
|
|
| |
| _update_client_context_hash_after_reply( |
| original_request_messages=captured_initial_request_messages, |
| assistant_reply_content=full_streamed_reply, |
| request_id=request_id, |
| user_identifier=token, |
| email_for_stats=ondemand_client.email, |
| current_ondemand_client_instance=ondemand_client, |
| config_inst=config_instance, |
| logger_instance=logger |
| ) |
| except Exception as e: |
| logger.error(f"[{request_id}] 流式传输过程中发生错误: {e}") |
| |
| |
| prompt_tokens, _, _ = count_message_tokens(messages, requested_model_name) |
| |
| if prompt_tokens is None: |
| prompt_tokens = 0 |
| |
| estimated_completion_tokens = 0 |
| |
| estimated_total_tokens = prompt_tokens |
| |
| error_json = { |
| "id": request_id, |
| "object": "chat.completion.chunk", |
| "created": int(time.time()), |
| "model": requested_model_name, |
| "choices": [{"delta": {"content": f"[流处理异常: {str(e)}]"}, "index": 0, "finish_reason": "error"}], |
| "usage": { |
| "prompt_tokens": prompt_tokens, |
| "completion_tokens": estimated_completion_tokens, |
| "total_tokens": estimated_total_tokens |
| } |
| } |
| yield f"data: {json.dumps(error_json, ensure_ascii=False)}\n\n" |
| yield "data: [DONE]\n\n" |
| |
| |
| request_duration_val = int((time.time() - request_start_time) * 1000) |
| _update_usage_statistics( |
| config_inst=config_instance, |
| request_id=request_id, |
| requested_model_name=requested_model_name, |
| account_email=ondemand_client.email if ondemand_client else email_for_stats, |
| is_success=False, |
| duration_ms=request_duration_val, |
| is_stream=True, |
| prompt_tokens_val=prompt_tokens if prompt_tokens is not None else 0, |
| completion_tokens_val=0, |
| total_tokens_val=prompt_tokens if prompt_tokens is not None else 0, |
| error_message=str(e) |
| ) |
| finally: |
| if stream_response_obj: |
| stream_response_obj.close() |
| |
| return Response(stream_with_context(generate_openai_stream(initial_messages_from_request)), content_type='text/event-stream; charset=utf-8') |
| else: |
| |
| final_content = ondemand_result.get("content", "") |
| |
| |
| prompt_tokens, completion_tokens, total_tokens = count_message_tokens(messages, requested_model_name) |
| completion_tokens_actual = count_tokens(final_content, requested_model_name) |
| total_tokens_actual = prompt_tokens + completion_tokens_actual |
| |
| openai_response = { |
| "id": request_id, |
| "object": "chat.completion", |
| "created": int(time.time()), |
| "model": requested_model_name, |
| "choices": [ |
| { |
| "message": { |
| "role": "assistant", |
| "content": final_content |
| }, |
| "finish_reason": "stop", |
| "index": 0 |
| } |
| ], |
| "usage": { |
| "prompt_tokens": prompt_tokens, |
| "completion_tokens": completion_tokens_actual, |
| "total_tokens": total_tokens_actual |
| } |
| } |
| logger.info(f"[{request_id}] 发送非流式 OpenAI 格式的响应。") |
| |
| |
| request_duration_val = int((time.time() - request_start_time) * 1000) |
| _update_usage_statistics( |
| config_inst=config_instance, |
| request_id=request_id, |
| requested_model_name=requested_model_name, |
| account_email=ondemand_client.email, |
| is_success=True, |
| duration_ms=request_duration_val, |
| is_stream=False, |
| prompt_tokens_val=prompt_tokens, |
| completion_tokens_val=completion_tokens_actual, |
| total_tokens_val=total_tokens_actual, |
| prompt_length=len(final_query_to_ondemand), |
| completion_length=len(final_content) if final_content else 0, |
| used_actual_tokens_for_history=True |
| ) |
|
|
| |
| _update_client_context_hash_after_reply( |
| original_request_messages=initial_messages_from_request, |
| assistant_reply_content=final_content, |
| request_id=request_id, |
| user_identifier=token, |
| email_for_stats=ondemand_client.email, |
| current_ondemand_client_instance=ondemand_client, |
| config_inst=config_instance, |
| logger_instance=logger |
| ) |
| |
| return openai_response |
| |
| @app.route('/', methods=['GET']) |
| def show_stats(): |
| """显示用量统计信息的HTML页面""" |
| current_time = datetime.now() |
| current_time_str = current_time.strftime('%Y-%m-%d %H:%M:%S') |
| current_date = current_time.strftime('%Y-%m-%d') |
| |
| with config_instance.usage_stats_lock: |
| |
| total_requests = config_instance.usage_stats["total_requests"] |
| successful_requests = config_instance.usage_stats["successful_requests"] |
| failed_requests = config_instance.usage_stats["failed_requests"] |
| total_prompt_tokens = config_instance.usage_stats["total_prompt_tokens"] |
| total_completion_tokens = config_instance.usage_stats["total_completion_tokens"] |
| total_tokens = config_instance.usage_stats["total_tokens"] |
| |
| |
| success_rate = int((successful_requests / total_requests * 100) if total_requests > 0 else 0) |
| |
| |
| successful_history = [req for req in config_instance.usage_stats["request_history"] if req.get('success', False)] |
| total_duration = sum(req.get('duration_ms', 0) for req in successful_history) |
| avg_duration = (total_duration / successful_requests) if successful_requests > 0 else 0 |
| |
| |
| min_duration = min([req.get('duration_ms', float('inf')) for req in successful_history]) if successful_history else 0 |
| |
| |
| today_requests = config_instance.usage_stats["daily_usage"].get(current_date, 0) |
| |
| if total_requests is None or today_requests is None: |
| growth_rate = 0 |
| elif total_requests == today_requests or (total_requests - today_requests) <= 0: |
| growth_rate = 100 |
| else: |
| growth_rate = (today_requests / (total_requests - today_requests) * 100) |
| |
| |
| total_cost = 0.0 |
| model_costs = {} |
| |
| |
| for req in successful_history: |
| model_name = req.get('model', '') |
| |
| all_model_prices = config_instance.get('model_prices', {}) |
| default_model_price = config_instance.get('default_model_price', {'input': 0.50 / 1000000, 'output': 2.00 / 1000000}) |
| model_price = all_model_prices.get(model_name, default_model_price) |
| |
| |
| input_tokens = req.get('prompt_tokens', 0) |
| |
| |
| if 'completion_tokens' in req: |
| output_tokens = req.get('completion_tokens', 0) |
| else: |
| output_tokens = req.get('estimated_completion_tokens', 0) |
| |
| |
| request_cost = (input_tokens * model_price['input']) + (output_tokens * model_price['output']) |
| total_cost += request_cost |
| |
| |
| if model_name not in model_costs: |
| model_costs[model_name] = 0 |
| model_costs[model_name] += request_cost |
| |
| |
| avg_cost = (total_cost / successful_requests) if successful_requests > 0 else 0 |
| |
| |
| model_usage = dict(config_instance.usage_stats["model_usage"]) |
| top_models = sorted(model_usage.items(), key=lambda x: x[1], reverse=True) |
| top_model = top_models[0] if top_models else None |
| |
| |
| stats = { |
| "total_requests": total_requests, |
| "successful_requests": successful_requests, |
| "failed_requests": failed_requests, |
| "success_rate": success_rate, |
| "avg_duration": avg_duration, |
| "min_duration": min_duration, |
| "today_requests": today_requests, |
| "growth_rate": growth_rate, |
| "total_prompt_tokens": total_prompt_tokens, |
| "total_completion_tokens": total_completion_tokens, |
| "total_tokens": total_tokens, |
| "total_cost": total_cost, |
| "avg_cost": avg_cost, |
| "model_usage": model_usage, |
| "model_costs": model_costs, |
| "top_model": top_model, |
| "model_tokens": dict(config_instance.usage_stats["model_tokens"]), |
| "account_usage": dict(config_instance.usage_stats["account_usage"]), |
| "daily_usage": dict(sorted(config_instance.usage_stats["daily_usage"].items(), reverse=True)[:30]), |
| "hourly_usage": dict(sorted(config_instance.usage_stats["hourly_usage"].items(), reverse=True)[:48]), |
| "request_history": list(config_instance.usage_stats["request_history"][:50]), |
| "daily_tokens": dict(sorted(config_instance.usage_stats["daily_tokens"].items(), reverse=True)[:30]), |
| "hourly_tokens": dict(sorted(config_instance.usage_stats["hourly_tokens"].items(), reverse=True)[:48]), |
| "last_saved": config_instance.usage_stats.get("last_saved", "从未保存") |
| } |
| |
| |
| return render_template('stats.html', stats=stats, current_time=current_time_str) |
| |
| @app.route('/save_stats', methods=['POST']) |
| def save_stats(): |
| """手动保存统计数据""" |
| try: |
| config_instance.save_stats_to_file() |
| logger.info("统计数据已手动保存") |
| return redirect(url_for('show_stats')) |
| except Exception as e: |
| logger.error(f"手动保存统计数据时出错: {e}") |
| return jsonify({"status": "error", "message": str(e)}), 500 |