| """ |
| End-to-end pipeline: load crypto data -> engineer features -> train 15-min direction classifier. |
| Single script to avoid intermediate file issues. |
| """ |
|
|
| import os |
| import numpy as np |
| import pandas as pd |
| from datasets import load_dataset |
| from sklearn.ensemble import RandomForestClassifier |
| from sklearn.linear_model import LogisticRegression |
| from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, classification_report |
| import json |
| import pickle |
|
|
| SEED = 42 |
| LOOKBACK = 60 |
| AHEAD = 15 |
| MAX_ROWS = 300_000 |
| MAX_TRAIN_SAMPLES = 30_000 |
| MAX_VAL_SAMPLES = 6_000 |
| MAX_TEST_SAMPLES = 6_000 |
| OUT_DIR = "/app/outputs" |
| os.makedirs(OUT_DIR, exist_ok=True) |
|
|
| def load_data(max_rows=MAX_ROWS): |
| print("Loading BTC...") |
| ds = load_dataset("WinkingFace/CryptoLM-Bitcoin-BTC-USDT", split=f"train[:{max_rows}]") |
| df_btc = ds.to_pandas() |
| for c in ["open", "volume"]: |
| df_btc[c] = pd.to_numeric(df_btc[c], errors="coerce") |
| df_btc = df_btc.rename(columns={c: f"btc_{c}" for c in df_btc.columns if c != "timestamp"}) |
|
|
| print("Loading ETH...") |
| ds = load_dataset("WinkingFace/CryptoLM-Ethereum-ETH-USDT", split=f"train[:{max_rows}]") |
| df_eth = ds.to_pandas() |
| for c in ["open", "volume"]: |
| df_eth[c] = pd.to_numeric(df_eth[c], errors="coerce") |
| df_eth = df_eth.rename(columns={c: f"eth_{c}" for c in df_eth.columns if c != "timestamp"}) |
|
|
| df = pd.merge(df_btc, df_eth, on="timestamp", how="inner").sort_values("timestamp").reset_index(drop=True) |
| df = df.dropna(subset=["btc_close", "eth_close"]).reset_index(drop=True) |
| print(f"Merged rows: {len(df)}") |
| return df |
|
|
| def engineer_features(df): |
| print("Engineering features...") |
| df["eth_btc_ratio"] = df["eth_close"] / df["btc_close"] |
| df["btc_ret_1m"] = df["btc_close"].pct_change() |
| df["eth_ret_1m"] = df["eth_close"].pct_change() |
| df["btc_vol_ma20"] = df["btc_volume"].rolling(20).mean() |
| df["eth_vol_ma20"] = df["eth_volume"].rolling(20).mean() |
| df["btc_range"] = (df["btc_high"] - df["btc_low"]) / df["btc_close"] |
| df["eth_range"] = (df["eth_high"] - df["eth_low"]) / df["eth_close"] |
| df["target"] = (df["btc_close"].shift(-AHEAD) > df["btc_close"]).astype(int) |
| df = df.iloc[:-AHEAD].copy() |
| return df |
|
|
| def build_windows(df, lookback=LOOKBACK): |
| print("Building windows...") |
| exclude = {"timestamp", "btc_month", "eth_month", "target"} |
| feat_cols = [c for c in df.columns if c not in exclude] |
| df = df.dropna(subset=feat_cols + ["target"]).reset_index(drop=True) |
|
|
| data = df[feat_cols].values.astype(np.float32) |
| targets = df["target"].values.astype(np.int64) |
| n = len(df) |
| valid = ~np.isnan(data).any(axis=1) & ~np.isnan(targets) |
|
|
| max_i = n - lookback - AHEAD + 1 |
| X_list, y_list = [], [] |
| for i in range(max_i): |
| end = i + lookback |
| tidx = end + AHEAD - 1 |
| if valid[i:end].all() and valid[tidx]: |
| X_list.append(data[i:end]) |
| y_list.append(targets[tidx]) |
| X = np.array(X_list, dtype=np.float32) |
| y = np.array(y_list, dtype=np.int64) |
| print(f"Samples: {X.shape}, pos_rate={y.mean():.3f}") |
| return X, y |
|
|
| def subsample(X, y, max_n, rng): |
| if len(X) > max_n: |
| idx = rng.choice(len(X), max_n, replace=False) |
| return X[idx], y[idx] |
| return X, y |
|
|
| def evaluate_model(name, model, X_test, y_test, results): |
| preds = model.predict(X_test) |
| probs = model.predict_proba(X_test)[:, 1] |
| acc = accuracy_score(y_test, preds) |
| f1 = f1_score(y_test, preds) |
| auc = roc_auc_score(y_test, probs) |
| results[name] = {"accuracy": float(acc), "f1": float(f1), "auc": float(auc)} |
| print(f" {name} test: acc={acc:.4f} f1={f1:.4f} auc={auc:.4f}") |
| return results |
|
|
| def main(): |
| df = load_data() |
| df = engineer_features(df) |
| X, y = build_windows(df) |
|
|
| n = len(X) |
| te = int(n * 0.70) |
| ve = int(n * 0.85) |
|
|
| X_train, y_train = X[:te], y[:te] |
| X_val, y_val = X[te:ve], y[te:ve] |
| X_test, y_test = X[ve:], y[ve:] |
| print(f"Split: train={len(X_train)}, val={len(X_val)}, test={len(X_test)}") |
|
|
| rng = np.random.RandomState(SEED) |
| X_train, y_train = subsample(X_train, y_train, MAX_TRAIN_SAMPLES, rng) |
| X_val, y_val = subsample(X_val, y_val, MAX_VAL_SAMPLES, rng) |
| X_test, y_test = subsample(X_test, y_test, MAX_TEST_SAMPLES, rng) |
| print(f"Subsampled: train={len(X_train)}, val={len(X_val)}, test={len(X_test)}") |
|
|
| def flat(X): |
| return X.reshape(X.shape[0], -1) |
|
|
| X_train_f = flat(X_train) |
| X_val_f = flat(X_val) |
| X_test_f = flat(X_test) |
|
|
| valid = (np.isfinite(X_train_f).all(axis=0) & |
| np.isfinite(X_val_f).all(axis=0) & |
| np.isfinite(X_test_f).all(axis=0)) |
| X_train_f = X_train_f[:, valid] |
| X_val_f = X_val_f[:, valid] |
| X_test_f = X_test_f[:, valid] |
| print(f"Valid features: {X_train_f.shape[1]}") |
|
|
| mean = X_train_f.mean(axis=0) |
| std = X_train_f.std(axis=0) + 1e-8 |
| X_train_f = (X_train_f - mean) / std |
| X_val_f = (X_val_f - mean) / std |
| X_test_f = (X_test_f - mean) / std |
|
|
| results = {} |
|
|
| print("\nTraining Random Forest...") |
| rf = RandomForestClassifier(n_estimators=200, max_depth=12, min_samples_leaf=5, n_jobs=-1, random_state=SEED) |
| rf.fit(X_train_f, y_train) |
| results = evaluate_model("RandomForest", rf, X_test_f, y_test, results) |
|
|
| print("\nTraining Logistic Regression...") |
| lr = LogisticRegression(max_iter=500, random_state=SEED) |
| lr.fit(X_train_f, y_train) |
| results = evaluate_model("LogisticRegression", lr, X_test_f, y_test, results) |
|
|
| best_name = max(results, key=lambda k: results[k]["auc"]) |
| print(f"\nBest model: {best_name} (AUC={results[best_name]['auc']:.4f})") |
| best_model = rf if best_name == "RandomForest" else lr |
|
|
| with open(os.path.join(OUT_DIR, "model.pkl"), "wb") as f: |
| pickle.dump(best_model, f) |
| np.save(os.path.join(OUT_DIR, "feature_mean.npy"), mean) |
| np.save(os.path.join(OUT_DIR, "feature_std.npy"), std) |
| np.save(os.path.join(OUT_DIR, "valid_cols.npy"), valid) |
|
|
| preds = best_model.predict(X_test_f) |
| print("\nBest Model Classification Report (Test):") |
| print(classification_report(y_test, preds, target_names=["down", "up"], digits=4)) |
|
|
| metrics = { |
| "best_model": best_name, |
| "train_samples": int(len(X_train_f)), |
| "val_samples": int(len(X_val_f)), |
| "test_samples": int(len(X_test_f)), |
| "n_features": int(X_train_f.shape[1]), |
| "results": results, |
| "best_test_accuracy": results[best_name]["accuracy"], |
| "best_test_f1": results[best_name]["f1"], |
| "best_test_auc": results[best_name]["auc"], |
| } |
| with open(os.path.join(OUT_DIR, "metrics.json"), "w") as f: |
| json.dump(metrics, f, indent=2) |
| print(f"\nArtifacts saved to {OUT_DIR}") |
|
|
| if __name__ == "__main__": |
| main() |
|
|