6

Probabilistic Deep Learning with Tensorflow

 2 years ago
source link: https://sandipanweb.wordpress.com/2021/09/01/probabilistic-deep-learning-with-tensorflow/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

In this blog, we shall discuss on how to implement probabilistic deep learning models using Tensorflow. The problems to be discussed in this blog appeared in the exercises / projects in the coursera course “Probabilistic Deep Learning“, by Imperial College, London, as a part of TensorFlow 2 for Deep Learning Specialization. The problem statements / descriptions are taken from the course itself.

Naive Bayes and logistic regression with Tensorflow Probability

In this problem, we shall develop a Naive Bayes classifier model to the Iris dataset using Distribution objects from TensorFlow Probability. We shall also explore the connection between the Naive Bayes classifier and logistic regression.

The Iris dataset

In this problem, we shall use the Iris dataset. It consists of 50 samples from each of three species of Iris (Iris setosa, Iris virginica and Iris versicolor). Four features were measured from each sample: the length and the width of the sepals and petals, in centimeters. For a reference, see the following papers:

  • R. A. Fisher. “The use of multiple measurements in taxonomic problems”. Annals of Eugenics. 7 (2): 179–188, 1936.

Our goal will be to construct a Naive Bayes classifier model that predicts the correct class from the sepal length and sepal width features. Under certain assumptions about this classifier model, we shall explore the relation to logistic regression.

The following figures show the 3 categories of flowers from the Iris dataset, namely, setosa, versicolor and verginica, respectively.

Let’s start by importing the required libraries.

import tensorflow as tf
import tensorflow_probability as tfp
tfd = tfp.distributions
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score
from sklearn import datasets, model_selection
%matplotlib inline

Load and prepare the data

We will first read in the Iris dataset, and split the dataset into training and test sets, using the following code snippet.

iris = datasets.load_iris()
# Use only the first two features: sepal length and width
data = iris.data[:, :2]
targets = iris.target
x_train, x_test, y_train, y_test = model_selection.train_test_split(data, targets, test_size=0.2)
labels = {0: 'Iris-Setosa', 1: 'Iris-Versicolour', 2: 'Iris-Virginica'}
label_colours = ['blue', 'orange', 'green']
def plot_data(x, y, labels, colours):
for c in np.unique(y):
inx = np.where(y == c)
plt.scatter(x[inx, 0], x[inx, 1], label=labels[c], c=colours[c])
plt.title("Training set")
plt.xlabel("Sepal length (cm)")
plt.ylabel("Sepal width (cm)")
plt.legend()
plt.figure(figsize=(8, 5))
plot_data(x_train, y_train, labels, label_colours)
plt.show()

Naive Bayes classifier

We will briefly review the Naive Bayes classifier model. The fundamental equation for this classifier is Bayes’ rule:

In the above, dd is the number of features or dimensions in the inputs X (in our case d=2), and K is the number of classes (in our case K=3). The distribution P(Y) is the class prior distribution, which is a discrete distribution over K classes. The distribution P(X|Y) is the class-conditional distribution over inputs.

The Naive Bayes classifier makes the assumption that the data features Xi are conditionally independent give the class Y (the ‘naive’ assumption). In this case, the class-conditional distribution decomposes as follows:

This simplifying assumption means that we typically need to estimate far fewer parameters for each of the distributions P(Xi|Y=yk) instead of the full joint distribution P(X|Y=yk).

Once the class prior distribution and class-conditional densities are estimated, the Naive Bayes classifier model can then make a class prediction for a new data input according to

Define the class prior distribution

We will begin by defining the class prior distribution. To do this we will simply take the maximum likelihood estimate, given by

where the superscript (n) indicates the n-th dataset example, and N is the total number of examples in the dataset. The above is simply the proportion of data examples belonging to class k.

Let’s now define a function that builds the prior distribution from the training data, and returns it as a Categorical Distribution object.

  • The input to your function y will be a numpy array of shape (num_samples,)
  • The entries in y will be integer labels k=0,1,…,K−1
  • The function should build and return the prior distribution as a Categorical distribution object
    • The probabilities for this distribution will be a length-KK vector, with entries corresponding to P(Y=yk) for k=0,1,…,K−1
    • Your function should work for any value of K≥1
    • This Distribution will have an empty batch shape and empty event shape
def get_prior(y):
return tfd.Categorical(probs=[np.mean(y == i) for i in range(3)])
prior = get_prior(y_train)
# Plot the prior distribution
labels = ['Iris-Setosa', 'Iris-Versicolour', 'Iris-Virginica']
plt.bar([0, 1, 2], prior.probs.numpy(), color=label_colours)
plt.xlabel("Class")
plt.ylabel("Prior probability")
plt.title("Class prior distribution")
plt.xticks([0, 1, 2], labels)
plt.show()

Define the class-conditional densities

Let’s now turn to the definition of the class-conditional distributions P(Xi|Y=yk) for i=0,1 and k=0,1,2. In our model, we will assume these distributions to be univariate Gaussian:

with mean parameters μik and standard deviation parameters σik, twelve parameters in all. We will again estimate these parameters using maximum likelihood. In this case, the estimates are given by

