| import os |
| import hashlib |
| import random |
| import string |
| import time |
| import requests |
| import json |
|
|
| class ConanClient: |
| def __init__(self, ak, sk, url, timeout=30): |
| """ |
| Initialize the Client |
| |
| Args: |
| ak: Access Key |
| sk: Secret Key |
| url: Server URL |
| timeout: Request timeout in seconds (default: 30) |
| """ |
| self.ak = ak |
| self.sk = sk |
| self.url = url |
| self.timeout = timeout |
|
|
| def __random_password(self, size=40, chars=None): |
| if chars is None: |
| chars = string.ascii_uppercase + string.ascii_lowercase + string.digits |
| random_chars = random.SystemRandom().choice |
| return "".join(random_chars(chars) for _ in range(size)) |
|
|
| def __signature(self, random_str, time_stamp): |
| params_str = "%s:%d:%s:%s" % (self.ak, time_stamp, random_str, self.sk) |
| encoded_params_str = params_str.encode("utf-8") |
| return hashlib.md5(encoded_params_str).hexdigest() |
|
|
| def get_signature(self): |
| timestamp = int(time.time()) |
| random_str = self.__random_password(20) |
| sig = self.__signature(random_str, timestamp) |
| params = { |
| "timestamp": timestamp, |
| "random": random_str, |
| "app_id": self.ak, |
| "sign": sig, |
| } |
| return params |
|
|
| def embed(self, text): |
| """ |
| Embed text using the server |
| |
| Args: |
| text: The input text to embed |
| |
| Returns: |
| requests.Response object |
| """ |
| params = self.get_signature() |
| params["body"] = text |
| params["content_id"] = f"test_{int(time.time())}" |
|
|
| headers = {"Content-Type": "application/json"} |
|
|
| rsp = requests.post(self.url, data=json.dumps(params), timeout=self.timeout, headers=headers) |
| result = json.loads(rsp.text) |
| return result |