| |
| import torch |
| import torch.nn as nn |
| import torchvision |
| from torchvision import datasets, transforms |
| import time |
| from torch.nn import functional as F |
| from math import floor, ceil |
| from torch.utils.data import DataLoader,TensorDataset |
| |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
| print(device) |
| |
| num_epochs = 60 |
| batch_size = 1000 |
| learning_rate = 0.001 |
|
|
| |
|
|
| transform = torchvision.transforms.Compose([ |
| torchvision.transforms.ToTensor(), |
| torchvision.transforms.Normalize( |
| (0.1307,), (0.3081,)) |
| ]) |
|
|
|
|
| train_set = torchvision.datasets.MNIST(root='MNIST', train=True, download=True) |
| train_data = train_set.data.float().unsqueeze(1) / 255.0 |
| train_labels = train_set.targets |
| train_dataset = TensorDataset(train_data,train_labels) |
| train_loader = DataLoader(train_dataset,batch_size=batch_size,shuffle=True) |
|
|
|
|
| test_set = torchvision.datasets.MNIST(root='MNIST', train=False, download=True) |
| test_data = test_set.data.float().unsqueeze(1) / 255.0 |
| test_labels = test_set.targets |
| test_dataset = TensorDataset(test_data,test_labels) |
| test_loader = DataLoader(test_dataset,batch_size=batch_size,shuffle=True) |
|
|
|
|
|
|
| |
| def conv3x3(in_channels, out_channels, stride=1): |
| return nn.Conv2d(in_channels, out_channels, kernel_size=3, |
| stride=stride, padding=1, bias=True) |
|
|
|
|
| |
| class ResidualBlock(nn.Module): |
| def __init__(self, in_channels, out_channels, stride=1, downsample=None): |
| super(ResidualBlock, self).__init__() |
| self.conv1 = conv3x3(in_channels, out_channels, stride) |
| self.bn1 = nn.BatchNorm2d(out_channels) |
| self.relu = nn.ReLU(inplace=True) |
| self.conv2 = conv3x3(out_channels, out_channels) |
| self.bn2 = nn.BatchNorm2d(out_channels) |
| self.downsample = downsample |
|
|
| def forward(self, x): |
| residual = x |
| out = self.conv1(x) |
| out = self.bn1(out) |
| out = self.relu(out) |
| out = self.conv2(out) |
| out = self.bn2(out) |
| |
| if self.downsample: |
| residual = self.downsample(x) |
| out += residual |
| out = self.relu(out) |
| return out |
|
|
|
|
| |
| class ResNet(nn.Module): |
| def __init__(self, block, layers, num_classes=10): |
| super(ResNet, self).__init__() |
| self.in_channels = 16 |
| self.conv = conv3x3(1, 16) |
| self.bn = nn.BatchNorm2d(16) |
| self.relu = nn.ReLU(inplace=True) |
| |
| |
| self.layer1 = self.make_layer(block, 16, layers[0], stride=1) |
| |
| |
| self.layer2 = self.make_layer(block, 32, layers[1], 2) |
| |
| self.layer3 = self.make_layer(block, 64, layers[2], 2) |
| self.avg_pool = nn.AvgPool2d(8) |
| self.fc1 = nn.Linear(3136, 128) |
| self.normfc12 = nn.LayerNorm((128), eps=1e-5) |
| self.fc2 = nn.Linear(128, num_classes) |
|
|
| def make_layer(self, block, out_channels, blocks, stride=1): |
| downsample = None |
| if (stride != 1) or (self.in_channels != out_channels): |
| downsample = nn.Sequential( |
| conv3x3(self.in_channels, out_channels, stride=stride), |
| nn.BatchNorm2d(out_channels)) |
| layers = [] |
| layers.append(block(self.in_channels, out_channels, stride, downsample)) |
| |
| self.in_channels = out_channels |
| |
| |
| |
| |
| for i in range(1, blocks): |
| layers.append(block(out_channels, out_channels)) |
| return nn.Sequential(*layers) |
|
|
| def forward(self, x): |
| out = self.conv(x) |
| out = self.bn(out) |
| out = self.relu(out) |
| |
| out = self.layer1(out) |
| |
| out = self.layer2(out) |
| |
| out = self.layer3(out) |
| |
| |
| |
| out = out.view(out.size(0), -1) |
| out = self.fc1(out) |
| out = self.normfc12(out) |
| out = self.relu(out) |
| out = self.fc2(out) |
| return out |
|
|
|
|
| |
| |
| model = ResNet(ResidualBlock, [2, 2, 2]).to(device) |
| criterion = nn.CrossEntropyLoss() |
| optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate) |
|
|
|
|
| |
| def update_lr(optimizer, lr): |
| for param_group in optimizer.param_groups: |
| param_group['lr'] = lr |
|
|
|
|
| |
| def test(model, test_loader): |
| model.eval() |
| with torch.no_grad(): |
| correct = 0 |
| total = 0 |
| for images, labels in test_loader: |
| images = images.to(device) |
| labels = labels.to(device) |
| outputs = model(images) |
| _, predicted = torch.max(outputs.data, 1) |
| total += labels.size(0) |
| correct += (predicted == labels).sum().item() |
|
|
| print('Accuracy of the model on the test images: {} %'.format(100 * correct / total)) |
|
|
|
|
| |
| total_step = len(train_loader) |
| curr_lr = learning_rate |
| for epoch in range(num_epochs): |
| in_epoch = time.time() |
| for i, (images, labels) in enumerate(train_loader): |
| images = images.to(device) |
| labels = labels.to(device) |
|
|
| |
| outputs = model(images) |
| loss = criterion(outputs, labels) |
|
|
| |
| optimizer.zero_grad() |
| loss.backward() |
| optimizer.step() |
|
|
| if (i + 1) % 100 == 0: |
| print("Epoch [{}/{}], Step [{}/{}] Loss: {:.4f}" |
| .format(epoch + 1, num_epochs, i + 1, total_step, loss.item())) |
| test(model, test_loader) |
| out_epoch = time.time() |
| print(f"use {(out_epoch - in_epoch) // 60}min{(out_epoch - in_epoch) % 60}s") |
| if (epoch + 1) % 20 == 0: |
| curr_lr /= 3 |
| update_lr(optimizer, curr_lr) |
| |
| model.eval() |
| with torch.no_grad(): |
| correct = 0 |
| total = 0 |
| for images, labels in test_loader: |
| images = images.to(device) |
| labels = labels.to(device) |
| outputs = model(images) |
| _, predicted = torch.max(outputs.data, 1) |
| total += labels.size(0) |
| correct += (predicted == labels).sum().item() |
|
|
| print('Accuracy of the model on the test images: {} %'.format(100 * correct / total)) |
|
|
| torch.save(model.state_dict(), '../resnet.ckpt') |
|
|