| import asyncio |
| import json |
| import pprint |
| import requests |
| from web3 import Web3 |
| from phi.tools import Toolkit |
| from phi.utils.log import logger |
| from src.libs.web3 import get_web3_instance |
| from src.libs.helper_functions import get_headers, get_private_key |
| from src.libs.token_approval_helper import TokenApprovalHelper |
| from src.libs.rpc_client import rpc_call |
|
|
| class CryptoSwapTools(Toolkit): |
| def __init__(self, web3: Web3 = get_web3_instance()): |
| super().__init__(name="swap_tools") |
|
|
| |
| self.web3 = web3 |
|
|
| |
| self.token_approval_helper = TokenApprovalHelper(web3, get_private_key()) |
|
|
| |
| self.register(self.get_swap_quote) |
| self.register(self.get_swap_price) |
| self.register(self.get_swap_sources) |
| self.register(self.execute_swap) |
|
|
| def get_swap_quote(self, buy_token: str, sell_token: str, sell_amount: str, chain: str = "base") -> str: |
| """ |
| Fetches a swap quote from the 0x Swap API. |
| |
| Args: |
| buy_token (str): The token to buy (e.g., 'DAI'). |
| sell_token (str): The token to sell (e.g., 'ETH'). |
| sell_amount (str): The amount of the sell token to swap, in the smallest unit (e.g., wei for ETH). |
| |
| Returns: |
| dict: A dictionary containing the swap quote details. |
| |
| Example: |
| >>> get_swap_quote('DAI', 'ETH', '1000000000000000000') |
| """ |
| logger.info(f"Fetching swap quote: buying {buy_token} with {sell_token} amount {sell_amount}") |
| try: |
| params = { |
| 'buyToken': buy_token, |
| 'sellToken': sell_token, |
| 'sellAmount': sell_amount, |
| 'wallet': { |
| "userEmail": "none", |
| "chain": chain, |
| "testnet": True, |
| "gasless": True, |
| "connected": True |
| } |
| } |
| response = asyncio.run(rpc_call(method_name="getSwapQuote", params=params)) |
| return f"{response}" |
| except requests.exceptions.RequestException as e: |
| logger.warning(f"Failed to get swap quote: {e}") |
| |
| return f"Error: {e}" |
|
|
| def get_swap_price(self, buy_token: str, sell_token: str, buy_amount: str, chain: str = "base") -> str: |
| """ |
| Fetches the price for a swap from the 0x Swap API. |
| |
| Args: |
| buy_token (str): The token to buy. |
| sell_token (str): The token to sell. |
| buy_amount (str): The amount of the buy token, in the smallest unit. |
| |
| Returns: |
| dict: A dictionary containing the swap price details. |
| |
| Example: |
| >>> get_swap_price('DAI', 'ETH', '1000000000000000000') |
| """ |
| logger.info(f"Fetching swap price: buying {buy_token} with {sell_token} amount {buy_amount}") |
| try: |
| params = { |
| 'buyToken': buy_token, |
| 'sellToken': sell_token, |
| 'buyAmount': buy_amount, |
| 'wallet': { |
| "userEmail": "none", |
| "chain": chain, |
| "testnet": True, |
| "gasless": True, |
| "connected": True |
| } |
| } |
| response = asyncio.run(rpc_call(method_name="getSwapPrice", params=params)) |
| return f"{response}" |
| except requests.exceptions.RequestException as e: |
| logger.warning(f"Failed to get swap price: {e}") |
| |
| return f"Error: {e}" |
|
|
| def get_swap_sources(self) -> str: |
| """ |
| Fetches the list of liquidity sources from the 0x Swap API. |
| |
| Returns: |
| dict: A dictionary containing the list of liquidity sources. |
| |
| Example: |
| >>> get_swap_sources() |
| """ |
| logger.info("Fetching swap sources") |
|
|
| response = asyncio.run(rpc_call(method_name="getSwapSources")) |
| return f"{response}" |
|
|
| def execute_swap( |
| self, |
| buy_token: str, |
| sell_token: str, |
| sell_amount: str, |
| user_email: str, |
| chain: str = "base", |
| ) -> str: |
| """ |
| Executes a swap using the 0x Swap API. this is a sensitive function, show a preview and allow user to give final permission to execute function |
| |
| Args: |
| -buy_token (str): The token to buy (e.g., 'DAI'). |
| -sell_token (str): The token to sell (e.g., 'ETH'). |
| -sell_amount (str): The amount of the sell token to swap, in the smallest unit (e.g., wei for ETH). |
| - user_email (str): The email of the user for whom the wallet is being fetched. |
| - chain (str): The EVM chain for which the wallet is being fetched and used to execute this transaction. use chain base as default |
| |
| Returns: |
| dict: The transaction receipt of the swap transaction. |
| |
| Example: |
| >>> execute_swap('DAI', 'ETH', '1000000000000000000', 'userEmail', 'base') |
| """ |
|
|
| logger.info("executing swap") |
| try: |
| params = { |
| 'buyToken': buy_token, |
| 'sellToken': sell_token, |
| 'sellAmount': sell_amount, |
| 'wallet': { |
| "userEmail": user_email, |
| "chain": chain, |
| "testnet": True, |
| "gasless": True, |
| "connected": True |
| } |
| } |
| response = asyncio.run(rpc_call(method_name="swapTokens", params=params)) |
| logger.info(f"Swap transaction receipt: {response}") |
| return f"{response}" |
| except requests.exceptions.RequestException as e: |
| logger.warning(f"Failed to execute swap: {e}") |
| |
| return f"Error: {e}" |
|
|