""" Speaker Identification using PCA and Classical ML Models ======================================================== Analyzes ECAPA embeddings using PCA and evaluates: - Logistic Regression - SVM (Linear) - SVM (RBF/Gaussian) - k-Nearest Neighbors (k-NN) Deliverables: - PCA visualization plots (2D) - Accuracy comparison table (all models x PCA dims) - Precision, Recall, F1, Confusion Matrices - Trained ML models (saved with joblib) """ import os import time import json from pathlib import Path import matplotlib matplotlib.use("Agg") # Non-interactive backend for server import matplotlib.pyplot as plt import numpy as np import pandas as pd import seaborn as sns from joblib import dump from sklearn.decomposition import PCA from sklearn.linear_model import LogisticRegression from sklearn.metrics import ( accuracy_score, confusion_matrix, f1_score, precision_score, recall_score, ) from sklearn.model_selection import train_test_split from sklearn.neighbors import KNeighborsClassifier from sklearn.preprocessing import LabelEncoder, StandardScaler from sklearn.svm import SVC from tqdm.auto import tqdm # ============================================================ # Configuration # ============================================================ RANDOM_STATE = 42 TEST_SIZE = 0.1 # 10% for final test VAL_SIZE = 0.1111 # ~10% of remaining (0.1111 * 0.9 ≈ 0.10) DATA_PATH = "voxceleb1_dev_ecapa_features.csv" OUTPUT_DIR = Path("results") MODELS_DIR = OUTPUT_DIR / "models" PLOTS_DIR = OUTPUT_DIR / "plots" OUTPUT_DIR.mkdir(parents=True, exist_ok=True) MODELS_DIR.mkdir(parents=True, exist_ok=True) PLOTS_DIR.mkdir(parents=True, exist_ok=True) print("=" * 60) print("Speaker Identification - PCA + ML Pipeline") print("=" * 60) # ============================================================ # 1. Load Data # ============================================================ print("\n[1/8] Loading dataset...") t0 = time.time() df = pd.read_csv(DATA_PATH) feature_cols = [c for c in df.columns if c.startswith("emb_")] print(f" Dataset shape: {df.shape}") print(f" Features: {len(feature_cols)}-dim ECAPA embeddings") print(f" Unique speakers: {df['speaker_id'].nunique()}") print(f" Load time: {time.time() - t0:.1f}s") # ============================================================ # 2. Train / Validation / Test Split (80/10/10) # ============================================================ print("\n[2/8] Splitting data 80/10/10 (speaker-stratified)...") t0 = time.time() # First split: 90% train+val, 10% test df_trainval, df_test = train_test_split( df, test_size=TEST_SIZE, random_state=RANDOM_STATE, stratify=df["speaker_id"], ) # Second split: 80% train, 10% val (from the 90%) df_train, df_val = train_test_split( df_trainval, test_size=VAL_SIZE, random_state=RANDOM_STATE, stratify=df_trainval["speaker_id"], ) print(f" Train: {len(df_train)} ({len(df_train)/len(df)*100:.1f}%)") print(f" Val: {len(df_val)} ({len(df_val)/len(df)*100:.1f}%)") print(f" Test: {len(df_test)} ({len(df_test)/len(df)*100:.1f}%)") print(f" Split time: {time.time() - t0:.1f}s") # Encode labels le = LabelEncoder() le.fit(df["speaker_id"]) X_train = df_train[feature_cols].values X_val = df_val[feature_cols].values X_test = df_test[feature_cols].values y_train_enc = le.transform(df_train["speaker_id"]) y_val_enc = le.transform(df_val["speaker_id"]) y_test_enc = le.transform(df_test["speaker_id"]) num_classes = len(le.classes_) print(f" Number of classes (speakers): {num_classes}") # ============================================================ # 3. Standardize Features # ============================================================ print("\n[3/8] Standardizing features...") t0 = time.time() scaler = StandardScaler() X_train_sc = scaler.fit_transform(X_train) X_val_sc = scaler.transform(X_val) X_test_sc = scaler.transform(X_test) print(f" Scaled train shape: {X_train_sc.shape}") print(f" Scale time: {time.time() - t0:.1f}s") # ============================================================ # 4. PCA Transformation (192, 100, 50, 2) # ============================================================ print("\n[4/8] Applying PCA...") t0 = time.time() pca_100 = PCA(n_components=100, random_state=RANDOM_STATE) pca_50 = PCA(n_components=50, random_state=RANDOM_STATE) pca_2 = PCA(n_components=2, random_state=RANDOM_STATE) # Fit on train, transform all X_train_pca100 = pca_100.fit_transform(X_train_sc) X_val_pca100 = pca_100.transform(X_val_sc) X_test_pca100 = pca_100.transform(X_test_sc) X_train_pca50 = pca_50.fit_transform(X_train_sc) X_val_pca50 = pca_50.transform(X_val_sc) X_test_pca50 = pca_50.transform(X_test_sc) X_train_pca2 = pca_2.fit_transform(X_train_sc) X_val_pca2 = pca_2.transform(X_val_sc) X_test_pca2 = pca_2.transform(X_test_sc) var_100 = pca_100.explained_variance_ratio_.sum() var_50 = pca_50.explained_variance_ratio_.sum() var_2 = pca_2.explained_variance_ratio_.sum() print(f" PCA 100 explained variance: {var_100:.4f}") print(f" PCA 50 explained variance: {var_50:.4f}") print(f" PCA 2 explained variance: {var_2:.4f}") print(f" PCA time: {time.time() - t0:.1f}s") # ============================================================ # 5. PCA 2D Visualization # ============================================================ print("\n[5/8] Generating PCA 2D visualization...") num_speakers = len(np.unique(y_train_enc)) cmap = plt.cm.get_cmap("nipy_spectral", num_speakers) fig, ax = plt.subplots(figsize=(14, 10)) scatter = ax.scatter( X_train_pca2[:, 0], X_train_pca2[:, 1], c=y_train_enc, cmap=cmap, alpha=0.45, s=8, linewidths=0, rasterized=True, marker="o", ) ax.set_title("2D PCA Projection of ECAPA Embeddings (Train Set)", fontsize=16) ax.set_xlabel(f"PC1 ({pca_2.explained_variance_ratio_[0] * 100:.2f}% variance)", fontsize=13) ax.set_ylabel(f"PC2 ({pca_2.explained_variance_ratio_[1] * 100:.2f}% variance)", fontsize=13) ax.grid(True, linestyle="--", alpha=0.3) plt.tight_layout() pca_plot_path = PLOTS_DIR / "pca_2d_visualization.png" fig.savefig(pca_plot_path, dpi=150) plt.close(fig) print(f" Saved: {pca_plot_path}") # ============================================================ # 6. Train Models # ============================================================ print("\n[6/8] Training models...") models = {} # Define model configs: name -> (model_instance, feature_sets) # Feature sets: "192" = original, "100" = PCA100, "50" = PCA50 feature_sets = { "192": (X_train_sc, X_val_sc, X_test_sc), "100": (X_train_pca100, X_val_pca100, X_test_pca100), "50": (X_train_pca50, X_val_pca50, X_test_pca50), } model_defs = { "Logistic Regression": [ LogisticRegression(max_iter=2000, solver="lbfgs", n_jobs=-1, random_state=RANDOM_STATE, verbose=0), ], "SVM (Linear)": [ SVC(kernel="linear", C=1.0, random_state=RANDOM_STATE), ], "SVM (RBF)": [ SVC(kernel="rbf", C=1.0, gamma="scale", random_state=RANDOM_STATE), ], "k-NN": [ KNeighborsClassifier(n_neighbors=5, metric="minkowski", n_jobs=-1), ], } results = {} for model_name, model_list in model_defs.items(): print(f"\n --- {model_name} ---") for model in model_list: for dim_name, (X_tr, X_va, X_te) in feature_sets.items(): key = f"{model_name}_{dim_name}" print(f" Training {key} ...", end=" ", flush=True) t_train = time.time() model_clone = type(model)(**model.get_params()) model_clone.fit(X_tr, y_train_enc) train_time = time.time() - t_train # Evaluate on test set t_pred = time.time() y_pred = model_clone.predict(X_te) pred_time = time.time() - t_pred acc = accuracy_score(y_test_enc, y_pred) prec = precision_score(y_test_enc, y_pred, average="macro", zero_division=0) rec = recall_score(y_test_enc, y_pred, average="macro", zero_division=0) f1 = f1_score(y_test_enc, y_pred, average="macro", zero_division=0) cm = confusion_matrix(y_test_enc, y_pred) results[key] = { "accuracy": acc, "precision_macro": prec, "recall_macro": rec, "f1_macro": f1, "train_time_s": train_time, "pred_time_s": pred_time, "confusion_matrix": cm.tolist(), } # Save model model_path = MODELS_DIR / f"{key.replace(' ', '_').replace('(', '').replace(')', '')}.joblib" dump(model_clone, model_path) print(f"acc={acc:.4f} prec={prec:.4f} rec={rec:.4f} f1={f1:.4f} " f"train={train_time:.1f}s pred={pred_time:.1f}s") # ============================================================ # 7. Save Results # ============================================================ print("\n[7/8] Saving results...") # 7a. Accuracy comparison table acc_table = pd.DataFrame([ { "Model": model_name, "Original (192)": results.get(f"{model_name}_192", {}).get("accuracy", None), "PCA (100)": results.get(f"{model_name}_100", {}).get("accuracy", None), "PCA (50)": results.get(f"{model_name}_50", {}).get("accuracy", None), } for model_name in model_defs.keys() ]) acc_table_path = OUTPUT_DIR / "accuracy_comparison_table.csv" acc_table.to_csv(acc_table_path, index=False) print(f"\n Accuracy Comparison Table:") print(acc_table.to_string(index=False)) print(f" Saved: {acc_table_path}") # 7b. Full results JSON (all metrics) results_path = OUTPUT_DIR / "full_results.json" with open(results_path, "w") as f: json.dump(results, f, indent=2) print(f" Saved: {results_path}") # 7c. PCA explained variance pca_var_df = pd.DataFrame({ "PCA Dimension": [100, 50, 2], "Explained Variance Ratio": [var_100, var_50, var_2], }) pca_var_path = OUTPUT_DIR / "pca_explained_variance.csv" pca_var_df.to_csv(pca_var_path, index=False) print(f" Saved: {pca_var_path}") # ============================================================ # 8. Visualizations # ============================================================ print("\n[8/8] Generating visualizations...") # 8a. Accuracy bar chart fig, ax = plt.subplots(figsize=(12, 7)) x = np.arange(len(model_defs)) width = 0.25 colors = ["#2196F3", "#4CAF50", "#FF9800"] for i, dim in enumerate(["192", "100", "50"]): accs = [results.get(f"{m}_{dim}", {}).get("accuracy", 0) for m in model_defs] ax.bar(x + i * width, accs, width, label=f"PCA ({dim})", color=colors[i]) ax.set_xlabel("Model", fontsize=13) ax.set_ylabel("Accuracy", fontsize=13) ax.set_title("Classification Accuracy by Model and PCA Dimensionality", fontsize=15) ax.set_xticks(x + width) ax.set_xticklabels(list(model_defs.keys()), rotation=15, ha="right") ax.set_ylim(0.90, 1.0) ax.legend() ax.grid(axis="y", linestyle="--", alpha=0.4) plt.tight_layout() acc_bar_path = PLOTS_DIR / "accuracy_comparison_bar.png" fig.savefig(acc_bar_path, dpi=150) plt.close(fig) print(f" Saved: {acc_bar_path}") # 8b. Confusion matrices (for best model = Logistic Regression PCA 100) best_key = "Logistic Regression_100" best_cm = np.array(results[best_key]["confusion_matrix"]) # For large number of classes, show a summary or top classes if best_cm.shape[0] > 50: # Show a subset or normalized version fig, ax = plt.subplots(figsize=(14, 12)) # Normalize row-wise cm_norm = best_cm.astype(float) / (best_cm.sum(axis=1, keepdims=True) + 1e-10) # For very large matrices, show a sample sample_size = min(50, best_cm.shape[0]) indices = np.linspace(0, best_cm.shape[0] - 1, sample_size, dtype=int) cm_sample = cm_norm[np.ix_(indices, indices)] sns.heatmap(cm_sample, ax=ax, cmap="Blues", cbar_kws={"label": "Proportion"}) ax.set_title(f"Confusion Matrix (Normalized) - {best_key} (sample {sample_size}x{sample_size})", fontsize=14) ax.set_xlabel("Predicted Speaker", fontsize=12) ax.set_ylabel("True Speaker", fontsize=12) else: fig, ax = plt.subplots(figsize=(10, 8)) sns.heatmap(best_cm, ax=ax, cmap="Blues", fmt="d") ax.set_title(f"Confusion Matrix - {best_key}", fontsize=14) ax.set_xlabel("Predicted Speaker", fontsize=12) ax.set_ylabel("True Speaker", fontsize=12) plt.tight_layout() cm_path = PLOTS_DIR / f"confusion_matrix_{best_key.replace(' ', '_').replace('(', '').replace(')', '')}.png" fig.savefig(cm_path, dpi=150) plt.close(fig) print(f" Saved: {cm_path}") # 8c. F1 Score bar chart fig, ax = plt.subplots(figsize=(12, 7)) for i, dim in enumerate(["192", "100", "50"]): f1s = [results.get(f"{m}_{dim}", {}).get("f1_macro", 0) for m in model_defs] ax.bar(x + i * width, f1s, width, label=f"PCA ({dim})", color=colors[i]) ax.set_xlabel("Model", fontsize=13) ax.set_ylabel("Macro F1 Score", fontsize=13) ax.set_title("Macro F1 Score by Model and PCA Dimensionality", fontsize=15) ax.set_xticks(x + width) ax.set_xticklabels(list(model_defs.keys()), rotation=15, ha="right") ax.legend() ax.grid(axis="y", linestyle="--", alpha=0.4) plt.tight_layout() f1_bar_path = PLOTS_DIR / "f1_comparison_bar.png" fig.savefig(f1_bar_path, dpi=150) plt.close(fig) print(f" Saved: {f1_bar_path}") # ============================================================ # Summary # ============================================================ print("\n" + "=" * 60) print("PIPELINE COMPLETE") print("=" * 60) print(f"\nResults directory: {OUTPUT_DIR.resolve()}") print(f" Models: {MODELS_DIR.resolve()}") print(f" Plots: {PLOTS_DIR.resolve()}") print(f"\nTotal models saved: {len(results)}") print(f"\nTop 5 results by accuracy:") sorted_results = sorted(results.items(), key=lambda x: x[1]["accuracy"], reverse=True) for key, val in sorted_results[:5]: print(f" {key:40s} acc={val['accuracy']:.4f} f1={val['f1_macro']:.4f}") print("\nDone!")