| |
| |
| |
| |
| |
| |
| import math |
| import torch |
| from torch import nn as nn |
| from torch.nn import functional as F |
| from torch.nn import init as init |
| from torch.nn.modules.batchnorm import _BatchNorm |
|
|
| from basicsr.utils import get_root_logger |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| @torch.no_grad() |
| def default_init_weights(module_list, scale=1, bias_fill=0, **kwargs): |
| """Initialize network weights. |
| |
| Args: |
| module_list (list[nn.Module] | nn.Module): Modules to be initialized. |
| scale (float): Scale initialized weights, especially for residual |
| blocks. Default: 1. |
| bias_fill (float): The value to fill bias. Default: 0 |
| kwargs (dict): Other arguments for initialization function. |
| """ |
| if not isinstance(module_list, list): |
| module_list = [module_list] |
| for module in module_list: |
| for m in module.modules(): |
| if isinstance(m, nn.Conv2d): |
| init.kaiming_normal_(m.weight, **kwargs) |
| m.weight.data *= scale |
| if m.bias is not None: |
| m.bias.data.fill_(bias_fill) |
| elif isinstance(m, nn.Linear): |
| init.kaiming_normal_(m.weight, **kwargs) |
| m.weight.data *= scale |
| if m.bias is not None: |
| m.bias.data.fill_(bias_fill) |
| elif isinstance(m, _BatchNorm): |
| init.constant_(m.weight, 1) |
| if m.bias is not None: |
| m.bias.data.fill_(bias_fill) |
|
|
|
|
| def make_layer(basic_block, num_basic_block, **kwarg): |
| """Make layers by stacking the same blocks. |
| |
| Args: |
| basic_block (nn.module): nn.module class for basic block. |
| num_basic_block (int): number of blocks. |
| |
| Returns: |
| nn.Sequential: Stacked blocks in nn.Sequential. |
| """ |
| layers = [] |
| for _ in range(num_basic_block): |
| layers.append(basic_block(**kwarg)) |
| return nn.Sequential(*layers) |
|
|
|
|
| class ResidualBlockNoBN(nn.Module): |
| """Residual block without BN. |
| |
| It has a style of: |
| ---Conv-ReLU-Conv-+- |
| |________________| |
| |
| Args: |
| num_feat (int): Channel number of intermediate features. |
| Default: 64. |
| res_scale (float): Residual scale. Default: 1. |
| pytorch_init (bool): If set to True, use pytorch default init, |
| otherwise, use default_init_weights. Default: False. |
| """ |
|
|
| def __init__(self, num_feat=64, res_scale=1, pytorch_init=False): |
| super(ResidualBlockNoBN, self).__init__() |
| self.res_scale = res_scale |
| self.conv1 = nn.Conv2d(num_feat, num_feat, 3, 1, 1, bias=True) |
| self.conv2 = nn.Conv2d(num_feat, num_feat, 3, 1, 1, bias=True) |
| self.relu = nn.ReLU(inplace=True) |
|
|
| if not pytorch_init: |
| default_init_weights([self.conv1, self.conv2], 0.1) |
|
|
| def forward(self, x): |
| identity = x |
| out = self.conv2(self.relu(self.conv1(x))) |
| return identity + out * self.res_scale |
|
|
|
|
| class Upsample(nn.Sequential): |
| """Upsample module. |
| |
| Args: |
| scale (int): Scale factor. Supported scales: 2^n and 3. |
| num_feat (int): Channel number of intermediate features. |
| """ |
|
|
| def __init__(self, scale, num_feat): |
| m = [] |
| if (scale & (scale - 1)) == 0: |
| for _ in range(int(math.log(scale, 2))): |
| m.append(nn.Conv2d(num_feat, 4 * num_feat, 3, 1, 1)) |
| m.append(nn.PixelShuffle(2)) |
| elif scale == 3: |
| m.append(nn.Conv2d(num_feat, 9 * num_feat, 3, 1, 1)) |
| m.append(nn.PixelShuffle(3)) |
| else: |
| raise ValueError(f'scale {scale} is not supported. ' |
| 'Supported scales: 2^n and 3.') |
| super(Upsample, self).__init__(*m) |
|
|
|
|
| def flow_warp(x, |
| flow, |
| interp_mode='bilinear', |
| padding_mode='zeros', |
| align_corners=True): |
| """Warp an image or feature map with optical flow. |
| |
| Args: |
| x (Tensor): Tensor with size (n, c, h, w). |
| flow (Tensor): Tensor with size (n, h, w, 2), normal value. |
| interp_mode (str): 'nearest' or 'bilinear'. Default: 'bilinear'. |
| padding_mode (str): 'zeros' or 'border' or 'reflection'. |
| Default: 'zeros'. |
| align_corners (bool): Before pytorch 1.3, the default value is |
| align_corners=True. After pytorch 1.3, the default value is |
| align_corners=False. Here, we use the True as default. |
| |
| Returns: |
| Tensor: Warped image or feature map. |
| """ |
| assert x.size()[-2:] == flow.size()[1:3] |
| _, _, h, w = x.size() |
| |
| grid_y, grid_x = torch.meshgrid( |
| torch.arange(0, h).type_as(x), |
| torch.arange(0, w).type_as(x)) |
| grid = torch.stack((grid_x, grid_y), 2).float() |
| grid.requires_grad = False |
|
|
| vgrid = grid + flow |
| |
| vgrid_x = 2.0 * vgrid[:, :, :, 0] / max(w - 1, 1) - 1.0 |
| vgrid_y = 2.0 * vgrid[:, :, :, 1] / max(h - 1, 1) - 1.0 |
| vgrid_scaled = torch.stack((vgrid_x, vgrid_y), dim=3) |
| output = F.grid_sample( |
| x, |
| vgrid_scaled, |
| mode=interp_mode, |
| padding_mode=padding_mode, |
| align_corners=align_corners) |
|
|
| |
| return output |
|
|
|
|
| def resize_flow(flow, |
| size_type, |
| sizes, |
| interp_mode='bilinear', |
| align_corners=False): |
| """Resize a flow according to ratio or shape. |
| |
| Args: |
| flow (Tensor): Precomputed flow. shape [N, 2, H, W]. |
| size_type (str): 'ratio' or 'shape'. |
| sizes (list[int | float]): the ratio for resizing or the final output |
| shape. |
| 1) The order of ratio should be [ratio_h, ratio_w]. For |
| downsampling, the ratio should be smaller than 1.0 (i.e., ratio |
| < 1.0). For upsampling, the ratio should be larger than 1.0 (i.e., |
| ratio > 1.0). |
| 2) The order of output_size should be [out_h, out_w]. |
| interp_mode (str): The mode of interpolation for resizing. |
| Default: 'bilinear'. |
| align_corners (bool): Whether align corners. Default: False. |
| |
| Returns: |
| Tensor: Resized flow. |
| """ |
| _, _, flow_h, flow_w = flow.size() |
| if size_type == 'ratio': |
| output_h, output_w = int(flow_h * sizes[0]), int(flow_w * sizes[1]) |
| elif size_type == 'shape': |
| output_h, output_w = sizes[0], sizes[1] |
| else: |
| raise ValueError( |
| f'Size type should be ratio or shape, but got type {size_type}.') |
|
|
| input_flow = flow.clone() |
| ratio_h = output_h / flow_h |
| ratio_w = output_w / flow_w |
| input_flow[:, 0, :, :] *= ratio_w |
| input_flow[:, 1, :, :] *= ratio_h |
| resized_flow = F.interpolate( |
| input=input_flow, |
| size=(output_h, output_w), |
| mode=interp_mode, |
| align_corners=align_corners) |
| return resized_flow |
|
|
|
|
| |
| def pixel_unshuffle(x, scale): |
| """ Pixel unshuffle. |
| |
| Args: |
| x (Tensor): Input feature with shape (b, c, hh, hw). |
| scale (int): Downsample ratio. |
| |
| Returns: |
| Tensor: the pixel unshuffled feature. |
| """ |
| b, c, hh, hw = x.size() |
| out_channel = c * (scale**2) |
| assert hh % scale == 0 and hw % scale == 0 |
| h = hh // scale |
| w = hw // scale |
| x_view = x.view(b, c, h, scale, w, scale) |
| return x_view.permute(0, 1, 3, 5, 2, 4).reshape(b, out_channel, h, w) |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| class LayerNormFunction(torch.autograd.Function): |
|
|
| @staticmethod |
| def forward(ctx, x, weight, bias, eps): |
| ctx.eps = eps |
| N, C, H, W = x.size() |
| mu = x.mean(1, keepdim=True) |
| var = (x - mu).pow(2).mean(1, keepdim=True) |
| y = (x - mu) / (var + eps).sqrt() |
| ctx.save_for_backward(y, var, weight) |
| y = weight.view(1, C, 1, 1) * y + bias.view(1, C, 1, 1) |
| return y |
|
|
| @staticmethod |
| def backward(ctx, grad_output): |
| eps = ctx.eps |
|
|
| N, C, H, W = grad_output.size() |
| y, var, weight = ctx.saved_variables |
| g = grad_output * weight.view(1, C, 1, 1) |
| mean_g = g.mean(dim=1, keepdim=True) |
|
|
| mean_gy = (g * y).mean(dim=1, keepdim=True) |
| gx = 1. / torch.sqrt(var + eps) * (g - y * mean_gy - mean_g) |
| return gx, (grad_output * y).sum(dim=3).sum(dim=2).sum(dim=0), grad_output.sum(dim=3).sum(dim=2).sum( |
| dim=0), None |
|
|
| class LayerNorm2d(nn.Module): |
|
|
| def __init__(self, channels, eps=1e-6): |
| super(LayerNorm2d, self).__init__() |
| self.register_parameter('weight', nn.Parameter(torch.ones(channels))) |
| self.register_parameter('bias', nn.Parameter(torch.zeros(channels))) |
| self.eps = eps |
|
|
| def forward(self, x): |
| return LayerNormFunction.apply(x, self.weight, self.bias, self.eps) |
|
|
| |
| class MySequential(nn.Sequential): |
| def forward(self, *inputs): |
| for module in self._modules.values(): |
| if type(inputs) == tuple: |
| inputs = module(*inputs) |
| else: |
| inputs = module(inputs) |
| return inputs |
|
|
| import time |
| def measure_inference_speed(model, data, max_iter=200, log_interval=50): |
| model.eval() |
|
|
| |
| num_warmup = 5 |
| pure_inf_time = 0 |
| fps = 0 |
|
|
| |
| for i in range(max_iter): |
|
|
| torch.cuda.synchronize() |
| start_time = time.perf_counter() |
|
|
| with torch.no_grad(): |
| model(*data) |
|
|
| torch.cuda.synchronize() |
| elapsed = time.perf_counter() - start_time |
|
|
| if i >= num_warmup: |
| pure_inf_time += elapsed |
| if (i + 1) % log_interval == 0: |
| fps = (i + 1 - num_warmup) / pure_inf_time |
| print( |
| f'Done image [{i + 1:<3}/ {max_iter}], ' |
| f'fps: {fps:.1f} img / s, ' |
| f'times per image: {1000 / fps:.1f} ms / img', |
| flush=True) |
|
|
| if (i + 1) == max_iter: |
| fps = (i + 1 - num_warmup) / pure_inf_time |
| print( |
| f'Overall fps: {fps:.1f} img / s, ' |
| f'times per image: {1000 / fps:.1f} ms / img', |
| flush=True) |
| break |
| return fps |