| import httpx, uuid, os |
| from typing import Optional, Union |
| from dotenv import load_dotenv |
|
|
| load_dotenv() |
|
|
| rpc_server_url = os.getenv('CHATXBT_RPC_SERVER_URL') |
|
|
| async def rpc_call( |
| method_name: str, |
| params: Optional[Union[dict, list]] = None, |
| url: str = rpc_server_url, |
| timeout: int = 900 |
| ) -> dict: |
| """ |
| This function makes an RPC call to the specified URL with the given method name and parameters. |
| |
| Args: |
| method_name (str): The name of the RPC method to be called. |
| params (Optional[Union[dict, list]], optional): Optional parameters for the RPC method. Defaults to None. |
| url (str, optional): The URL of the RPC server. Defaults to rpc_server_url. |
| |
| Returns: |
| dict: The JSON response from the RPC server. |
| |
| Raises: |
| httpx.RequestError: If an error occurs while making the RPC call. |
| |
| """ |
| headers = { |
| 'Content-Type': 'application/json', |
| } |
|
|
| auth = httpx.BasicAuth( |
| username=os.getenv('CHATXBT_RPC_SERVER_BASIC_AUTH_USERNAME'), |
| password=os.getenv('CHATXBT_RPC_SERVER_BASIC_AUTH_PASSWORD') |
| ) |
|
|
| payload = { |
| 'method': method_name, |
| 'params': params, |
| 'jsonrpc': '2.0', |
| 'id': str(uuid.uuid4()), |
| } |
|
|
| try: |
| async with httpx.AsyncClient(timeout=timeout) as client: |
| response = await client.post(url, json=payload, headers=headers, auth=auth) |
| response.raise_for_status() |
| return response.json() |
| except httpx.RequestError as e: |
| print(f"Error making RPC call: {e}") |
| raise ValueError(f"Error making RPC call: {e}") |
|
|