""" End-to-end pipeline: load crypto data -> engineer features -> train 15-min direction classifier. Single script to avoid intermediate file issues. """ import os import numpy as np import pandas as pd from datasets import load_dataset from sklearn.ensemble import RandomForestClassifier from sklearn.linear_model import LogisticRegression from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, classification_report import json import pickle SEED = 42 LOOKBACK = 60 AHEAD = 15 MAX_ROWS = 300_000 MAX_TRAIN_SAMPLES = 30_000 MAX_VAL_SAMPLES = 6_000 MAX_TEST_SAMPLES = 6_000 OUT_DIR = "/app/outputs" os.makedirs(OUT_DIR, exist_ok=True) def load_data(max_rows=MAX_ROWS): print("Loading BTC...") ds = load_dataset("WinkingFace/CryptoLM-Bitcoin-BTC-USDT", split=f"train[:{max_rows}]") df_btc = ds.to_pandas() for c in ["open", "volume"]: df_btc[c] = pd.to_numeric(df_btc[c], errors="coerce") df_btc = df_btc.rename(columns={c: f"btc_{c}" for c in df_btc.columns if c != "timestamp"}) print("Loading ETH...") ds = load_dataset("WinkingFace/CryptoLM-Ethereum-ETH-USDT", split=f"train[:{max_rows}]") df_eth = ds.to_pandas() for c in ["open", "volume"]: df_eth[c] = pd.to_numeric(df_eth[c], errors="coerce") df_eth = df_eth.rename(columns={c: f"eth_{c}" for c in df_eth.columns if c != "timestamp"}) df = pd.merge(df_btc, df_eth, on="timestamp", how="inner").sort_values("timestamp").reset_index(drop=True) df = df.dropna(subset=["btc_close", "eth_close"]).reset_index(drop=True) print(f"Merged rows: {len(df)}") return df def engineer_features(df): print("Engineering features...") df["eth_btc_ratio"] = df["eth_close"] / df["btc_close"] df["btc_ret_1m"] = df["btc_close"].pct_change() df["eth_ret_1m"] = df["eth_close"].pct_change() df["btc_vol_ma20"] = df["btc_volume"].rolling(20).mean() df["eth_vol_ma20"] = df["eth_volume"].rolling(20).mean() df["btc_range"] = (df["btc_high"] - df["btc_low"]) / df["btc_close"] df["eth_range"] = (df["eth_high"] - df["eth_low"]) / df["eth_close"] df["target"] = (df["btc_close"].shift(-AHEAD) > df["btc_close"]).astype(int) df = df.iloc[:-AHEAD].copy() return df def build_windows(df, lookback=LOOKBACK): print("Building windows...") exclude = {"timestamp", "btc_month", "eth_month", "target"} feat_cols = [c for c in df.columns if c not in exclude] df = df.dropna(subset=feat_cols + ["target"]).reset_index(drop=True) data = df[feat_cols].values.astype(np.float32) targets = df["target"].values.astype(np.int64) n = len(df) valid = ~np.isnan(data).any(axis=1) & ~np.isnan(targets) max_i = n - lookback - AHEAD + 1 X_list, y_list = [], [] for i in range(max_i): end = i + lookback tidx = end + AHEAD - 1 if valid[i:end].all() and valid[tidx]: X_list.append(data[i:end]) y_list.append(targets[tidx]) X = np.array(X_list, dtype=np.float32) y = np.array(y_list, dtype=np.int64) print(f"Samples: {X.shape}, pos_rate={y.mean():.3f}") return X, y def subsample(X, y, max_n, rng): if len(X) > max_n: idx = rng.choice(len(X), max_n, replace=False) return X[idx], y[idx] return X, y def evaluate_model(name, model, X_test, y_test, results): preds = model.predict(X_test) probs = model.predict_proba(X_test)[:, 1] acc = accuracy_score(y_test, preds) f1 = f1_score(y_test, preds) auc = roc_auc_score(y_test, probs) results[name] = {"accuracy": float(acc), "f1": float(f1), "auc": float(auc)} print(f" {name} test: acc={acc:.4f} f1={f1:.4f} auc={auc:.4f}") return results def main(): df = load_data() df = engineer_features(df) X, y = build_windows(df) n = len(X) te = int(n * 0.70) ve = int(n * 0.85) X_train, y_train = X[:te], y[:te] X_val, y_val = X[te:ve], y[te:ve] X_test, y_test = X[ve:], y[ve:] print(f"Split: train={len(X_train)}, val={len(X_val)}, test={len(X_test)}") rng = np.random.RandomState(SEED) X_train, y_train = subsample(X_train, y_train, MAX_TRAIN_SAMPLES, rng) X_val, y_val = subsample(X_val, y_val, MAX_VAL_SAMPLES, rng) X_test, y_test = subsample(X_test, y_test, MAX_TEST_SAMPLES, rng) print(f"Subsampled: train={len(X_train)}, val={len(X_val)}, test={len(X_test)}") def flat(X): return X.reshape(X.shape[0], -1) X_train_f = flat(X_train) X_val_f = flat(X_val) X_test_f = flat(X_test) valid = (np.isfinite(X_train_f).all(axis=0) & np.isfinite(X_val_f).all(axis=0) & np.isfinite(X_test_f).all(axis=0)) X_train_f = X_train_f[:, valid] X_val_f = X_val_f[:, valid] X_test_f = X_test_f[:, valid] print(f"Valid features: {X_train_f.shape[1]}") mean = X_train_f.mean(axis=0) std = X_train_f.std(axis=0) + 1e-8 X_train_f = (X_train_f - mean) / std X_val_f = (X_val_f - mean) / std X_test_f = (X_test_f - mean) / std results = {} print("\nTraining Random Forest...") rf = RandomForestClassifier(n_estimators=200, max_depth=12, min_samples_leaf=5, n_jobs=-1, random_state=SEED) rf.fit(X_train_f, y_train) results = evaluate_model("RandomForest", rf, X_test_f, y_test, results) print("\nTraining Logistic Regression...") lr = LogisticRegression(max_iter=500, random_state=SEED) lr.fit(X_train_f, y_train) results = evaluate_model("LogisticRegression", lr, X_test_f, y_test, results) best_name = max(results, key=lambda k: results[k]["auc"]) print(f"\nBest model: {best_name} (AUC={results[best_name]['auc']:.4f})") best_model = rf if best_name == "RandomForest" else lr with open(os.path.join(OUT_DIR, "model.pkl"), "wb") as f: pickle.dump(best_model, f) np.save(os.path.join(OUT_DIR, "feature_mean.npy"), mean) np.save(os.path.join(OUT_DIR, "feature_std.npy"), std) np.save(os.path.join(OUT_DIR, "valid_cols.npy"), valid) preds = best_model.predict(X_test_f) print("\nBest Model Classification Report (Test):") print(classification_report(y_test, preds, target_names=["down", "up"], digits=4)) metrics = { "best_model": best_name, "train_samples": int(len(X_train_f)), "val_samples": int(len(X_val_f)), "test_samples": int(len(X_test_f)), "n_features": int(X_train_f.shape[1]), "results": results, "best_test_accuracy": results[best_name]["accuracy"], "best_test_f1": results[best_name]["f1"], "best_test_auc": results[best_name]["auc"], } with open(os.path.join(OUT_DIR, "metrics.json"), "w") as f: json.dump(metrics, f, indent=2) print(f"\nArtifacts saved to {OUT_DIR}") if __name__ == "__main__": main()