| import argparse
|
| from collections import namedtuple
|
| import numpy as np
|
| import torch
|
| import cv2,os
|
| import torch
|
| import torch.nn.functional as F
|
| from collections import defaultdict
|
| from sklearn.cluster import DBSCAN
|
|
|
| """
|
| taken from https://github.com/githubharald/WordDetectorNN
|
| Download the models from https://www.dropbox.com/s/mqhco2q67ovpfjq/model.zip?dl=1 and pass the path to word_segment(.) as argument.
|
| """
|
|
|
| from typing import Type, Any, Callable, Union, List, Optional
|
|
|
| import torch.nn as nn
|
| from torch import Tensor
|
|
|
|
|
| def conv3x3(in_planes: int, out_planes: int, stride: int = 1, groups: int = 1, dilation: int = 1) -> nn.Conv2d:
|
| """3x3 convolution with padding"""
|
| return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
|
| padding=dilation, groups=groups, bias=False, dilation=dilation)
|
|
|
|
|
| def conv1x1(in_planes: int, out_planes: int, stride: int = 1) -> nn.Conv2d:
|
| """1x1 convolution"""
|
| return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)
|
|
|
|
|
| class BasicBlock(nn.Module):
|
| expansion: int = 1
|
|
|
| def __init__(
|
| self,
|
| inplanes: int,
|
| planes: int,
|
| stride: int = 1,
|
| downsample: Optional[nn.Module] = None,
|
| groups: int = 1,
|
| base_width: int = 64,
|
| dilation: int = 1,
|
| norm_layer: Optional[Callable[..., nn.Module]] = None
|
| ) -> None:
|
| super(BasicBlock, self).__init__()
|
| if norm_layer is None:
|
| norm_layer = nn.BatchNorm2d
|
| if groups != 1 or base_width != 64:
|
| raise ValueError('BasicBlock only supports groups=1 and base_width=64')
|
| if dilation > 1:
|
| raise NotImplementedError("Dilation > 1 not supported in BasicBlock")
|
|
|
| self.conv1 = conv3x3(inplanes, planes, stride)
|
| self.bn1 = norm_layer(planes)
|
| self.relu = nn.ReLU(inplace=True)
|
| self.conv2 = conv3x3(planes, planes)
|
| self.bn2 = norm_layer(planes)
|
| self.downsample = downsample
|
| self.stride = stride
|
|
|
| def forward(self, x: Tensor) -> Tensor:
|
| identity = x
|
|
|
| out = self.conv1(x)
|
| out = self.bn1(out)
|
| out = self.relu(out)
|
|
|
| out = self.conv2(out)
|
| out = self.bn2(out)
|
|
|
| if self.downsample is not None:
|
| identity = self.downsample(x)
|
|
|
| out += identity
|
| out = self.relu(out)
|
|
|
| return out
|
|
|
|
|
| class Bottleneck(nn.Module):
|
|
|
|
|
|
|
|
|
|
|
|
|
| expansion: int = 4
|
|
|
| def __init__(
|
| self,
|
| inplanes: int,
|
| planes: int,
|
| stride: int = 1,
|
| downsample: Optional[nn.Module] = None,
|
| groups: int = 1,
|
| base_width: int = 64,
|
| dilation: int = 1,
|
| norm_layer: Optional[Callable[..., nn.Module]] = None
|
| ) -> None:
|
| super(Bottleneck, self).__init__()
|
| if norm_layer is None:
|
| norm_layer = nn.BatchNorm2d
|
| width = int(planes * (base_width / 64.)) * groups
|
|
|
| self.conv1 = conv1x1(inplanes, width)
|
| self.bn1 = norm_layer(width)
|
| self.conv2 = conv3x3(width, width, stride, groups, dilation)
|
| self.bn2 = norm_layer(width)
|
| self.conv3 = conv1x1(width, planes * self.expansion)
|
| self.bn3 = norm_layer(planes * self.expansion)
|
| self.relu = nn.ReLU(inplace=True)
|
| self.downsample = downsample
|
| self.stride = stride
|
|
|
| def forward(self, x: Tensor) -> Tensor:
|
| identity = x
|
|
|
| out = self.conv1(x)
|
| out = self.bn1(out)
|
| out = self.relu(out)
|
|
|
| out = self.conv2(out)
|
| out = self.bn2(out)
|
| out = self.relu(out)
|
|
|
| out = self.conv3(out)
|
| out = self.bn3(out)
|
|
|
| if self.downsample is not None:
|
| identity = self.downsample(x)
|
|
|
| out += identity
|
| out = self.relu(out)
|
|
|
| return out
|
|
|
|
|
| class ResNet(nn.Module):
|
|
|
| def __init__(
|
| self,
|
| block: Type[Union[BasicBlock, Bottleneck]],
|
| layers: List[int],
|
| num_classes: int = 1000,
|
| zero_init_residual: bool = False,
|
| groups: int = 1,
|
| width_per_group: int = 64,
|
| replace_stride_with_dilation: Optional[List[bool]] = None,
|
| norm_layer: Optional[Callable[..., nn.Module]] = None
|
| ) -> None:
|
| super(ResNet, self).__init__()
|
| if norm_layer is None:
|
| norm_layer = nn.BatchNorm2d
|
| self._norm_layer = norm_layer
|
|
|
| self.inplanes = 64
|
| self.dilation = 1
|
| if replace_stride_with_dilation is None:
|
|
|
|
|
| replace_stride_with_dilation = [False, False, False]
|
| if len(replace_stride_with_dilation) != 3:
|
| raise ValueError("replace_stride_with_dilation should be None "
|
| "or a 3-element tuple, got {}".format(replace_stride_with_dilation))
|
| self.groups = groups
|
| self.base_width = width_per_group
|
| self.conv1 = nn.Conv2d(1, self.inplanes, kernel_size=7, stride=2, padding=3,
|
| bias=False)
|
| self.bn1 = norm_layer(self.inplanes)
|
| self.relu = nn.ReLU(inplace=True)
|
| self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
|
| self.layer1 = self._make_layer(block, 64, layers[0])
|
| self.layer2 = self._make_layer(block, 128, layers[1], stride=2,
|
| dilate=replace_stride_with_dilation[0])
|
| self.layer3 = self._make_layer(block, 256, layers[2], stride=2,
|
| dilate=replace_stride_with_dilation[1])
|
| self.layer4 = self._make_layer(block, 512, layers[3], stride=2,
|
| dilate=replace_stride_with_dilation[2])
|
| self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
|
| self.fc = nn.Linear(512 * block.expansion, num_classes)
|
|
|
| for m in self.modules():
|
| if isinstance(m, nn.Conv2d):
|
| nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
|
| elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
|
| nn.init.constant_(m.weight, 1)
|
| nn.init.constant_(m.bias, 0)
|
|
|
|
|
|
|
|
|
| if zero_init_residual:
|
| for m in self.modules():
|
| if isinstance(m, Bottleneck):
|
| nn.init.constant_(m.bn3.weight, 0)
|
| elif isinstance(m, BasicBlock):
|
| nn.init.constant_(m.bn2.weight, 0)
|
|
|
| def _make_layer(self, block: Type[Union[BasicBlock, Bottleneck]], planes: int, blocks: int,
|
| stride: int = 1, dilate: bool = False) -> nn.Sequential:
|
| norm_layer = self._norm_layer
|
| downsample = None
|
| previous_dilation = self.dilation
|
| if dilate:
|
| self.dilation *= stride
|
| stride = 1
|
| if stride != 1 or self.inplanes != planes * block.expansion:
|
| downsample = nn.Sequential(
|
| conv1x1(self.inplanes, planes * block.expansion, stride),
|
| norm_layer(planes * block.expansion),
|
| )
|
|
|
| layers = []
|
| layers.append(block(self.inplanes, planes, stride, downsample, self.groups,
|
| self.base_width, previous_dilation, norm_layer))
|
| self.inplanes = planes * block.expansion
|
| for _ in range(1, blocks):
|
| layers.append(block(self.inplanes, planes, groups=self.groups,
|
| base_width=self.base_width, dilation=self.dilation,
|
| norm_layer=norm_layer))
|
|
|
| return nn.Sequential(*layers)
|
|
|
| def _forward_impl(self, x: Tensor) -> Tensor:
|
|
|
| x = self.conv1(x)
|
| x = self.bn1(x)
|
| out1 = self.relu(x)
|
| x = self.maxpool(out1)
|
|
|
| out2 = self.layer1(x)
|
| out3 = self.layer2(out2)
|
| out4 = self.layer3(out3)
|
| out5 = self.layer4(out4)
|
|
|
| return out5, out4, out3, out2, out1
|
|
|
| def forward(self, x: Tensor) -> Tensor:
|
| return self._forward_impl(x)
|
|
|
|
|
| def _resnet(
|
| arch: str,
|
| block: Type[Union[BasicBlock, Bottleneck]],
|
| layers: List[int],
|
| pretrained: bool,
|
| progress: bool,
|
| **kwargs: Any
|
| ) -> ResNet:
|
| model = ResNet(block, layers, **kwargs)
|
| return model
|
|
|
|
|
| def resnet18(pretrained: bool = False, progress: bool = True, **kwargs: Any) -> ResNet:
|
| r"""ResNet-18 model from
|
| `"Deep Residual Learning for Image Recognition" <https://arxiv.org/pdf/1512.03385.pdf>`_.
|
|
|
| Args:
|
| pretrained (bool): If True, returns a model pre-trained on ImageNet
|
| progress (bool): If True, displays a progress bar of the download to stderr
|
| """
|
| return _resnet('resnet18', BasicBlock, [2, 2, 2, 2], pretrained, progress,
|
| **kwargs)
|
|
|
|
|
| def resnet34(pretrained: bool = False, progress: bool = True, **kwargs: Any) -> ResNet:
|
| r"""ResNet-34 model from
|
| `"Deep Residual Learning for Image Recognition" <https://arxiv.org/pdf/1512.03385.pdf>`_.
|
|
|
| Args:
|
| pretrained (bool): If True, returns a model pre-trained on ImageNet
|
| progress (bool): If True, displays a progress bar of the download to stderr
|
| """
|
| return _resnet('resnet34', BasicBlock, [3, 4, 6, 3], pretrained, progress,
|
| **kwargs)
|
|
|
|
|
| def resnet50(pretrained: bool = False, progress: bool = True, **kwargs: Any) -> ResNet:
|
| r"""ResNet-50 model from
|
| `"Deep Residual Learning for Image Recognition" <https://arxiv.org/pdf/1512.03385.pdf>`_.
|
|
|
| Args:
|
| pretrained (bool): If True, returns a model pre-trained on ImageNet
|
| progress (bool): If True, displays a progress bar of the download to stderr
|
| """
|
| return _resnet('resnet50', Bottleneck, [3, 4, 6, 3], pretrained, progress,
|
| **kwargs)
|
|
|
|
|
| def resnet101(pretrained: bool = False, progress: bool = True, **kwargs: Any) -> ResNet:
|
| r"""ResNet-101 model from
|
| `"Deep Residual Learning for Image Recognition" <https://arxiv.org/pdf/1512.03385.pdf>`_.
|
|
|
| Args:
|
| pretrained (bool): If True, returns a model pre-trained on ImageNet
|
| progress (bool): If True, displays a progress bar of the download to stderr
|
| """
|
| return _resnet('resnet101', Bottleneck, [3, 4, 23, 3], pretrained, progress,
|
| **kwargs)
|
|
|
|
|
| def resnet152(pretrained: bool = False, progress: bool = True, **kwargs: Any) -> ResNet:
|
| r"""ResNet-152 model from
|
| `"Deep Residual Learning for Image Recognition" <https://arxiv.org/pdf/1512.03385.pdf>`_.
|
|
|
| Args:
|
| pretrained (bool): If True, returns a model pre-trained on ImageNet
|
| progress (bool): If True, displays a progress bar of the download to stderr
|
| """
|
| return _resnet('resnet152', Bottleneck, [3, 8, 36, 3], pretrained, progress,
|
| **kwargs)
|
|
|
|
|
| def resnext50_32x4d(pretrained: bool = False, progress: bool = True, **kwargs: Any) -> ResNet:
|
| r"""ResNeXt-50 32x4d model from
|
| `"Aggregated Residual Transformation for Deep Neural Networks" <https://arxiv.org/pdf/1611.05431.pdf>`_.
|
|
|
| Args:
|
| pretrained (bool): If True, returns a model pre-trained on ImageNet
|
| progress (bool): If True, displays a progress bar of the download to stderr
|
| """
|
| kwargs['groups'] = 32
|
| kwargs['width_per_group'] = 4
|
| return _resnet('resnext50_32x4d', Bottleneck, [3, 4, 6, 3],
|
| pretrained, progress, **kwargs)
|
|
|
|
|
| def resnext101_32x8d(pretrained: bool = False, progress: bool = True, **kwargs: Any) -> ResNet:
|
| r"""ResNeXt-101 32x8d model from
|
| `"Aggregated Residual Transformation for Deep Neural Networks" <https://arxiv.org/pdf/1611.05431.pdf>`_.
|
|
|
| Args:
|
| pretrained (bool): If True, returns a model pre-trained on ImageNet
|
| progress (bool): If True, displays a progress bar of the download to stderr
|
| """
|
| kwargs['groups'] = 32
|
| kwargs['width_per_group'] = 8
|
| return _resnet('resnext101_32x8d', Bottleneck, [3, 4, 23, 3],
|
| pretrained, progress, **kwargs)
|
|
|
|
|
| def wide_resnet50_2(pretrained: bool = False, progress: bool = True, **kwargs: Any) -> ResNet:
|
| r"""Wide ResNet-50-2 model from
|
| `"Wide Residual Networks" <https://arxiv.org/pdf/1605.07146.pdf>`_.
|
|
|
| The model is the same as ResNet except for the bottleneck number of channels
|
| which is twice larger in every block. The number of channels in outer 1x1
|
| convolutions is the same, e.g. last block in ResNet-50 has 2048-512-2048
|
| channels, and in Wide ResNet-50-2 has 2048-1024-2048.
|
|
|
| Args:
|
| pretrained (bool): If True, returns a model pre-trained on ImageNet
|
| progress (bool): If True, displays a progress bar of the download to stderr
|
| """
|
| kwargs['width_per_group'] = 64 * 2
|
| return _resnet('wide_resnet50_2', Bottleneck, [3, 4, 6, 3],
|
| pretrained, progress, **kwargs)
|
|
|
|
|
| def wide_resnet101_2(pretrained: bool = False, progress: bool = True, **kwargs: Any) -> ResNet:
|
| r"""Wide ResNet-101-2 model from
|
| `"Wide Residual Networks" <https://arxiv.org/pdf/1605.07146.pdf>`_.
|
|
|
| The model is the same as ResNet except for the bottleneck number of channels
|
| which is twice larger in every block. The number of channels in outer 1x1
|
| convolutions is the same, e.g. last block in ResNet-50 has 2048-512-2048
|
| channels, and in Wide ResNet-50-2 has 2048-1024-2048.
|
|
|
| Args:
|
| pretrained (bool): If True, returns a model pre-trained on ImageNet
|
| progress (bool): If True, displays a progress bar of the download to stderr
|
| """
|
| kwargs['width_per_group'] = 64 * 2
|
| return _resnet('wide_resnet101_2', Bottleneck, [3, 4, 23, 3],
|
| pretrained, progress, **kwargs)
|
|
|
| def compute_iou(ra, rb):
|
| """intersection over union of two axis aligned rectangles ra and rb"""
|
| if ra.xmax < rb.xmin or rb.xmax < ra.xmin or ra.ymax < rb.ymin or rb.ymax < ra.ymin:
|
| return 0
|
|
|
| l = max(ra.xmin, rb.xmin)
|
| r = min(ra.xmax, rb.xmax)
|
| t = max(ra.ymin, rb.ymin)
|
| b = min(ra.ymax, rb.ymax)
|
|
|
| intersection = (r - l) * (b - t)
|
| union = ra.area() + rb.area() - intersection
|
|
|
| iou = intersection / union
|
| return iou
|
|
|
| def compute_dist_mat(aabbs):
|
| """Jaccard distance matrix of all pairs of aabbs"""
|
| num_aabbs = len(aabbs)
|
|
|
| dists = np.zeros((num_aabbs, num_aabbs))
|
| for i in range(num_aabbs):
|
| for j in range(num_aabbs):
|
| if j > i:
|
| break
|
|
|
| dists[i, j] = dists[j, i] = 1 - compute_iou(aabbs[i], aabbs[j])
|
|
|
| return dists
|
|
|
|
|
| def cluster_aabbs(aabbs):
|
| """cluster aabbs using DBSCAN and the Jaccard distance between bounding boxes"""
|
| if len(aabbs) < 2:
|
| return aabbs
|
|
|
| dists = compute_dist_mat(aabbs)
|
| clustering = DBSCAN(eps=0.7, min_samples=3, metric='precomputed').fit(dists)
|
|
|
| clusters = defaultdict(list)
|
| for i, c in enumerate(clustering.labels_):
|
| if c == -1:
|
| continue
|
| clusters[c].append(aabbs[i])
|
|
|
| res_aabbs = []
|
| for curr_cluster in clusters.values():
|
| xmin = np.median([aabb.xmin for aabb in curr_cluster])
|
| xmax = np.median([aabb.xmax for aabb in curr_cluster])
|
| ymin = np.median([aabb.ymin for aabb in curr_cluster])
|
| ymax = np.median([aabb.ymax for aabb in curr_cluster])
|
| res_aabbs.append(AABB(xmin, xmax, ymin, ymax))
|
|
|
| return res_aabbs
|
|
|
|
|
| class AABB:
|
| """axis aligned bounding box"""
|
|
|
| def __init__(self, xmin, xmax, ymin, ymax):
|
| self.xmin = xmin
|
| self.xmax = xmax
|
| self.ymin = ymin
|
| self.ymax = ymax
|
|
|
| def scale(self, fx, fy):
|
| new = AABB(self.xmin, self.xmax, self.ymin, self.ymax)
|
| new.xmin = fx * new.xmin
|
| new.xmax = fx * new.xmax
|
| new.ymin = fy * new.ymin
|
| new.ymax = fy * new.ymax
|
| return new
|
|
|
| def scale_around_center(self, fx, fy):
|
| cx = (self.xmin + self.xmax) / 2
|
| cy = (self.ymin + self.ymax) / 2
|
|
|
| new = AABB(self.xmin, self.xmax, self.ymin, self.ymax)
|
| new.xmin = cx - fx * (cx - self.xmin)
|
| new.xmax = cx + fx * (self.xmax - cx)
|
| new.ymin = cy - fy * (cy - self.ymin)
|
| new.ymax = cy + fy * (self.ymax - cy)
|
| return new
|
|
|
| def translate(self, tx, ty):
|
| new = AABB(self.xmin, self.xmax, self.ymin, self.ymax)
|
| new.xmin = new.xmin + tx
|
| new.xmax = new.xmax + tx
|
| new.ymin = new.ymin + ty
|
| new.ymax = new.ymax + ty
|
| return new
|
|
|
| def as_type(self, t):
|
| new = AABB(self.xmin, self.xmax, self.ymin, self.ymax)
|
| new.xmin = t(new.xmin)
|
| new.xmax = t(new.xmax)
|
| new.ymin = t(new.ymin)
|
| new.ymax = t(new.ymax)
|
| return new
|
|
|
| def enlarge_to_int_grid(self):
|
| new = AABB(self.xmin, self.xmax, self.ymin, self.ymax)
|
| new.xmin = np.floor(new.xmin)
|
| new.xmax = np.ceil(new.xmax)
|
| new.ymin = np.floor(new.ymin)
|
| new.ymax = np.ceil(new.ymax)
|
| return new
|
|
|
| def clip(self, clip_aabb):
|
| new = AABB(self.xmin, self.xmax, self.ymin, self.ymax)
|
| new.xmin = min(max(new.xmin, clip_aabb.xmin), clip_aabb.xmax)
|
| new.xmax = max(min(new.xmax, clip_aabb.xmax), clip_aabb.xmin)
|
| new.ymin = min(max(new.ymin, clip_aabb.ymin), clip_aabb.ymax)
|
| new.ymax = max(min(new.ymax, clip_aabb.ymax), clip_aabb.ymin)
|
| return new
|
|
|
| def area(self):
|
| return (self.xmax - self.xmin) * (self.ymax - self.ymin)
|
|
|
| def __str__(self):
|
| return f'AABB(xmin={self.xmin},xmax={self.xmax},ymin={self.ymin},ymax={self.ymax})'
|
|
|
| def __repr__(self):
|
| return str(self)
|
|
|
| class MapOrdering:
|
| """order of the maps encoding the aabbs around the words"""
|
| SEG_WORD = 0
|
| SEG_SURROUNDING = 1
|
| SEG_BACKGROUND = 2
|
| GEO_TOP = 3
|
| GEO_BOTTOM = 4
|
| GEO_LEFT = 5
|
| GEO_RIGHT = 6
|
| NUM_MAPS = 7
|
|
|
|
|
| def encode(shape, gt, f=1.0):
|
| gt_map = np.zeros((MapOrdering.NUM_MAPS,) + shape)
|
| for aabb in gt:
|
| aabb = aabb.scale(f, f)
|
|
|
|
|
| aabb_clip = AABB(0, shape[0] - 1, 0, shape[1] - 1)
|
|
|
| aabb_word = aabb.scale_around_center(0.5, 0.5).as_type(int).clip(aabb_clip)
|
| aabb_sur = aabb.as_type(int).clip(aabb_clip)
|
| gt_map[MapOrdering.SEG_SURROUNDING, aabb_sur.ymin:aabb_sur.ymax + 1, aabb_sur.xmin:aabb_sur.xmax + 1] = 1
|
| gt_map[MapOrdering.SEG_SURROUNDING, aabb_word.ymin:aabb_word.ymax + 1, aabb_word.xmin:aabb_word.xmax + 1] = 0
|
| gt_map[MapOrdering.SEG_WORD, aabb_word.ymin:aabb_word.ymax + 1, aabb_word.xmin:aabb_word.xmax + 1] = 1
|
|
|
|
|
| for x in range(aabb_word.xmin, aabb_word.xmax + 1):
|
| for y in range(aabb_word.ymin, aabb_word.ymax + 1):
|
| gt_map[MapOrdering.GEO_TOP, y, x] = y - aabb.ymin
|
| gt_map[MapOrdering.GEO_BOTTOM, y, x] = aabb.ymax - y
|
| gt_map[MapOrdering.GEO_LEFT, y, x] = x - aabb.xmin
|
| gt_map[MapOrdering.GEO_RIGHT, y, x] = aabb.xmax - x
|
|
|
| gt_map[MapOrdering.SEG_BACKGROUND] = np.clip(1 - gt_map[MapOrdering.SEG_WORD] - gt_map[MapOrdering.SEG_SURROUNDING],
|
| 0, 1)
|
|
|
| return gt_map
|
|
|
|
|
| def subsample(idx, max_num):
|
| """restrict fg indices to a maximum number"""
|
| f = len(idx[0]) / max_num
|
| if f > 1:
|
| a = np.asarray([idx[0][int(j * f)] for j in range(max_num)], np.int64)
|
| b = np.asarray([idx[1][int(j * f)] for j in range(max_num)], np.int64)
|
| idx = (a, b)
|
| return idx
|
|
|
|
|
| def fg_by_threshold(thres, max_num=None):
|
| """all pixels above threshold are fg pixels, optionally limited to a maximum number"""
|
|
|
| def func(seg_map):
|
| idx = np.where(seg_map > thres)
|
| if max_num is not None:
|
| idx = subsample(idx, max_num)
|
| return idx
|
|
|
| return func
|
|
|
|
|
| def fg_by_cc(thres, max_num):
|
| """take a maximum number of pixels per connected component, but at least 3 (->DBSCAN minPts)"""
|
|
|
| def func(seg_map):
|
| seg_mask = (seg_map > thres).astype(np.uint8)
|
| num_labels, label_img = cv2.connectedComponents(seg_mask, connectivity=4)
|
| max_num_per_cc = max(max_num // (num_labels + 1), 3)
|
|
|
| all_idx = [np.empty(0, np.int64), np.empty(0, np.int64)]
|
| for curr_label in range(1, num_labels):
|
| curr_idx = np.where(label_img == curr_label)
|
| curr_idx = subsample(curr_idx, max_num_per_cc)
|
| all_idx[0] = np.append(all_idx[0], curr_idx[0])
|
| all_idx[1] = np.append(all_idx[1], curr_idx[1])
|
| return tuple(all_idx)
|
|
|
| return func
|
|
|
|
|
| def decode(pred_map, comp_fg=fg_by_threshold(0.5), f=1):
|
| idx = comp_fg(pred_map[MapOrdering.SEG_WORD])
|
| pred_map_masked = pred_map[..., idx[0], idx[1]]
|
| aabbs = []
|
| for yc, xc, pred in zip(idx[0], idx[1], pred_map_masked.T):
|
| t = pred[MapOrdering.GEO_TOP]
|
| b = pred[MapOrdering.GEO_BOTTOM]
|
| l = pred[MapOrdering.GEO_LEFT]
|
| r = pred[MapOrdering.GEO_RIGHT]
|
| aabb = AABB(xc - l, xc + r, yc - t, yc + b)
|
| aabbs.append(aabb.scale(f, f))
|
| return aabbs
|
|
|
|
|
| def main():
|
| import matplotlib.pyplot as plt
|
| aabbs_in = [AABB(10, 30, 30, 60)]
|
| encoded = encode((50, 50), aabbs_in, f=0.5)
|
| aabbs_out = decode(encoded, f=2)
|
| print(aabbs_out[0])
|
| plt.subplot(151)
|
| plt.imshow(encoded[MapOrdering.SEG_WORD:MapOrdering.SEG_BACKGROUND + 1].transpose(1, 2, 0))
|
|
|
| plt.subplot(152)
|
| plt.imshow(encoded[MapOrdering.GEO_TOP])
|
| plt.subplot(153)
|
| plt.imshow(encoded[MapOrdering.GEO_BOTTOM])
|
| plt.subplot(154)
|
| plt.imshow(encoded[MapOrdering.GEO_LEFT])
|
| plt.subplot(155)
|
| plt.imshow(encoded[MapOrdering.GEO_RIGHT])
|
|
|
| plt.show()
|
|
|
|
|
| def compute_scale_down(input_size, output_size):
|
| """compute scale down factor of neural network, given input and output size"""
|
| return output_size[0] / input_size[0]
|
|
|
|
|
| def prob_true(p):
|
| """return True with probability p"""
|
| return np.random.random() < p
|
|
|
|
|
| class UpscaleAndConcatLayer(torch.nn.Module):
|
| """
|
| take small map with cx channels
|
| upscale to size of large map (s*s)
|
| concat large map with cy channels and upscaled small map
|
| apply conv and output map with cz channels
|
| """
|
|
|
| def __init__(self, cx, cy, cz):
|
| super(UpscaleAndConcatLayer, self).__init__()
|
| self.conv = torch.nn.Conv2d(cx + cy, cz, 3, padding=1)
|
|
|
| def forward(self, x, y, s):
|
| x = F.interpolate(x, s)
|
| z = torch.cat((x, y), 1)
|
| z = F.relu(self.conv(z))
|
| return z
|
|
|
|
|
| class WordDetectorNet(torch.nn.Module):
|
|
|
| input_size = (448, 448)
|
| output_size = (224, 224)
|
| scale_down = compute_scale_down(input_size, output_size)
|
|
|
| def __init__(self):
|
| super(WordDetectorNet, self).__init__()
|
|
|
| self.backbone = resnet18()
|
|
|
| self.up1 = UpscaleAndConcatLayer(512, 256, 256)
|
| self.up2 = UpscaleAndConcatLayer(256, 128, 128)
|
| self.up3 = UpscaleAndConcatLayer(128, 64, 64)
|
| self.up4 = UpscaleAndConcatLayer(64, 64, 32)
|
|
|
| self.conv1 = torch.nn.Conv2d(32, MapOrdering.NUM_MAPS, 3, 1, padding=1)
|
|
|
| @staticmethod
|
| def scale_shape(s, f):
|
| assert s[0] % f == 0 and s[1] % f == 0
|
| return s[0] // f, s[1] // f
|
|
|
| def output_activation(self, x, apply_softmax):
|
| if apply_softmax:
|
| seg = torch.softmax(x[:, MapOrdering.SEG_WORD:MapOrdering.SEG_BACKGROUND + 1], dim=1)
|
| else:
|
| seg = x[:, MapOrdering.SEG_WORD:MapOrdering.SEG_BACKGROUND + 1]
|
| geo = torch.sigmoid(x[:, MapOrdering.GEO_TOP:]) * self.input_size[0]
|
| y = torch.cat([seg, geo], dim=1)
|
| return y
|
|
|
| def forward(self, x, apply_softmax=False):
|
|
|
|
|
| s = x.shape[2:]
|
| bb5, bb4, bb3, bb2, bb1 = self.backbone(x)
|
|
|
| x = self.up1(bb5, bb4, self.scale_shape(s, 16))
|
| x = self.up2(x, bb3, self.scale_shape(s, 8))
|
| x = self.up3(x, bb2, self.scale_shape(s, 4))
|
| x = self.up4(x, bb1, self.scale_shape(s, 2))
|
| x = self.conv1(x)
|
|
|
| return self.output_activation(x, apply_softmax)
|
|
|
|
|
| def ceil32(val):
|
| if val % 32 == 0:
|
| return val
|
| val = (val // 32 + 1) * 32
|
| return val
|
|
|
| def word_segment(path, output_folder, model_path):
|
|
|
| os.makedirs(output_folder, exist_ok = True)
|
|
|
| max_side_len = 5000
|
| thres = 0.5
|
| max_aabbs = 1000
|
|
|
| orig = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
|
| net = WordDetectorNet()
|
| net.load_state_dict(torch.load(model_path, map_location='cuda'))
|
| net.eval()
|
| net.cuda()
|
|
|
| f = min(max_side_len / orig.shape[0], max_side_len / orig.shape[1])
|
| if f < 1:
|
| orig = cv2.resize(orig, dsize=None, fx=f, fy=f)
|
| img = np.ones((ceil32(orig.shape[0]), ceil32(orig.shape[1])), np.uint8) * 255
|
| img[:orig.shape[0], :orig.shape[1]] = orig
|
|
|
| img = (img / 255 - 0.5).astype(np.float32)
|
| imgs = img[None, None, ...]
|
| imgs = torch.from_numpy(imgs).cuda()
|
| with torch.no_grad():
|
| y = net(imgs, apply_softmax=True)
|
| y_np = y.to('cpu').numpy()
|
| scale_up = 1 / compute_scale_down(WordDetectorNet.input_size, WordDetectorNet.output_size)
|
|
|
| img_np = imgs[0, 0].to('cpu').numpy()
|
| pred_map = y_np[0]
|
|
|
| aabbs = decode(pred_map, comp_fg=fg_by_cc(thres, max_aabbs), f=scale_up)
|
| h, w = img_np.shape
|
| aabbs = [aabb.clip(AABB(0, w - 1, 0, h - 1)) for aabb in aabbs]
|
| clustered_aabbs = cluster_aabbs(aabbs)
|
|
|
| img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
|
|
|
| for idx,bb in enumerate(clustered_aabbs):
|
| bb1 = bb
|
| im_i = (img_np[int(bb1.ymin):int(bb1.ymax),int(bb1.xmin):int(bb1.xmax)]+0.5)*255
|
| cv2.imwrite(f'{output_folder}/im_{idx}.png',im_i)
|
|
|