Note that the above are just the means and variances of the sample data points for each class.

Let’s now implement a function the computes the class-conditional Gaussian densities, using the maximum likelihood parameter estimates given above, and returns them in a single, batched MultivariateNormalDiag Distribution object.

  • The inputs to the function are
    • a numpy array x of shape (num_samples, num_features) for the data inputs
    • a numpy array y of shape (num_samples,) for the target labels
  • The function should work for any number of classes K≥1 and any number of features d≥1
def get_class_conditionals(x, y):
mu = [np.mean(x[y == k], axis=0) for k in range(3)]
sigma = [np.sqrt(np.mean((x[y == k]-mu[k])**2, axis=0)) for k in range(3)]
return tfd.MultivariateNormalDiag(loc=mu, scale_diag=sigma)
class_conditionals = get_class_conditionals(x_train, y_train)

We can visualise the class-conditional densities with contour plots by running the cell below. Notice how the contours of each distribution correspond to a Gaussian distribution with diagonal covariance matrix, since the model assumes that each feature is independent given the class.

def get_meshgrid(x0_range, x1_range, num_points=100):
x0 = np.linspace(x0_range[0], x0_range[1], num_points)
x1 = np.linspace(x1_range[0], x1_range[1], num_points)
return np.meshgrid(x0, x1)
def contour_plot(x0_range, x1_range, prob_fn, batch_shape, colours, levels=None, num_points=100):
X0, X1 = get_meshgrid(x0_range, x1_range, num_points=num_points)
Z = prob_fn(np.expand_dims(np.array([X0.ravel(), X1.ravel()]).T, 1))
#print(Z.shape, batch_shape, 'x', *X0.shape)
Z = np.array(Z).T.reshape(batch_shape, *X0.shape)
for batch in np.arange(batch_shape):
if levels:
plt.contourf(X0, X1, Z[batch], alpha=0.2, colors=colours, levels=levels)
else:
plt.contour(X0, X1, Z[batch], colors=colours[batch], alpha=0.3)
plt.figure(figsize=(10, 6))
plot_data(x_train, y_train, labels, label_colours)
x0_min, x0_max = x_train[:, 0].min(), x_train[:, 0].max()
x1_min, x1_max = x_train[:, 1].min(), x_train[:, 1].max()
contour_plot((x0_min, x0_max), (x1_min, x1_max), class_conditionals.prob, 3, label_colours)
plt.title("Training set with class-conditional density contours")
plt.show()

Make predictions from the model

Now the prior and class-conditional distributions are defined, you can use them to compute the model’s class probability predictions for an unknown test input, according to

The class prediction can then be taken as the class with the maximum probability:

Let’s now implement a function to return the model’s class probabilities for a given batch of test inputs of shape (batch_shape, 2), where the batch_shape has rank at least one.

  • The inputs to the function are the prior and class_conditionals distributions, and the inputs x
  • The function should use these distributions to compute the probabilities for each class k as above
    • As before, the function should work for any number of classes K≥1
  • It should then compute the prediction by taking the class with the highest probability
  • The predictions should be returned in a numpy array of shape (batch_shape)
def predict_class(prior, class_conditionals, x):
x = x[:, np.newaxis, :]
return tf.argmax(tf.cast(class_conditionals.prob(x),tf.float32)*tf.cast(prior.probs,tf.float32),axis=1).numpy()
predictions = predict_class(prior, class_conditionals, x_test)
# Evaluate the model accuracy on the test set
accuracy = accuracy_score(y_test, predictions)
print("Test accuracy: {:.4f}".format(accuracy))
# Test accuracy: 0.8000
# Plot the model's decision regions
plt.figure(figsize=(10, 6))
plot_data(x_train, y_train, labels, label_colours)
x0_min, x0_max = x_train[:, 0].min(), x_train[:, 0].max()
x1_min, x1_max = x_train[:, 1].min(), x_train[:, 1].max()
contour_plot((x0_min, x0_max), (x1_min, x1_max),
lambda x: predict_class(prior, class_conditionals, x),
3, label_colours, levels=[-0.5, 0.5, 1.5, 2.5],
num_points=500)
plt.title("Training set with decision regions")
plt.show()

Binary classifier

We will now draw a connection between the Naive Bayes classifier and logistic regression.

First, we will update our model to be a binary classifier. In particular, the model will output the probability that a given input data sample belongs to the ‘Iris-Setosa’ class: P(Y=y0|X1,…,Xd). The remaining two classes will be pooled together with the label y1.

# Redefine the dataset to have binary labels
y_train_binary = np.array(y_train)
y_train_binary[np.where(y_train_binary == 2)] = 1
y_test_binary = np.array(y_test)
y_test_binary[np.where(y_test_binary == 2)] = 1
# Plot the training data
labels_binary = {0: 'Iris-Setosa', 1: 'Iris-Versicolour / Iris-Virginica'}
label_colours_binary = ['blue', 'red']
plt.figure(figsize=(8, 5))
plot_data(x_train, y_train_binary, labels_binary, label_colours_binary)
plt.show()

