| import jittor as jt |
| from jittor import init |
| from jittor import nn |
| from jittor.dataset.mnist import MNIST |
| import jittor.transform as transform |
| import argparse |
| import os |
| import numpy as np |
| import math |
| import time |
| import cv2 |
|
|
| jt.flags.use_cuda = 1 |
| os.makedirs('images', exist_ok=True) |
| os.makedirs("saved_models", exist_ok=True) |
|
|
| parser = argparse.ArgumentParser() |
| parser.add_argument('--n_epochs', type=int, default=200, help='训练的时期数') |
| parser.add_argument('--batch_size', type=int, default=64, help='批次大小') |
| parser.add_argument('--lr', type=float, default=0.0002, help='学习率') |
| parser.add_argument('--b1', type=float, default=0.5, help='梯度的一阶动量衰减') |
| parser.add_argument('--b2', type=float, default=0.999, help='梯度的一阶动量衰减') |
| parser.add_argument('--n_cpu', type=int, default=8, help='批处理生成期间要使用的 cpu 线程数') |
| parser.add_argument('--latent_dim', type=int, default=100, help='潜在空间的维度') |
| parser.add_argument('--img_size', type=int, default=28, help='每个图像尺寸的大小') |
| parser.add_argument('--channels', type=int, default=1, help='图像通道数') |
| parser.add_argument('--sample_interval', type=int, default=400, help='图像样本之间的间隔') |
|
|
| opt = parser.parse_args() |
| print(opt) |
| img_shape = (opt.channels, opt.img_size, opt.img_size) |
|
|
| |
| def save_image(img, path, nrow=None): |
| N,C,W,H = img.shape |
| ''' |
| [-1,700,28] , img2的形状(1,700,28) |
| img[0][0][0] = img2[0][0] |
| img2:[ |
| [1*28] |
| ......(一共700个) |
| ](1,700,28) |
| ''' |
| img2=img.reshape([-1,W*nrow*nrow,H]) |
| |
| img=img2[:,:W*nrow,:] |
| for i in range(1,nrow): |
| ''' |
| img(1,140,28),img2(1,700,28) |
| img从(1,140,28)->(1,140,28+28)->...->(1,140,28+28+28+28)=(1,140,140) |
| np.concatenate把两个三维数组合并 |
| ''' |
| img=np.concatenate([img,img2[:,W*nrow*i:W*nrow*(i+1),:]],axis=2) |
| |
| img=(img+1.0)/2.0*255 |
| |
| |
| img=img.transpose((1,2,0)) |
| |
| cv2.imwrite(path,img) |
|
|
| |
| class Generator(nn.Module): |
|
|
| def __init__(self): |
| super(Generator, self).__init__() |
|
|
| def block(in_feat, out_feat, normalize=True): |
| layers = [nn.Linear(in_feat, out_feat)] |
| if normalize: |
| layers.append(nn.BatchNorm1d(out_feat, 0.8)) |
| layers.append(nn.LeakyReLU(scale=0.2)) |
| return layers |
| self.model = nn.Sequential(*block(opt.latent_dim, 128, normalize=False), *block(128, 256), *block(256, 512), *block(512, 1024), nn.Linear(1024, int(np.prod(img_shape))), nn.Tanh()) |
|
|
| def execute(self, z): |
| img = self.model(z) |
| img = img.view((img.shape[0], *img_shape)) |
| return img |
|
|
| |
| class Discriminator(nn.Module): |
|
|
| def __init__(self): |
| super(Discriminator, self).__init__() |
| self.model = nn.Sequential(nn.Linear(int(np.prod(img_shape)), 512), nn.LeakyReLU(scale=0.2), nn.Linear(512, 256), nn.LeakyReLU(scale=0.2), nn.Linear(256, 1), nn.Sigmoid()) |
|
|
| def execute(self, img): |
| img_flat = img.view((img.shape[0], (- 1))) |
| validity = self.model(img_flat) |
| return validity |
|
|
| |
| ''' |
| 源码: |
| class BCELoss(Module): |
| def __init__(self, weight=None, size_average=True): |
| self.weight = weight |
| self.size_average = size_average |
| def execute(self, output, target): |
| return bce_loss(output, target, self.weight, self.size_average) |
| |
| # weight:表示对loss中每个元素的加权权值,默认为None |
| # size_average:指定输出的格式,包括'mean','sum' |
| # output:判别器对生成的数据的判别结果(64*1) |
| # target:判别器对真实的数据的判别结果(64*1) |
| def bce_loss(output, target, weight=None, size_average=True): |
| # jt.maximum(x,y):返回x和y的元素最大值 |
| # 公式:损失值 = -权重*[ 理想结果*log(判别结果) + (1-理想结果)*log(1-判别结果) ] |
| loss = - ( |
| target * jt.log(jt.maximum(output, 1e-20)) |
| + |
| (1 - target) * jt.log(jt.maximum(1 - output, 1e-20)) |
| ) |
| if weight is not None: |
| loss *= weight |
| if size_average: |
| return loss.mean()# 求均值 |
| else: |
| return loss.sum()# 求和 |
| ''' |
| |
| adversarial_loss = nn.BCELoss() |
|
|
| |
| generator = Generator() |
| discriminator = Discriminator() |
|
|
| |
| transform = transform.Compose([ |
| transform.Resize(size=opt.img_size), |
| transform.Gray(), |
| transform.ImageNormalize(mean=[0.5], std=[0.5]), |
| ]) |
| dataloader = MNIST(train=True, transform=transform).set_attrs(batch_size=opt.batch_size, shuffle=True) |
|
|
| |
| optimizer_G = jt.optim.Adam(generator.parameters(), lr=opt.lr, betas=(opt.b1, opt.b2)) |
| optimizer_D = jt.optim.Adam(discriminator.parameters(), lr=opt.lr, betas=(opt.b1, opt.b2)) |
|
|
| warmup_times = -1 |
| run_times = 3000 |
| total_time = 0. |
| cnt = 0 |
|
|
| |
| |
| |
|
|
| for epoch in range(opt.n_epochs): |
| for (i, (real_imgs, _)) in enumerate(dataloader): |
|
|
| ''' |
| valid表示真,全为1,fake表示假,全为0 |
| img.shape[0]:图像的垂直尺寸(高度)h |
| [ [1.0]...(一共h个)...[1.0] ] 64*1的数组 |
| ''' |
| valid = jt.ones([real_imgs.shape[0], 1]).stop_grad() |
| fake = jt.zeros([real_imgs.shape[0], 1]).stop_grad() |
|
|
| |
| |
| |
|
|
| |
| ''' |
| 随机生成一个符合正态分布的噪声,numpy.random.normal(loc=0.0, scale=1.0, size=None) |
| loc:正态分布的均值,对应着这个分布的中心,0说明这一个以Y轴为对称轴的正态分布 |
| scale:正态分布的标准差,对应分布的宽度,scale越大,正态分布的曲线越矮胖,scale越小,曲线越高瘦 |
| shape:(图片的高度h,潜在空间的维度100) == (64,100) == z.shape |
| ''' |
| z = jt.array(np.random.normal(0, 1, (real_imgs.shape[0], opt.latent_dim)).astype(np.float32)) |
| |
| ''' |
| gen_imgs的形状:(64,1,28,28), 64*1中每个元素都是28*28 |
| [ |
| [28*28] |
| ...... (一共64个28*28) |
| ] |
| ''' |
| gen_imgs = generator(z) |
| |
| ''' |
| 把生成的图片数据放进判别器中,让判别器对其进行分类,计算出数据可能是真实数据的概率值(0-1之间的数) |
| valid当作是判别器分类的结果,全为1说明判别器认为这个数据来源于真实图片 |
| adversarial_loss会调用bce_loss求损失值 |
| 因为我们需要使生成器生成的数据越来越像真实的数据,所以我们需要这两个数据越来越相似[discriminator(gen_imgs)和valid] |
| loss(x,y)=-w*[ylogx+(1-y)log(1-x)] |
| 生成器理想条件下,discriminator(gen_imgs)=1,loss(1,1)=0 |
| ''' |
| g_loss = adversarial_loss(discriminator(gen_imgs), valid) |
| |
| optimizer_G.step(g_loss) |
| |
| |
| |
| |
| |
| |
| ''' |
| real_imgs:加载的训练集数据 |
| 把训练集数据放进判别器,得到判别器对训练集数据的判别结果,计算出数据可能是真实数据的概率值 |
| valid当作是判别器分类的结果,全为1说明判别器认为这个数据来源于真实图片 |
| 因为我们需要使判别器把训练集数据判别为真实数据,所以我们需要使这两个数据越来越相似[discriminator(real_imgs), valid] |
| loss(x,y)=-w*[ylogx+(1-y)log(1-x)] |
| 判别器理想条件下,discriminator(real_imgs)=1,loss(1,1)=0 |
| ''' |
| real_loss = adversarial_loss(discriminator(real_imgs), valid) |
| |
| ''' |
| gen_imgs:生成器生成的图片数据 |
| 把生成的图片数据放进判别器中,让判别器对其进行分类,计算出数据可能是真实数据的概率值(0-1之间的数) |
| fake当作是判别器分类的结果,全为0说明判别器认为这个数据来源于生成的数据,而不是真实现实中的数据 |
| 调用bce_loss求损失值 |
| 因为我们需要使判别器能识别出机器生成的图片数据,所以我们需要使这两个数越来越相似[discriminator(gen_imgs), fake] |
| loss(x,y)=-w*[ylogx+(1-y)log(1-x)] |
| 判别器理想条件下,discriminator(gen_imgs)=0,loss(0,0)=0 |
| ''' |
| fake_loss = adversarial_loss(discriminator(gen_imgs), fake) |
| |
| d_loss = ((real_loss + fake_loss) / 2) |
| |
| optimizer_D.step(d_loss) |
|
|
| |
| |
| |
| |
|
|
| if warmup_times==-1: |
| ''' |
| D loss:判别器的损失值,越小越好(0-1的数) |
| G loss:生成器的损失值,越小越好(0-1的数) |
| numpy():把Var数据类型的数据转换成array数据类型 |
| ''' |
| print(('[Epoch %d/%d] [Batch %d/%d] [D loss: %f] [G loss: %f]' % (epoch, opt.n_epochs, i, len(dataloader), d_loss.numpy()[0], g_loss.numpy()[0]))) |
| |
| batches_done = ((epoch * len(dataloader)) + i) |
| |
| if ((batches_done % opt.sample_interval) == 0): |
| |
| save_image(gen_imgs.data[:25], ('images/GAN_images/%d.png' % batches_done), nrow=5) |
| else: |
| jt.sync_all() |
| cnt += 1 |
| print(cnt) |
| if cnt == warmup_times: |
| jt.sync_all(True) |
| sta = time.time() |
| if cnt > warmup_times + run_times: |
| jt.sync_all(True) |
| total_time = time.time() - sta |
| print(f"run {run_times} iters cost {total_time} seconds, and avg {total_time / run_times} one iter.") |
| exit(0) |
|
|
| |
| if (epoch+1) % 10 == 0: |
| generator.save("saved_models/generator_last.pkl") |
| discriminator.save("saved_models/discriminator_last.pkl") |
|
|