| |
| import torch |
| import torch.nn as nn |
| import torchaudio |
| from torch.utils.data import Sampler |
| import os |
| import math |
| import scipy |
| from pathlib import Path |
| import numpy as np |
| import pandas as pd |
|
|
| from utils.evaluation_measures import compute_sed_eval_metrics |
|
|
|
|
| class Encoder: |
| def __init__(self, labels, audio_len, frame_len, frame_hop, net_pooling=1, sr=16000): |
| if type(labels) in [np.ndarray, np.array]: |
| labels = labels.tolist() |
| self.labels = labels |
| self.audio_len = audio_len |
| self.frame_len = frame_len |
| self.frame_hop = frame_hop |
| self.sr = sr |
| self.net_pooling = net_pooling |
| n_samples = self.audio_len * self.sr |
| self.n_frames = int(math.ceil(n_samples/2/self.frame_hop)*2 / self.net_pooling) |
|
|
| def _time_to_frame(self, time): |
| sample = time * self.sr |
| frame = sample / self.frame_hop |
| return np.clip(frame / self.net_pooling, a_min=0, a_max=self.n_frames) |
|
|
| def _frame_to_time(self, frame): |
| time = frame * self.net_pooling * self.frame_hop / self.sr |
| return np.clip(time, a_min=0, a_max=self.audio_len) |
|
|
| def encode_strong_df(self, events_df): |
| |
| true_labels = np.zeros((self.n_frames, len(self.labels))) |
| for _, row in events_df.iterrows(): |
| if not pd.isna(row['event_label']): |
| label_idx = self.labels.index(row["event_label"]) |
| onset = int(self._time_to_frame(row["onset"])) |
| offset = int(np.ceil(self._time_to_frame(row["offset"]))) |
| true_labels[onset:offset, label_idx] = 1 |
| return true_labels |
|
|
| def encode_weak(self, events): |
| |
| labels = np.zeros((len(self.labels))) |
| if len(events) == 0: |
| return labels |
| else: |
| for event in events: |
| labels[self.labels.index(event)] = 1 |
| return labels |
|
|
| def decode_strong(self, outputs): |
| |
| pred = [] |
| for i, label_column in enumerate(outputs.T): |
| change_indices = self.find_contiguous_regions(label_column) |
| for row in change_indices: |
| onset = self._frame_to_time(row[0]) |
| offset = self._frame_to_time(row[1]) |
| onset = np.clip(onset, a_min=0, a_max=self.audio_len) |
| offset = np.clip(offset, a_min=0, a_max=self.audio_len) |
| pred.append([self.labels[i], onset, offset]) |
| return pred |
|
|
| def decode_weak(self, outputs): |
| result_labels = [] |
| for i, value in enumerate(outputs): |
| if value == 1: |
| result_labels.append(self.labels[i]) |
| return result_labels |
|
|
| def find_contiguous_regions(self, array): |
| |
| change_indices = np.logical_xor(array[1:], array[:-1]).nonzero()[0] |
| |
| change_indices += 1 |
| if array[0]: |
| |
| |
| change_indices = np.r_[0, change_indices] |
| if array[-1]: |
| |
| change_indices = np.r_[change_indices, array.size] |
| |
| return change_indices.reshape((-1, 2)) |
|
|
|
|
| def decode_pred_batch(outputs, weak_preds, filenames, encoder, thresholds, median_filter, decode_weak, pad_idx=None): |
| pred_dfs = {} |
| for threshold in thresholds: |
| pred_dfs[threshold] = pd.DataFrame() |
| for batch_idx in range(outputs.shape[0]): |
| for c_th in thresholds: |
| output = outputs[batch_idx] |
| if pad_idx is not None: |
| true_len = int(output.shape[-1] * pad_idx[batch_idx].item) |
| output = output[:true_len] |
| output = output.transpose(0, 1).detach().cpu().numpy() |
| if decode_weak: |
| for class_idx in range(weak_preds.size(1)): |
| if weak_preds[batch_idx, class_idx] < c_th: |
| output[:, class_idx] = 0 |
| elif decode_weak > 1: |
| output[:, class_idx] = 1 |
| if decode_weak < 2: |
| output = output > c_th |
| for mf_idx in range(len(median_filter)): |
| output[:, mf_idx] = scipy.ndimage.filters.median_filter(output[:, mf_idx], (median_filter[mf_idx])) |
| pred = encoder.decode_strong(output) |
| pred = pd.DataFrame(pred, columns=["event_label", "onset", "offset"]) |
| pred["filename"] = Path(filenames[batch_idx]).stem + ".wav" |
| pred_dfs[c_th] = pred_dfs[c_th]._append(pred, ignore_index=True) |
| return pred_dfs |
|
|
|
|
| class ConcatDatasetBatchSampler(Sampler): |
| def __init__(self, samplers, batch_sizes, epoch=0): |
| self.batch_sizes = batch_sizes |
| self.samplers = samplers |
| self.offsets = [0] + np.cumsum([len(x) for x in self.samplers]).tolist()[:-1] |
|
|
| self.epoch = epoch |
| self.set_epoch(self.epoch) |
|
|
| def _iter_one_dataset(self, c_batch_size, c_sampler, c_offset): |
| batch = [] |
| for idx in c_sampler: |
| batch.append(c_offset + idx) |
| if len(batch) == c_batch_size: |
| yield batch |
|
|
| def set_epoch(self, epoch): |
| if hasattr(self.samplers[0], "epoch"): |
| for s in self.samplers: |
| s.set_epoch(epoch) |
|
|
| def __iter__(self): |
| iterators = [iter(i) for i in self.samplers] |
| tot_batch = [] |
| for b_num in range(len(self)): |
| for samp_idx in range(len(self.samplers)): |
| c_batch = [] |
| while len(c_batch) < self.batch_sizes[samp_idx]: |
| c_batch.append(self.offsets[samp_idx] + next(iterators[samp_idx])) |
| tot_batch.extend(c_batch) |
| yield tot_batch |
| tot_batch = [] |
|
|
| def __len__(self): |
| min_len = float("inf") |
| for idx, sampler in enumerate(self.samplers): |
| c_len = (len(sampler)) // self.batch_sizes[idx] |
| min_len = min(c_len, min_len) |
| return min_len |
|
|
|
|
| class ExponentialWarmup(object): |
| def __init__(self, optimizer, max_lr, rampup_length, exponent=-5.0): |
| self.optimizer = optimizer |
| self.rampup_length = rampup_length |
| self.max_lr = max_lr |
| self.step_num = 1 |
| self.exponent = exponent |
|
|
| def zero_grad(self): |
| self.optimizer.zero_grad() |
|
|
| def _get_lr(self): |
| return self.max_lr * self._get_scaling_factor() |
|
|
| def _set_lr(self, lr): |
| for param_group in self.optimizer.param_groups: |
| param_group["lr"] = lr |
|
|
| def step(self): |
| self.step_num += 1 |
| lr = self._get_lr() |
| self._set_lr(lr) |
|
|
| |
| |
| |
| |
| |
|
|
| def _get_scaling_factor(self): |
| if self.rampup_length == 0: |
| return 1.0 |
| else: |
| current = np.clip(self.step_num, 0.0, self.rampup_length) |
| phase = 1.0 - current / self.rampup_length |
| return float(np.exp(self.exponent * phase * phase)) |
|
|
|
|
| def update_ema(net, ema_net, step, ema_factor): |
| |
| alpha = min(1 - 1 / step, ema_factor) |
| for ema_params, params in zip(ema_net.parameters(), net.parameters()): |
| ema_params.data.mul_(alpha).add_(params.data, alpha=1 - alpha) |
| return ema_net |
|
|
|
|
| def log_sedeval_metrics(predictions, ground_truth, save_dir=None): |
| """ Return the set of metrics from sed_eval |
| Args: |
| predictions: pd.DataFrame, the dataframe of predictions. |
| ground_truth: pd.DataFrame, the dataframe of groundtruth. |
| save_dir: str, path to the folder where to save the event and segment based metrics outputs. |
| |
| Returns: |
| tuple, event-based macro-F1 and micro-F1, segment-based macro-F1 and micro-F1 |
| """ |
| if predictions.empty: |
| return 0.0, 0.0, 0.0, 0.0 |
|
|
| gt = pd.read_csv(ground_truth, sep="\t") |
|
|
| event_res, segment_res = compute_sed_eval_metrics(predictions, gt) |
|
|
| if save_dir is not None: |
| os.makedirs(save_dir, exist_ok=True) |
| with open(os.path.join(save_dir, "event_f1.txt"), "w") as f: |
| f.write(str(event_res)) |
|
|
| with open(os.path.join(save_dir, "segment_f1.txt"), "w") as f: |
| f.write(str(segment_res)) |
|
|
| return ( |
| event_res.results()["class_wise_average"]["f_measure"]["f_measure"], |
| event_res.results()["overall"]["f_measure"]["f_measure"], |
| segment_res.results()["class_wise_average"]["f_measure"]["f_measure"], |
| segment_res.results()["overall"]["f_measure"]["f_measure"], |
| ) |
|
|
|
|
| class Scaler(nn.Module): |
| def __init__(self, statistic="instance", normtype="minmax", dims=(0, 2), eps=1e-8): |
| super(Scaler, self).__init__() |
| self.statistic = statistic |
| self.normtype = normtype |
| self.dims = dims |
| self.eps = eps |
|
|
| def load_state_dict(self, state_dict, strict=True): |
| if self.statistic == "dataset": |
| super(Scaler, self).load_state_dict(state_dict, strict) |
|
|
| def _load_from_state_dict(self, state_dict, prefix, local_metadata, strict, |
| missing_keys, unexpected_keys, error_msgs): |
| if self.statistic == "dataset": |
| super(Scaler, self)._load_from_state_dict(state_dict, prefix, local_metadata, strict, missing_keys, |
| unexpected_keys, error_msgs) |
|
|
| def forward(self, input): |
| if self.statistic == "dataset": |
| if self.normtype == "mean": |
| return input - self.mean |
| elif self.normtype == "standard": |
| std = torch.sqrt(self.mean_squared - self.mean ** 2) |
| return (input - self.mean) / (std + self.eps) |
| else: |
| raise NotImplementedError |
|
|
| elif self.statistic =="instance": |
| if self.normtype == "mean": |
| return input - torch.mean(input, self.dims, keepdim=True) |
| elif self.normtype == "standard": |
| return (input - torch.mean(input, self.dims, keepdim=True)) / ( |
| torch.std(input, self.dims, keepdim=True) + self.eps) |
| elif self.normtype == "minmax": |
| return (input - torch.amin(input, dim=self.dims, keepdim=True)) / ( |
| torch.amax(input, dim=self.dims, keepdim=True) |
| - torch.amin(input, dim=self.dims, keepdim=True) + self.eps) |
| else: |
| raise NotImplementedError |
|
|
| else: |
| raise NotImplementedError |
|
|
|
|
| class AsymmetricalFocalLoss(nn.Module): |
| def __init__(self, gamma=0, zeta=0): |
| super(AsymmetricalFocalLoss, self).__init__() |
| self.gamma = gamma |
| self.zeta = zeta |
|
|
| def forward(self, pred, target): |
| losses = - (((1 - pred) ** self.gamma) * target * torch.clamp_min(torch.log(pred), -100) + |
| (pred ** self.zeta) * (1 - target) * torch.clamp_min(torch.log(1 - pred), -100)) |
| return torch.mean(losses) |
|
|
|
|
| def take_log(feature): |
| amp2db = torchaudio.transforms.AmplitudeToDB(stype="amplitude") |
| amp2db.amin = 1e-5 |
| return amp2db(feature).clamp(min=-50, max=80) |
|
|
|
|
| def count_parameters(model): |
| total_params = 0 |
| for name, parameter in model.named_parameters(): |
| if not parameter.requires_grad: |
| continue |
| param = parameter.numel() |
| total_params += param |
| return total_params |
|
|