import asyncio import base64 import inspect import json import random import re from collections.abc import AsyncGenerator from typing import Any import httpx from openai import AsyncAzureOpenAI, AsyncOpenAI from openai._exceptions import NotFoundError from openai.lib.streaming.chat._completions import ChatCompletionStreamState from openai.types.chat.chat_completion import ChatCompletion from openai.types.chat.chat_completion_chunk import ChatCompletionChunk from openai.types.completion_usage import CompletionUsage import astrbot.core.message.components as Comp from astrbot import logger from astrbot.api.provider import Provider from astrbot.core.agent.message import ContentPart, ImageURLPart, Message, TextPart from astrbot.core.agent.tool import ToolSet from astrbot.core.message.message_event_result import MessageChain from astrbot.core.provider.entities import LLMResponse, TokenUsage, ToolCallsResult from astrbot.core.utils.io import download_image_by_url from astrbot.core.utils.network_utils import ( create_proxy_client, is_connection_error, log_connection_failure, ) from astrbot.core.utils.string_utils import normalize_and_dedupe_strings from ..register import register_provider_adapter @register_provider_adapter( "openai_chat_completion", "OpenAI API Chat Completion 提供商适配器", ) class ProviderOpenAIOfficial(Provider): _ERROR_TEXT_CANDIDATE_MAX_CHARS = 4096 @classmethod def _truncate_error_text_candidate(cls, text: str) -> str: if len(text) <= cls._ERROR_TEXT_CANDIDATE_MAX_CHARS: return text return text[: cls._ERROR_TEXT_CANDIDATE_MAX_CHARS] @staticmethod def _safe_json_dump(value: Any) -> str | None: try: return json.dumps(value, ensure_ascii=False, default=str) except Exception: return None def _get_image_moderation_error_patterns(self) -> list[str]: """Return configured moderation patterns (case-insensitive substring match, not regex).""" configured = self.provider_config.get("image_moderation_error_patterns", []) patterns: list[str] = [] if isinstance(configured, str): configured = [configured] if isinstance(configured, list): for pattern in configured: if not isinstance(pattern, str): continue pattern = pattern.strip() if pattern: patterns.append(pattern) return patterns @staticmethod def _extract_error_text_candidates(error: Exception) -> list[str]: candidates: list[str] = [] def _append_candidate(candidate: Any): if candidate is None: return text = str(candidate).strip() if not text: return candidates.append( ProviderOpenAIOfficial._truncate_error_text_candidate(text) ) _append_candidate(str(error)) body = getattr(error, "body", None) if isinstance(body, dict): err_obj = body.get("error") body_text = ProviderOpenAIOfficial._safe_json_dump( {"error": err_obj} if isinstance(err_obj, dict) else body ) _append_candidate(body_text) if isinstance(err_obj, dict): for field in ("message", "type", "code", "param"): value = err_obj.get(field) if value is not None: _append_candidate(value) elif isinstance(body, str): _append_candidate(body) response = getattr(error, "response", None) if response is not None: response_text = getattr(response, "text", None) if isinstance(response_text, str): _append_candidate(response_text) return normalize_and_dedupe_strings(candidates) def _is_content_moderated_upload_error(self, error: Exception) -> bool: patterns = [ pattern.lower() for pattern in self._get_image_moderation_error_patterns() ] if not patterns: return False candidates = [ candidate.lower() for candidate in self._extract_error_text_candidates(error) ] for pattern in patterns: if any(pattern in candidate for candidate in candidates): return True return False @staticmethod def _context_contains_image(contexts: list[dict]) -> bool: for context in contexts: content = context.get("content") if not isinstance(content, list): continue for item in content: if isinstance(item, dict) and item.get("type") == "image_url": return True return False async def _fallback_to_text_only_and_retry( self, payloads: dict, context_query: list, chosen_key: str, available_api_keys: list[str], func_tool: ToolSet | None, reason: str, *, image_fallback_used: bool = False, ) -> tuple: logger.warning( "检测到图片请求失败(%s),已移除图片并重试(保留文本内容)。", reason, ) new_contexts = await self._remove_image_from_context(context_query) payloads["messages"] = new_contexts return ( False, chosen_key, available_api_keys, payloads, new_contexts, func_tool, image_fallback_used, ) def _create_http_client(self, provider_config: dict) -> httpx.AsyncClient | None: """创建带代理的 HTTP 客户端""" proxy = provider_config.get("proxy", "") return create_proxy_client("OpenAI", proxy) def __init__(self, provider_config, provider_settings) -> None: super().__init__(provider_config, provider_settings) self.chosen_api_key = None self.api_keys: list = super().get_keys() self.chosen_api_key = self.api_keys[0] if len(self.api_keys) > 0 else None self.timeout = provider_config.get("timeout", 120) self.custom_headers = provider_config.get("custom_headers", {}) if isinstance(self.timeout, str): self.timeout = int(self.timeout) if not isinstance(self.custom_headers, dict) or not self.custom_headers: self.custom_headers = None else: for key in self.custom_headers: self.custom_headers[key] = str(self.custom_headers[key]) if "api_version" in provider_config: # Using Azure OpenAI API self.client = AsyncAzureOpenAI( api_key=self.chosen_api_key, api_version=provider_config.get("api_version", None), default_headers=self.custom_headers, base_url=provider_config.get("api_base", ""), timeout=self.timeout, http_client=self._create_http_client(provider_config), ) else: # Using OpenAI Official API self.client = AsyncOpenAI( api_key=self.chosen_api_key, base_url=provider_config.get("api_base", None), default_headers=self.custom_headers, timeout=self.timeout, http_client=self._create_http_client(provider_config), ) self.default_params = inspect.signature( self.client.chat.completions.create, ).parameters.keys() model = provider_config.get("model", "unknown") self.set_model(model) self.reasoning_key = "reasoning_content" async def get_models(self): try: models_str = [] models = await self.client.models.list() models = sorted(models.data, key=lambda x: x.id) for model in models: models_str.append(model.id) return models_str except NotFoundError as e: raise Exception(f"获取模型列表失败:{e}") async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse: if tools: model = payloads.get("model", "").lower() omit_empty_param_field = "gemini" in model tool_list = tools.get_func_desc_openai_style( omit_empty_parameter_field=omit_empty_param_field, ) if tool_list: payloads["tools"] = tool_list # 不在默认参数中的参数放在 extra_body 中 extra_body = {} to_del = [] for key in payloads: if key not in self.default_params: extra_body[key] = payloads[key] to_del.append(key) for key in to_del: del payloads[key] # 读取并合并 custom_extra_body 配置 custom_extra_body = self.provider_config.get("custom_extra_body", {}) if isinstance(custom_extra_body, dict): extra_body.update(custom_extra_body) model = payloads.get("model", "").lower() completion = await self.client.chat.completions.create( **payloads, stream=False, extra_body=extra_body, ) if not isinstance(completion, ChatCompletion): raise Exception( f"API 返回的 completion 类型错误:{type(completion)}: {completion}。", ) logger.debug(f"completion: {completion}") llm_response = await self._parse_openai_completion(completion, tools) return llm_response async def _query_stream( self, payloads: dict, tools: ToolSet | None, ) -> AsyncGenerator[LLMResponse, None]: """流式查询API,逐步返回结果""" if tools: model = payloads.get("model", "").lower() omit_empty_param_field = "gemini" in model tool_list = tools.get_func_desc_openai_style( omit_empty_parameter_field=omit_empty_param_field, ) if tool_list: payloads["tools"] = tool_list # 不在默认参数中的参数放在 extra_body 中 extra_body = {} # 读取并合并 custom_extra_body 配置 custom_extra_body = self.provider_config.get("custom_extra_body", {}) if isinstance(custom_extra_body, dict): extra_body.update(custom_extra_body) to_del = [] for key in payloads: if key not in self.default_params: extra_body[key] = payloads[key] to_del.append(key) for key in to_del: del payloads[key] stream = await self.client.chat.completions.create( **payloads, stream=True, extra_body=extra_body, ) llm_response = LLMResponse("assistant", is_chunk=True) state = ChatCompletionStreamState() async for chunk in stream: try: state.handle_chunk(chunk) except Exception as e: logger.warning("Saving chunk state error: " + str(e)) if len(chunk.choices) == 0: continue delta = chunk.choices[0].delta # logger.debug(f"chunk delta: {delta}") # handle the content delta reasoning = self._extract_reasoning_content(chunk) _y = False llm_response.id = chunk.id if reasoning: llm_response.reasoning_content = reasoning _y = True if delta.content: # Don't strip streaming chunks to preserve spaces between words completion_text = self._normalize_content(delta.content, strip=False) llm_response.result_chain = MessageChain( chain=[Comp.Plain(completion_text)], ) _y = True if chunk.usage: llm_response.usage = self._extract_usage(chunk.usage) if _y: yield llm_response final_completion = state.get_final_completion() llm_response = await self._parse_openai_completion(final_completion, tools) yield llm_response def _extract_reasoning_content( self, completion: ChatCompletion | ChatCompletionChunk, ) -> str: """Extract reasoning content from OpenAI ChatCompletion if available.""" reasoning_text = "" if len(completion.choices) == 0: return reasoning_text if isinstance(completion, ChatCompletion): choice = completion.choices[0] reasoning_attr = getattr(choice.message, self.reasoning_key, None) if reasoning_attr: reasoning_text = str(reasoning_attr) elif isinstance(completion, ChatCompletionChunk): delta = completion.choices[0].delta reasoning_attr = getattr(delta, self.reasoning_key, None) if reasoning_attr: reasoning_text = str(reasoning_attr) return reasoning_text def _extract_usage(self, usage: CompletionUsage) -> TokenUsage: ptd = usage.prompt_tokens_details cached = ptd.cached_tokens if ptd and ptd.cached_tokens else 0 prompt_tokens = 0 if usage.prompt_tokens is None else usage.prompt_tokens completion_tokens = ( 0 if usage.completion_tokens is None else usage.completion_tokens ) return TokenUsage( input_other=prompt_tokens - cached, input_cached=cached, output=completion_tokens, ) @staticmethod def _normalize_content(raw_content: Any, strip: bool = True) -> str: """Normalize content from various formats to plain string. Some LLM providers return content as list[dict] format like [{'type': 'text', 'text': '...'}] instead of plain string. This method handles both formats. Args: raw_content: The raw content from LLM response, can be str, list, dict, or other. strip: Whether to strip whitespace from the result. Set to False for streaming chunks to preserve spaces between words. Returns: Normalized plain text string. """ # Handle dict format (e.g., {"type": "text", "text": "..."}) if isinstance(raw_content, dict): if "text" in raw_content: text_val = raw_content.get("text", "") return str(text_val) if text_val is not None else "" # For other dict formats, return empty string and log logger.warning(f"Unexpected dict format content: {raw_content}") return "" if isinstance(raw_content, list): # Check if this looks like OpenAI content-part format # Only process if at least one item has {'type': 'text', 'text': ...} structure has_content_part = any( isinstance(part, dict) and part.get("type") == "text" for part in raw_content ) if has_content_part: text_parts = [] for part in raw_content: if isinstance(part, dict) and part.get("type") == "text": text_val = part.get("text", "") # Coerce to str in case text is null or non-string text_parts.append(str(text_val) if text_val is not None else "") return "".join(text_parts) # Not content-part format, return string representation return str(raw_content) if isinstance(raw_content, str): content = raw_content.strip() if strip else raw_content # Check if the string is a JSON-encoded list (e.g., "[{'type': 'text', ...}]") # This can happen when streaming concatenates content that was originally list format # Only check if it looks like a complete JSON array (requires strip for check) check_content = raw_content.strip() if ( check_content.startswith("[") and check_content.endswith("]") and len(check_content) < 8192 ): try: # First try standard JSON parsing parsed = json.loads(check_content) except json.JSONDecodeError: # If that fails, try parsing as Python literal (handles single quotes) # This is safer than blind replace("'", '"') which corrupts apostrophes try: import ast parsed = ast.literal_eval(check_content) except (ValueError, SyntaxError): parsed = None if isinstance(parsed, list): # Only convert if it matches OpenAI content-part schema # i.e., at least one item has {'type': 'text', 'text': ...} has_content_part = any( isinstance(part, dict) and part.get("type") == "text" for part in parsed ) if has_content_part: text_parts = [] for part in parsed: if isinstance(part, dict) and part.get("type") == "text": text_val = part.get("text", "") # Coerce to str in case text is null or non-string text_parts.append( str(text_val) if text_val is not None else "" ) if text_parts: return "".join(text_parts) return content # Fallback for other types (int, float, etc.) return str(raw_content) if raw_content is not None else "" async def _parse_openai_completion( self, completion: ChatCompletion, tools: ToolSet | None ) -> LLMResponse: """Parse OpenAI ChatCompletion into LLMResponse""" llm_response = LLMResponse("assistant") if len(completion.choices) == 0: raise Exception("API 返回的 completion 为空。") choice = completion.choices[0] # parse the text completion if choice.message.content is not None: completion_text = self._normalize_content(choice.message.content) # specially, some providers may set tags around reasoning content in the completion text, # we use regex to remove them, and store then in reasoning_content field reasoning_pattern = re.compile(r"(.*?)", re.DOTALL) matches = reasoning_pattern.findall(completion_text) if matches: llm_response.reasoning_content = "\n".join( [match.strip() for match in matches], ) completion_text = reasoning_pattern.sub("", completion_text).strip() # Also clean up orphan tags that may leak from some models completion_text = re.sub(r"\s*$", "", completion_text).strip() llm_response.result_chain = MessageChain().message(completion_text) # parse the reasoning content if any # the priority is higher than the tag extraction llm_response.reasoning_content = self._extract_reasoning_content(completion) # parse tool calls if any if choice.message.tool_calls and tools is not None: args_ls = [] func_name_ls = [] tool_call_ids = [] tool_call_extra_content_dict = {} for tool_call in choice.message.tool_calls: if isinstance(tool_call, str): # workaround for #1359 tool_call = json.loads(tool_call) if tools is None: # 工具集未提供 # Should be unreachable raise Exception("工具集未提供") for tool in tools.func_list: if ( tool_call.type == "function" and tool.name == tool_call.function.name ): # workaround for #1454 if isinstance(tool_call.function.arguments, str): args = json.loads(tool_call.function.arguments) else: args = tool_call.function.arguments args_ls.append(args) func_name_ls.append(tool_call.function.name) tool_call_ids.append(tool_call.id) # gemini-2.5 / gemini-3 series extra_content handling extra_content = getattr(tool_call, "extra_content", None) if extra_content is not None: tool_call_extra_content_dict[tool_call.id] = extra_content llm_response.role = "tool" llm_response.tools_call_args = args_ls llm_response.tools_call_name = func_name_ls llm_response.tools_call_ids = tool_call_ids llm_response.tools_call_extra_content = tool_call_extra_content_dict # specially handle finish reason if choice.finish_reason == "content_filter": raise Exception( "API 返回的 completion 由于内容安全过滤被拒绝(非 AstrBot)。", ) if llm_response.completion_text is None and not llm_response.tools_call_args: logger.error(f"API 返回的 completion 无法解析:{completion}。") raise Exception(f"API 返回的 completion 无法解析:{completion}。") llm_response.raw_completion = completion llm_response.id = completion.id if completion.usage: llm_response.usage = self._extract_usage(completion.usage) return llm_response async def _prepare_chat_payload( self, prompt: str | None, image_urls: list[str] | None = None, contexts: list[dict] | list[Message] | None = None, system_prompt: str | None = None, tool_calls_result: ToolCallsResult | list[ToolCallsResult] | None = None, model: str | None = None, extra_user_content_parts: list[ContentPart] | None = None, **kwargs, ) -> tuple: """准备聊天所需的有效载荷和上下文""" if contexts is None: contexts = [] new_record = None if prompt is not None: new_record = await self.assemble_context( prompt, image_urls, extra_user_content_parts ) context_query = self._ensure_message_to_dicts(contexts) if new_record: context_query.append(new_record) if system_prompt: context_query.insert(0, {"role": "system", "content": system_prompt}) for part in context_query: if "_no_save" in part: del part["_no_save"] # tool calls result if tool_calls_result: if isinstance(tool_calls_result, ToolCallsResult): context_query.extend(tool_calls_result.to_openai_messages()) else: for tcr in tool_calls_result: context_query.extend(tcr.to_openai_messages()) model = model or self.get_model() payloads = {"messages": context_query, "model": model} self._finally_convert_payload(payloads) return payloads, context_query def _finally_convert_payload(self, payloads: dict) -> None: """Finally convert the payload. Such as think part conversion, tool inject.""" for message in payloads.get("messages", []): if message.get("role") == "assistant" and isinstance( message.get("content"), list ): reasoning_content = "" new_content = [] # not including think part for part in message["content"]: if part.get("type") == "think": reasoning_content += str(part.get("think")) else: new_content.append(part) message["content"] = new_content # reasoning key is "reasoning_content" if reasoning_content: message["reasoning_content"] = reasoning_content async def _handle_api_error( self, e: Exception, payloads: dict, context_query: list, func_tool: ToolSet | None, chosen_key: str, available_api_keys: list[str], retry_cnt: int, max_retries: int, image_fallback_used: bool = False, ) -> tuple: """处理API错误并尝试恢复""" if "429" in str(e): logger.warning( f"API 调用过于频繁,尝试使用其他 Key 重试。当前 Key: {chosen_key[:12]}", ) # 最后一次不等待 if retry_cnt < max_retries - 1: await asyncio.sleep(1) if chosen_key in available_api_keys: available_api_keys.remove(chosen_key) if len(available_api_keys) > 0: chosen_key = random.choice(available_api_keys) return ( False, chosen_key, available_api_keys, payloads, context_query, func_tool, image_fallback_used, ) raise e if "maximum context length" in str(e): logger.warning( f"上下文长度超过限制。尝试弹出最早的记录然后重试。当前记录条数: {len(context_query)}", ) await self.pop_record(context_query) payloads["messages"] = context_query return ( False, chosen_key, available_api_keys, payloads, context_query, func_tool, image_fallback_used, ) if "The model is not a VLM" in str(e): # siliconcloud if image_fallback_used or not self._context_contains_image(context_query): raise e # 尝试删除所有 image return await self._fallback_to_text_only_and_retry( payloads, context_query, chosen_key, available_api_keys, func_tool, "model_not_vlm", image_fallback_used=True, ) if self._is_content_moderated_upload_error(e): if image_fallback_used or not self._context_contains_image(context_query): raise e return await self._fallback_to_text_only_and_retry( payloads, context_query, chosen_key, available_api_keys, func_tool, "image_content_moderated", image_fallback_used=True, ) if ( "Function calling is not enabled" in str(e) or ("tool" in str(e).lower() and "support" in str(e).lower()) or ("function" in str(e).lower() and "support" in str(e).lower()) ): # openai, ollama, gemini openai, siliconcloud 的错误提示与 code 不统一,只能通过字符串匹配 logger.info( f"{self.get_model()} 不支持函数工具调用,已自动去除,不影响使用。", ) payloads.pop("tools", None) return ( False, chosen_key, available_api_keys, payloads, context_query, None, image_fallback_used, ) # logger.error(f"发生了错误。Provider 配置如下: {self.provider_config}") if "tool" in str(e).lower() and "support" in str(e).lower(): logger.error("疑似该模型不支持函数调用工具调用。请输入 /tool off_all") if is_connection_error(e): proxy = self.provider_config.get("proxy", "") log_connection_failure("OpenAI", e, proxy) raise e async def text_chat( self, prompt=None, session_id=None, image_urls=None, func_tool=None, contexts=None, system_prompt=None, tool_calls_result=None, model=None, extra_user_content_parts=None, **kwargs, ) -> LLMResponse: payloads, context_query = await self._prepare_chat_payload( prompt, image_urls, contexts, system_prompt, tool_calls_result, model=model, extra_user_content_parts=extra_user_content_parts, **kwargs, ) llm_response = None max_retries = 10 available_api_keys = self.api_keys.copy() chosen_key = random.choice(available_api_keys) image_fallback_used = False last_exception = None retry_cnt = 0 for retry_cnt in range(max_retries): try: self.client.api_key = chosen_key llm_response = await self._query(payloads, func_tool) break except Exception as e: last_exception = e ( success, chosen_key, available_api_keys, payloads, context_query, func_tool, image_fallback_used, ) = await self._handle_api_error( e, payloads, context_query, func_tool, chosen_key, available_api_keys, retry_cnt, max_retries, image_fallback_used=image_fallback_used, ) if success: break if retry_cnt == max_retries - 1 or llm_response is None: logger.error(f"API 调用失败,重试 {max_retries} 次仍然失败。") if last_exception is None: raise Exception("未知错误") raise last_exception return llm_response async def text_chat_stream( self, prompt=None, session_id=None, image_urls=None, func_tool=None, contexts=None, system_prompt=None, tool_calls_result=None, model=None, **kwargs, ) -> AsyncGenerator[LLMResponse, None]: """流式对话,与服务商交互并逐步返回结果""" payloads, context_query = await self._prepare_chat_payload( prompt, image_urls, contexts, system_prompt, tool_calls_result, model=model, **kwargs, ) max_retries = 10 available_api_keys = self.api_keys.copy() chosen_key = random.choice(available_api_keys) image_fallback_used = False last_exception = None retry_cnt = 0 for retry_cnt in range(max_retries): try: self.client.api_key = chosen_key async for response in self._query_stream(payloads, func_tool): yield response break except Exception as e: last_exception = e ( success, chosen_key, available_api_keys, payloads, context_query, func_tool, image_fallback_used, ) = await self._handle_api_error( e, payloads, context_query, func_tool, chosen_key, available_api_keys, retry_cnt, max_retries, image_fallback_used=image_fallback_used, ) if success: break if retry_cnt == max_retries - 1: logger.error(f"API 调用失败,重试 {max_retries} 次仍然失败。") if last_exception is None: raise Exception("未知错误") raise last_exception async def _remove_image_from_context(self, contexts: list): """从上下文中删除所有带有 image 的记录""" new_contexts = [] for context in contexts: if "content" in context and isinstance(context["content"], list): # continue new_content = [] for item in context["content"]: if isinstance(item, dict) and "image_url" in item: continue new_content.append(item) if not new_content: # 用户只发了图片 new_content = [{"type": "text", "text": "[图片]"}] context["content"] = new_content new_contexts.append(context) return new_contexts def get_current_key(self) -> str: return self.client.api_key def get_keys(self) -> list[str]: return self.api_keys def set_key(self, key) -> None: self.client.api_key = key async def assemble_context( self, text: str, image_urls: list[str] | None = None, extra_user_content_parts: list[ContentPart] | None = None, ) -> dict: """组装成符合 OpenAI 格式的 role 为 user 的消息段""" async def resolve_image_part(image_url: str) -> dict | None: if image_url.startswith("http"): image_path = await download_image_by_url(image_url) image_data = await self.encode_image_bs64(image_path) elif image_url.startswith("file:///"): image_path = image_url.replace("file:///", "") image_data = await self.encode_image_bs64(image_path) else: image_data = await self.encode_image_bs64(image_url) if not image_data: logger.warning(f"图片 {image_url} 得到的结果为空,将忽略。") return None return { "type": "image_url", "image_url": {"url": image_data}, } # 构建内容块列表 content_blocks = [] # 1. 用户原始发言(OpenAI 建议:用户发言在前) if text: content_blocks.append({"type": "text", "text": text}) elif image_urls: # 如果没有文本但有图片,添加占位文本 content_blocks.append({"type": "text", "text": "[图片]"}) elif extra_user_content_parts: # 如果只有额外内容块,也需要添加占位文本 content_blocks.append({"type": "text", "text": " "}) # 2. 额外的内容块(系统提醒、指令等) if extra_user_content_parts: for part in extra_user_content_parts: if isinstance(part, TextPart): content_blocks.append({"type": "text", "text": part.text}) elif isinstance(part, ImageURLPart): image_part = await resolve_image_part(part.image_url.url) if image_part: content_blocks.append(image_part) else: raise ValueError(f"不支持的额外内容块类型: {type(part)}") # 3. 图片内容 if image_urls: for image_url in image_urls: image_part = await resolve_image_part(image_url) if image_part: content_blocks.append(image_part) # 如果只有主文本且没有额外内容块和图片,返回简单格式以保持向后兼容 if ( text and not extra_user_content_parts and not image_urls and len(content_blocks) == 1 and content_blocks[0]["type"] == "text" ): return {"role": "user", "content": content_blocks[0]["text"]} # 否则返回多模态格式 return {"role": "user", "content": content_blocks} async def encode_image_bs64(self, image_url: str) -> str: """将图片转换为 base64""" if image_url.startswith("base64://"): return image_url.replace("base64://", "data:image/jpeg;base64,") with open(image_url, "rb") as f: image_bs64 = base64.b64encode(f.read()).decode("utf-8") return "data:image/jpeg;base64," + image_bs64 async def terminate(self): if self.client: await self.client.close()