详解Wassertein GAN:运用KerasMNIST上的完成

阅读论文 Wassertein GAN 时,我发明了解它最好的方法便是用代码来实实行质。于是本文中我将用本人的 Keras 上的代码来向大师简明先容一下这篇作品。


何为 GAN?


GAN,亦称为生成对立收集(Generative Adversarial Network),它是生成模子中的一类——即一种可以通过察看来自特定分布的教练数据,进而实验对这个分布举行预测的模子。这个模子新获取的样本「看起来」会和最初的教练样本相似。有些生成模子只会去进修教练数据分布的参数,有少许模子则只可从教练数据分布中提取样本,而有少许则可以二者兼顾。


目前,曾经保管了许众品种的生成模子:全可睹信心收集(Fully Visible Belief Network)、变分自编码器(Variational Autoencoder)、玻尔兹曼机(Boltzmann Machine),生成随机收集(Generative Stochastic Network),像素递归神经收集(PixelRNN)等等。以上的模子都因其所外征或接近的教练数据密度而有所区别。少许模子会去精细的外征教练数据,另少许则会以某种方法去和教练数据举行互动——比如说生成模子。GAN 便是这里所说的后者。阵势部生成模子的进修准绳都可被轮廓为「最大化相似度预测」——即让模子的参数可以尽可以地与教练数据相似。


GAN 的义务方法可以看成一个由两部分构成的游戏:生成器(Generator/G)和判别器(Discriminator/D)(一般而言,这两者都由神经收集构成)。生成器随机将一个噪声举措本人的输入,然后实验去生成一个样本,目标是让判别器无法判别这个样本是来自教练数据照旧来自生成器的。判别器这里,我们让它以监视进修方法来义务,精细而言便是让它察看实样本和生成器生成的样本,而且同时用标签告诉它这些样本区分来自哪里。某种原理上,判别器可以替代固定的耗损函数,而且实验进修与教练数据分布相关的方式。


何为 Wasserstein GAN?


就其实质而言,任何生成模子的目标都是让模子(习得地)的分布与实数据之间的差别抵达最小。然而,古板 GAN 中的判别器 D 并不会当模子与实的分布重叠度不敷时去供应足够的新闻来估量这个差别度——这导致生成器得不到一个强有力的反应新闻(特别是教练之初),另外生成器的稳定性也广泛缺乏。


Wasserstein GAN 本来的根底之上添加了少许新的方法,让判别器 D 去拟合模子与实分布之间的 Wasserstein 间隔。Wassersterin 间隔会大致估量出「调解一个分布去立室另一个分布还需求众少义务」。另外,其定义的方法十分值妥当心,它以致可以适用于非重叠的分布。


为了让判别器 D 可以有用地拟合 Wasserstein 间隔:


  • 其权重必需紧致空间(compact space)之内。为了抵达这个目标,其权重需求每步教练之后,被调解到-0.01 到+0.01 的合区间上。然而,论文作家供认,虽然这关于裁剪间距的挑选并不是抱负且高敏锐的(highly sensitive),可是它实行中却是有用的。更众新闻可参睹论文 6 到 7 页。
  • 因为判别器被教练到了更好的形态上,以是它可认为生成器供应一个有用的梯度。
  • 判别器顶层需求有线性激活。
  • 它需求一个实质上不会改正判别器输出的代价函数。

K.mean(y_true * y_pred)

以 keras 这段耗损函数为例:


  •  这里采用 mean 来顺应差别的批大小以及乘积。
  •  预测的值通过乘上 element(可运用的真值)来最大化输出结果(优化器一般会将耗损函数的值最小化)。


论文作家外示,与 vanlillaGAN 比较,WGAN 有一下优点:


  • 有原理的耗损目标。判别器 D 的耗损可以与生成样本(这些样本使得可以更少地监控教练进程)的质料很好地联系起来。
  • 稳定性取得改良。当判别器 D 的教练抵达了最佳,它便可认为生成器 G 的教练供应一个有用的耗损。这意味着,对判别器 D 和生成器 G 的教练不必样本数目上保持均衡(相反, Vanilla GAN 方法中而这是均衡的)。另外,作家也外示,实行中,他们的 WGAN 模子没有爆发过一次解体的状况。


开端编程!


我们会 Keras 上完成 ACGAN 的 Wasserstein variety。 ACGAN 这种生成对立收集中,其判别器 D 不光可以预测样本的实与否,同时还可以将其举行归类。


系澜代码附有部分标明。


[1] 导入库文献:

import os

import matplotlib.pyplot as plt
%matplotlib inline
%config InlineBackend.figure_format = 'retina' # enable hi-res output

import numpy as np
import tensorflow as tf

import keras.backend as K
from keras.datasets import mnist
from keras.layers import *
from keras.models import *
from keras.optimizers import *
from keras.initializers import *
from keras.callbacks import *
from keras.utils.generic_utils import Progbar