We will also make an extra modelling assumption that for each class kk, the class-conditional distribution P(Xi|Y=yk) for each feature i=0,1, has standard deviation σi, which is the same for each class k.

This means there are now six parameters in total: four for the means μik and two for the standard deviations σiσi (i,k=0,1).

We will again use maximum likelihood to estimate these parameters. The prior distribution will be as before, with the class prior probabilities given by

We will use our previous function get_prior to redefine the prior distribution.

prior_binary = get_prior(y_train_binary)
# Plot the prior distribution
plt.bar([0, 1], prior_binary.probs.numpy()[:-1], color=label_colours_binary)
plt.xlabel("Class")
plt.ylabel("Prior probability")
plt.title("Class prior distribution")
plt.xticks([0, 1], labels_binary)
plt.show()

For the class-conditional densities, the maximum likelihood estimate for the means are again given by

However, the estimate for the standard deviations σi is updated. There is also a closed-form solution for the shared standard deviations, but we will instead learn these from the data.

Let’s now implement a function that takes the training inputs and target labels as input, as well as an optimizer object, number of epochs and a TensorFlow Variable. This function should be written according to the following spec:

  • The inputs to the function are:
    • a numpy array x of shape (num_samples, num_features) for the data inputs
    • a numpy array y of shape (num_samples,) for the target labels
    • tf.Variable object scales of length 2 for the standard deviations σiσi
    • optimiser: an optimiser object
    • epochs: the number of epochs to run the training for
  • The function should first compute the means μik of the class-conditional Gaussians according to the above equation
  • Then create a batched multivariate Gaussian distribution object using MultivariateNormalDiag with the means set to μik and the scales set to scales
  • Run a custom training loop for epochs number of epochs, in which:
    • the average per-example negative log likelihood for the whole dataset is computed as the loss
    • the gradient of the loss with respect to the scales variables is computed
    • the scales variables are updated by the optimiser object
  • At each iteration, save the values of the scales variable and the loss
  • The function should return a tuple of three objects:
    • a numpy array of shape (epochs,) of loss values
    • a numpy array of shape (epochs, 2) of values for the scales variable at each iteration
    • the final learned batched MultivariateNormalDiag distribution object
mu = [np.mean(x_train[y_train_binary == k], axis=0) for k in range(2)]
def learn_stdevs(x, y, scales, optimiser, epochs):
def nll(x, y, distribution):
predictions = - distribution.log_prob(x)
return tf.reduce_sum(predictions[y==0][:,0]) + tf.reduce_sum(predictions[y==1][:,1])
@tf.function
def get_loss_and_grads(x, y, distribution):
with tf.GradientTape() as tape:
tape.watch(distribution.trainable_variables)
loss = nll(x, y, distribution)
grads = tape.gradient(loss, distribution.trainable_variables)
return loss, grads
shape = (len(set(y)), x.shape[-1])
loc = np.zeros(shape, dtype=np.float32)
for feature in range(shape[0]):
for category in range(shape[-1]):
data_point = x[y == category][:, feature]
loc[category, feature] = np.mean(data_point)
distribution = tfd.MultivariateNormalDiag(loc=loc, scale_diag=scales) #b(2), e(2)
x = np.expand_dims(x , 1).astype('float32')
train_loss_results = []
train_scale_results = []
for epoch in range(epochs):
loss, grads = get_loss_and_grads(x, y, distribution)
optimiser.apply_gradients(zip(grads, distribution.trainable_variables))
scales = distribution.parameters['scale_diag'].numpy()
train_loss_results.append(loss)
train_scale_results.append(scales)
if epoch % 10 == 0:
print(f'epoch: {epoch}, loss: {loss}')
return np.array(train_loss_results), np.array(train_scale_results), distribution
scales = tf.Variable([1., 1.])
opt = tf.keras.optimizers.Adam(learning_rate=0.01)
epochs = 500
# run the training loop
nlls, scales_arr, class_conditionals_binary = learn_stdevs(x_train, y_train_binary, scales, opt, epochs)
epoch: 0, loss: 246.33450317382812
epoch: 10, loss: 227.07168579101562
epoch: 20, loss: 207.1158905029297
epoch: 30, loss: 187.12120056152344
epoch: 40, loss: 168.60015869140625
epoch: 50, loss: 153.5633087158203
epoch: 60, loss: 143.8475341796875
epoch: 70, loss: 142.80393981933594
epoch: 80, loss: 142.56259155273438
epoch: 90, loss: 142.23074340820312
epoch: 100, loss: 142.25711059570312
epoch: 110, loss: 142.18955993652344
epoch: 120, loss: 142.1979217529297
epoch: 130, loss: 142.18882751464844
epoch: 140, loss: 142.18991088867188
epoch: 150, loss: 142.1887664794922
epoch: 160, loss: 142.1888885498047
epoch: 170, loss: 142.18875122070312
epoch: 180, loss: 142.1887664794922
epoch: 190, loss: 142.1887664794922
epoch: 200, loss: 142.1887664794922
epoch: 210, loss: 142.18875122070312
epoch: 220, loss: 142.1887664794922
epoch: 230, loss: 142.18873596191406
epoch: 240, loss: 142.18878173828125
epoch: 250, loss: 142.18875122070312
epoch: 260, loss: 142.18875122070312
epoch: 270, loss: 142.18875122070312
epoch: 280, loss: 142.18875122070312
epoch: 290, loss: 142.18875122070312
epoch: 300, loss: 142.18878173828125
epoch: 310, loss: 142.18875122070312
epoch: 320, loss: 142.18875122070312
epoch: 330, loss: 142.18875122070312
epoch: 340, loss: 142.18875122070312
epoch: 350, loss: 142.1887664794922
epoch: 360, loss: 142.1887664794922
epoch: 370, loss: 142.1887664794922
epoch: 380, loss: 142.1887664794922
epoch: 390, loss: 142.1887664794922
epoch: 400, loss: 142.1887664794922
epoch: 410, loss: 142.1887664794922
epoch: 420, loss: 142.1887664794922
epoch: 430, loss: 142.1887664794922
epoch: 440, loss: 142.1887664794922
epoch: 450, loss: 142.1887664794922
epoch: 460, loss: 142.1887664794922
epoch: 470, loss: 142.1887664794922
epoch: 480, loss: 142.1887664794922
epoch: 490, loss: 142.1887664794922
print("Class conditional means:")
print(class_conditionals_binary.loc.numpy())
print("\nClass conditional standard deviations:")
print(class_conditionals_binary.stddev().numpy())
Class conditional means:
[[4.9692307 3.3820512]
 [6.2172837 2.8814814]]

