| """ |
| On-Chain Executor β Construct unsigned transactions for Mantle L2 |
| ================================================================= |
| Follows the mantle-agent-scaffold pattern: construct-only safety. |
| Never handles private keys. Returns unsigned transaction payloads |
| that must be signed externally (e.g., via ERC-4337 wallet). |
| |
| Supported DEXes: Fluxion (primary), Agni, Merchant Moe |
| Supported Lending: Aave V3 (Lendle) |
| """ |
|
|
| import logging |
| import time |
| from dataclasses import dataclass, field |
| from enum import Enum |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| from eth_abi import encode as abi_encode |
| from eth_utils import function_signature_to_4byte_selector |
| import json |
|
|
| from config.constants import ( |
| MANTLE_CHAIN_ID, |
| MANTLE_RPC_URL, |
| Tokens, |
| DEXRouters, |
| LendingProtocols, |
| ERC20_ABI, |
| SWAP_ROUTER_ABI, |
| AAVE_POOL_ABI, |
| PortfolioConfig, |
| ) |
|
|
| logger = logging.getLogger("executor") |
|
|
|
|
| @dataclass |
| class UnsignedTx: |
| """ |
| Unsigned transaction payload β the executor's output. |
| Must be signed by an external wallet (e.g., ERC-4337 bundler). |
| |
| Follows mantle-agent-scaffold's UnsignedTxResult pattern. |
| """ |
| to: str |
| data: str |
| value: str = "0x0" |
| chain_id: int = MANTLE_CHAIN_ID |
| gas_limit: Optional[int] = None |
| human_summary: str = "" |
| |
| def to_dict(self) -> Dict[str, Any]: |
| d = { |
| "to": self.to, |
| "data": self.data, |
| "value": self.value, |
| "chainId": self.chain_id, |
| "human_summary": self.human_summary, |
| } |
| if self.gas_limit: |
| d["gasLimit"] = hex(self.gas_limit) |
| return d |
|
|
|
|
| @dataclass |
| class RebalancePlan: |
| """Complete rebalancing execution plan.""" |
| timestamp: float |
| current_weights: List[float] |
| target_weights: List[float] |
| trades: List[UnsignedTx] |
| approvals: List[UnsignedTx] |
| estimated_gas_usd: float |
| estimated_slippage_bps: int |
| human_summary: str |
| |
| def to_dict(self) -> Dict: |
| return { |
| "timestamp": self.timestamp, |
| "current_weights": self.current_weights, |
| "target_weights": self.target_weights, |
| "n_trades": len(self.trades), |
| "n_approvals": len(self.approvals), |
| "estimated_gas_usd": self.estimated_gas_usd, |
| "estimated_slippage_bps": self.estimated_slippage_bps, |
| "human_summary": self.human_summary, |
| "trades": [t.to_dict() for t in self.trades], |
| "approvals": [a.to_dict() for a in self.approvals], |
| } |
|
|
|
|
| class SwapRoute(Enum): |
| """Available swap routing options.""" |
| FLUXION = "fluxion" |
| AGNI = "agni" |
| MERCHANT_MOE = "merchant_moe" |
|
|
|
|
| |
|
|
| def encode_function_call(sig: str, args: list) -> str: |
| """Encode a Solidity function call to hex calldata.""" |
| selector = function_signature_to_4byte_selector(sig).hex() |
| if not args: |
| return "0x" + selector |
| |
| |
| param_str = sig[sig.index("(") + 1 : sig.index(")")] |
| param_types = [t.strip() for t in param_str.split(",")] if param_str else [] |
| |
| encoded = abi_encode(param_types, args).hex() |
| return "0x" + selector + encoded |
|
|
|
|
| def encode_erc20_approve(spender: str, amount: int) -> str: |
| """Encode ERC20 approve(spender, amount).""" |
| return encode_function_call( |
| "approve(address,uint256)", |
| [bytes.fromhex(spender[2:]), amount] |
| ) |
|
|
|
|
| |
|
|
| TOKEN_DECIMALS = { |
| "USDY": 18, |
| "mETH": 18, |
| "MI4": 18, |
| "USDC": 6, |
| "USDT": 6, |
| "WMNT": 18, |
| "WETH": 18, |
| } |
|
|
| TOKEN_ADDRESSES = { |
| "USDY": Tokens.USDY, |
| "mETH": Tokens.METH, |
| "USDC": Tokens.USDC, |
| "USDT": Tokens.USDT, |
| "WMNT": Tokens.WMNT, |
| "WETH": Tokens.WETH, |
| } |
|
|
| |
| FEE_TIERS = [100, 500, 3000, 10000] |
|
|
|
|
| |
|
|
| class OnChainExecutor: |
| """ |
| Constructs unsigned transactions for portfolio rebalancing. |
| |
| Safety guarantees: |
| - NEVER handles private keys |
| - ONLY interacts with whitelisted contracts |
| - ALL outputs are unsigned transaction payloads |
| - Human-readable summaries for every action |
| """ |
| |
| WHITELISTED_CONTRACTS = { |
| DEXRouters.FLUXION_SWAP_ROUTER, |
| DEXRouters.AGNI_SWAP_ROUTER, |
| DEXRouters.MERCHANT_MOE_ROUTER, |
| LendingProtocols.AAVE_V3_POOL, |
| Tokens.USDY, |
| Tokens.METH, |
| Tokens.USDC, |
| Tokens.WMNT, |
| } |
| |
| def __init__( |
| self, |
| wallet_address: str, |
| portfolio_config: Optional[PortfolioConfig] = None, |
| preferred_dex: SwapRoute = SwapRoute.FLUXION, |
| ): |
| self.wallet = wallet_address |
| self.config = portfolio_config or PortfolioConfig() |
| self.preferred_dex = preferred_dex |
| |
| |
| self.execution_log: List[Dict] = [] |
| |
| def is_whitelisted(self, address: str) -> bool: |
| """Check if contract address is in the whitelist.""" |
| return address.lower() in {a.lower() for a in self.WHITELISTED_CONTRACTS} |
| |
| def build_rebalance_plan( |
| self, |
| current_weights: List[float], |
| target_weights: List[float], |
| portfolio_value_usd: float, |
| asset_prices: Dict[str, float], |
| max_slippage_bps: Optional[int] = None, |
| ) -> RebalancePlan: |
| """ |
| Build a complete rebalancing plan from current to target weights. |
| |
| 1. Compute required trades (sell overweight, buy underweight) |
| 2. Route through best DEX |
| 3. Generate approval + swap transactions |
| 4. Estimate gas costs |
| |
| Args: |
| current_weights: [USDY_w, mETH_w, MI4_w] |
| target_weights: [USDY_w, mETH_w, MI4_w] |
| portfolio_value_usd: Total portfolio value |
| asset_prices: {"USDY": 1.0, "mETH": 3200.0, "MI4": 100.0} |
| max_slippage_bps: Override max slippage |
| |
| Returns: |
| RebalancePlan with all unsigned transactions |
| """ |
| slippage = max_slippage_bps or self.config.max_slippage_bps |
| assets = ["USDY", "mETH", "MI4"] |
| |
| |
| deltas = [target_weights[i] - current_weights[i] for i in range(3)] |
| |
| |
| sells = [] |
| buys = [] |
| for i, (asset, delta) in enumerate(zip(assets, deltas)): |
| if abs(delta) < 0.005: |
| continue |
| usd_amount = abs(delta) * portfolio_value_usd |
| token_amount = usd_amount / max(asset_prices.get(asset, 1.0), 0.01) |
| |
| if delta < 0: |
| sells.append((asset, token_amount, usd_amount)) |
| else: |
| buys.append((asset, token_amount, usd_amount)) |
| |
| |
| approvals = [] |
| trades = [] |
| summary_parts = [] |
| |
| |
| for sell_asset, sell_amount, sell_usd in sells: |
| |
| decimals = TOKEN_DECIMALS.get(sell_asset, 18) |
| amount_raw = int(sell_amount * (10 ** decimals)) |
| |
| |
| router = self._get_router_address() |
| approval_tx = self._build_approval( |
| token=TOKEN_ADDRESSES[sell_asset], |
| spender=router, |
| amount=amount_raw, |
| summary=f"Approve {sell_amount:.4f} {sell_asset} for swap", |
| ) |
| approvals.append(approval_tx) |
| |
| |
| swap_tx = self._build_swap( |
| token_in=TOKEN_ADDRESSES[sell_asset], |
| token_out=Tokens.USDC, |
| amount_in=amount_raw, |
| min_amount_out=int(sell_usd * (10 ** 6) * (1 - slippage / 10000)), |
| fee=3000, |
| summary=f"Sell {sell_amount:.4f} {sell_asset} (~${sell_usd:.2f}) β USDC", |
| ) |
| trades.append(swap_tx) |
| summary_parts.append(f"SELL {sell_amount:.4f} {sell_asset} (${sell_usd:.0f})") |
| |
| |
| for buy_asset, buy_amount, buy_usd in buys: |
| decimals_in = TOKEN_DECIMALS["USDC"] |
| amount_in_raw = int(buy_usd * (10 ** decimals_in)) |
| |
| decimals_out = TOKEN_DECIMALS.get(buy_asset, 18) |
| min_out = int(buy_amount * (10 ** decimals_out) * (1 - slippage / 10000)) |
| |
| |
| router = self._get_router_address() |
| approval_tx = self._build_approval( |
| token=Tokens.USDC, |
| spender=router, |
| amount=amount_in_raw, |
| summary=f"Approve {buy_usd:.2f} USDC for {buy_asset} purchase", |
| ) |
| approvals.append(approval_tx) |
| |
| |
| swap_tx = self._build_swap( |
| token_in=Tokens.USDC, |
| token_out=TOKEN_ADDRESSES.get(buy_asset, Tokens.USDY), |
| amount_in=amount_in_raw, |
| min_amount_out=min_out, |
| fee=3000, |
| summary=f"Buy {buy_amount:.4f} {buy_asset} (~${buy_usd:.2f}) with USDC", |
| ) |
| trades.append(swap_tx) |
| summary_parts.append(f"BUY {buy_amount:.4f} {buy_asset} (${buy_usd:.0f})") |
| |
| |
| gas_per_swap = 250_000 |
| gas_per_approval = 50_000 |
| total_gas = len(trades) * gas_per_swap + len(approvals) * gas_per_approval |
| gas_price_gwei = 0.02 |
| gas_cost_mnt = total_gas * gas_price_gwei / 1e9 |
| mnt_price = 0.70 |
| gas_cost_usd = gas_cost_mnt * mnt_price |
| |
| plan = RebalancePlan( |
| timestamp=time.time(), |
| current_weights=current_weights, |
| target_weights=target_weights, |
| trades=trades, |
| approvals=approvals, |
| estimated_gas_usd=gas_cost_usd, |
| estimated_slippage_bps=slippage, |
| human_summary=( |
| f"Rebalance: {' | '.join(summary_parts) or 'No trades needed'}\n" |
| f"Trades: {len(trades)}, Approvals: {len(approvals)}, " |
| f"Est. gas: ${gas_cost_usd:.4f}" |
| ), |
| ) |
| |
| logger.info(f"Rebalance plan: {plan.human_summary}") |
| self.execution_log.append(plan.to_dict()) |
| return plan |
| |
| def _get_router_address(self) -> str: |
| """Get the router address for the preferred DEX.""" |
| if self.preferred_dex == SwapRoute.FLUXION: |
| return DEXRouters.FLUXION_SWAP_ROUTER |
| elif self.preferred_dex == SwapRoute.AGNI: |
| return DEXRouters.AGNI_SWAP_ROUTER |
| elif self.preferred_dex == SwapRoute.MERCHANT_MOE: |
| return DEXRouters.MERCHANT_MOE_ROUTER |
| return DEXRouters.FLUXION_SWAP_ROUTER |
| |
| def _build_approval( |
| self, token: str, spender: str, amount: int, summary: str |
| ) -> UnsignedTx: |
| """Build ERC20 approve transaction.""" |
| if not self.is_whitelisted(token) or not self.is_whitelisted(spender): |
| raise ValueError(f"Contract not whitelisted: token={token}, spender={spender}") |
| |
| |
| selector = "095ea7b3" |
| spender_padded = spender[2:].lower().zfill(64) |
| amount_hex = hex(amount)[2:].zfill(64) |
| calldata = "0x" + selector + spender_padded + amount_hex |
| |
| return UnsignedTx( |
| to=token, |
| data=calldata, |
| human_summary=summary, |
| gas_limit=60_000, |
| ) |
| |
| def _build_swap( |
| self, |
| token_in: str, |
| token_out: str, |
| amount_in: int, |
| min_amount_out: int, |
| fee: int = 3000, |
| summary: str = "", |
| ) -> UnsignedTx: |
| """Build Uniswap V3-style exactInputSingle swap transaction.""" |
| router = self._get_router_address() |
| |
| if not self.is_whitelisted(router): |
| raise ValueError(f"Router not whitelisted: {router}") |
| |
| |
| selector = "414bf389" |
| deadline = int(time.time()) + 1800 |
| |
| |
| params = ( |
| token_in[2:].lower().zfill(64) + |
| token_out[2:].lower().zfill(64) + |
| hex(fee)[2:].zfill(64) + |
| self.wallet[2:].lower().zfill(64) + |
| hex(deadline)[2:].zfill(64) + |
| hex(amount_in)[2:].zfill(64) + |
| hex(min_amount_out)[2:].zfill(64) + |
| "0" * 64 |
| ) |
| |
| calldata = "0x" + selector + params |
| |
| return UnsignedTx( |
| to=router, |
| data=calldata, |
| human_summary=summary, |
| gas_limit=300_000, |
| ) |
| |
| def build_aave_supply( |
| self, asset: str, amount: int, asset_symbol: str = "USDY" |
| ) -> List[UnsignedTx]: |
| """Build Aave V3 supply transactions (approval + supply).""" |
| pool = LendingProtocols.AAVE_V3_POOL |
| |
| |
| approve_tx = self._build_approval( |
| token=asset, |
| spender=pool, |
| amount=amount, |
| summary=f"Approve {asset_symbol} for Aave V3 supply", |
| ) |
| |
| |
| selector = "617ba037" |
| calldata = "0x" + selector + ( |
| asset[2:].lower().zfill(64) + |
| hex(amount)[2:].zfill(64) + |
| self.wallet[2:].lower().zfill(64) + |
| "0" * 64 |
| ) |
| |
| supply_tx = UnsignedTx( |
| to=pool, |
| data=calldata, |
| human_summary=f"Supply {asset_symbol} to Aave V3 for additional yield", |
| gas_limit=300_000, |
| ) |
| |
| return [approve_tx, supply_tx] |
| |
| def build_aave_withdraw( |
| self, asset: str, amount: int, asset_symbol: str = "USDY" |
| ) -> UnsignedTx: |
| """Build Aave V3 withdraw transaction.""" |
| pool = LendingProtocols.AAVE_V3_POOL |
| |
| |
| selector = "69328dec" |
| calldata = "0x" + selector + ( |
| asset[2:].lower().zfill(64) + |
| hex(amount)[2:].zfill(64) + |
| self.wallet[2:].lower().zfill(64) |
| ) |
| |
| return UnsignedTx( |
| to=pool, |
| data=calldata, |
| human_summary=f"Withdraw {asset_symbol} from Aave V3", |
| gas_limit=250_000, |
| ) |
| |
| def get_execution_history(self) -> List[Dict]: |
| """Return execution log.""" |
| return self.execution_log |
|
|