| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| from __future__ import print_function |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| import numpy as np |
| from collections import OrderedDict |
|
|
| |
| |
| import os |
| import random |
|
|
| |
| |
| SEED1 = 1337 |
| NEW_LINE = "\n" |
|
|
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| def set_seed(seed): |
| |
| |
| torch.backends.cudnn.deterministic = True |
| torch.backends.cudnn.benchmark = False |
| |
| |
| |
| |
|
|
| |
| def angle_incidence_calculation(b, c, alpha, last_ray=False): |
| ''' |
| # remove invalid values: |
| if(last_ray): # the last ray |
| if(np.isnan(b) or np.isinf(b)): |
| b = 60. |
| if(np.isnan(c) or np.isinf(c)): |
| c = 60. |
| else: |
| b[np.isnan(b)] = 60. |
| b[np.isinf(b)] = 60. |
| c[np.isnan(c)] = 60. |
| c[np.isinf(c)] = 60. |
| ''' |
| |
| a = np.sqrt(b*b + c*c - 2*b*c*np.cos(alpha)) |
| if(last_ray): |
| beta = np.arccos([(a*a + c*c - b*b)/(2*a*c)]) |
| theta = np.abs(np.pi/2 - beta) |
| else: |
| gamma = np.arccos([(a*a + b*b - c*c)/(2*a*b)]) |
| theta = np.abs(np.pi/2 - gamma) |
|
|
| return theta |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| POINTS = 1081 |
| class VaeTestDataset(torch.utils.data.Dataset): |
| def __init__(self, img_path, file_name): |
| |
| |
| self.scan_file_names = [] |
| self.intensity_file_names = [] |
| |
| self.label_file_names = [] |
| |
| |
| |
| self.s_mu = 4.518406 |
| self.s_std = 8.2914915 |
| self.i_mu = 3081.8167 |
| self.i_std = 1529.4413 |
| self.a_mu = 0.5959513 |
| self.a_std = 0.4783924 |
| |
| fp_folder = open(img_path+'dataset.txt','r') |
| |
| |
| for folder_line in fp_folder.read().split(NEW_LINE): |
| if('-' in folder_line): |
| folder_path = folder_line |
| fp_file = open(img_path+folder_path+'/'+file_name+'.txt', 'r') |
| for line in fp_file.read().split(NEW_LINE): |
| if('.npy' in line): |
| self.scan_file_names.append(img_path+folder_path+'/scans_lidar/'+line) |
| self.intensity_file_names.append(img_path+folder_path+'/intensities_lidar/'+line) |
| |
| self.label_file_names.append(img_path+folder_path+'/semantic_label/'+line) |
| |
| fp_file.close() |
|
|
| |
| fp_folder.close() |
|
|
| self.length = len(self.scan_file_names) |
|
|
| print("dataset length: ", self.length) |
|
|
|
|
| def __len__(self): |
| return self.length |
|
|
| def __getitem__(self, idx): |
| |
| scan = np.zeros((1, POINTS)) |
| intensity = np.zeros((1, POINTS)) |
| angle_incidence = np.zeros((1, POINTS)) |
| label = np.zeros((1, POINTS)) |
| |
| |
| intensity_name = self.intensity_file_names[idx] |
| intensity = np.load(intensity_name) |
|
|
| |
| scan_name = self.scan_file_names[idx] |
| scan = np.load(scan_name) |
|
|
| |
| label_name = self.label_file_names[idx] |
| label = np.load(label_name) |
|
|
| |
| b = scan[:-1] |
| c = scan[1:] |
| alpha = np.ones(POINTS - 1)*((270*np.pi / 180) / (POINTS - 1)) |
| theta = angle_incidence_calculation(b, c, alpha) |
| |
| b_last = scan[-2] |
| c_last = scan[-1] |
| alpha_last = (270*np.pi / 180) / (POINTS - 1) |
| theta_last = angle_incidence_calculation(b_last, c_last, alpha_last, last_ray=True) |
| angle_incidence = np.concatenate((theta[0], theta_last), axis=0) |
|
|
| |
| scan[np.isnan(scan)] = 0. |
| scan[np.isinf(scan)] = 0. |
|
|
| intensity[np.isnan(intensity)] = 0. |
| intensity[np.isinf(intensity)] = 0. |
|
|
| angle_incidence[np.isnan(angle_incidence)] = 0. |
| angle_incidence[np.isinf(angle_incidence)] = 0. |
|
|
| label[np.isnan(label)] = 0. |
| label[np.isinf(label)] = 0. |
|
|
| |
| |
| |
| scan = (scan - self.s_mu) / self.s_std |
|
|
| |
| |
| intensity = (intensity - self.i_mu) / self.i_std |
|
|
| |
| |
| angle_incidence = (angle_incidence - self.a_mu) / self.a_std |
|
|
| |
| scan_tensor = torch.FloatTensor(scan) |
| intensity_tensor = torch.FloatTensor(intensity) |
| angle_incidence_tensor = torch.FloatTensor(angle_incidence) |
| label_tensor = torch.FloatTensor(label) |
|
|
| data = { |
| 'scan': scan_tensor, |
| 'intensity': intensity_tensor, |
| 'angle_incidence': angle_incidence_tensor, |
| 'label': label_tensor, |
| } |
|
|
| return data |
|
|
| |
| |
|
|
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| class Residual(nn.Module): |
| def __init__(self, in_channels, num_hiddens, num_residual_hiddens): |
| super(Residual, self).__init__() |
| self._block = nn.Sequential( |
| nn.ReLU(True), |
| nn.Conv1d(in_channels=in_channels, |
| out_channels=num_residual_hiddens, |
| kernel_size=3, stride=1, padding=1, bias=False), |
| nn.BatchNorm1d(num_residual_hiddens), |
| nn.ReLU(True), |
| nn.Conv1d(in_channels=num_residual_hiddens, |
| out_channels=num_hiddens, |
| kernel_size=1, stride=1, bias=False), |
| nn.BatchNorm1d(num_hiddens) |
| ) |
| |
| def forward(self, x): |
| return x + self._block(x) |
|
|
| class ResidualStack(nn.Module): |
| def __init__(self, in_channels, num_hiddens, num_residual_layers, num_residual_hiddens): |
| super(ResidualStack, self).__init__() |
| self._num_residual_layers = num_residual_layers |
| self._layers = nn.ModuleList([Residual(in_channels, num_hiddens, num_residual_hiddens) |
| for _ in range(self._num_residual_layers)]) |
|
|
| def forward(self, x): |
| for i in range(self._num_residual_layers): |
| x = self._layers[i](x) |
| return F.relu(x) |
|
|
| |
| |
| class Encoder(nn.Module): |
| def __init__(self, in_channels, num_hiddens, num_residual_layers, num_residual_hiddens): |
| super(Encoder, self).__init__() |
| self._conv_1 = nn.Sequential(*[ |
| nn.Conv1d(in_channels=in_channels, |
| out_channels=num_hiddens//2, |
| kernel_size=4, |
| stride=2, |
| padding=1), |
| nn.BatchNorm1d(num_hiddens//2), |
| nn.ReLU(True) |
| ]) |
| self._conv_2 = nn.Sequential(*[ |
| nn.Conv1d(in_channels=num_hiddens//2, |
| out_channels=num_hiddens, |
| kernel_size=4, |
| stride=2, |
| padding=1), |
| nn.BatchNorm1d(num_hiddens) |
| |
| ]) |
| self._residual_stack = ResidualStack(in_channels=num_hiddens, |
| num_hiddens=num_hiddens, |
| num_residual_layers=num_residual_layers, |
| num_residual_hiddens=num_residual_hiddens) |
|
|
| def forward(self, inputs): |
| x = self._conv_1(inputs) |
| x = self._conv_2(x) |
| x = self._residual_stack(x) |
| return x |
|
|
| |
| class Decoder(nn.Module): |
| def __init__(self, out_channels, num_hiddens, num_residual_layers, num_residual_hiddens): |
| super(Decoder, self).__init__() |
| |
| self._residual_stack = ResidualStack(in_channels=num_hiddens, |
| num_hiddens=num_hiddens, |
| num_residual_layers=num_residual_layers, |
| num_residual_hiddens=num_residual_hiddens) |
|
|
| self._conv_trans_2 = nn.Sequential(*[ |
| nn.ReLU(True), |
| nn.ConvTranspose1d(in_channels=num_hiddens, |
| out_channels=num_hiddens//2, |
| kernel_size=4, |
| stride=2, |
| padding=1), |
| nn.BatchNorm1d(num_hiddens//2), |
| nn.ReLU(True) |
| ]) |
|
|
| self._conv_trans_1 = nn.Sequential(*[ |
| nn.ConvTranspose1d(in_channels=num_hiddens//2, |
| out_channels=num_hiddens//2, |
| kernel_size=4, |
| stride=2, |
| padding=1, |
| output_padding=1), |
| nn.BatchNorm1d(num_hiddens//2), |
| nn.ReLU(True), |
| nn.Conv1d(in_channels=num_hiddens//2, |
| out_channels=out_channels, |
| kernel_size=3, |
| stride=1, |
| padding=1), |
| |
| ]) |
|
|
| def forward(self, inputs): |
| x = self._residual_stack(inputs) |
| x = self._conv_trans_2(x) |
| x = self._conv_trans_1(x) |
| return x |
|
|
| class VAE_Encoder(nn.Module): |
| def __init__(self, input_channel, num_hiddens, num_residual_layers, num_residual_hiddens, embedding_dim): |
| super(VAE_Encoder, self).__init__() |
| |
| self.input_channels = input_channel |
| ''' |
| # Constants |
| num_hiddens = 128 #128 |
| num_residual_hiddens = 64 #32 |
| num_residual_layers = 2 |
| embedding_dim = 2 #64 |
| ''' |
|
|
| |
| in_channels = input_channel |
| self._encoder = Encoder(in_channels, |
| num_hiddens, |
| num_residual_layers, |
| num_residual_hiddens) |
|
|
| |
| self._encoder_z_mu = nn.Conv1d(in_channels=num_hiddens, |
| out_channels=embedding_dim, |
| kernel_size=1, |
| stride=1) |
| self._encoder_z_log_sd = nn.Conv1d(in_channels=num_hiddens, |
| out_channels=embedding_dim, |
| kernel_size=1, |
| stride=1) |
| |
| def forward(self, x): |
| |
| x = x.reshape(-1, self.input_channels, POINTS) |
| |
| encoder_out = self._encoder(x) |
| |
| z_mu = self._encoder_z_mu(encoder_out) |
| z_log_sd = self._encoder_z_log_sd(encoder_out) |
| return z_mu, z_log_sd |
|
|
| |
| class S3Net(nn.Module): |
| def __init__(self, input_channels, output_channels): |
| super(S3Net, self).__init__() |
| |
| self.input_channels = input_channels |
| self.latent_dim = 270 |
| self.output_channels = output_channels |
|
|
| |
| num_hiddens = 64 |
| num_residual_hiddens = 32 |
| num_residual_layers = 2 |
| embedding_dim = 1 |
| |
| |
| self._encoder = VAE_Encoder(self.input_channels, |
| num_hiddens, |
| num_residual_layers, |
| num_residual_hiddens, |
| embedding_dim) |
|
|
| |
| self._decoder_z_mu = nn.ConvTranspose1d(in_channels=embedding_dim, |
| out_channels=num_hiddens, |
| kernel_size=1, |
| stride=1) |
| self._decoder = Decoder(self.output_channels, |
| num_hiddens, |
| num_residual_layers, |
| num_residual_hiddens) |
|
|
| self.softmax = nn.Softmax(dim=1) |
|
|
| |
|
|
| def vae_reparameterize(self, z_mu, z_log_sd): |
| """ |
| :param mu: mean from the encoder's latent space |
| :param log_sd: log standard deviation from the encoder's latent space |
| :output: reparameterized latent variable z, Monte carlo KL divergence |
| """ |
| |
| z_mu = z_mu.reshape(-1, self.latent_dim, 1) |
| z_log_sd = z_log_sd.reshape(-1, self.latent_dim, 1) |
| |
| |
| pz = torch.distributions.Normal(loc=torch.zeros_like(z_mu), scale=torch.ones_like(z_log_sd)) |
| |
| qz_x = torch.distributions.Normal(loc=z_mu, scale=torch.exp(z_log_sd)) |
|
|
| |
| z = qz_x.rsample() |
| |
| |
| kl_divergence = (pz.log_prob(z) - qz_x.log_prob(z)).sum(dim=1) |
| kl_loss = -kl_divergence.mean() |
|
|
| return z, kl_loss |
|
|
| def forward(self, x_s, x_i, x_a): |
| """ |
| Forward pass `input_img` through the network |
| """ |
| |
| |
| |
| x_s = x_s.reshape(-1, 1, POINTS) |
| x_i = x_i.reshape(-1, 1, POINTS) |
| x_a = x_a.reshape(-1, 1, POINTS) |
| |
| x = torch.cat([x_s, x_i, x_a], dim=1) |
| |
| |
| z_mu, z_log_sd = self._encoder(x) |
|
|
| |
| z, kl_loss = self.vae_reparameterize(z_mu, z_log_sd) |
| |
| |
| |
| z = z.reshape(-1, 1, 270) |
| x_d = self._decoder_z_mu(z) |
| semantic_channels = self._decoder(x_d) |
|
|
| |
| semantic_scan = self.softmax(semantic_channels) |
|
|
| return semantic_scan, semantic_channels, kl_loss |
|
|
| |
| |
|
|
| |
| |
|
|