Class conditional standard deviations:
[[0.5590086  0.34253535]
 [0.5590086  0.34253535]]
# Plot the loss and convergence of the standard deviation parameters

fig, ax = plt.subplots(1, 2, figsize=(14, 5))
ax[0].plot(nlls)
ax[0].set_title("Loss vs epoch")
ax[0].set_xlabel("Epoch")
ax[0].set_ylabel("Average negative log-likelihood")
for k in [0, 1]:
    ax[1].plot(scales_arr[:, k], color=label_colours_binary[k], label=labels_binary[k])
ax[1].set_title("Standard deviation ML estimates vs epoch")
ax[1].set_xlabel("Epoch")
ax[1].set_ylabel("Standard deviation")
plt.legend()
plt.show()

We can also plot the contours of the class-conditional Gaussian distributions as before, this time with just binary labelled data. Notice the contours are the same for each class, just with a different centre location.

We can also plot the decision regions for this binary classifier model, notice that the decision boundary is now linear.

The following animation shows how we can learn the standard deviation parameters for the class-conditional distributions for Naive Bayes using tensorflow, with the above code snippet.

Link to logistic regression

In fact, we can see that our predictive distribution P(Y=y0|X) can be written as follows:

With our additional modelling assumption of a shared covariance matrix Σ, it can be shown (using the Gaussian pdf) that a is in fact a linear function of X:

The model therefore takes the form P(Y=y0|X)=σ(wTX+w0), with weights w∈R2 and bias w0∈R. This is the form used by logistic regression, and explains why the decision boundary above is linear.

In the above we have outlined the derivation of the generative logistic regression model. The parameters are typically estimated with maximum likelihood, as we have done.

Finally, we will use the above equations to directly parameterize the output Bernoulli distribution of the generative logistic regression model.

Let’s now write the following function, according to the following specification:

  • The inputs to the function are:
    • the prior distribution prior over the two classes
    • the (batched) class-conditional distribution class_conditionals
  • The function should use the parameters of the above distributions to compute the weights and bias terms w and w0 as above
  • The function should then return a tuple of two numpy arrays for w and w0
def get_logistic_regression_params(prior, class_conditionals):   
Sigma = class_conditionals.covariance().numpy()
SI = np.linalg.inv(Sigma)
p = prior.probs.numpy()
mu = class_conditionals.parameters['loc'] #.numpy()
w = SI @ (mu[0] - mu[1])
w0 = -0.5*mu[0].T@SI@mu[0] + 0.5*mu[1].T@SI@mu[1] + np.log(p[0]/p[1])
return w, w0
w, w0 = get_logistic_regression_params(prior_binary, class_conditionals_binary)

We can now use these parameters to make a contour plot to display the predictive distribution of our logistic regression model.

Probabilistic generative models

Let’s start with generative models, using normalizing flow networks and the variational autoencoder algorithm. We shall create a synthetic dataset with a normalizing flow with randomised parameters. This dataset will then be used to train a variational autoencoder, and the trained model will be used to interpolate between the generated images. The concepts to be used will be

  • Distribution objects
  • Probabilistic layers
  • Bijectors
  • ELBO optimization
  • KL divergence Regularizers.

The next figure represents the theory required for the implementation:

Let’s start by running importing the required libraries first. 

import tensorflow as tf
import tensorflow_probability as tfp
tfd = tfp.distributions
tfb = tfp.bijectors
tfpl = tfp.layers
import numpy as np
import matplotlib.pyplot as plt

We shall create our own image dataset from contour plots of a transformed distribution using a random normalizing flow network and then use the variational autoencoder algorithm to train generative and inference networks, and synthesise new images by interpolating in the latent space.

