|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
|
|
| from datetime import datetime |
| from decimal import Decimal |
| import json |
| import logging |
| import os |
| import time |
| from typing import Dict, Optional, Tuple, List |
|
|
| from pandas import DataFrame |
| import requests |
| from web3 import Web3 |
|
|
| ETHERSCAN_API_KEY = "" |
| EXCLUDED_ADDRESSES = { |
| "0x6aA9b180c1a4Ef43Ea540Da905f23BAfAEEB7DC8", |
| "0x6aA9b180c1a4Ef43Ea540Da905f23BAfAEEB7CB5", |
| "0x3B3AbC1604fAd139F841Da5c3Cad73a72621fee4", |
| } |
| COINGECKO_PRICE_API_URL = "https://api.coingecko.com/api/v3/coins/{coin_id}/history?date={date}}" |
| WHITELISTED_TOKENS = { |
| |
| "0x0b2c639c533813f4aa9d7837caf62653d097ff85": ("USDC", 6), |
| "0x01bff41798a0bcf287b996046ca68b395dbc1071": ("USDT0", 6), |
| "0x94b008aa00579c1307b0ef2c499ad98a8ce58e58": ("USDT", 6), |
| "0x7f5c764cbc14f9669b88837ca1490cca17c31607": ("USDC.e", 6), |
| "0x8ae125e8653821e851f12a49f7765db9a9ce7384": ("DOLA", 18), |
| "0xc40f949f8a4e094d1b49a23ea9241d289b7b2819": ("LUSD", 18), |
| "0xda10009cbd5d07dd0cecc66161fc93d7c9000da1": ("DAI", 18), |
| "0x087c440f251ff6cfe62b86dde1be558b95b4bb9b": ("BOLD", 18), |
| "0x2e3d870790dc77a83dd1d18184acc7439a53f475": ("FRAX", 18), |
| "0x2218a117083f5b482b0bb821d27056ba9c04b1d3": ("sDAI", 18), |
| "0x1217bfe6c773eec6cc4a38b5dc45b92292b6e189": ("oUSDT", 6), |
| "0x4f604735c1cf31399c6e711d5962b2b3e0225ad3": ("USDGLO", 18), |
| "0xFC2E6e6BCbd49ccf3A5f029c79984372DcBFE527": ("OLAS", 18) |
| } |
| COIN_ID_MAPPING = { |
| "usdc": "usd-coin", |
| "alusd": "alchemix-usd", |
| "usdt0": "usdt0", |
| "usdt": "bridged-usdt", |
| "usdc.e": "bridged-usd-coin-optimism", |
| "usx": "token-dforce-usd", |
| "dola": "dola-usd", |
| "lusd": "liquity-usd", |
| "dai": "makerdao-optimism-bridged-dai-optimism", |
| "bold": "liquity-bold", |
| "frax": "frax", |
| "sdai": "savings-dai", |
| "usd+": "overnight-fi-usd-optimism", |
| "ousdt": "openusdt", |
| "usdglo": "glo-dollar", |
| "olas": "autonolas" |
| } |
| ZERO_ADDRESS = "0x0000000000000000000000000000000000000000" |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| w3 = Web3(Web3.HTTPProvider("https://rpc-gate.autonolas.tech/optimism-rpc/")) |
|
|
|
|
| def get_coin_id_from_symbol(symbol: str, chain: str) -> Optional[str]: |
| """Map token symbol to CoinGecko ID.""" |
| if chain == "optimism": |
| coin_id_map = { |
| "USDC": "usd-coin", |
| "ALUSD": "alchemix-usd", |
| "USDT0": "usdt0", |
| "USDT": "bridged-usdt", |
| "MSUSD": None, |
| "USDC.E": "bridged-usd-coin-optimism", |
| "USX": "token-dforce-usd", |
| "DOLA": "dola-usd", |
| "LUSD": "liquity-usd", |
| "DAI": "makerdao-optimism-bridged-dai-optimism", |
| "BOLD": "liquity-bold", |
| "FRAX": "frax", |
| "SDAI": "savings-dai", |
| "USD+": "overnight-fi-usd-optimism", |
| "OUSDT": "openusdt", |
| "USDGLO": "glo-dollar", |
| "ETH": "ethereum", |
| "WETH": "ethereum", |
| "WBTC": "wrapped-bitcoin", |
| } |
| return coin_id_map.get(symbol.upper()) |
| return None |
|
|
| def load_cache(name: str) -> Dict: |
| """Load price cache from JSON file.""" |
| cache_file = f"{name}_cache.json" |
| if os.path.exists(cache_file): |
| try: |
| with open(cache_file, 'r') as f: |
| return json.load(f) |
| except json.JSONDecodeError: |
| logger.warning("Cache file corrupted, creating new cache") |
| return {} |
| return {} |
|
|
| def save_cache(name: str, cache: Dict): |
| """Save price cache to JSON file.""" |
| cache_file = f"{name}_cache.json" |
| with open(cache_file, 'w') as f: |
| json.dump(cache, f, indent=2) |
|
|
| def get_cached_price(date_str: str, token_symbol: str) -> Optional[float]: |
| """Get price from cache if available.""" |
| cache = load_cache(name="price") |
| return cache.get(date_str, {}).get(token_symbol) |
|
|
| def update_price_cache(date_str: str, token_symbol: str, price: float): |
| """Update price cache with new value.""" |
| cache = load_cache(name="price") |
| if date_str not in cache: |
| cache[date_str] = {} |
| cache[date_str][token_symbol] = price |
| save_cache(name="price", cache=cache) |
|
|
| def get_cached_request(cache_key: str) -> Optional[Dict]: |
| """Get cached request response if available.""" |
| cache = load_cache(name="request") |
| return cache.get(cache_key) |
|
|
| def update_request_cache(cache_key: str, response: Dict): |
| """Update request cache with new response.""" |
| cache = load_cache(name="request") |
| cache[cache_key] = response |
| save_cache(name="request", cache=cache) |
|
|
| def fetch_historical_eth_price(date_str: str) -> float: |
| """Fetch historical ETH price from CoinGecko with caching.""" |
| |
| cached_price = get_cached_price(date_str, "ETH") |
| if cached_price is not None: |
| return cached_price |
|
|
| try: |
| url = "https://api.coingecko.com/api/v3/coins/ethereum/history" |
| params = {"date": date_str, "localization": "false"} |
| |
| |
| time.sleep(1.2) |
| |
| response = requests.get(url, params=params) |
| response.raise_for_status() |
| |
| data = response.json() |
| if "market_data" in data and "current_price" in data["market_data"]: |
| price = data["market_data"]["current_price"]["usd"] |
| |
| update_price_cache(date_str, "ETH", price) |
| return price |
| |
| return 0.0 |
| |
| except Exception as e: |
| print(f"Error fetching ETH price for {date_str}: {str(e)}") |
| return 0.0 |
| |
| def fetch_historical_token_price(coin_id: str, date_str: str, token_symbol: str) -> float: |
| """Fetch historical token price from CoinGecko with caching.""" |
| |
| cached_price = get_cached_price(date_str, token_symbol) |
| if cached_price is not None: |
| return cached_price |
|
|
| try: |
| success, data = request_with_retries( |
| endpoint=f"https://api.coingecko.com/api/v3/coins/{coin_id}/history", |
| params={"date": date_str, "localization": "false"}, |
| ) |
| if not success: |
| logger.error(f"Failed to fetch historical price for {coin_id} on {date_str}") |
| return 0.0 |
|
|
| |
| time.sleep(1.2) |
| |
| if "market_data" in data and "current_price" in data["market_data"]: |
| price = data["market_data"]["current_price"]["usd"] |
| |
| update_price_cache(date_str, token_symbol, price) |
| return price |
| |
| return 0.0 |
| |
| except Exception as e: |
| print(f"Error fetching price for {coin_id} on {date_str}: {str(e)}") |
| return 0.0 |
|
|
| def get_block_at_timestamp( |
| timestamp: int, |
| chain: str = "optimism" |
| ) -> Optional[int]: |
| success, res = request_with_retries( |
| endpoint=f"https://api-optimistic.etherscan.io/api?module=block&action=getblocknobytime×tamp={timestamp}&closest=before&apikey={ETHERSCAN_API_KEY}", |
| ) |
| if success and res.get("status") == "1" and "result" in res: |
| return int(res.get("result")) |
| else: |
| logger.error(f"Failed to fetch block at timestamp {timestamp} for {chain}: {res.get('message', 'Unknown error')}") |
| return None |
|
|
| def fetch_eth_balance(address: str, timestamp: float) -> float: |
| key = "eth_balance" |
| cache = load_cache(name=key) |
| if f"{address}_{timestamp}" in cache: |
| return cache[f"{address}_{timestamp}"] / (10 ** 18) |
|
|
| balance = w3.eth.get_balance( |
| account=Web3.to_checksum_address(address), |
| block_identifier=get_block_at_timestamp(int(timestamp)) |
| ) |
|
|
| cache[f"{address}_{timestamp}"] = balance |
| save_cache(name=key, cache=cache) |
| return balance / (10 ** 18) |
|
|
| def fetch_token_balance( |
| address: str, |
| token_address: str, |
| timestamp: int, |
| decimals: int = 18 |
| ) -> Optional[float]: |
| contract = w3.eth.contract( |
| address=Web3.to_checksum_address(token_address), |
| abi=[ |
| { |
| "constant": True, |
| "inputs": [{"name": "_owner", "type": "address"}], |
| "name": "balanceOf", |
| "outputs": [{"name": "", "type": "uint256"}], |
| "payable": False, |
| "stateMutability": "view", |
| "type": "function", |
| } |
| ] |
| ) |
| try: |
| cache_key = f"token_balance_{address}_{token_address}_{timestamp}" |
| cache = load_cache(name="token_balance") |
| if cache_key in cache: |
| return cache[cache_key] / (10 ** decimals) |
|
|
| balance = contract.functions.balanceOf(address).call(block_identifier=get_block_at_timestamp(int(timestamp))) |
| cache[cache_key] = balance |
| save_cache(name="token_balance", cache=cache) |
| return balance / (10 ** decimals) if balance else 0.0 |
| except Exception as e: |
| logger.error(f"Error fetching token balance for {address} at {timestamp}: {e}") |
| return None |
|
|
| def get_datetime_from_timestamp(timestamp: str) -> Optional[datetime]: |
| """Convert timestamp string to datetime object.""" |
| try: |
| return datetime.fromisoformat(timestamp.replace("Z", "+00:00")) |
| except (ValueError, TypeError): |
| logger.warning(f"Invalid timestamp format: {timestamp}") |
| return None |
|
|
| def request_with_retries( |
| endpoint: str, |
| params: Dict = None, |
| headers: Dict = None, |
| method: str = "GET", |
| body: Dict = None, |
| rate_limited_code: int = 429, |
| retry_wait: int = 5, |
| max_retries: int = 3 |
| ) -> Tuple[bool, Dict]: |
| for attempt in range(max_retries): |
| try: |
| if method.upper() == "POST": |
| cache_key = f"POST_{endpoint}_{str(body or {})}" |
| cached_response = get_cached_request(cache_key) |
| if cached_response is not None: |
| return len(cached_response) > 0, cached_response |
| |
| response = requests.post(endpoint, headers=headers, json=body) |
|
|
| if response.ok: |
| update_request_cache(cache_key, response.json()) |
| else: |
| |
| cache_key = f"{endpoint}_{str(params or {})}" |
| cached_response = get_cached_request(cache_key) |
| if cached_response is not None: |
| return len(cached_response) > 0, cached_response |
|
|
| response = requests.get(endpoint, headers=headers, params=params or {}) |
|
|
| |
| if response.status_code == 200: |
| update_request_cache(cache_key, response.json()) |
| elif response.status_code == 404: |
| update_request_cache(cache_key, {}) |
| |
| if response.status_code == rate_limited_code: |
| logger.warning(f"Rate limited. Waiting {retry_wait} seconds...") |
| time.sleep(retry_wait) |
| continue |
| |
| if response.status_code != 200: |
| logger.error(f"Request failed with status {response.status_code}") |
| return False, {} |
| |
| return True, response.json() |
| |
| except Exception as e: |
| logger.error(f"Request failed: {str(e)}") |
| if attempt < max_retries - 1: |
| time.sleep(retry_wait) |
| continue |
| return False, {} |
| |
| return False, {} |
|
|
| def should_include_transfer_optimism( |
| from_address: str |
| ) -> bool: |
| """Determine if an Optimism transfer should be included based on from address type.""" |
| if not from_address: |
| return False |
|
|
| |
| if from_address.lower() in [ |
| "0x0000000000000000000000000000000000000000", |
| "0x0", |
| "", |
| ]: |
| return False |
|
|
| try: |
| |
| payload = { |
| "jsonrpc": "2.0", |
| "method": "eth_getCode", |
| "params": [from_address, "latest"], |
| "id": 1, |
| } |
|
|
| success, result = request_with_retries( |
| endpoint="https://mainnet.optimism.io", |
| method="POST", |
| headers={"Content-Type": "application/json"}, |
| body=payload, |
| rate_limited_code=429, |
| retry_wait=5, |
| ) |
|
|
| if not success: |
| logger.error("Failed to check contract code") |
| return False |
|
|
| code = result.get("result", "0x") |
|
|
| |
| if code == "0x": |
| return True |
|
|
| |
| safe_check_url = f"https://safe-transaction-optimism.safe.global/api/v1/safes/{from_address}/" |
| success, _ = request_with_retries( |
| endpoint=safe_check_url, |
| headers={"Accept": "application/json"}, |
| rate_limited_code=429, |
| retry_wait=5, |
| ) |
|
|
| if success: |
| return True |
|
|
| logger.info( |
| f"Excluding transfer from contract: {from_address}" |
| ) |
| return False |
|
|
| except Exception as e: |
| logger.error(f"Error checking address {from_address}: {e}") |
| return False |
|
|
| def fetch_optimism_incoming_transfers( |
| address: str, |
| last_timestamp: int |
| ) -> Dict: |
| """ |
| Fetch Optimism transfers for a given address with improved error handling and rate limiting. |
| """ |
| base_url = "https://safe-transaction-optimism.safe.global/api/v1" |
| all_transfers_by_date = {} |
| |
| try: |
| logger.info(f"Fetching Optimism transfers for address {address}...") |
| |
| |
| transfers_url = f"{base_url}/safes/{address}/incoming-transfers/" |
| |
| processed_count = 0 |
| page_count = 0 |
| max_pages = 10 |
| |
| while page_count < max_pages: |
| page_count += 1 |
| logger.info(f"Fetching page {page_count} for address {address}") |
| |
| success, response_json = request_with_retries( |
| endpoint=transfers_url, |
| headers={"Accept": "application/json"}, |
| rate_limited_code=429, |
| retry_wait=5 |
| ) |
| |
| if not success: |
| logger.error(f"Failed to fetch Optimism transfers for address {address} on page {page_count}") |
| break |
| |
| transfers = response_json.get("results", []) |
| if not transfers: |
| logger.info(f"No more transfers found for address {address} on page {page_count}") |
| break |
| print("incoming transfers",response_json) |
| for transfer in transfers: |
| |
| timestamp = transfer.get("executionDate") |
| if not timestamp: |
| continue |
| |
| tx_datetime = get_datetime_from_timestamp(timestamp) |
| tx_date = tx_datetime.strftime("%Y-%m-%d") if tx_datetime else None |
| |
| if not tx_date: |
| continue |
| |
| if tx_datetime.timestamp() > last_timestamp: |
| continue |
| |
| |
| from_address = transfer.get("from", address) |
| transfer_type = transfer.get("type", "") |
| |
| if from_address.lower() == address.lower(): |
| continue |
| |
| |
| if tx_date not in all_transfers_by_date: |
| all_transfers_by_date[tx_date] = [] |
|
|
| should_include = should_include_transfer_optimism(from_address) |
| if not should_include: |
| continue |
| |
| |
| if transfer_type == "ERC20_TRANSFER": |
| |
| token_info = transfer.get("tokenInfo", {}) |
| token_address = transfer.get("tokenAddress", "") |
| |
| if not token_info: |
| if not token_address: |
| continue |
| |
| symbol = "Unknown" |
| decimals = 18 |
| else: |
| symbol = token_info.get("symbol", "Unknown") |
| decimals = int(token_info.get("decimals", 18) or 18) |
| |
| if symbol.lower() not in ["usdc", "eth"]: |
| continue |
| |
| value_raw = int(transfer.get("value", "0") or "0") |
| amount = value_raw / (10**decimals) |
| |
| transfer_data = { |
| "from_address": from_address, |
| "amount": amount, |
| "token_address": token_address, |
| "symbol": symbol, |
| "timestamp": timestamp, |
| "tx_hash": transfer.get("transactionHash", ""), |
| "type": "token" |
| } |
| |
| elif transfer_type == "ETHER_TRANSFER": |
| |
| try: |
| value_wei = int(transfer.get("value", "0") or "0") |
| amount_eth = value_wei / 10**18 |
| |
| if amount_eth <= 0: |
| continue |
| except (ValueError, TypeError): |
| logger.warning(f"Skipping transfer with invalid value: {transfer.get('value')}") |
| continue |
| |
| transfer_data = { |
| "from_address": from_address, |
| "amount": amount_eth, |
| "token_address": "", |
| "symbol": "ETH", |
| "timestamp": timestamp, |
| "tx_hash": transfer.get("transactionHash", ""), |
| "type": "eth" |
| } |
| |
| else: |
| |
| continue |
| |
| all_transfers_by_date[tx_date].append(transfer_data) |
| processed_count += 1 |
| |
| |
| if processed_count % 50 == 0: |
| logger.info(f"Processed {processed_count} Optimism transfers for address {address}...") |
| |
| |
| cursor = response_json.get("next") |
| if not cursor: |
| logger.info(f"No more pages for address {address}") |
| break |
| else: |
| transfers_url = cursor |
| |
| logger.info(f"Completed Optimism transfers for address {address}: {processed_count} found") |
| return all_transfers_by_date |
| |
| except Exception as e: |
| logger.error(f"Error fetching Optimism transfers for address {address}: {e}") |
| return {} |
| |
|
|
| def fetch_optimism_outgoing_transfers( |
| address: str, |
| final_timestamp: int, |
| from_address: str, |
| ) -> Dict: |
| """Fetch all outgoing transfers from the safe address on Optimism until a specific date. |
| |
| Args: |
| address: The safe address to fetch transfers for |
| final_timestamp: The timestamp until which to fetch transfers |
| from_address: The master address to check for reversions |
| |
| Returns: |
| Dict: Dictionary of transfers organized by date |
| """ |
| all_transfers = {} |
|
|
| if not address: |
| logger.warning( |
| "No address provided for fetching Optimism outgoing transfers" |
| ) |
| return all_transfers |
|
|
| try: |
| |
| base_url = "https://safe-transaction-optimism.safe.global/api/v1" |
| transfers_url = f"{base_url}/safes/{address}/transfers/" |
|
|
| processed_count = 0 |
| page_count = 0 |
| max_pages = 50 |
| |
| while page_count < max_pages: |
| page_count += 1 |
| logger.info(f"Fetching outgoing transfers page {page_count} for address {address}") |
| |
| success, response_json = request_with_retries( |
| endpoint=transfers_url, |
| headers={"Accept": "application/json"}, |
| rate_limited_code=429, |
| retry_wait=5, |
| ) |
|
|
| if not success: |
| logger.error(f"Failed to fetch Optimism transfers for address {address} on page {page_count}") |
| break |
|
|
| transfers = response_json.get("results", []) |
|
|
| if not transfers: |
| logger.info(f"No more transfers found for address {address} on page {page_count}") |
| break |
| |
| print("outgoing_transfers", response_json) |
| |
| for transfer in transfers: |
| |
| timestamp = transfer.get("executionDate") |
| if not timestamp: |
| continue |
|
|
| |
| try: |
| tx_datetime = datetime.fromisoformat( |
| timestamp.replace("Z", "+00:00") |
| ) |
| tx_date = tx_datetime.strftime("%Y-%m-%d") |
| except (ValueError, TypeError): |
| logger.warning( |
| f"Invalid timestamp format: {timestamp}" |
| ) |
| continue |
|
|
| if tx_datetime.timestamp() > final_timestamp: |
| continue |
|
|
| |
| |
| if transfer.get("from").lower() == address.lower(): |
| transfer_type = transfer.get("type", "") |
|
|
| if transfer_type == "ETHER_TRANSFER": |
| try: |
| value_wei = int(transfer.get("value", "0") or "0") |
| amount_eth = value_wei / 10**18 |
|
|
| if amount_eth <= 0: |
| continue |
| except (ValueError, TypeError): |
| continue |
|
|
| transfer_data = { |
| "from_address": address, |
| "to_address": transfer.get("to"), |
| "amount": amount_eth, |
| "token_address": ZERO_ADDRESS, |
| "symbol": "ETH", |
| "timestamp": timestamp, |
| "tx_hash": transfer.get("transactionHash", ""), |
| "type": "eth", |
| } |
|
|
| if tx_date not in all_transfers: |
| all_transfers[tx_date] = [] |
| all_transfers[tx_date].append(transfer_data) |
| processed_count += 1 |
| |
| |
| |
| token_info = transfer.get("tokenInfo", {}) |
| token_address = transfer.get("tokenAddress", "") |
| |
| if token_info: |
| symbol = token_info.get("symbol", "Unknown") |
| decimals = int(token_info.get("decimals", 18) or 18) |
| else: |
| symbol = "Unknown" |
| decimals = 18 |
| |
| try: |
| value_raw = int(transfer.get("value", "0") or "0") |
| amount = value_raw / (10**decimals) |
| |
| if amount <= 0: |
| continue |
| except (ValueError, TypeError): |
| continue |
|
|
| transfer_data = { |
| "from_address": transfer.get("from"), |
| "to_address": transfer.get("to"), |
| "amount": amount, |
| "token_address": token_address, |
| "symbol": symbol, |
| "timestamp": timestamp, |
| "tx_hash": transfer.get("transactionHash", ""), |
| "type": "token", |
| } |
|
|
| if tx_date not in all_transfers: |
| all_transfers[tx_date] = [] |
| all_transfers[tx_date].append(transfer_data) |
| processed_count += 1 |
|
|
| |
| if processed_count % 50 == 0: |
| logger.info(f"Processed {processed_count} outgoing transfers for address {address}...") |
|
|
| |
| cursor = response_json.get("next") |
| if not cursor: |
| logger.info(f"No more pages for address {address}") |
| break |
| else: |
| transfers_url = cursor |
|
|
| logger.info(f"Completed Optimism outgoing transfers: {processed_count} found") |
| return all_transfers |
|
|
| except Exception as e: |
| logger.error(f"Error fetching Optimism outgoing transfers: {e}") |
| return {} |
| |
|
|
| def track_and_calculate_reversion_value( |
| safe_address: str, |
| chain: str, |
| incoming_transfers: Dict, |
| outgoing_transfers: Dict, |
| ) -> float: |
| """Track ETH transfers to safe address and handle reversion logic.""" |
| try: |
| if not incoming_transfers: |
| logger.warning(f"No transfers found for {chain} chain") |
| return 0.0 |
|
|
| |
| eth_transfers = [] |
| initial_funding = None |
| master_safe_address = None |
| reversion_transfers = [] |
| reversion_value = 0.0 |
|
|
| |
| sorted_incoming_transfers = [] |
| for _, transfers in incoming_transfers.items(): |
| for transfer in transfers: |
| if isinstance(transfer, dict) and "timestamp" in transfer: |
| sorted_incoming_transfers.append(transfer) |
|
|
| sorted_incoming_transfers.sort(key=lambda x: x["timestamp"]) |
|
|
| sorted_outgoing_transfers = [] |
| for _, transfers in outgoing_transfers.items(): |
| for transfer in transfers: |
| if isinstance(transfer, dict) and "timestamp" in transfer: |
| sorted_outgoing_transfers.append(transfer) |
|
|
| sorted_outgoing_transfers.sort(key=lambda x: x["timestamp"]) |
|
|
| |
| for transfer in sorted_incoming_transfers: |
| |
| if transfer.get("symbol") == "ETH": |
| |
| if not initial_funding: |
| initial_funding = { |
| "amount": transfer.get("amount", 0), |
| "from_address": transfer.get("from_address"), |
| "timestamp": transfer.get("timestamp"), |
| } |
| if transfer.get("from_address"): |
| master_safe_address = transfer.get("from_address").lower() |
| eth_transfers.append(transfer) |
| |
| elif ( |
| transfer.get("from_address", "").lower() == master_safe_address |
| ): |
| eth_transfers.append(transfer) |
|
|
| for transfer in sorted_outgoing_transfers: |
| if transfer.get("symbol") == "ETH": |
| if ( |
| transfer.get("to_address", "").lower() == master_safe_address |
| and transfer.get("from_address", "").lower() |
| == safe_address.lower() |
| ): |
| reversion_transfers.append(transfer) |
|
|
| reversion_value = calculate_total_reversion_value( |
| eth_transfers, reversion_transfers |
| ) |
|
|
| return reversion_value |
|
|
| except Exception as e: |
| logger.error(f"Error tracking ETH transfers: {str(e)}") |
| return 0.0 |
|
|
| def calculate_total_reversion_value( |
| eth_transfers: List[Dict], reversion_transfers: List[Dict] |
| ) -> float: |
| """Calculate the total reversion value from the reversion transfers.""" |
| reversion_amount = 0.0 |
| reversion_date = None |
| reversion_value = 0.0 |
| last_transfer = eth_transfers[-1] |
|
|
| try: |
| |
| timestamp = last_transfer.get("timestamp", "") |
| if timestamp.endswith("Z"): |
| |
| tx_datetime = datetime.fromisoformat(timestamp.replace("Z", "+00:00")) |
| reversion_date = tx_datetime.strftime("%d-%m-%Y") |
| else: |
| |
| reversion_date = datetime.fromtimestamp(int(timestamp)).strftime( |
| "%d-%m-%Y" |
| ) |
| except (ValueError, TypeError) as e: |
| logger.warning(f"Error parsing timestamp: {e}") |
| |
| transfer = reversion_transfers[0] |
| reversion_date = datetime.fromisoformat(transfer.get("timestamp", "").replace('Z', '+00:00')).strftime("%d-%m-%Y") |
|
|
| for index, transfer in enumerate(reversion_transfers): |
| transfer_date = datetime.fromisoformat(transfer.get("timestamp", "").replace('Z', '+00:00')).strftime("%d-%m-%Y") |
| if index == 0: |
| eth_price = fetch_historical_eth_price(reversion_date) |
| else: |
| eth_price = fetch_historical_eth_price(transfer_date) |
| if eth_price: |
| reversion_amount = transfer.get("amount", 0) |
| reversion_value += reversion_amount * eth_price |
|
|
| return reversion_value |
| |
| def calculate_initial_investment_value_from_funding_events( |
| chain: str, |
| address: str, |
| incoming_transfers: Dict, |
| outgoing_transfers: Dict, |
| ) -> float: |
| total_investment = 0.0 |
| |
| if not incoming_transfers: |
| print(f"No transfers found for {chain} chain") |
| return 0.0 |
| |
| if chain == "optimism": |
| print("Using Optimism-specific transfer processing") |
| for date, date_transfers in incoming_transfers.items(): |
| for transfer in date_transfers: |
| try: |
| amount = transfer.get("amount", 0) |
| token_symbol = transfer.get("symbol", "").upper() |
| |
| if amount <= 0: |
| continue |
| |
| |
| date_str = datetime.strptime(date, "%Y-%m-%d").strftime("%d-%m-%Y") |
|
|
| if token_symbol == "ETH": |
| price = fetch_historical_eth_price(date_str) |
| else: |
| coingecko_id = get_coin_id_from_symbol(token_symbol, chain) |
| if coingecko_id: |
| price = fetch_historical_token_price( |
| coingecko_id, date_str, token_symbol |
| ) |
| else: |
| price = None |
| |
| transfer_value = amount * price |
| total_investment += transfer_value |
| |
| print(f"Processed transfer on {date}: {amount} {token_symbol} @ ${price} = ${transfer_value}") |
| |
| except Exception as e: |
| print(f"Error processing transfer: {str(e)}") |
| continue |
| else: |
| print(f"Unsupported chain: {chain}, skipping") |
| return 0.0 |
| |
| reversion_value = track_and_calculate_reversion_value( |
| safe_address=address, |
| chain=chain, |
| incoming_transfers=incoming_transfers, |
| outgoing_transfers=outgoing_transfers, |
| ) |
|
|
| logger.info(f"Total investment: {total_investment}") |
| logger.info(f"Reversion value: {reversion_value}") |
| total_investment = total_investment - reversion_value |
| logger.info(f"Total investment after reversion: {total_investment}") |
| |
| print(f"Total initial investment from {chain} chain: ${total_investment}") |
| return total_investment if total_investment > 0 else 0.0 |
|
|
| def calculate_initial_value_from_address_and_timestamp( |
| address: str, |
| final_timestamp: int, |
| ) -> Tuple[float, int]: |
| |
| incoming_transfers = fetch_optimism_incoming_transfers(address, final_timestamp) |
| logger.info("Fetched incoming transfers") |
| |
| |
| from_address = None |
| for date_transfers in incoming_transfers.values(): |
| if date_transfers: |
| from_address = date_transfers[0].get('from_address') |
| break |
| |
| if from_address is None: |
| logger.warning("No from_address found in incoming transfers") |
| from_address = "" |
| |
| outgoing_transfers = fetch_optimism_outgoing_transfers(address, final_timestamp, from_address) |
| logger.info(f"Fetched outgoing transfers {outgoing_transfers}") |
| initial_timestamp = final_timestamp |
| for _transfers in incoming_transfers.values(): |
| for _transfer in _transfers: |
| if "timestamp" not in _transfer: |
| continue |
|
|
| transfer_timestamp = datetime.fromisoformat(_transfer["timestamp"].replace('Z', '+00:00')).timestamp() |
| if transfer_timestamp < initial_timestamp: |
| initial_timestamp = int(transfer_timestamp) |
|
|
| |
| initial_investment = calculate_initial_investment_value_from_funding_events( |
| chain="optimism", |
| address=address, |
| incoming_transfers=incoming_transfers, |
| outgoing_transfers=outgoing_transfers, |
| ) |
|
|
| return initial_investment, int(initial_timestamp) |
|
|
| def calculate_final_value_from_address_and_timestamp( |
| address: str, |
| timestamp: int, |
| ) -> float: |
| """ |
| Calculate the final portfolio value at a specific timestamp by fetching |
| ETH and token balances and multiplying by historical prices. |
| """ |
| final_value = 0.0 |
| |
| try: |
| |
| eth_balance = fetch_eth_balance(address, timestamp) |
| if eth_balance > 0: |
| eth_price = fetch_historical_eth_price( |
| datetime.utcfromtimestamp(timestamp).strftime("%d-%m-%Y") |
| ) |
| if eth_price and eth_price > 0: |
| eth_value = eth_balance * eth_price |
| final_value += eth_value |
| logger.info(f"ETH value: {eth_balance:.6f} ETH @ ${eth_price:.2f} = ${eth_value:.2f}") |
| else: |
| logger.warning(f"Could not fetch ETH price for timestamp {timestamp}") |
| |
| |
| for token_address, (symbol, decimals) in WHITELISTED_TOKENS.items(): |
| try: |
| token_balance = fetch_token_balance( |
| address=address, |
| token_address=token_address, |
| decimals=decimals, |
| timestamp=timestamp, |
| ) |
| |
| if token_balance is not None and token_balance > 0: |
| token_price = fetch_historical_token_price( |
| coin_id=COIN_ID_MAPPING.get(symbol.lower(), symbol.lower()), |
| date_str=datetime.utcfromtimestamp(timestamp).strftime("%d-%m-%Y"), |
| token_symbol=symbol |
| ) |
| |
| if token_price is not None and token_price > 0: |
| token_value = token_balance * token_price |
| final_value += token_value |
| logger.info(f"{symbol} value: {token_balance:.6f} @ ${token_price:.6f} = ${token_value:.2f}") |
| else: |
| logger.warning(f"Could not fetch price for {symbol} at timestamp {timestamp}") |
| |
| except Exception as e: |
| logger.error(f"Error processing token {symbol} ({token_address}): {e}") |
| continue |
| |
| except Exception as e: |
| logger.error(f"Error calculating final value for address {address}: {e}") |
| return 0.0 |
|
|
| logger.info(f"Total final value for {address}: ${final_value:.2f}") |
| return final_value |
|
|
| def _calculate_adjusted_apr( |
| apr: float, |
| initial_timestamp: int, |
| final_timestamp: int |
| ) -> float: |
| if apr is None or apr == 0: |
| return 0.0 |
|
|
| intial_eth_price = fetch_historical_eth_price(datetime.utcfromtimestamp(initial_timestamp).strftime("%d-%m-%Y")) |
| final_eth_price = fetch_historical_eth_price(datetime.utcfromtimestamp(final_timestamp).strftime("%d-%m-%Y")) |
|
|
| if ( |
| final_eth_price is not None |
| and intial_eth_price is not None |
| ): |
| adjustment_factor = Decimal("1") - ( |
| Decimal(str(final_eth_price)) / Decimal(str(intial_eth_price)) |
| ) |
| adjusted_apr = round( |
| float(apr) |
| + float(adjustment_factor * Decimal("100")), |
| 2, |
| ) |
| return adjusted_apr |
| else: |
| logger.warning( |
| f"Could not fetch ETH prices for timestamps {initial_timestamp} and {final_timestamp}. Returning original APR: {apr}" |
| ) |
| return apr |
|
|
| def calculate_apr_and_roi( |
| initial_value: float, |
| final_value: float, |
| initial_timestamp: int, |
| final_timestamp: int |
| ) -> Tuple[float, float, float]: |
| if final_value <= 0: |
| logger.warning("Final value is non-positive, returning 0.0 for APR and ROI.") |
| return 0.0, 0.0, 0.0 |
|
|
| |
| roi = ((final_value / initial_value) - 1) * 100 |
| |
| |
| hours = max(1, (final_timestamp - int(initial_timestamp)) / 3600) |
| |
| |
| hours_in_year = 8760 |
| time_ratio = hours_in_year / hours |
| |
| |
| apr = float(roi * time_ratio) |
| if apr < 0: |
| apr = roi |
|
|
| adjust_apr = _calculate_adjusted_apr( |
| apr=apr, |
| initial_timestamp=initial_timestamp, |
| final_timestamp=final_timestamp |
| ) |
| |
| return float(round(apr, 2)), float(round(adjust_apr, 2)), float(round(roi, 2)) |
|
|
|
|
| def fix_apr_and_roi(df: DataFrame) -> DataFrame: |
| """ |
| Fix APR and ROI values by recalculating them based on actual blockchain data. |
| This function processes each row only once and includes proper error handling. |
| """ |
| if df.empty: |
| logger.info("Empty DataFrame provided to fix_apr_and_roi, returning as-is") |
| return df |
|
|
| logger.info(f"Starting fix_apr_and_roi with {len(df)} rows") |
| |
| |
| original_count = len(df) |
| df = df[~df['address'].isin(EXCLUDED_ADDRESSES)] |
| excluded_count = original_count - len(df) |
| if excluded_count > 0: |
| logger.info(f"Excluded {excluded_count} rows with excluded addresses") |
|
|
| |
| original_count = len(df) |
| df = df[df['timestamp'] >= '2025-06-06 00:00:00.000000'] |
| old_data_count = original_count - len(df) |
| if old_data_count > 0: |
| logger.info(f"Excluded {old_data_count} rows with timestamps before 2025-06-06") |
|
|
| |
| current_time = datetime.now().timestamp() |
| future_rows = df[df['timestamp'].apply(lambda x: x.timestamp()) > current_time] |
| if not future_rows.empty: |
| logger.warning(f"Found {len(future_rows)} rows with future timestamps, excluding them") |
| for idx, row in future_rows.iterrows(): |
| logger.warning(f"Future timestamp found: {row['timestamp']} (timestamp: {row['timestamp'].timestamp()})") |
| df = df[df['timestamp'].apply(lambda x: x.timestamp()) <= current_time] |
|
|
| if df.empty: |
| logger.warning("No valid rows remaining after filtering, returning empty DataFrame") |
| return df |
|
|
| logger.info(f"Processing {len(df)} valid rows") |
| |
| |
| df_copy = df.copy() |
| rows_to_drop = [] |
| processed_count = 0 |
| |
| for idx, row in df_copy.iterrows(): |
| try: |
| if row['is_dummy']: |
| logger.debug(f"Skipping dummy row {idx}") |
| continue |
|
|
| processed_count += 1 |
| logger.info(f"Processing row {processed_count}/{len(df_copy)} - Address: {row['address']}") |
|
|
| final_timestamp = int(row['timestamp'].timestamp()) |
| |
| |
| if final_timestamp > current_time: |
| logger.warning(f"Skipping row {idx} with future timestamp: {final_timestamp}") |
| rows_to_drop.append(idx) |
| continue |
| |
| calculation_metrics = row['calculation_metrics'] |
| |
| |
| try: |
| initial_value, initial_timestamp = calculate_initial_value_from_address_and_timestamp( |
| row['address'], final_timestamp |
| ) |
| except Exception as e: |
| logger.error(f"Error calculating initial value for address {row['address']}: {e}") |
| rows_to_drop.append(idx) |
| continue |
| |
| |
| try: |
| final_value = calculate_final_value_from_address_and_timestamp( |
| row['address'], final_timestamp |
| ) |
| |
| |
| volume = row.get("volume", 0) |
| if volume and volume > 0: |
| final_value += volume |
| logger.info(f"Added volume ${volume:.2f} to final value for address {row['address']}") |
| |
| except Exception as e: |
| logger.error(f"Error calculating final value for address {row['address']}: {e}") |
| rows_to_drop.append(idx) |
| continue |
|
|
| if initial_value <= 0: |
| logger.warning(f"Initial value for address {row['address']} is non-positive ({initial_value}), skipping row.") |
| rows_to_drop.append(idx) |
| continue |
|
|
| |
| calculation_metrics['initial_value'] = initial_value |
| calculation_metrics['final_value'] = final_value |
| df.at[idx, 'calculation_metrics'] = calculation_metrics |
|
|
| |
| try: |
| apr, adjusted_apr, roi = calculate_apr_and_roi( |
| initial_value=initial_value, |
| final_value=final_value, |
| initial_timestamp=initial_timestamp, |
| final_timestamp=final_timestamp |
| ) |
| df.at[idx, 'apr'] = apr |
| df.at[idx, 'adjusted_apr'] = adjusted_apr |
| df.at[idx, 'roi'] = roi |
| |
| logger.info(f"Successfully processed address {row['address']}: APR={apr:.2f}, ROI={roi:.2f}") |
| |
| except Exception as e: |
| logger.error(f"Error calculating APR/ROI for address {row['address']}: {e}") |
| rows_to_drop.append(idx) |
| continue |
| |
| except Exception as e: |
| logger.error(f"Unexpected error processing row {idx}: {e}") |
| rows_to_drop.append(idx) |
| continue |
|
|
| |
| if rows_to_drop: |
| logger.info(f"Dropping {len(rows_to_drop)} rows due to errors") |
| df = df.drop(rows_to_drop) |
|
|
| logger.info(f"Completed fix_apr_and_roi: {len(df)} rows remaining") |
| return df |
|
|
|
|
| if __name__ == "__main__": |
| test_address = "0xc8E264f402Ae94f69bDEf8B1f035F7200cD2B0c7" |
| test_final_timestamp = 1750711233 |
|
|
| v = calculate_initial_value_from_address_and_timestamp( |
| test_address, |
| test_final_timestamp |
| ) |
| print(v) |
|
|