[2].Runtime 配备

# random seed
RND = 777

# output settings
RUN = 'B'
OUT_DIR = 'out/' + RUN
TENSORBOARD_DIR = '/tensorboard/wgans/' + RUN
SAVE_SAMPLE_IMAGES = False

# GPU # to run on
GPU = "0"

BATCH_SIZE = 100
ITERATIONS = 20000

# size of the random vector used to initialize G
Z_SIZE = 100

[3]生成器 G 每举行一次迭代,判别器 D 都需求举行 D_ITERS 次迭代。


  • 因为 WGAN 中让判别器质料可以优化这件事更加主要,以是判别器 D 与生成器 G 教练次数上呈非对称比例。
  • 论文的 v2 版本中,判别器 D 生成器 G 每 1000 次迭代的前 25 次都会教练 100 次,另外,判别器也会当生成器每举行了 500 次迭代以后教练 100 次。

D_ITERS = 5

[4]其它准备:

# create output dir
if not os.path.isdir(OUT_DIR): os.makedirs(OUT_DIR)

# make only specific GPU to be utilized
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = GPU

# seed random generator for repeatability
np.random.seed(RND)

# force Keras to use last dimension for image channels
K.set_image_dim_ordering('tf')

[5]判别器的耗损函数:


  • 因为判别器 D 一方面力图教练数据分布与生成器 G 生成的数据之间习得一个 Wasserstein 间隔的拟合,另一方面判别器 D 又是线性激活的,以是这里我们不需求去改正它的输出结果。
  • 因为曾经运用了耗损函数 Mean,以是我们可以差别的批大小之间比较输出结果。
  • 预测结果等于真值(true value)与元素的点乘(element-wise multiplication),为了让判别器 D 的输出可以最大化(一般,优化器都力图去让耗损函数的值抵达最小),真值需求取-1。

def d_loss(y_true, y_pred):
    return K.mean(y_true * y_pred)

[6].创立判别器 D


判别器将图像举措输入,然后给出两个输出:

  • 用线性激活来评判生成图像的「虚假度」(最大化以用于生成图像)。
  •  用 softmax 激活来对图像品种举行预测。
  • 因为权重是从标准差为 0.02 的正态分布中初始化出来的,以是最初的剪裁不会去掉所有的权重。


 def create_D():


    # weights are initlaized from normal distribution with below params
    weight_init = RandomNormal(mean=0., stddev=0.02)

    input_image = Input(shape=(28, 28, 1), name='input_image')

    x = Conv2D(
        32, (3, 3),
        padding='same',
        name='conv_1',
        kernel_initializer=weight_init)(input_image)
    x = LeakyReLU()(x)
    x = MaxPool2D(pool_size=2)(x)
    x = Dropout(0.3)(x)

    x = Conv2D(
        64, (3, 3),
        padding='same',
        name='conv_2',
        kernel_initializer=weight_init)(x)
    x = MaxPool2D(pool_size=1)(x)
    x = LeakyReLU()(x)
    x = Dropout(0.3)(x)

    x = Conv2D(
        128, (3, 3),
        padding='same',
        name='conv_3',
        kernel_initializer=weight_init)(x)
    x = MaxPool2D(pool_size=2)(x)
    x = LeakyReLU()(x)
    x = Dropout(0.3)(x)

    x = Conv2D(
        256, (3, 3),
        padding='same',
        name='coonv_4',
        kernel_initializer=weight_init)(x)
    x = MaxPool2D(pool_size=1)(x)
    x = LeakyReLU()(x)
    x = Dropout(0.3)(x)

    features = Flatten()(x)

    output_is_fake = Dense(
        1, activation='linear', name='output_is_fake')(features)

    output_class = Dense(
        10, activation='softmax', name='output_class')(features)

    return Model(
        inputs=[input_image], outputs=[output_is_fake, output_class], name='D')


[7].创立生成器


生成器有两个输入:


  • 一个尺寸为Z_SIZE的潜随机变量。
  • 我们期望生成的数字类型(integer o 到 9)。


为了到场这些输入(input),integer 类型会内部转换成一个1 x DICT_LEN(本例中DICT_LEN = 10)的希罕向量,然后乘上嵌入的维度为 DICT_LEN x Z_SIZE的矩阵,结果取得一个维度为1 x Z_SIZE的鳞集向量。然后该向量乘上(点 乘)可以的输入(input),颠末众个上菜样和卷积层,着末其维度就可以和教练图像的维度立室了。