The normalising flow

  • To construct the image dataset, you will build a normalizing flow to transform the 2-D Gaussian random variable z = (z1, z2), which has mean 0 and covariance matrix Σ=σ^2.I2, with σ=0.3.
  • This normalizing flow uses bijectors that are parameterized by the following random variables:
    • θ ∼ U[0,2π)
    • a ∼ N(3,1)

The complete normalising flow is given by the following chain of transformations:

  • f1(z)=(z1,z2−2)
  • f2(z)=(z1,z2/2),
  • f3(z)=(z1,z2+a.z1^2),
  • f4(z)=R.z, where R is a rotation matrix with angle θ,
  • f5(z)=tanh(z), where the tanh function is applied elementwise.

The transformed random variable x is given by x=f5(f4(f3(f2(f1(z))))).

  • We need to use or construct bijectors for each of the transformations fi, i=1,…,5 and use tfb.Chain and tfb.TransformedDistribution to construct the final transformed distribution.
  • Ensure to implement the log_det_jacobian methods for any subclassed bijectors that you write.
  • Display a scatter plot of samples from the base distribution.
  • Display 4 scatter plot images of the transformed distribution from your random normalizing flow, using samples of θ and a. Fix the axes of these 4 plots to the range [−1,1].

The following code block shows how to implement the above steps:

def plot_distribution(samples, ax, title, col='red'):
ax.scatter(samples[:, 0], samples[:, 1], marker='.', c=col, alpha=0.5)
ax.set_xlim([-1,1])
ax.set_ylim([-1,1])
ax.set_title(title, size=15)
# f3(𝑧)=(𝑧1,𝑧2+𝑎𝑧1^2)
class Degree2Polynomial(tfb.Bijector):
def __init__(self, a):
self.a = a
super(Degree2Polynomial, self).__init__(forward_min_event_ndims=1, is_constant_jacobian=True)
def _forward(self, x):
return tf.concat([x[..., :1], x[..., 1:] + self.a * tf.square(x[..., :1])], axis=-1)
def _inverse(self, y):
return tf.concat([y[..., :1], y[..., 1:] - self.a * tf.square(y[..., :1])], axis=-1)
def _forward_log_det_jacobian(self, x):
return tf.constant(0., dtype=x.dtype)
# f4(𝑧)=Rz
class Rotation(tfb.Bijector):
def __init__(self, theta):
self.R = tf.constant([[np.cos(theta), -np.sin(theta)],
[np.sin(theta), np.cos(theta)]], dtype=tf.float32)
super(Rotation, self).__init__(forward_min_event_ndims=1, is_constant_jacobian=True)
def _forward(self, x):
return tf.linalg.matvec(self.R, x)
def _inverse(self, y):
return tf.linalg.matvec(tf.transpose(self.R), y)
def _forward_log_det_jacobian(self, x):
return tf.constant(0., x.dtype)
def get_normalizing_flow_dist(a, theta):
bijectors = [
tfb.Shift([0.,-2]),   # f1
tfb.Scale([1,1/2]),   # f2
Degree2Polynomial(a), # f3
Rotation(theta),      # f4
tfb.Tanh()            # f5
]
flow_bijector = tfb.Chain(list(reversed(bijectors)))
return tfd.TransformedDistribution(distribution=base_distribution,
bijector=flow_bijector)
nsamples= 10000
sigma = 0.3
base_distribution = tfd.MultivariateNormalDiag(loc=tf.zeros(2), scale_diag=sigma*tf.ones(2))
samples = base_distribution.sample(nsamples)
fig, ax = plt.subplots(figsize=(8,8))
plot_distribution(samples, ax, 'Base distribution', 'blue')
plt.show()
fig, axes = plt.subplots(nrows=2, ncols=2, figsize=(15,15))
axes = axes.flatten()
plt.subplots_adjust(0, 0, 1, 0.925, 0.05, 0.05)
colors = ['red', 'green', 'orange', 'magenta']
for i in range(4):
a = tfd.Normal(loc=3, scale=1).sample(1)[0].numpy()
theta = tfd.Uniform(low = 0, high = 2*np.pi).sample(1)[0].numpy()
transformed_distribution = get_normalizing_flow_dist(a, theta)
samples = transformed_distribution.sample(nsamples)
plot_distribution(samples, axes[i], r'$\theta$={:.02f}, a={:.02f}'.format(theta, a), colors[i])
plt.suptitle('Transformed Distribution with Normalizing Flow', size=20)
plt.show()

Create the image dataset

  • Let’s now use your random normalizing flow to generate an image dataset of contour plots from our random normalising flow network.
  • First, let’s display a sample of 4 contour plot images from our normalizing flow network using 4 independently sampled sets of parameters, using the following get_densities function: this function calculates density values for a (batched) Distribution for use in a contour plot.
  • The dataset should consist of at least 1000 images, stored in a numpy array of shape (N, 36, 36, 3). Each image in the dataset should correspond to a contour plot of a transformed distribution from a normalizing flow with an independently sampled set of parameters. It will take a few minutes to create the dataset.
  • As well as the get_densities function, the following get_image_array_from_density_values function will help to generate the dataset. This function creates a numpy array for an image of the contour plot for a given set of density values Z. Feel free to choose your own options for the contour plots.
  • Let’s display a sample of 20 images from your generated dataset in a figure.
