muthuk1's picture
Add agent/executor.py
1955a7b verified
"""
On-Chain Executor β€” Construct unsigned transactions for Mantle L2
=================================================================
Follows the mantle-agent-scaffold pattern: construct-only safety.
Never handles private keys. Returns unsigned transaction payloads
that must be signed externally (e.g., via ERC-4337 wallet).
Supported DEXes: Fluxion (primary), Agni, Merchant Moe
Supported Lending: Aave V3 (Lendle)
"""
import logging
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple
from eth_abi import encode as abi_encode
from eth_utils import function_signature_to_4byte_selector
import json
from config.constants import (
MANTLE_CHAIN_ID,
MANTLE_RPC_URL,
Tokens,
DEXRouters,
LendingProtocols,
ERC20_ABI,
SWAP_ROUTER_ABI,
AAVE_POOL_ABI,
PortfolioConfig,
)
logger = logging.getLogger("executor")
@dataclass
class UnsignedTx:
"""
Unsigned transaction payload β€” the executor's output.
Must be signed by an external wallet (e.g., ERC-4337 bundler).
Follows mantle-agent-scaffold's UnsignedTxResult pattern.
"""
to: str
data: str # hex-encoded calldata
value: str = "0x0" # msg.value in hex
chain_id: int = MANTLE_CHAIN_ID
gas_limit: Optional[int] = None
human_summary: str = ""
def to_dict(self) -> Dict[str, Any]:
d = {
"to": self.to,
"data": self.data,
"value": self.value,
"chainId": self.chain_id,
"human_summary": self.human_summary,
}
if self.gas_limit:
d["gasLimit"] = hex(self.gas_limit)
return d
@dataclass
class RebalancePlan:
"""Complete rebalancing execution plan."""
timestamp: float
current_weights: List[float]
target_weights: List[float]
trades: List[UnsignedTx]
approvals: List[UnsignedTx]
estimated_gas_usd: float
estimated_slippage_bps: int
human_summary: str
def to_dict(self) -> Dict:
return {
"timestamp": self.timestamp,
"current_weights": self.current_weights,
"target_weights": self.target_weights,
"n_trades": len(self.trades),
"n_approvals": len(self.approvals),
"estimated_gas_usd": self.estimated_gas_usd,
"estimated_slippage_bps": self.estimated_slippage_bps,
"human_summary": self.human_summary,
"trades": [t.to_dict() for t in self.trades],
"approvals": [a.to_dict() for a in self.approvals],
}
class SwapRoute(Enum):
"""Available swap routing options."""
FLUXION = "fluxion"
AGNI = "agni"
MERCHANT_MOE = "merchant_moe"
# ─────────────────────── ABI Encoding Helpers ───────────────────────
def encode_function_call(sig: str, args: list) -> str:
"""Encode a Solidity function call to hex calldata."""
selector = function_signature_to_4byte_selector(sig).hex()
if not args:
return "0x" + selector
# Map signature to types
param_str = sig[sig.index("(") + 1 : sig.index(")")]
param_types = [t.strip() for t in param_str.split(",")] if param_str else []
encoded = abi_encode(param_types, args).hex()
return "0x" + selector + encoded
def encode_erc20_approve(spender: str, amount: int) -> str:
"""Encode ERC20 approve(spender, amount)."""
return encode_function_call(
"approve(address,uint256)",
[bytes.fromhex(spender[2:]), amount]
)
# ─────────────────────── Token Registry ─────────────────────────────
TOKEN_DECIMALS = {
"USDY": 18,
"mETH": 18,
"MI4": 18,
"USDC": 6,
"USDT": 6,
"WMNT": 18,
"WETH": 18,
}
TOKEN_ADDRESSES = {
"USDY": Tokens.USDY,
"mETH": Tokens.METH,
"USDC": Tokens.USDC,
"USDT": Tokens.USDT,
"WMNT": Tokens.WMNT,
"WETH": Tokens.WETH,
}
# Fluxion fee tiers (Uni V3 compatible)
FEE_TIERS = [100, 500, 3000, 10000] # 0.01%, 0.05%, 0.3%, 1%
# ─────────────────────── Executor ───────────────────────────────────
class OnChainExecutor:
"""
Constructs unsigned transactions for portfolio rebalancing.
Safety guarantees:
- NEVER handles private keys
- ONLY interacts with whitelisted contracts
- ALL outputs are unsigned transaction payloads
- Human-readable summaries for every action
"""
WHITELISTED_CONTRACTS = {
DEXRouters.FLUXION_SWAP_ROUTER,
DEXRouters.AGNI_SWAP_ROUTER,
DEXRouters.MERCHANT_MOE_ROUTER,
LendingProtocols.AAVE_V3_POOL,
Tokens.USDY,
Tokens.METH,
Tokens.USDC,
Tokens.WMNT,
}
def __init__(
self,
wallet_address: str,
portfolio_config: Optional[PortfolioConfig] = None,
preferred_dex: SwapRoute = SwapRoute.FLUXION,
):
self.wallet = wallet_address
self.config = portfolio_config or PortfolioConfig()
self.preferred_dex = preferred_dex
# Execution history
self.execution_log: List[Dict] = []
def is_whitelisted(self, address: str) -> bool:
"""Check if contract address is in the whitelist."""
return address.lower() in {a.lower() for a in self.WHITELISTED_CONTRACTS}
def build_rebalance_plan(
self,
current_weights: List[float],
target_weights: List[float],
portfolio_value_usd: float,
asset_prices: Dict[str, float],
max_slippage_bps: Optional[int] = None,
) -> RebalancePlan:
"""
Build a complete rebalancing plan from current to target weights.
1. Compute required trades (sell overweight, buy underweight)
2. Route through best DEX
3. Generate approval + swap transactions
4. Estimate gas costs
Args:
current_weights: [USDY_w, mETH_w, MI4_w]
target_weights: [USDY_w, mETH_w, MI4_w]
portfolio_value_usd: Total portfolio value
asset_prices: {"USDY": 1.0, "mETH": 3200.0, "MI4": 100.0}
max_slippage_bps: Override max slippage
Returns:
RebalancePlan with all unsigned transactions
"""
slippage = max_slippage_bps or self.config.max_slippage_bps
assets = ["USDY", "mETH", "MI4"]
# Calculate weight deltas
deltas = [target_weights[i] - current_weights[i] for i in range(3)]
# Determine sells (negative delta) and buys (positive delta)
sells = []
buys = []
for i, (asset, delta) in enumerate(zip(assets, deltas)):
if abs(delta) < 0.005: # skip if delta < 0.5%
continue
usd_amount = abs(delta) * portfolio_value_usd
token_amount = usd_amount / max(asset_prices.get(asset, 1.0), 0.01)
if delta < 0:
sells.append((asset, token_amount, usd_amount))
else:
buys.append((asset, token_amount, usd_amount))
# Build transactions
approvals = []
trades = []
summary_parts = []
# Process sells first (to get intermediate tokens for buys)
for sell_asset, sell_amount, sell_usd in sells:
# Route: sell_asset β†’ USDC (intermediate)
decimals = TOKEN_DECIMALS.get(sell_asset, 18)
amount_raw = int(sell_amount * (10 ** decimals))
# Build approval
router = self._get_router_address()
approval_tx = self._build_approval(
token=TOKEN_ADDRESSES[sell_asset],
spender=router,
amount=amount_raw,
summary=f"Approve {sell_amount:.4f} {sell_asset} for swap",
)
approvals.append(approval_tx)
# Build swap
swap_tx = self._build_swap(
token_in=TOKEN_ADDRESSES[sell_asset],
token_out=Tokens.USDC,
amount_in=amount_raw,
min_amount_out=int(sell_usd * (10 ** 6) * (1 - slippage / 10000)),
fee=3000,
summary=f"Sell {sell_amount:.4f} {sell_asset} (~${sell_usd:.2f}) β†’ USDC",
)
trades.append(swap_tx)
summary_parts.append(f"SELL {sell_amount:.4f} {sell_asset} (${sell_usd:.0f})")
# Process buys
for buy_asset, buy_amount, buy_usd in buys:
decimals_in = TOKEN_DECIMALS["USDC"]
amount_in_raw = int(buy_usd * (10 ** decimals_in))
decimals_out = TOKEN_DECIMALS.get(buy_asset, 18)
min_out = int(buy_amount * (10 ** decimals_out) * (1 - slippage / 10000))
# Approve USDC
router = self._get_router_address()
approval_tx = self._build_approval(
token=Tokens.USDC,
spender=router,
amount=amount_in_raw,
summary=f"Approve {buy_usd:.2f} USDC for {buy_asset} purchase",
)
approvals.append(approval_tx)
# Swap USDC β†’ buy_asset
swap_tx = self._build_swap(
token_in=Tokens.USDC,
token_out=TOKEN_ADDRESSES.get(buy_asset, Tokens.USDY),
amount_in=amount_in_raw,
min_amount_out=min_out,
fee=3000,
summary=f"Buy {buy_amount:.4f} {buy_asset} (~${buy_usd:.2f}) with USDC",
)
trades.append(swap_tx)
summary_parts.append(f"BUY {buy_amount:.4f} {buy_asset} (${buy_usd:.0f})")
# Estimate gas
gas_per_swap = 250_000 # typical Uni V3 swap
gas_per_approval = 50_000
total_gas = len(trades) * gas_per_swap + len(approvals) * gas_per_approval
gas_price_gwei = 0.02 # Mantle is cheap
gas_cost_mnt = total_gas * gas_price_gwei / 1e9
mnt_price = 0.70 # approximate
gas_cost_usd = gas_cost_mnt * mnt_price
plan = RebalancePlan(
timestamp=time.time(),
current_weights=current_weights,
target_weights=target_weights,
trades=trades,
approvals=approvals,
estimated_gas_usd=gas_cost_usd,
estimated_slippage_bps=slippage,
human_summary=(
f"Rebalance: {' | '.join(summary_parts) or 'No trades needed'}\n"
f"Trades: {len(trades)}, Approvals: {len(approvals)}, "
f"Est. gas: ${gas_cost_usd:.4f}"
),
)
logger.info(f"Rebalance plan: {plan.human_summary}")
self.execution_log.append(plan.to_dict())
return plan
def _get_router_address(self) -> str:
"""Get the router address for the preferred DEX."""
if self.preferred_dex == SwapRoute.FLUXION:
return DEXRouters.FLUXION_SWAP_ROUTER
elif self.preferred_dex == SwapRoute.AGNI:
return DEXRouters.AGNI_SWAP_ROUTER
elif self.preferred_dex == SwapRoute.MERCHANT_MOE:
return DEXRouters.MERCHANT_MOE_ROUTER
return DEXRouters.FLUXION_SWAP_ROUTER
def _build_approval(
self, token: str, spender: str, amount: int, summary: str
) -> UnsignedTx:
"""Build ERC20 approve transaction."""
if not self.is_whitelisted(token) or not self.is_whitelisted(spender):
raise ValueError(f"Contract not whitelisted: token={token}, spender={spender}")
# approve(address,uint256)
selector = "095ea7b3"
spender_padded = spender[2:].lower().zfill(64)
amount_hex = hex(amount)[2:].zfill(64)
calldata = "0x" + selector + spender_padded + amount_hex
return UnsignedTx(
to=token,
data=calldata,
human_summary=summary,
gas_limit=60_000,
)
def _build_swap(
self,
token_in: str,
token_out: str,
amount_in: int,
min_amount_out: int,
fee: int = 3000,
summary: str = "",
) -> UnsignedTx:
"""Build Uniswap V3-style exactInputSingle swap transaction."""
router = self._get_router_address()
if not self.is_whitelisted(router):
raise ValueError(f"Router not whitelisted: {router}")
# exactInputSingle((address,address,uint24,address,uint256,uint256,uint256,uint160))
selector = "414bf389"
deadline = int(time.time()) + 1800 # 30 min deadline
# ABI encode the tuple
params = (
token_in[2:].lower().zfill(64) +
token_out[2:].lower().zfill(64) +
hex(fee)[2:].zfill(64) +
self.wallet[2:].lower().zfill(64) +
hex(deadline)[2:].zfill(64) +
hex(amount_in)[2:].zfill(64) +
hex(min_amount_out)[2:].zfill(64) +
"0" * 64 # sqrtPriceLimitX96 = 0 (no limit)
)
calldata = "0x" + selector + params
return UnsignedTx(
to=router,
data=calldata,
human_summary=summary,
gas_limit=300_000,
)
def build_aave_supply(
self, asset: str, amount: int, asset_symbol: str = "USDY"
) -> List[UnsignedTx]:
"""Build Aave V3 supply transactions (approval + supply)."""
pool = LendingProtocols.AAVE_V3_POOL
# Approval
approve_tx = self._build_approval(
token=asset,
spender=pool,
amount=amount,
summary=f"Approve {asset_symbol} for Aave V3 supply",
)
# supply(address,uint256,address,uint16)
selector = "617ba037"
calldata = "0x" + selector + (
asset[2:].lower().zfill(64) +
hex(amount)[2:].zfill(64) +
self.wallet[2:].lower().zfill(64) +
"0" * 64 # referralCode = 0
)
supply_tx = UnsignedTx(
to=pool,
data=calldata,
human_summary=f"Supply {asset_symbol} to Aave V3 for additional yield",
gas_limit=300_000,
)
return [approve_tx, supply_tx]
def build_aave_withdraw(
self, asset: str, amount: int, asset_symbol: str = "USDY"
) -> UnsignedTx:
"""Build Aave V3 withdraw transaction."""
pool = LendingProtocols.AAVE_V3_POOL
# withdraw(address,uint256,address)
selector = "69328dec"
calldata = "0x" + selector + (
asset[2:].lower().zfill(64) +
hex(amount)[2:].zfill(64) +
self.wallet[2:].lower().zfill(64)
)
return UnsignedTx(
to=pool,
data=calldata,
human_summary=f"Withdraw {asset_symbol} from Aave V3",
gas_limit=250_000,
)
def get_execution_history(self) -> List[Dict]:
"""Return execution log."""
return self.execution_log