""" On-Chain Executor — Construct unsigned transactions for Mantle L2 ================================================================= Follows the mantle-agent-scaffold pattern: construct-only safety. Never handles private keys. Returns unsigned transaction payloads that must be signed externally (e.g., via ERC-4337 wallet). Supported DEXes: Fluxion (primary), Agni, Merchant Moe Supported Lending: Aave V3 (Lendle) """ import logging import time from dataclasses import dataclass, field from enum import Enum from typing import Any, Dict, List, Optional, Tuple from eth_abi import encode as abi_encode from eth_utils import function_signature_to_4byte_selector import json from config.constants import ( MANTLE_CHAIN_ID, MANTLE_RPC_URL, Tokens, DEXRouters, LendingProtocols, ERC20_ABI, SWAP_ROUTER_ABI, AAVE_POOL_ABI, PortfolioConfig, ) logger = logging.getLogger("executor") @dataclass class UnsignedTx: """ Unsigned transaction payload — the executor's output. Must be signed by an external wallet (e.g., ERC-4337 bundler). Follows mantle-agent-scaffold's UnsignedTxResult pattern. """ to: str data: str # hex-encoded calldata value: str = "0x0" # msg.value in hex chain_id: int = MANTLE_CHAIN_ID gas_limit: Optional[int] = None human_summary: str = "" def to_dict(self) -> Dict[str, Any]: d = { "to": self.to, "data": self.data, "value": self.value, "chainId": self.chain_id, "human_summary": self.human_summary, } if self.gas_limit: d["gasLimit"] = hex(self.gas_limit) return d @dataclass class RebalancePlan: """Complete rebalancing execution plan.""" timestamp: float current_weights: List[float] target_weights: List[float] trades: List[UnsignedTx] approvals: List[UnsignedTx] estimated_gas_usd: float estimated_slippage_bps: int human_summary: str def to_dict(self) -> Dict: return { "timestamp": self.timestamp, "current_weights": self.current_weights, "target_weights": self.target_weights, "n_trades": len(self.trades), "n_approvals": len(self.approvals), "estimated_gas_usd": self.estimated_gas_usd, "estimated_slippage_bps": self.estimated_slippage_bps, "human_summary": self.human_summary, "trades": [t.to_dict() for t in self.trades], "approvals": [a.to_dict() for a in self.approvals], } class SwapRoute(Enum): """Available swap routing options.""" FLUXION = "fluxion" AGNI = "agni" MERCHANT_MOE = "merchant_moe" # ─────────────────────── ABI Encoding Helpers ─────────────────────── def encode_function_call(sig: str, args: list) -> str: """Encode a Solidity function call to hex calldata.""" selector = function_signature_to_4byte_selector(sig).hex() if not args: return "0x" + selector # Map signature to types param_str = sig[sig.index("(") + 1 : sig.index(")")] param_types = [t.strip() for t in param_str.split(",")] if param_str else [] encoded = abi_encode(param_types, args).hex() return "0x" + selector + encoded def encode_erc20_approve(spender: str, amount: int) -> str: """Encode ERC20 approve(spender, amount).""" return encode_function_call( "approve(address,uint256)", [bytes.fromhex(spender[2:]), amount] ) # ─────────────────────── Token Registry ───────────────────────────── TOKEN_DECIMALS = { "USDY": 18, "mETH": 18, "MI4": 18, "USDC": 6, "USDT": 6, "WMNT": 18, "WETH": 18, } TOKEN_ADDRESSES = { "USDY": Tokens.USDY, "mETH": Tokens.METH, "USDC": Tokens.USDC, "USDT": Tokens.USDT, "WMNT": Tokens.WMNT, "WETH": Tokens.WETH, } # Fluxion fee tiers (Uni V3 compatible) FEE_TIERS = [100, 500, 3000, 10000] # 0.01%, 0.05%, 0.3%, 1% # ─────────────────────── Executor ─────────────────────────────────── class OnChainExecutor: """ Constructs unsigned transactions for portfolio rebalancing. Safety guarantees: - NEVER handles private keys - ONLY interacts with whitelisted contracts - ALL outputs are unsigned transaction payloads - Human-readable summaries for every action """ WHITELISTED_CONTRACTS = { DEXRouters.FLUXION_SWAP_ROUTER, DEXRouters.AGNI_SWAP_ROUTER, DEXRouters.MERCHANT_MOE_ROUTER, LendingProtocols.AAVE_V3_POOL, Tokens.USDY, Tokens.METH, Tokens.USDC, Tokens.WMNT, } def __init__( self, wallet_address: str, portfolio_config: Optional[PortfolioConfig] = None, preferred_dex: SwapRoute = SwapRoute.FLUXION, ): self.wallet = wallet_address self.config = portfolio_config or PortfolioConfig() self.preferred_dex = preferred_dex # Execution history self.execution_log: List[Dict] = [] def is_whitelisted(self, address: str) -> bool: """Check if contract address is in the whitelist.""" return address.lower() in {a.lower() for a in self.WHITELISTED_CONTRACTS} def build_rebalance_plan( self, current_weights: List[float], target_weights: List[float], portfolio_value_usd: float, asset_prices: Dict[str, float], max_slippage_bps: Optional[int] = None, ) -> RebalancePlan: """ Build a complete rebalancing plan from current to target weights. 1. Compute required trades (sell overweight, buy underweight) 2. Route through best DEX 3. Generate approval + swap transactions 4. Estimate gas costs Args: current_weights: [USDY_w, mETH_w, MI4_w] target_weights: [USDY_w, mETH_w, MI4_w] portfolio_value_usd: Total portfolio value asset_prices: {"USDY": 1.0, "mETH": 3200.0, "MI4": 100.0} max_slippage_bps: Override max slippage Returns: RebalancePlan with all unsigned transactions """ slippage = max_slippage_bps or self.config.max_slippage_bps assets = ["USDY", "mETH", "MI4"] # Calculate weight deltas deltas = [target_weights[i] - current_weights[i] for i in range(3)] # Determine sells (negative delta) and buys (positive delta) sells = [] buys = [] for i, (asset, delta) in enumerate(zip(assets, deltas)): if abs(delta) < 0.005: # skip if delta < 0.5% continue usd_amount = abs(delta) * portfolio_value_usd token_amount = usd_amount / max(asset_prices.get(asset, 1.0), 0.01) if delta < 0: sells.append((asset, token_amount, usd_amount)) else: buys.append((asset, token_amount, usd_amount)) # Build transactions approvals = [] trades = [] summary_parts = [] # Process sells first (to get intermediate tokens for buys) for sell_asset, sell_amount, sell_usd in sells: # Route: sell_asset → USDC (intermediate) decimals = TOKEN_DECIMALS.get(sell_asset, 18) amount_raw = int(sell_amount * (10 ** decimals)) # Build approval router = self._get_router_address() approval_tx = self._build_approval( token=TOKEN_ADDRESSES[sell_asset], spender=router, amount=amount_raw, summary=f"Approve {sell_amount:.4f} {sell_asset} for swap", ) approvals.append(approval_tx) # Build swap swap_tx = self._build_swap( token_in=TOKEN_ADDRESSES[sell_asset], token_out=Tokens.USDC, amount_in=amount_raw, min_amount_out=int(sell_usd * (10 ** 6) * (1 - slippage / 10000)), fee=3000, summary=f"Sell {sell_amount:.4f} {sell_asset} (~${sell_usd:.2f}) → USDC", ) trades.append(swap_tx) summary_parts.append(f"SELL {sell_amount:.4f} {sell_asset} (${sell_usd:.0f})") # Process buys for buy_asset, buy_amount, buy_usd in buys: decimals_in = TOKEN_DECIMALS["USDC"] amount_in_raw = int(buy_usd * (10 ** decimals_in)) decimals_out = TOKEN_DECIMALS.get(buy_asset, 18) min_out = int(buy_amount * (10 ** decimals_out) * (1 - slippage / 10000)) # Approve USDC router = self._get_router_address() approval_tx = self._build_approval( token=Tokens.USDC, spender=router, amount=amount_in_raw, summary=f"Approve {buy_usd:.2f} USDC for {buy_asset} purchase", ) approvals.append(approval_tx) # Swap USDC → buy_asset swap_tx = self._build_swap( token_in=Tokens.USDC, token_out=TOKEN_ADDRESSES.get(buy_asset, Tokens.USDY), amount_in=amount_in_raw, min_amount_out=min_out, fee=3000, summary=f"Buy {buy_amount:.4f} {buy_asset} (~${buy_usd:.2f}) with USDC", ) trades.append(swap_tx) summary_parts.append(f"BUY {buy_amount:.4f} {buy_asset} (${buy_usd:.0f})") # Estimate gas gas_per_swap = 250_000 # typical Uni V3 swap gas_per_approval = 50_000 total_gas = len(trades) * gas_per_swap + len(approvals) * gas_per_approval gas_price_gwei = 0.02 # Mantle is cheap gas_cost_mnt = total_gas * gas_price_gwei / 1e9 mnt_price = 0.70 # approximate gas_cost_usd = gas_cost_mnt * mnt_price plan = RebalancePlan( timestamp=time.time(), current_weights=current_weights, target_weights=target_weights, trades=trades, approvals=approvals, estimated_gas_usd=gas_cost_usd, estimated_slippage_bps=slippage, human_summary=( f"Rebalance: {' | '.join(summary_parts) or 'No trades needed'}\n" f"Trades: {len(trades)}, Approvals: {len(approvals)}, " f"Est. gas: ${gas_cost_usd:.4f}" ), ) logger.info(f"Rebalance plan: {plan.human_summary}") self.execution_log.append(plan.to_dict()) return plan def _get_router_address(self) -> str: """Get the router address for the preferred DEX.""" if self.preferred_dex == SwapRoute.FLUXION: return DEXRouters.FLUXION_SWAP_ROUTER elif self.preferred_dex == SwapRoute.AGNI: return DEXRouters.AGNI_SWAP_ROUTER elif self.preferred_dex == SwapRoute.MERCHANT_MOE: return DEXRouters.MERCHANT_MOE_ROUTER return DEXRouters.FLUXION_SWAP_ROUTER def _build_approval( self, token: str, spender: str, amount: int, summary: str ) -> UnsignedTx: """Build ERC20 approve transaction.""" if not self.is_whitelisted(token) or not self.is_whitelisted(spender): raise ValueError(f"Contract not whitelisted: token={token}, spender={spender}") # approve(address,uint256) selector = "095ea7b3" spender_padded = spender[2:].lower().zfill(64) amount_hex = hex(amount)[2:].zfill(64) calldata = "0x" + selector + spender_padded + amount_hex return UnsignedTx( to=token, data=calldata, human_summary=summary, gas_limit=60_000, ) def _build_swap( self, token_in: str, token_out: str, amount_in: int, min_amount_out: int, fee: int = 3000, summary: str = "", ) -> UnsignedTx: """Build Uniswap V3-style exactInputSingle swap transaction.""" router = self._get_router_address() if not self.is_whitelisted(router): raise ValueError(f"Router not whitelisted: {router}") # exactInputSingle((address,address,uint24,address,uint256,uint256,uint256,uint160)) selector = "414bf389" deadline = int(time.time()) + 1800 # 30 min deadline # ABI encode the tuple params = ( token_in[2:].lower().zfill(64) + token_out[2:].lower().zfill(64) + hex(fee)[2:].zfill(64) + self.wallet[2:].lower().zfill(64) + hex(deadline)[2:].zfill(64) + hex(amount_in)[2:].zfill(64) + hex(min_amount_out)[2:].zfill(64) + "0" * 64 # sqrtPriceLimitX96 = 0 (no limit) ) calldata = "0x" + selector + params return UnsignedTx( to=router, data=calldata, human_summary=summary, gas_limit=300_000, ) def build_aave_supply( self, asset: str, amount: int, asset_symbol: str = "USDY" ) -> List[UnsignedTx]: """Build Aave V3 supply transactions (approval + supply).""" pool = LendingProtocols.AAVE_V3_POOL # Approval approve_tx = self._build_approval( token=asset, spender=pool, amount=amount, summary=f"Approve {asset_symbol} for Aave V3 supply", ) # supply(address,uint256,address,uint16) selector = "617ba037" calldata = "0x" + selector + ( asset[2:].lower().zfill(64) + hex(amount)[2:].zfill(64) + self.wallet[2:].lower().zfill(64) + "0" * 64 # referralCode = 0 ) supply_tx = UnsignedTx( to=pool, data=calldata, human_summary=f"Supply {asset_symbol} to Aave V3 for additional yield", gas_limit=300_000, ) return [approve_tx, supply_tx] def build_aave_withdraw( self, asset: str, amount: int, asset_symbol: str = "USDY" ) -> UnsignedTx: """Build Aave V3 withdraw transaction.""" pool = LendingProtocols.AAVE_V3_POOL # withdraw(address,uint256,address) selector = "69328dec" calldata = "0x" + selector + ( asset[2:].lower().zfill(64) + hex(amount)[2:].zfill(64) + self.wallet[2:].lower().zfill(64) ) return UnsignedTx( to=pool, data=calldata, human_summary=f"Withdraw {asset_symbol} from Aave V3", gas_limit=250_000, ) def get_execution_history(self) -> List[Dict]: """Return execution log.""" return self.execution_log