| import ccxt |
| import sys |
| import os |
| import argparse |
| import time |
| from datetime import datetime, timezone |
| import pytz |
| import threading |
| import csv |
| import pandas as pd |
| from sklearn.model_selection import train_test_split |
| from sklearn.ensemble import RandomForestRegressor |
| from sklearn.preprocessing import StandardScaler |
| from sklearn.metrics import mean_squared_error |
| import joblib |
|
|
| def fetch_markets(exchange): |
| try: |
| markets = exchange.fetch_markets() |
| symbols = [market['symbol'] for market in markets if market['spot'] and market['symbol'].endswith('/USDT')] |
| return symbols |
| except (ccxt.ExchangeError, ccxt.NetworkError, ccxt.DDoSProtection) as e: |
| print(f"Exchange error: {str(e)}") |
| os.kill(os.getpid(), 9) |
| sys.exit(-999) |
|
|
| def fetch_ohlcv(exchange, symbol, timeframe, limit=1000): |
| try: |
| all_candles = [] |
| since = exchange.parse8601('2020-01-01T00:00:00Z') |
| while True: |
| candles = exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit) |
| if not candles: |
| break |
| all_candles.extend(candles) |
| since = candles[-1][0] + 1 |
| if len(candles) < limit: |
| break |
| return all_candles |
| except Exception as e: |
| print(f"Error fetching OHLCV data for {symbol}: {str(e)}") |
| return [] |
|
|
| def fetch_current_price(exchange, symbol): |
| try: |
| ticker = exchange.fetch_ticker(symbol) |
| return ticker['ask'] if 'ask' in ticker else None |
| except Exception as e: |
| print(f"Error fetching current price for {symbol}: {str(e)}") |
| return None |
|
|
| def format_candle_time(timestamp): |
| return datetime.fromtimestamp(timestamp / 1000, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S') |
|
|
| def save_history_to_file(symbol, timeframe, ohlcv): |
| directory = "downloaded_history" |
| if not os.path.exists(directory): |
| os.makedirs(directory) |
|
|
| if not ohlcv: |
| print(f"No OHLCV data to save for {symbol}") |
| return |
|
|
| start_date = format_candle_time(ohlcv[0][0]).split()[0] |
| end_date = format_candle_time(ohlcv[-1][0]).split()[0] |
| filename = (f"{directory}/{symbol.replace('/', '_')}_{timeframe}_{start_date}_{end_date}.csv").replace(" ", "_").replace(":", "-") |
|
|
| with open(filename, 'w', newline='') as csvfile: |
| writer = csv.writer(csvfile) |
| writer.writerow(['Timestamp', 'Open', 'High', 'Low', 'Close', 'Volume']) |
| for candle in ohlcv: |
| timestamp, open_price, high_price, low_price, close_price, volume = candle |
| writer.writerow([format_candle_time(timestamp), open_price, high_price, low_price, close_price, volume]) |
|
|
| print(f"Saved history to {filename}") |
|
|
| def load_data_from_files(directory, symbol_filter=None): |
| all_data = [] |
| filenames = [] |
| for filename in os.listdir(directory): |
| if filename.endswith(".csv"): |
| if symbol_filter and not filename.startswith(symbol_filter): |
| continue |
| filepath = os.path.join(directory, filename) |
| df = pd.read_csv(filepath) |
| df['symbol'] = filename.split('_')[0] |
| timeframe = filename.split('_')[1] |
| df['Timeframe'] = timeframe |
| all_data.append(df) |
| filenames.append(filepath) |
| return pd.concat(all_data, ignore_index=True) if all_data else pd.DataFrame(), filenames |
|
|
| def preprocess_data(df): |
| df['Timestamp'] = pd.to_datetime(df['Timestamp']) |
| df.set_index('Timestamp', inplace=True) |
| df.sort_index(inplace=True) |
| df['Target'] = df['Close'].shift(-1) |
| df.dropna(inplace=True) |
| |
| X = df[['Open', 'High', 'Low', 'Close', 'Volume']] |
| y = df['Target'] |
| |
| return X, y |
|
|
| def train_model(X, y): |
| X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) |
| scaler = StandardScaler() |
| X_train_scaled = scaler.fit_transform(X_train) |
| X_test_scaled = scaler.transform(X_test) |
| |
| model = RandomForestRegressor(n_estimators=100, random_state=42) |
| model.fit(X_train_scaled, y_train) |
| |
| y_pred = model.predict(X_test_scaled) |
| mse = mean_squared_error(y_test, y_pred) |
| rmse = mse**0.5 |
| print(f"Model trained. RMSE: {rmse:.4f}") |
| |
| return model, scaler |
|
|
| def predict_next_candle(model, scaler, data): |
| data_scaled = scaler.transform(data[['Open', 'High', 'Low', 'Close', 'Volume']]) |
| prediction = model.predict(data_scaled) |
| return prediction |
|
|
| def analyze_symbol(exchange, symbol, timeframe, output_file): |
| try: |
| ohlcv = fetch_ohlcv(exchange, symbol, timeframe) |
| if not ohlcv: |
| return |
|
|
| save_history_to_file(symbol, timeframe, ohlcv) |
|
|
| max_candle = max(ohlcv, key=lambda x: x[2] - x[3]) |
| open_price = max_candle[1] |
| close_price = max_candle[4] |
| highest_price = max_candle[2] |
| lowest_price = max_candle[3] |
| timestamp = max_candle[0] |
| candle_date_time = format_candle_time(timestamp) |
|
|
| greatest_candle_info = ( |
| f"Symbol: {symbol}, Timeframe: {timeframe}, " |
| f"Open: {open_price:.4f}, Close: {close_price:.4f}, " |
| f"High: {highest_price:.4f}, Low: {lowest_price:.4f}, " |
| f"Range: {highest_price - lowest_price:.4f}, " |
| f"Greatest Candle DateTime: {candle_date_time}\n" |
| ) |
| |
| current_price = fetch_current_price(exchange, symbol) |
| if current_price is None: |
| return |
|
|
| current_time = datetime.now(pytz.UTC).strftime('%Y-%m-%d %H:%M:%S') |
| result = ( |
| f"[{current_time}] {symbol} (BINANCE:{symbol.replace('/', '')}) " |
| f"Timeframe: {timeframe}, Current price: {current_price:.4f}\n" |
| f"{greatest_candle_info}\n" |
| ) |
| print(result.strip()) |
|
|
| with open(output_file, 'a') as f: |
| f.write(result) |
|
|
| except Exception as e: |
| print(f"Error analyzing symbol {symbol}: {str(e)}") |
|
|
| def write_predictions_to_file(output_file, predictions): |
| with open(output_file, 'a') as f: |
| for symbol, last_timestamp, timeframe, prediction in predictions: |
| |
| if isinstance(last_timestamp, int): |
| last_timestamp = datetime.fromtimestamp(last_timestamp / 1000, tz=timezone.utc) |
| |
| last_timestamp_str = last_timestamp.strftime('%Y-%m-%d %H:%M:%S') |
| f.write(f"Prediction for {symbol} ({timeframe}):\n") |
| f.write(f"- Last Timestamp: {last_timestamp_str}\n") |
| f.write(f"- Predicted Value: {prediction[-1]:.4f}\n") |
| f.write("\n") |
|
|
| def worker(exchange, symbols, timeframe, output_file): |
| for symbol in symbols: |
| if symbol in fetch_markets(exchange): |
| analyze_symbol(exchange, symbol, timeframe, output_file) |
| else: |
| print(f"Skipping invalid symbol {symbol}") |
|
|
| def main(): |
| parser = argparse.ArgumentParser(description='Show details of the greatest historical candle, save historical data, and train a model.') |
| parser.add_argument('--timeframe', type=str, required=True, help='Timeframe for the candlestick analysis') |
| parser.add_argument('--train', action='store_true', help='Train model using existing historical data') |
| parser.add_argument('--use-existing', action='store_true', help='Use existing historical data files') |
| args = parser.parse_args() |
|
|
| timeframe = args.timeframe |
| train_model_flag = args.train |
| use_existing_flag = args.use_existing |
| script_name = os.path.basename(__file__).split('.')[0] |
| result_directory = f"scan_results_{script_name}" |
|
|
| if not os.path.exists(result_directory): |
| os.makedirs(result_directory) |
|
|
| output_file = os.path.join(result_directory, f"{datetime.now(pytz.UTC).strftime('%Y%m%d_%H%M%S')}_{timeframe}_greatest_candles.txt") |
|
|
| if train_model_flag: |
| historical_data_dir = "downloaded_history" |
| symbol_filter = 'BTC_USDT' |
| df, filenames = load_data_from_files(historical_data_dir, symbol_filter) |
| if df.empty: |
| print("No historical data found for training.") |
| return |
|
|
| print("Training model using the following BTC-related files:") |
| for filename in filenames: |
| print(f"- {filename}") |
|
|
| X, y = preprocess_data(df) |
| if X.empty or y.empty: |
| print("No valid data available for training.") |
| return |
|
|
| model, scaler = train_model(X, y) |
|
|
| joblib.dump(model, 'model.pkl') |
| joblib.dump(scaler, 'scaler.pkl') |
| |
| elif use_existing_flag: |
| model = joblib.load('model.pkl') |
| scaler = joblib.load('scaler.pkl') |
| |
| historical_data_dir = "downloaded_history" |
| symbol_filter = 'BTC_USDT' |
| df, filenames = load_data_from_files(historical_data_dir, symbol_filter) |
| if df.empty: |
| print("No historical data found for prediction.") |
| return |
|
|
| print("Using the following BTC-related files for prediction:") |
| for filename in filenames: |
| print(f"- {filename}") |
|
|
| predictions = [] |
| for symbol in df['symbol'].unique(): |
| symbol_data = df[df['symbol'] == symbol] |
| timeframe = symbol_data['Timeframe'].iloc[0] |
| last_timestamp = symbol_data.index[-1] |
| next_candle_prediction = predict_next_candle(model, scaler, symbol_data) |
| predictions.append((symbol, last_timestamp, timeframe, next_candle_prediction)) |
|
|
| write_predictions_to_file(output_file, predictions) |
|
|
| else: |
| exchange = ccxt.binance() |
| symbols = fetch_markets(exchange) |
| worker(exchange, symbols, timeframe, output_file) |
|
|
| if __name__ == "__main__": |
| main() |
|
|