X, Y = np.meshgrid(np.linspace(-1, 1, 100), np.linspace(-1, 1, 100))
inputs = np.transpose(np.stack((X, Y)), [1, 2, 0])
def get_densities(transformed_distribution):
batch_shape = transformed_distribution.batch_shape
Z = transformed_distribution.prob(np.expand_dims(inputs, 2))
Z = np.transpose(Z, list(range(2, 2+len(batch_shape))) + [0, 1])
return Z
import numpy as np
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
from matplotlib.figure import Figure
def get_image_array_from_density_values(Z):
assert Z.shape == (100, 100)
fig = Figure(figsize=(0.5, 0.5))
canvas = FigureCanvas(fig)
ax = fig.gca()
ax.contourf(X, Y, Z, cmap='hot', levels=100)
ax.axis('off')
fig.tight_layout(pad=0)
ax.margins(0)
fig.canvas.draw()
image_from_plot = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
image_from_plot = image_from_plot.reshape(fig.canvas.get_width_height()[::-1] + (3,))
return image_from_plot
plt.figure(figsize=(5,5))
plt.subplots_adjust(0, 0, 1, 0.95, 0.05, 0.08)
for i in range(4):
a = tfd.Normal(loc=3, scale=1).sample(1)[0].numpy()
theta = tfd.Uniform(low = 0, high = 2*np.pi).sample(1)[0].numpy()
transformed_distribution = get_normalizing_flow_dist(a, theta)
transformed_distribution = tfd.BatchReshape(transformed_distribution, [1])
Z = get_densities(transformed_distribution)
image = get_image_array_from_density_values(Z.squeeze())
plt.subplot(2,2,i+1), plt.imshow(image), plt.axis('off')
plt.title(r'$\theta$={:.02f}, a={:.02f}'.format(theta, a), size=10)
plt.show()
N = 1000
image_dataset = np.zeros((N, 36, 36, 3))
for i in range(N):
a = tfd.Normal(loc=3, scale=1).sample(1)[0].numpy()
theta = tfd.Uniform(low = 0, high = 2*np.pi).sample(1)[0].numpy()
transformed_distribution = tfd.BatchReshape(get_normalizing_flow_dist(a, theta), [1])
image_dataset[i,...] = get_image_array_from_density_values(get_densities(transformed_distribution).squeeze())
image_dataset = tf.convert_to_tensor(image_dataset, dtype=tf.float32)
image_dataset.shape
# TensorShape([1000, 36, 36, 3])
plt.figure(figsize=(20,4))
plt.subplots_adjust(0, 0, 1, 0.95, 0.05, 0.08)
indices = np.random.choice(N, 20)
for i in range(20):
image = image_dataset[indices[i]].numpy()
image = image / image.max()
plt.subplot(2,10,i+1), plt.imshow(image), plt.axis('off')
plt.show()

Create tf.data.Dataset objects

  • Let’s now split your dataset to create tf.data.Dataset objects for training and validation data.
  • Using the map method, ;et’s normalize the pixel values so that they lie between 0 and 1.
  • These Datasets will be used to train a variational autoencoder (VAE). Use the map method to return a tuple of input and output Tensors where the image is duplicated as both input and output.
  • Randomly shuffle the training Dataset.
  • Batch both datasets with a batch size of 20, setting drop_remainder=True.
  • Print the element_spec property for one of the Dataset objects.
n = len(image_dataset)
tf_image_dataset = tf.data.Dataset.from_tensor_slices(image_dataset)
tf_image_dataset = tf_image_dataset.shuffle(3)
tf_image_dataset = tf_image_dataset.map(lambda x : x / tf.reduce_max(x))
tf_image_dataset = tf_image_dataset.map(lambda x: (x, x))
train_sz = int(0.8*n)
training = tf_image_dataset.take(train_sz)
validation = tf_image_dataset.skip(train_sz)
training = training.batch(batch_size=20, drop_remainder=True)
validation = validation.batch(batch_size=20, drop_remainder=True)
training.element_spec
#(TensorSpec(shape=(20, 36, 36, 3), dtype=tf.float32, name=None),
# TensorSpec(shape=(20, 36, 36, 3), dtype=tf.float32, name=None))