def create_G(Z_SIZE=Z_SIZE):
    DICT_LEN = 10
    EMBEDDING_LEN = Z_SIZE

    # weights are initialized from normal distribution with below params
    weight_init = RandomNormal(mean=0., stddev=0.02)

    # class#
    input_class = Input(shape=(1, ), dtype='int32', name='input_class')
    # encode class# to the same size as Z to use hadamard multiplication later on
    e = Embedding(
        DICT_LEN, EMBEDDING_LEN,
        embeddings_initializer='glorot_uniform')(input_class)
    embedded_class = Flatten(name='embedded_class')(e)

    # latent var
    input_z = Input(shape=(Z_SIZE, ), name='input_z')

    # hadamard product
    h = multiply([input_z, embedded_class], name='h')

    # cnn part
    x = Dense(1024)(h)
    x = LeakyReLU()(x)

    x = Dense(128 * 7 * 7)(x)
    x = LeakyReLU()(x)
    x = Reshape((7, 7, 128))(x)

    x = UpSampling2D(size=(2, 2))(x)
    x = Conv2D(256, (5, 5), padding='same', kernel_initializer=weight_init)(x)
    x = LeakyReLU()(x)

    x = UpSampling2D(size=(2, 2))(x)
    x = Conv2D(128, (5, 5), padding='same', kernel_initializer=weight_init)(x)
    x = LeakyReLU()(x)

    x = Conv2D(
        1, (2, 2),
        padding='same',
        activation='tanh',
        name='output_generated_image',
        kernel_initializer=weight_init)(x)

    return Model(inputs=[input_z, input_class], outputs=x, name='G')

[8].将判别器 D 和生成器 G 整合到一个模子中:

D = create_D()

D.compile(
    optimizer=RMSprop(lr=0.00005),
    loss=[d_loss, 'sparse_categorical_crossentropy'])

input_z = Input(shape=(Z_SIZE, ), name='input_z_')
input_class = Input(shape=(1, ),name='input_class_', dtype='int32')

G = create_G()

# create combined D(G) model
output_is_fake, output_class = D(G(inputs=[input_z, input_class]))
DG = Model(inputs=[input_z, input_class], outputs=[output_is_fake, output_class])

DG.compile(
    optimizer=RMSprop(lr=0.00005),
    loss=[d_loss, 'sparse_categorical_crossentropy']
)


[9].加载 MNIST 数据集:

# load mnist data
(X_train, y_train), (X_test, y_test) = mnist.load_data()

# use all available 70k samples from both train and test sets
X_train = np.concatenate((X_train, X_test))
y_train = np.concatenate((y_train, y_test))

# convert to -1..1 range, reshape to (sample_i, 28, 28, 1)
X_train = (X_train.astype(np.float32) - 127.5) / 127.5
X_train = np.expand_dims(X_train, axis=3)

[10].生成样本以及将目标和图像发送到 TensorBorad 的适用东西:

# save 10x10 sample of generated images
def generate_samples(n=0, save=True):

    zz = np.random.normal(0., 1., (100, Z_SIZE))
    generated_classes = np.array(list(range(0, 10)) * 10)
    generated_images = G.predict([zz, generated_classes.reshape(-1, 1)])

    rr = []
    for c in range(10):
        rr.append(
            np.concatenate(generated_images[c * 10:(1 + c) * 10]).reshape(
                280, 28))
    img = np.hstack(rr)

    if save:
        plt.imsave(OUT_DIR + '/samples_%07d.png' % n, img, cmap=plt.cm.gray)

    return img

# write tensorboard summaries
sw = tf.summary.FileWriter(TENSORBOARD_DIR)
def update_tb_summary(step, write_sample_images=True):

    s = tf.Summary()

    # losses as is
    for names, vals in zip((('D_real_is_fake', 'D_real_class'),
                            ('D_fake_is_fake', 'D_fake_class'), ('DG_is_fake',
                                                                 'DG_class')),
                           (D_true_losses, D_fake_losses, DG_losses)):

        v = s.value.add()
        v.simple_value = vals[-1][1]
        v.tag = names[0]

        v = s.value.add()
        v.simple_value = vals[-1][2]
        v.tag = names[1]

    # D loss: -1*D_true_is_fake - D_fake_is_fake
    v = s.value.add()
    v.simple_value = -D_true_losses[-1][1] - D_fake_losses[-1][1]
    v.tag = 'D loss (-1*D_real_is_fake - D_fake_is_fake)'

    # generated image
    if write_sample_images:
        img = generate_samples(step, save=True)
        s.MergeFromString(tf.Session().run(
            tf.summary.image('samples_%07d' % step,
                             img.reshape([1, *img.shape, 1]))))

    sw.add_summary(s, step)
    sw.flush()


[11].教练


教练进程包罗了以下方法:


1、消弭对判别器 D 权重的掌握,让它们变得可进修。

2、调解判别器的权重(调解到-0.01 到+0.01 合区间上)。

3、向判别器 D 供应实的样本,通过耗损函数中将其乘上-1 来尽可以最大化它的输出,最小化它的值。

