| import copy |
| import json |
| import re |
| import tiktoken |
| import uuid |
|
|
| from curl_cffi import requests |
| from tclogger import logger |
|
|
| from constants.envs import PROXIES |
| from constants.headers import OPENAI_GET_HEADERS, OPENAI_POST_DATA |
| from constants.models import TOKEN_LIMIT_MAP, TOKEN_RESERVED |
|
|
| from messagers.message_outputer import OpenaiStreamOutputer |
| from networks.proof_worker import ProofWorker |
|
|
|
|
| class OpenaiRequester: |
| def __init__(self): |
| self.init_requests_params() |
|
|
| def init_requests_params(self): |
| self.api_base = "https://chat.openai.com/backend-anon" |
| self.api_me = f"{self.api_base}/me" |
| self.api_models = f"{self.api_base}/models" |
| self.api_chat_requirements = f"{self.api_base}/sentinel/chat-requirements" |
| self.api_conversation = f"{self.api_base}/conversation" |
| self.uuid = str(uuid.uuid4()) |
| self.requests_headers = copy.deepcopy(OPENAI_GET_HEADERS) |
| extra_headers = { |
| "Oai-Device-Id": self.uuid, |
| } |
| self.requests_headers.update(extra_headers) |
|
|
| def log_request(self, url, method="GET"): |
| logger.note(f"> {method}:", end=" ") |
| logger.mesg(f"{url}", end=" ") |
|
|
| def log_response( |
| self, res: requests.Response, stream=False, iter_lines=False, verbose=False |
| ): |
| status_code = res.status_code |
| status_code_str = f"[{status_code}]" |
|
|
| if status_code == 200: |
| logger_func = logger.success |
| else: |
| logger_func = logger.warn |
|
|
| logger_func(status_code_str) |
|
|
| logger.enter_quiet(not verbose) |
|
|
| if stream: |
| if not iter_lines: |
| return |
|
|
| if not hasattr(self, "content_offset"): |
| self.content_offset = 0 |
|
|
| for line in res.iter_lines(): |
| line = line.decode("utf-8") |
| line = re.sub(r"^data:\s*", "", line) |
| if re.match(r"^\[DONE\]", line): |
| logger.success("\n[Finished]") |
| break |
| line = line.strip() |
| print(line) |
| if line: |
| try: |
| data = json.loads(line, strict=False) |
| message_role = data["message"]["author"]["role"] |
| message_status = data["message"]["status"] |
| if ( |
| message_role == "assistant" |
| and message_status == "in_progress" |
| ): |
| content = data["message"]["content"]["parts"][0] |
| delta_content = content[self.content_offset :] |
| self.content_offset = len(content) |
| logger_func(delta_content, end="") |
| except Exception as e: |
| logger.warn(e) |
| else: |
| logger_func(res.json()) |
|
|
| logger.exit_quiet(not verbose) |
|
|
| def get_models(self): |
| self.log_request(self.api_models) |
| res = requests.get( |
| self.api_models, |
| headers=self.requests_headers, |
|
|
| timeout=10, |
|
|
| ) |
| self.log_response(res) |
|
|
| def auth(self): |
| self.log_request(self.api_chat_requirements, method="POST") |
| res = requests.post( |
| self.api_chat_requirements, |
| headers=self.requests_headers, |
|
|
| timeout=10, |
|
|
| ) |
| data = res.json() |
| self.chat_requirements_token = data["token"] |
| self.chat_requirements_seed = data["proofofwork"]["seed"] |
| self.chat_requirements_difficulty = data["proofofwork"]["difficulty"] |
| self.log_response(res) |
|
|
| def transform_messages(self, messages: list[dict]): |
| def get_role(role): |
| if role in ["system", "user", "assistant"]: |
| return role |
| else: |
| return "system" |
|
|
| new_messages = [ |
| { |
| "author": {"role": get_role(message["role"])}, |
| "content": {"content_type": "text", "parts": [message["content"]]}, |
| "metadata": {}, |
| } |
| for message in messages |
| ] |
| return new_messages |
|
|
| def chat_completions(self, messages: list[dict], iter_lines=False, verbose=False): |
| proof_token = ProofWorker().calc_proof_token( |
| self.chat_requirements_seed, self.chat_requirements_difficulty |
| ) |
| extra_headers = { |
| "Accept": "text/event-stream", |
| "Openai-Sentinel-Chat-Requirements-Token": self.chat_requirements_token, |
| "Openai-Sentinel-Proof-Token": proof_token, |
| } |
| requests_headers = copy.deepcopy(self.requests_headers) |
| requests_headers.update(extra_headers) |
|
|
| post_data = copy.deepcopy(OPENAI_POST_DATA) |
| extra_data = { |
| "messages": self.transform_messages(messages), |
| "websocket_request_id": str(uuid.uuid4()), |
| } |
| post_data.update(extra_data) |
|
|
| self.log_request(self.api_conversation, method="POST") |
| s = requests.Session() |
| res = s.post( |
| self.api_conversation, |
| headers=requests_headers, |
| json=post_data, |
|
|
| timeout=10, |
|
|
| stream=True, |
| ) |
| self.log_response(res, stream=True, iter_lines=iter_lines, verbose=verbose) |
| return res |
|
|
|
|
| class OpenaiStreamer: |
| def __init__(self): |
| self.model = "gpt-3.5-turbo" |
| self.message_outputer = OpenaiStreamOutputer( |
| owned_by="openai", model="gpt-3.5-turbo" |
| ) |
| self.tokenizer = tiktoken.get_encoding("cl100k_base") |
|
|
| def count_tokens(self, messages: list[dict]): |
| token_count = sum( |
| len(self.tokenizer.encode(message["content"])) for message in messages |
| ) |
| logger.note(f"Prompt Token Count: {token_count}") |
| return token_count |
|
|
| def check_token_limit(self, messages: list[dict]): |
| token_limit = TOKEN_LIMIT_MAP[self.model] |
| token_count = self.count_tokens(messages) |
| token_redundancy = int(token_limit - TOKEN_RESERVED - token_count) |
| if token_redundancy <= 0: |
| raise ValueError( |
| f"Prompt exceeded token limit: {token_count} > {token_limit}" |
| ) |
| return True |
|
|
| def chat_response(self, messages: list[dict], iter_lines=False, verbose=False): |
| self.check_token_limit(messages) |
| logger.enter_quiet(not verbose) |
| requester = OpenaiRequester() |
| requester.auth() |
| logger.exit_quiet(not verbose) |
| return requester.chat_completions( |
| messages=messages, iter_lines=iter_lines, verbose=verbose |
| ) |
|
|
| def chat_return_generator(self, stream_response: requests.Response, verbose=False): |
| content_offset = 0 |
| is_finished = False |
|
|
| for line in stream_response.iter_lines(): |
| line = line.decode("utf-8") |
| line = re.sub(r"^data:\s*", "", line) |
| line = line.strip() |
| print(line) |
|
|
| if not line: |
| continue |
|
|
| if re.match(r"^\[DONE\]", line): |
| content_type = "Finished" |
| delta_content = "" |
| logger.success("\n[Finished]") |
| is_finished = True |
| else: |
| print(line) |
| content_type = "Completions" |
| delta_content = "" |
| try: |
| data = json.loads(line, strict=False) |
| message_role = data["message"]["author"]["role"] |
| message_status = data["message"]["status"] |
| if message_role == "assistant" and message_status == "in_progress": |
| content = data["message"]["content"]["parts"][0] |
| if not len(content): |
| continue |
| delta_content = content[content_offset:] |
| content_offset = len(content) |
| if verbose: |
| logger.success(delta_content, end="") |
| else: |
| continue |
| except Exception as e: |
| logger.warn(e) |
|
|
| output = self.message_outputer.output( |
| content=delta_content, content_type=content_type |
| ) |
| yield output |
|
|
| if not is_finished: |
| yield self.message_outputer.output(content="", content_type="Finished") |
|
|
| def chat_return_dict(self, stream_response: requests.Response): |
| final_output = self.message_outputer.default_data.copy() |
| final_output["choices"] = [ |
| { |
| "index": 0, |
| "finish_reason": "stop", |
| "message": {"role": "assistant", "content": ""}, |
| } |
| ] |
| final_content = "" |
| for item in self.chat_return_generator(stream_response): |
| print(item) |
| try: |
| data = json.loads(item) |
| delta = data["choices"][0]["delta"] |
| delta_content = delta.get("content", "") |
| if delta_content: |
| final_content += delta_content |
| except Exception as e: |
| logger.warn(e) |
| final_output["choices"][0]["message"]["content"] = final_content.strip() |
| return final_output |
|
|
|
|
| if __name__ == "__main__": |
| streamer = OpenaiStreamer() |
| messages = [ |
| { |
| "role": "system", |
| "content": "You are an LLM developed by NiansuhAI.\nYour name is Niansuh-Copilot.", |
| }, |
| {"role": "user", "content": "Hello, what is your role?"}, |
| {"role": "assistant", "content": "I am an LLM."}, |
| {"role": "user", "content": "What is your name?"}, |
| ] |
|
|
| streamer.chat_response(messages=messages, iter_lines=True, verbose=True) |
| |
|
|