| import torch
|
| import numpy as np
|
| import matplotlib.pyplot as plt
|
| from sklearn.decomposition import PCA
|
| from sklearn.manifold import TSNE
|
| import umap
|
|
|
| def plot_dimensionality_reduction(tensor, method='all', labels=None, input_type='Unknown', task='Unknown'):
|
| """
|
| Plots 2D projections of high-dimensional data using PCA, t-SNE, or UMAP.
|
|
|
| Parameters:
|
| tensor (torch.Tensor): Input data of shape (n_samples, n_features).
|
| method (str or list): One of ['pca', 'tsne', 'umap'] or 'all' for all three.
|
| labels (array-like): Optional labels for coloring the scatter plot.
|
| input_type (str): Type of input data for title.
|
| task (str): Task description for title.
|
| """
|
| tensor = tensor.view(tensor.size(0), -1)
|
|
|
| if isinstance(tensor, torch.Tensor):
|
| tensor = tensor.cpu().numpy()
|
|
|
| methods = []
|
| if method == 'all':
|
| methods = ['pca', 'tsne', 'umap']
|
| elif isinstance(method, str):
|
| methods = [method]
|
| elif isinstance(method, list):
|
| methods = method
|
|
|
| plt.figure(figsize=(6 * len(methods), 5))
|
| plt.suptitle(f"Input: {input_type}, Task: {task}", fontsize=16)
|
|
|
| for i, m in enumerate(methods):
|
| if m == 'pca':
|
| reducer = PCA(n_components=2)
|
| title = 'PCA'
|
| elif m == 'tsne':
|
| reducer = TSNE(n_components=2, perplexity=2, random_state=42)
|
| title = 't-SNE'
|
| elif m == 'umap':
|
| reducer = umap.UMAP(n_components=2, random_state=42)
|
| title = 'UMAP'
|
| else:
|
| raise ValueError(f"Unknown method: {m}")
|
|
|
| reduced_data = reducer.fit_transform(tensor)
|
|
|
| plt.subplot(1, len(methods), i + 1)
|
|
|
| if labels is not None:
|
| unique_labels = np.unique(labels)
|
| cmap = plt.get_cmap('Spectral', len(unique_labels))
|
|
|
| scatter = plt.scatter(reduced_data[:, 0], reduced_data[:, 1], c=labels, cmap=cmap, alpha=0.75)
|
|
|
| cbar = plt.colorbar(scatter, ticks=unique_labels)
|
| cbar.set_ticklabels(unique_labels)
|
| else:
|
| plt.scatter(reduced_data[:, 0], reduced_data[:, 1], alpha=0.75)
|
|
|
| plt.title(title, fontsize=14)
|
| plt.xlabel("Component 1")
|
| plt.ylabel("Component 2")
|
| plt.grid(True, linestyle='--', alpha=0.5)
|
|
|
| plt.tight_layout(rect=[0, 0, 1, 0.95])
|
| plt.show()
|
|
|
| def plot_coverage(receivers, coverage_map, dpi=200, figsize=(6, 4), cbar_title=None, title=None,
|
| scatter_size=12, transmitter_position=None, transmitter_orientation=None,
|
| legend=False, limits=None, proj_3d=False, equal_aspect=False, tight_layout=True,
|
| colormap='tab20'):
|
|
|
| plot_params = {'cmap': colormap}
|
| if limits:
|
| plot_params['vmin'], plot_params['vmax'] = limits[0], limits[1]
|
|
|
|
|
| x, y = receivers[:, 0], receivers[:, 1]
|
|
|
|
|
| fig, ax = plt.subplots(dpi=dpi, figsize=figsize,
|
| subplot_kw={})
|
|
|
|
|
| ax.scatter(x, y, c=coverage_map, s=scatter_size, marker='s', edgecolors='black', linewidth=.15, **plot_params)
|
|
|
|
|
| ax.set_xlabel('x (m)')
|
| ax.set_ylabel('y (m)')
|
|
|
|
|
| if legend:
|
| ax.legend(loc='upper center', ncols=10, framealpha=0.5)
|
|
|
|
|
| if tight_layout:
|
| padding = 1
|
| mins = np.min(receivers, axis=0) - padding
|
| maxs = np.max(receivers, axis=0) + padding
|
|
|
| ax.set_xlim([mins[0], maxs[0]])
|
| ax.set_ylim([mins[1], maxs[1]])
|
|
|
|
|
| if equal_aspect:
|
| ax.set_aspect('equal')
|
|
|
|
|
| plt.show()
|
|
|
| import torch
|
| import torch.nn as nn
|
| import torch.optim as optim
|
| import torch.nn.functional as F
|
| from torch.utils.data import DataLoader, TensorDataset, random_split
|
| import numpy as np
|
| import matplotlib.pyplot as plt
|
| from sklearn.metrics import f1_score
|
|
|
|
|
| def get_data_loaders(data_tensor, labels_tensor, batch_size=32, split_ratio=0.8):
|
| dataset = TensorDataset(data_tensor, labels_tensor)
|
|
|
| train_size = int(split_ratio * len(dataset))
|
| test_size = len(dataset) - train_size
|
| train_dataset, test_dataset = random_split(dataset, [train_size, test_size])
|
|
|
| train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
|
| test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
|
|
|
| return train_loader, test_loader
|
|
|
| class FCN(nn.Module):
|
| def __init__(self, input_dim, num_classes):
|
| super(FCN, self).__init__()
|
| self.fc1 = nn.Linear(input_dim, 128)
|
| self.bn1 = nn.BatchNorm1d(128)
|
| self.dropout1 = nn.Dropout(0.3)
|
|
|
| self.fc2 = nn.Linear(128, 64)
|
| self.bn2 = nn.BatchNorm1d(64)
|
| self.dropout2 = nn.Dropout(0.3)
|
|
|
| self.fc3 = nn.Linear(64, num_classes)
|
|
|
| def forward(self, x):
|
| x = F.relu(self.bn1(self.fc1(x)))
|
| x = self.dropout1(x)
|
| x = F.relu(self.bn2(self.fc2(x)))
|
| x = self.dropout2(x)
|
| return self.fc3(x)
|
|
|
|
|
|
|
| def train_model(model, train_loader, test_loader, epochs=20, lr=0.001, device="cpu", decay_step=10, decay_rate=0.5):
|
| model.to(device)
|
| optimizer = optim.Adam(model.parameters(), lr=lr)
|
| scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=decay_step, gamma=decay_rate)
|
| criterion = nn.CrossEntropyLoss()
|
|
|
| train_losses, test_f1_scores = [], []
|
|
|
| for epoch in range(epochs):
|
| model.train()
|
| epoch_loss = 0
|
| for batch_x, batch_y in train_loader:
|
| batch_x, batch_y = batch_x.to(device), batch_y.to(device)
|
|
|
| optimizer.zero_grad()
|
| outputs = model(batch_x)
|
| loss = criterion(outputs, batch_y)
|
| loss.backward()
|
| optimizer.step()
|
|
|
| epoch_loss += loss.item()
|
|
|
| train_losses.append(epoch_loss / len(train_loader))
|
| scheduler.step()
|
|
|
|
|
| f1 = evaluate_model(model, test_loader, device)
|
| test_f1_scores.append(f1)
|
|
|
| print(f"Epoch [{epoch+1}/{epochs}], Loss: {train_losses[-1]:.4f}, F1-score: {f1:.4f}, LR: {scheduler.get_last_lr()[0]:.6f}")
|
|
|
| return train_losses, np.array([test_f1_scores])
|
|
|
|
|
| def evaluate_model(model, test_loader, device):
|
| model.eval()
|
| all_preds, all_labels = [], []
|
|
|
| with torch.no_grad():
|
| for batch_x, batch_y in test_loader:
|
| batch_x, batch_y = batch_x.to(device), batch_y.to(device)
|
|
|
| outputs = model(batch_x)
|
| preds = torch.argmax(outputs, dim=1)
|
|
|
| all_preds.extend(preds.cpu().numpy())
|
| all_labels.extend(batch_y.cpu().numpy())
|
|
|
| return f1_score(all_labels, all_preds, average='weighted')
|
|
|
|
|
| import matplotlib.cm as cm
|
|
|
| def plot_metrics(test_f1_scores, input_types, n_train=None, flag=0):
|
| """
|
| Plots the F1-score over epochs or number of training samples.
|
|
|
| Parameters:
|
| test_f1_scores (list): List of F1-score values per epoch or training samples.
|
| input_types (list): List of input type names.
|
| n_train (list, optional): Number of training samples (used when flag=1).
|
| flag (int): 0 for plotting F1-score over epochs, 1 for F1-score over training samples.
|
| """
|
| plt.figure(figsize=(7, 5), dpi=200)
|
| colors = cm.get_cmap('Spectral', test_f1_scores.shape[0])
|
| markers = ['o', 's', 'D', '^', 'v', 'P', '*', 'X', 'h']
|
|
|
| for r in range(test_f1_scores.shape[0]):
|
| color = colors(0.5 if test_f1_scores.shape[0] == 1 else r / (test_f1_scores.shape[0] - 1))
|
| marker = markers[r % len(markers)]
|
| if flag == 0:
|
| if test_f1_scores.shape[0] == 1:
|
| plt.plot(test_f1_scores[r], linewidth=2, color=color, label=f"{input_types[r]}")
|
| else:
|
| plt.plot(test_f1_scores[r], linewidth=2, marker=marker, markersize=5, markeredgewidth=1.5,
|
| markeredgecolor=color, color=color, label=f"{input_types[r]}")
|
| else:
|
| plt.plot(n_train, test_f1_scores[r], linewidth=2, marker=marker, markersize=6, markeredgewidth=1.5,
|
| markeredgecolor=color, markerfacecolor='none', color=color, label=f"{input_types[r]}")
|
| plt.xscale('log')
|
|
|
| x_label = "Epochs" if flag == 0 else "Number of training samples"
|
| plt.xlabel(f"{x_label}", fontsize=12)
|
| plt.ylabel("F1-score", fontsize=12)
|
|
|
| plt.legend()
|
| plt.grid(alpha=0.3)
|
| plt.show()
|
|
|
|
|
| def classify_by_euclidean_distance(train_loader, test_loader, device="cpu"):
|
| """
|
| Classifies test samples based on the Euclidean distance to the mean of training samples from each class.
|
| Computes the F1-score for evaluation.
|
|
|
| Parameters:
|
| - train_loader (DataLoader): DataLoader for training data.
|
| - test_loader (DataLoader): DataLoader for test data.
|
| - device (str): Device to run computations on ("cpu" or "cuda").
|
|
|
| Returns:
|
| - predictions (torch.Tensor): Predicted class for each test sample.
|
| - f1 (float): Weighted F1-score.
|
| """
|
|
|
|
|
| train_data_list, train_labels_list = [], []
|
| for batch_x, batch_y in train_loader:
|
| train_data_list.append(batch_x.to(device))
|
| train_labels_list.append(batch_y.to(device))
|
|
|
| train_data = torch.cat(train_data_list)
|
| train_labels = torch.cat(train_labels_list)
|
|
|
| unique_classes = torch.unique(train_labels)
|
| class_means = {}
|
|
|
|
|
| for cls in unique_classes:
|
| class_means[cls.item()] = train_data[train_labels == cls].mean(dim=0)
|
|
|
|
|
| class_means_tensor = torch.stack([class_means[cls.item()] for cls in unique_classes])
|
|
|
|
|
| test_data_list, test_labels_list = [], []
|
| for batch_x, batch_y in test_loader:
|
| test_data_list.append(batch_x.to(device))
|
| test_labels_list.append(batch_y.to(device))
|
|
|
| test_data = torch.cat(test_data_list)
|
| test_labels = torch.cat(test_labels_list)
|
|
|
|
|
| dists = torch.cdist(test_data, class_means_tensor)
|
|
|
|
|
| predictions = unique_classes[torch.argmin(dists, dim=1)]
|
|
|
|
|
| f1 = f1_score(test_labels.cpu().numpy(), predictions.cpu().numpy(), average='weighted')
|
|
|
| return f1
|
|
|
| def generate_gaussian_noise(data, snr_db):
|
| """
|
| Generate complex-valued Gaussian noise given an SNR and apply it to the data.
|
|
|
| Args:
|
| data (np.ndarray): Input data array of shape (n_samples, n_features), assumed to be complex-valued.
|
| snr_db (float): Signal-to-Noise Ratio in decibels (dB).
|
|
|
| Returns:
|
| np.ndarray: Complex-valued Gaussian noise of the same shape as data.
|
| """
|
|
|
| signal_power = np.mean(np.abs(data) ** 2, axis=1, keepdims=True)
|
|
|
|
|
| snr_linear = 10 ** (snr_db / 10)
|
| noise_power = signal_power / snr_linear
|
|
|
|
|
| noise_real = np.random.randn(*data.shape) * np.sqrt(noise_power / 2)
|
| noise_imag = np.random.randn(*data.shape) * np.sqrt(noise_power / 2)
|
|
|
|
|
| noise = noise_real + 1j * noise_imag
|
|
|
| return noise
|
|
|