4、向判别器 D 供应假的样本试图最小化其输出。

5、按照上文讲述的判别器迭代教练方法重复方法 3 和 4。

6、固定判别器 D 的权重。

7、教练一对判别器和生成器,竭力去最小化其输出。因为这种手腕优化了生成器 G 的权重,以是前面曾经教练好了的权重固定的判别器才会将生成的假样本判别为真图像。


progress_bar = Progbar(target=ITERATIONS)

DG_losses = []
D_true_losses = []
D_fake_losses = []

for it in range(ITERATIONS):

    if len(D_true_losses) > 0:
        progress_bar.update(
            it,
            values=[ # avg of 5 most recent
                    ('D_real_is_fake', np.mean(D_true_losses[-5:], axis=0)[1]),
                    ('D_real_class', np.mean(D_true_losses[-5:], axis=0)[2]),
                    ('D_fake_is_fake', np.mean(D_fake_losses[-5:], axis=0)[1]),
                    ('D_fake_class', np.mean(D_fake_losses[-5:], axis=0)[2]),
                    ('D(G)_is_fake', np.mean(DG_losses[-5:],axis=0)[1]),
                    ('D(G)_class', np.mean(DG_losses[-5:],axis=0)[2])
            ]
        )
        
    else:
        progress_bar.update(it)

    # 1: train D on real+generated images

    if (it % 1000) < 25 or it % 500 == 0: # 25 times in 1000, every 500th
        d_iters = 100
    else:
        d_iters = D_ITERS

    for d_it in range(d_iters):

        # unfreeze D
        D.trainable = True
        for l in D.layers: l.trainable = True

        # clip D weights

        for l in D.layers:
            weights = l.get_weights()
            weights = [np.clip(w, -0.01, 0.01) for w in weights]
            l.set_weights(weights)

        # 1.1: maximize D output on reals === minimize -1*(D(real))

        # draw random samples from real images
        index = np.random.choice(len(X_train), BATCH_SIZE, replace=False)
        real_images = X_train[index]
        real_images_classes = y_train[index]

        D_loss = D.train_on_batch(real_images, [-np.ones(BATCH_SIZE), 
          real_images_classes])
        D_true_losses.append(D_loss)

        # 1.2: minimize D output on fakes 

        zz = np.random.normal(0., 1., (BATCH_SIZE, Z_SIZE))
        generated_classes = np.random.randint(0, 10, BATCH_SIZE)
        generated_images = G.predict([zz, generated_classes.reshape(-1, 1)])

        D_loss = D.train_on_batch(generated_images, [np.ones(BATCH_SIZE),
          generated_classes])
        D_fake_losses.append(D_loss)

    # 2: train D(G) (D is frozen)
    # minimize D output while supplying it with fakes, 
    # telling it that they are reals (-1)

    # freeze D
    D.trainable = False
    for l in D.layers: l.trainable = False

    zz = np.random.normal(0., 1., (BATCH_SIZE, Z_SIZE)) 
    generated_classes = np.random.randint(0, 10, BATCH_SIZE)

    DG_loss = DG.train_on_batch(
        [zz, generated_classes.reshape((-1, 1))],
        [-np.ones(BATCH_SIZE), generated_classes])

    DG_losses.append(DG_loss)

    if it % 10 == 0:
        update_tb_summary(it, write_sample_images=(it % 250 == 0))


结论


视频的每一秒都是 250 次教练迭代。运用 Wasserstein GAN 的一个好处便是它有着耗损与样实质料之间的联系。如我们系澜看到的耗损图,当举行了约 8000 次教练之后,耗损开端迫近 0,视频中 32 秒位置时,我们曾经开端可以看到像样的图像了。




附论文地址:https://arxiv.org/pdf/1701.07875.pdf


参考文献

1. Wasserstein GAN paper (https://arxiv.org/pdf/1701.07875.pdf) – Martin Arjovsky, Soumith Chintala, Léon Bottou

2. NIPS 2016 Tutorial: Generative Adversarial Networks (https://arxiv.org/pdf/1701.00160.pdf) – Ian Goodfellow

3. Original PyTorch code for the Wasserstein GAN paper (https://github.com/martinarjovsky/WassersteinGAN)

4. Conditional Image Synthesis with Auxiliary Classifier GANs (https://arxiv.org/pdf/1610.09585v3.pdf) – Augustus Odena, Christopher Olah, Jonathon Shlens

5. Keras ACGAN implementation (https://github.com/lukedeo/keras-acgan) – Luke de Oliveira

6. Code for the article (https://gist.github.com/myurasov/6ecf449b32eb263e7d9a7f6e9aed5dc2)


原文链接:https://myurasov.github.io/2017/09/24/wasserstein-gan-keras.html?r


初学工程生成模子Wassertein GAN完成Keras
暂无评论
暂无评论~