Build the encoder and decoder networks

  • Let’s now create the encoder and decoder for the variational autoencoder algorithm.
  • Let’s design these networks, subject to the following constraints:
    • The encoder and decoder networks should be built using the Sequential class.
    • The encoder and decoder networks should use probabilistic layers where necessary to represent distributions.
    • The prior distribution should be a zero-mean, isotropic Gaussian (identity covariance matrix).
    • The encoder network should add the KL divergence loss to the model.
  • Print the model summary for the encoder and decoder networks.
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import (Dense, Flatten, Reshape, Concatenate, Conv2D, UpSampling2D, BatchNormalization)
latent_dim = 2 #50
prior = tfd.MultivariateNormalDiag(loc=tf.zeros(latent_dim))
def get_kl_regularizer(prior_distribution):
return tfpl.KLDivergenceRegularizer(prior_distribution,
weight=1.0,
use_exact_kl=False,
test_points_fn=lambda q: q.sample(3),
test_points_reduce_axis=(0,1))       
kl_regularizer = get_kl_regularizer(prior)
def get_encoder(latent_dim, kl_regularizer):
return Sequential([
Conv2D(filters=32, kernel_size=3, activation='relu', strides=2, padding='same', input_shape=(36,36,3)),
BatchNormalization(),
Conv2D(filters=64, kernel_size=3, activation='relu', strides=2, padding='same'),
BatchNormalization(),
Conv2D(filters=128, kernel_size=3, activation='relu', strides=3, padding='same'),
BatchNormalization(),
Flatten(),
Dense(tfpl.MultivariateNormalTriL.params_size(latent_dim)),
tfpl.MultivariateNormalTriL(latent_dim, activity_regularizer=kl_regularizer)
], name='encoder')     
def get_decoder(latent_dim):
return Sequential([
Dense(1152, activation='relu', input_shape=(latent_dim,)),
Reshape((3,3,128)),
UpSampling2D(size=(3,3)),
Conv2D(filters=64, kernel_size=3, activation='relu', padding='same'),
UpSampling2D(size=(2,2)),
Conv2D(filters=32, kernel_size=2, activation='relu', padding='same'),
UpSampling2D(size=(2,2)),
Conv2D(filters=128, kernel_size=2, activation='relu', padding='same'),
Conv2D(filters=3, kernel_size=2, activation=None, padding='same'),
Flatten(),  
tfpl.IndependentBernoulli(event_shape=(36,36,3))
], name='decoder')   
encoder = get_encoder(latent_dim=2, kl_regularizer=kl_regularizer)
#encoder.losses
encoder.summary()
Model: "encoder"
_________________________________________________________________
Layer (type)                 Output Shape              Param #  
=================================================================
conv2d (Conv2D)              (None, 18, 18, 32)        896      
_________________________________________________________________
batch_normalization (BatchNo (None, 18, 18, 32)        128      
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 9, 9, 64)          18496    
_________________________________________________________________
batch_normalization_1 (Batch (None, 9, 9, 64)          256      
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 3, 3, 128)         73856    
_________________________________________________________________
batch_normalization_2 (Batch (None, 3, 3, 128)         512      
_________________________________________________________________
flatten (Flatten)            (None, 1152)              0        
_________________________________________________________________
dense (Dense)                (None, 5)                 5765     
_________________________________________________________________
multivariate_normal_tri_l (M multiple                  0        
=================================================================
Total params: 99,909
Trainable params: 99,461
Non-trainable params: 448
_________________________________________________________________
decoder = get_decoder(latent_dim=2)
decoder.summary()
Model: "decoder"
_________________________________________________________________
Layer (type)                 Output Shape              Param #  
=================================================================
dense_1 (Dense)              (None, 1152)              3456     
_________________________________________________________________
reshape (Reshape)            (None, 3, 3, 128)         0        
_________________________________________________________________
up_sampling2d (UpSampling2D) (None, 9, 9, 128)         0        
_________________________________________________________________
conv2d_3 (Conv2D)            (None, 9, 9, 64)          73792    
_________________________________________________________________
up_sampling2d_1 (UpSampling2 (None, 18, 18, 64)        0        
_________________________________________________________________
conv2d_4 (Conv2D)            (None, 18, 18, 32)        8224     
_________________________________________________________________
up_sampling2d_2 (UpSampling2 (None, 36, 36, 32)        0        
_________________________________________________________________
conv2d_5 (Conv2D)            (None, 36, 36, 128)       16512    
_________________________________________________________________
conv2d_6 (Conv2D)            (None, 36, 36, 3)         1539     
_________________________________________________________________
flatten_1 (Flatten)          (None, 3888)              0        
_________________________________________________________________
independent_bernoulli (Indep multiple                  0        
=================================================================
Total params: 103,523
Trainable params: 103,523
Non-trainable params: 0
____________________________
def reconstruction_loss(batch_of_images, decoding_dist):
return -tf.reduce_mean(decoding_dist.log_prob(batch_of_images))

Train the variational autoencoder

  • Let’s now train the variational autoencoder. Build the VAE using the Model class and the encoder and decoder models. Print the model summary.
  • Compile the VAE with the negative log likelihood loss and train with the fit method, using the training and validation Datasets.
  • Plot the learning curves for loss vs epoch for both training and validation sets.
vae = Model(inputs=encoder.inputs, outputs=decoder(encoder.outputs))
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0005)
vae.compile(optimizer=optimizer, loss=reconstruction_loss)
history = vae.fit(training, validation_data=validation, epochs=20)
Epoch 1/20
40/40 [==============================] - 34s 777ms/step - loss: 1250.2296 - val_loss: 1858.7103
Epoch 2/20
40/40 [==============================] - 29s 731ms/step - loss: 661.8687 - val_loss: 1605.1261
Epoch 3/20
40/40 [==============================] - 29s 720ms/step - loss: 545.2802 - val_loss: 1245.0518
Epoch 4/20
40/40 [==============================] - 28s 713ms/step - loss: 489.1101 - val_loss: 1024.5863
Epoch 5/20
40/40 [==============================] - 29s 718ms/step - loss: 453.3464 - val_loss: 841.4725
Epoch 6/20
40/40 [==============================] - 29s 733ms/step - loss: 438.8413 - val_loss: 742.0212
Epoch 7/20
40/40 [==============================] - 30s 751ms/step - loss: 433.2563 - val_loss: 657.4024
Epoch 8/20
40/40 [==============================] - 30s 751ms/step - loss: 417.5353 - val_loss: 602.7039
Epoch 9/20
40/40 [==============================] - 29s 726ms/step - loss: 409.8351 - val_loss: 545.5004
Epoch 10/20
40/40 [==============================] - 30s 741ms/step - loss: 406.3284 - val_loss: 507.9868
Epoch 11/20
40/40 [==============================] - 30s 741ms/step - loss: 402.9056 - val_loss: 462.0777
Epoch 12/20
40/40 [==============================] - 29s 733ms/step - loss: 397.4801 - val_loss: 444.4444
Epoch 13/20
40/40 [==============================] - 30s 741ms/step - loss: 398.2078 - val_loss: 423.1287
Epoch 14/20
40/40 [==============================] - 29s 723ms/step - loss: 395.5187 - val_loss: 411.3030
Epoch 15/20
40/40 [==============================] - 30s 739ms/step - loss: 397.3987 - val_loss: 407.5134
Epoch 16/20
40/40 [==============================] - 29s 721ms/step - loss: 399.3271 - val_loss: 402.7288
Epoch 17/20
40/40 [==============================] - 29s 736ms/step - loss: 393.4259 - val_loss: 401.4711
Epoch 18/20
40/40 [==============================] - 29s 726ms/step - loss: 390.5508 - val_loss: 399.1924
Epoch 19/20
40/40 [==============================] - 29s 736ms/step - loss: 389.3187 - val_loss: 401.1656
Epoch 20/20
40/40 [==============================] - 29s 728ms/step - loss: 389.4718 - val_loss: 393.5178
nepochs = 20
plt.figure(figsize=(8,5))
plt.plot(range(nepochs), history.history['loss'], label='train-loss')
plt.plot(range(nepochs), history.history['val_loss'], label='valid-loss')
plt.legend()
plt.xlabel('epochs')
plt.ylabel('loss')
plt.show()

Use the encoder and decoder networks

  • Let’s now put your encoder and decoder networks into practice!
  • Randomly sample 1000 images from the dataset, and pass them through the encoder. Display the embeddings in a scatter plot (project to 2 dimensions if the latent space has dimension higher than two).
  • Randomly sample 4 images from the dataset and for each image, display the original and reconstructed image from the VAE in a figure.
    • Use the mean of the output distribution to display the images.
  • Randomly sample 6 latent variable realisations from the prior distribution, and display the images in a figure.
    • Again use the mean of the output distribution to display the images.
def reconstruct(encoder, decoder, batch_of_images):
approx_distribution = encoder(batch_of_images)
decoding_dist = decoder(approx_distribution.mean())
return decoding_dist.mean()
embedding = encoder(image_dataset / 255).mean()
fig, ax = plt.subplots(figsize=(8,8))
plt.scatter(embedding[:,0], embedding[:,1], c='red', s=50, edgecolor='k')
plt.title('Embedding', size=20)
plt.show()
plt.figure(figsize=(6,12))
plt.subplots_adjust(0, 0, 1, 0.95, 0.05, 0.08)
indices = np.random.choice(len(image_dataset), 4)
for i in range(4):
image = image_dataset[indices[i]].numpy()
image = image / image.max()
plt.subplot(4,2,2*i+1), plt.imshow(image), plt.axis('off')
reconstructions = reconstruct(encoder, decoder, np.expand_dims(image, axis=0))
plt.subplot(4,2,2*i+2), plt.imshow(reconstructions[0].numpy()), plt.axis('off')
plt.suptitle('original (left column) vs. VAE-reconstructed (right column)', size=15)
plt.show()
nsample = 6
samples = np.random.uniform(-10, 10, (nsample, latent_dim)) #prior.sample(6)
fig, ax = plt.subplots(figsize=(8,8))
plt.scatter(samples[:,0], samples[:,1], color='blue')
for i in range(nsample):
plt.text(samples[i,0] + 0.05, samples[i,1] + 0.05, 'embedding {}'.format(i), fontsize=15)
plt.title('Embeddings', size=20)
plt.show()
reconstructions = decoder(samples).mean()
#print(samples.shape, reconstructions.shape)
plt.figure(figsize=(8,6))
plt.subplots_adjust(0, 0, 1, 0.9, 0.05, 0.08)
indices = np.random.choice(len(image_dataset), 4)
for i in range(nsample):
plt.subplot(2,3,i+1), plt.imshow(reconstructions[i]), plt.title('image {}'.format(i)), plt.axis('off')
plt.suptitle('VAE-reconstructions', size=20)
plt.show()

The following animation of latent space interpolation shows the decoder’s generations, depending on the latent space.

To be continued…


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK