Probabilistic Deep Learning with Tensorflow
source link: https://sandipanweb.wordpress.com/2021/09/01/probabilistic-deep-learning-with-tensorflow/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
In this blog, we shall discuss on how to implement probabilistic deep learning models using Tensorflow. The problems to be discussed in this blog appeared in the exercises / projects in the coursera course “Probabilistic Deep Learning“, by Imperial College, London, as a part of TensorFlow 2 for Deep Learning Specialization. The problem statements / descriptions are taken from the course itself.
Naive Bayes and logistic regression with Tensorflow Probability
In this problem, we shall develop a Naive Bayes classifier model to the Iris dataset using Distribution objects from TensorFlow Probability. We shall also explore the connection between the Naive Bayes classifier and logistic regression.
The Iris dataset
In this problem, we shall use the Iris dataset. It consists of 50 samples from each of three species of Iris (Iris setosa, Iris virginica and Iris versicolor). Four features were measured from each sample: the length and the width of the sepals and petals, in centimeters. For a reference, see the following papers:
- R. A. Fisher. “The use of multiple measurements in taxonomic problems”. Annals of Eugenics. 7 (2): 179–188, 1936.
Our goal will be to construct a Naive Bayes classifier model that predicts the correct class from the sepal length and sepal width features. Under certain assumptions about this classifier model, we shall explore the relation to logistic regression.
The following figures show the 3 categories of flowers from the Iris dataset, namely, setosa, versicolor and verginica, respectively.
Let’s start by importing the required libraries.
import
tensorflow as tf
import
tensorflow_probability as tfp
tfd
=
tfp.distributions
import
numpy as np
import
matplotlib.pyplot as plt
from
sklearn.metrics
import
accuracy_score
from
sklearn
import
datasets, model_selection
%
matplotlib inline
Load and prepare the data
We will first read in the Iris dataset, and split the dataset into training and test sets, using the following code snippet.
iris
=
datasets.load_iris()
# Use only the first two features: sepal length and width
data
=
iris.data[:, :
2
]
targets
=
iris.target
x_train, x_test, y_train, y_test
=
model_selection.train_test_split(data, targets, test_size
=
0.2
)
labels
=
{
0
:
'Iris-Setosa'
,
1
:
'Iris-Versicolour'
,
2
:
'Iris-Virginica'
}
label_colours
=
[
'blue'
,
'orange'
,
'green'
]
def
plot_data(x, y, labels, colours):
for
c
in
np.unique(y):
inx
=
np.where(y
=
=
c)
plt.scatter(x[inx,
0
], x[inx,
1
], label
=
labels[c], c
=
colours[c])
plt.title(
"Training set"
)
plt.xlabel(
"Sepal length (cm)"
)
plt.ylabel(
"Sepal width (cm)"
)
plt.legend()
plt.figure(figsize
=
(
8
,
5
))
plot_data(x_train, y_train, labels, label_colours)
plt.show()
Naive Bayes classifier
We will briefly review the Naive Bayes classifier model. The fundamental equation for this classifier is Bayes’ rule:
In the above, dd is the number of features or dimensions in the inputs X (in our case d=2), and K is the number of classes (in our case K=3). The distribution P(Y) is the class prior distribution, which is a discrete distribution over K classes. The distribution P(X|Y) is the class-conditional distribution over inputs.
The Naive Bayes classifier makes the assumption that the data features Xi are conditionally independent give the class Y (the ‘naive’ assumption). In this case, the class-conditional distribution decomposes as follows:
This simplifying assumption means that we typically need to estimate far fewer parameters for each of the distributions P(Xi|Y=yk) instead of the full joint distribution P(X|Y=yk).
Once the class prior distribution and class-conditional densities are estimated, the Naive Bayes classifier model can then make a class prediction for a new data input according to
Define the class prior distribution
We will begin by defining the class prior distribution. To do this we will simply take the maximum likelihood estimate, given by
where the superscript (n) indicates the n-th dataset example, and N is the total number of examples in the dataset. The above is simply the proportion of data examples belonging to class k.
Let’s now define a function that builds the prior distribution from the training data, and returns it as a Categorical
Distribution object.
- The input to your function
y
will be a numpy array of shape(num_samples,)
- The entries in
y
will be integer labels k=0,1,…,K−1 - The function should build and return the prior distribution as a
Categorical
distribution object- The probabilities for this distribution will be a length-KK vector, with entries corresponding to P(Y=yk) for k=0,1,…,K−1
- Your function should work for any value of K≥1
- This Distribution will have an empty batch shape and empty event shape
def
get_prior(y):
return
tfd.Categorical(probs
=
[np.mean(y
=
=
i)
for
i
in
range
(
3
)])
prior
=
get_prior(y_train)
# Plot the prior distribution
labels
=
[
'Iris-Setosa'
,
'Iris-Versicolour'
,
'Iris-Virginica'
]
plt.bar([
0
,
1
,
2
], prior.probs.numpy(), color
=
label_colours)
plt.xlabel(
"Class"
)
plt.ylabel(
"Prior probability"
)
plt.title(
"Class prior distribution"
)
plt.xticks([
0
,
1
,
2
], labels)
plt.show()
Define the class-conditional densities
Let’s now turn to the definition of the class-conditional distributions P(Xi|Y=yk) for i=0,1 and k=0,1,2. In our model, we will assume these distributions to be univariate Gaussian:
with mean parameters μik and standard deviation parameters σik, twelve parameters in all. We will again estimate these parameters using maximum likelihood. In this case, the estimates are given by
Note that the above are just the means and variances of the sample data points for each class.
Let’s now implement a function the computes the class-conditional Gaussian densities, using the maximum likelihood parameter estimates given above, and returns them in a single, batched MultivariateNormalDiag
Distribution object.
- The inputs to the function are
- a numpy array
x
of shape(num_samples, num_features)
for the data inputs - a numpy array
y
of shape(num_samples,)
for the target labels
- a numpy array
- The function should work for any number of classes K≥1 and any number of features d≥1
def
get_class_conditionals(x, y):
mu
=
[np.mean(x[y
=
=
k], axis
=
0
)
for
k
in
range
(
3
)]
sigma
=
[np.sqrt(np.mean((x[y
=
=
k]
-
mu[k])
*
*
2
, axis
=
0
))
for
k
in
range
(
3
)]
return
tfd.MultivariateNormalDiag(loc
=
mu, scale_diag
=
sigma)
class_conditionals
=
get_class_conditionals(x_train, y_train)
We can visualise the class-conditional densities with contour plots by running the cell below. Notice how the contours of each distribution correspond to a Gaussian distribution with diagonal covariance matrix, since the model assumes that each feature is independent given the class.
def
get_meshgrid(x0_range, x1_range, num_points
=
100
):
x0
=
np.linspace(x0_range[
0
], x0_range[
1
], num_points)
x1
=
np.linspace(x1_range[
0
], x1_range[
1
], num_points)
return
np.meshgrid(x0, x1)
def
contour_plot(x0_range, x1_range, prob_fn, batch_shape, colours, levels
=
None
, num_points
=
100
):
X0, X1
=
get_meshgrid(x0_range, x1_range, num_points
=
num_points)
Z
=
prob_fn(np.expand_dims(np.array([X0.ravel(), X1.ravel()]).T,
1
))
#print(Z.shape, batch_shape, 'x', *X0.shape)
Z
=
np.array(Z).T.reshape(batch_shape,
*
X0.shape)
for
batch
in
np.arange(batch_shape):
if
levels:
plt.contourf(X0, X1, Z[batch], alpha
=
0.2
, colors
=
colours, levels
=
levels)
else
:
plt.contour(X0, X1, Z[batch], colors
=
colours[batch], alpha
=
0.3
)
plt.figure(figsize
=
(
10
,
6
))
plot_data(x_train, y_train, labels, label_colours)
x0_min, x0_max
=
x_train[:,
0
].
min
(), x_train[:,
0
].
max
()
x1_min, x1_max
=
x_train[:,
1
].
min
(), x_train[:,
1
].
max
()
contour_plot((x0_min, x0_max), (x1_min, x1_max), class_conditionals.prob,
3
, label_colours)
plt.title(
"Training set with class-conditional density contours"
)
plt.show()
Make predictions from the model
Now the prior and class-conditional distributions are defined, you can use them to compute the model’s class probability predictions for an unknown test input, according to
The class prediction can then be taken as the class with the maximum probability:
Let’s now implement a function to return the model’s class probabilities for a given batch of test inputs of shape (batch_shape, 2)
, where the batch_shape
has rank at least one.
- The inputs to the function are the
prior
andclass_conditionals
distributions, and the inputsx
- The function should use these distributions to compute the probabilities for each class k as above
- As before, the function should work for any number of classes K≥1
- It should then compute the prediction by taking the class with the highest probability
- The predictions should be returned in a numpy array of shape
(batch_shape)
def
predict_class(prior, class_conditionals, x):
x
=
x[:, np.newaxis, :]
return
tf.argmax(tf.cast(class_conditionals.prob(x),tf.float32)
*
tf.cast(prior.probs,tf.float32),axis
=
1
).numpy()
predictions
=
predict_class(prior, class_conditionals, x_test)
# Evaluate the model accuracy on the test set
accuracy
=
accuracy_score(y_test, predictions)
print
(
"Test accuracy: {:.4f}"
.
format
(accuracy))
# Test accuracy: 0.8000
# Plot the model's decision regions
plt.figure(figsize
=
(
10
,
6
))
plot_data(x_train, y_train, labels, label_colours)
x0_min, x0_max
=
x_train[:,
0
].
min
(), x_train[:,
0
].
max
()
x1_min, x1_max
=
x_train[:,
1
].
min
(), x_train[:,
1
].
max
()
contour_plot((x0_min, x0_max), (x1_min, x1_max),
lambda
x: predict_class(prior, class_conditionals, x),
3
, label_colours, levels
=
[
-
0.5
,
0.5
,
1.5
,
2.5
],
num_points
=
500
)
plt.title(
"Training set with decision regions"
)
plt.show()
Binary classifier
We will now draw a connection between the Naive Bayes classifier and logistic regression.
First, we will update our model to be a binary classifier. In particular, the model will output the probability that a given input data sample belongs to the ‘Iris-Setosa’ class: P(Y=y0|X1,…,Xd). The remaining two classes will be pooled together with the label y1.
# Redefine the dataset to have binary labels
y_train_binary
=
np.array(y_train)
y_train_binary[np.where(y_train_binary
=
=
2
)]
=
1
y_test_binary
=
np.array(y_test)
y_test_binary[np.where(y_test_binary
=
=
2
)]
=
1
# Plot the training data
labels_binary
=
{
0
:
'Iris-Setosa'
,
1
:
'Iris-Versicolour / Iris-Virginica'
}
label_colours_binary
=
[
'blue'
,
'red'
]
plt.figure(figsize
=
(
8
,
5
))
plot_data(x_train, y_train_binary, labels_binary, label_colours_binary)
plt.show()
We will also make an extra modelling assumption that for each class kk, the class-conditional distribution P(Xi|Y=yk) for each feature i=0,1, has standard deviation σi, which is the same for each class k.
This means there are now six parameters in total: four for the means μik and two for the standard deviations σiσi (i,k=0,1).
We will again use maximum likelihood to estimate these parameters. The prior distribution will be as before, with the class prior probabilities given by
We will use our previous function get_prior
to redefine the prior distribution.
prior_binary
=
get_prior(y_train_binary)
# Plot the prior distribution
plt.bar([
0
,
1
], prior_binary.probs.numpy()[:
-
1
], color
=
label_colours_binary)
plt.xlabel(
"Class"
)
plt.ylabel(
"Prior probability"
)
plt.title(
"Class prior distribution"
)
plt.xticks([
0
,
1
], labels_binary)
plt.show()
For the class-conditional densities, the maximum likelihood estimate for the means are again given by
However, the estimate for the standard deviations σi is updated. There is also a closed-form solution for the shared standard deviations, but we will instead learn these from the data.
Let’s now implement a function that takes the training inputs and target labels as input, as well as an optimizer object, number of epochs and a TensorFlow Variable. This function should be written according to the following spec:
- The inputs to the function are:
- a numpy array
x
of shape(num_samples, num_features)
for the data inputs - a numpy array
y
of shape(num_samples,)
for the target labels - a
tf.Variable
objectscales
of length 2 for the standard deviations σiσi optimiser
: an optimiser objectepochs
: the number of epochs to run the training for
- a numpy array
- The function should first compute the means μik of the class-conditional Gaussians according to the above equation
- Then create a batched multivariate Gaussian distribution object using
MultivariateNormalDiag
with the means set to μik and the scales set toscales
- Run a custom training loop for
epochs
number of epochs, in which:- the average per-example negative log likelihood for the whole dataset is computed as the loss
- the gradient of the loss with respect to the
scales
variables is computed - the
scales
variables are updated by theoptimiser
object
- At each iteration, save the values of the
scales
variable and the loss - The function should return a tuple of three objects:
- a numpy array of shape
(epochs,)
of loss values - a numpy array of shape
(epochs, 2)
of values for thescales
variable at each iteration - the final learned batched
MultivariateNormalDiag
distribution object
- a numpy array of shape
mu
=
[np.mean(x_train[y_train_binary
=
=
k], axis
=
0
)
for
k
in
range
(
2
)]
def
learn_stdevs(x, y, scales, optimiser, epochs):
def
nll(x, y, distribution):
predictions
=
-
distribution.log_prob(x)
return
tf.reduce_sum(predictions[y
=
=
0
][:,
0
])
+
tf.reduce_sum(predictions[y
=
=
1
][:,
1
])
@tf
.function
def
get_loss_and_grads(x, y, distribution):
with tf.GradientTape() as tape:
tape.watch(distribution.trainable_variables)
loss
=
nll(x, y, distribution)
grads
=
tape.gradient(loss, distribution.trainable_variables)
return
loss, grads
shape
=
(
len
(
set
(y)), x.shape[
-
1
])
loc
=
np.zeros(shape, dtype
=
np.float32)
for
feature
in
range
(shape[
0
]):
for
category
in
range
(shape[
-
1
]):
data_point
=
x[y
=
=
category][:, feature]
loc[category, feature]
=
np.mean(data_point)
distribution
=
tfd.MultivariateNormalDiag(loc
=
loc, scale_diag
=
scales)
#b(2), e(2)
x
=
np.expand_dims(x ,
1
).astype(
'float32'
)
train_loss_results
=
[]
train_scale_results
=
[]
for
epoch
in
range
(epochs):
loss, grads
=
get_loss_and_grads(x, y, distribution)
optimiser.apply_gradients(
zip
(grads, distribution.trainable_variables))
scales
=
distribution.parameters[
'scale_diag'
].numpy()
train_loss_results.append(loss)
train_scale_results.append(scales)
if
epoch
%
10
=
=
0
:
print
(f
'epoch: {epoch}, loss: {loss}'
)
return
np.array(train_loss_results), np.array(train_scale_results), distribution
scales
=
tf.Variable([
1.
,
1.
])
opt
=
tf.keras.optimizers.Adam(learning_rate
=
0.01
)
epochs
=
500
# run the training loop
nlls, scales_arr, class_conditionals_binary
=
learn_stdevs(x_train, y_train_binary, scales, opt, epochs)
epoch:
0
, loss:
246.33450317382812
epoch:
10
, loss:
227.07168579101562
epoch:
20
, loss:
207.1158905029297
epoch:
30
, loss:
187.12120056152344
epoch:
40
, loss:
168.60015869140625
epoch:
50
, loss:
153.5633087158203
epoch:
60
, loss:
143.8475341796875
epoch:
70
, loss:
142.80393981933594
epoch:
80
, loss:
142.56259155273438
epoch:
90
, loss:
142.23074340820312
epoch:
100
, loss:
142.25711059570312
epoch:
110
, loss:
142.18955993652344
epoch:
120
, loss:
142.1979217529297
epoch:
130
, loss:
142.18882751464844
epoch:
140
, loss:
142.18991088867188
epoch:
150
, loss:
142.1887664794922
epoch:
160
, loss:
142.1888885498047
epoch:
170
, loss:
142.18875122070312
epoch:
180
, loss:
142.1887664794922
epoch:
190
, loss:
142.1887664794922
epoch:
200
, loss:
142.1887664794922
epoch:
210
, loss:
142.18875122070312
epoch:
220
, loss:
142.1887664794922
epoch:
230
, loss:
142.18873596191406
epoch:
240
, loss:
142.18878173828125
epoch:
250
, loss:
142.18875122070312
epoch:
260
, loss:
142.18875122070312
epoch:
270
, loss:
142.18875122070312
epoch:
280
, loss:
142.18875122070312
epoch:
290
, loss:
142.18875122070312
epoch:
300
, loss:
142.18878173828125
epoch:
310
, loss:
142.18875122070312
epoch:
320
, loss:
142.18875122070312
epoch:
330
, loss:
142.18875122070312
epoch:
340
, loss:
142.18875122070312
epoch:
350
, loss:
142.1887664794922
epoch:
360
, loss:
142.1887664794922
epoch:
370
, loss:
142.1887664794922
epoch:
380
, loss:
142.1887664794922
epoch:
390
, loss:
142.1887664794922
epoch:
400
, loss:
142.1887664794922
epoch:
410
, loss:
142.1887664794922
epoch:
420
, loss:
142.1887664794922
epoch:
430
, loss:
142.1887664794922
epoch:
440
, loss:
142.1887664794922
epoch:
450
, loss:
142.1887664794922
epoch:
460
, loss:
142.1887664794922
epoch:
470
, loss:
142.1887664794922
epoch:
480
, loss:
142.1887664794922
epoch:
490
, loss:
142.1887664794922
print("Class conditional means:")
print(class_conditionals_binary.loc.numpy())
print("\nClass conditional standard deviations:")
print(class_conditionals_binary.stddev().numpy())
Class conditional means:
[[4.9692307 3.3820512]
[6.2172837 2.8814814]]
Class conditional standard deviations:
[[0.5590086 0.34253535]
[0.5590086 0.34253535]]
# Plot the loss and convergence of the standard deviation parameters
fig, ax = plt.subplots(1, 2, figsize=(14, 5))
ax[0].plot(nlls)
ax[0].set_title("Loss vs epoch")
ax[0].set_xlabel("Epoch")
ax[0].set_ylabel("Average negative log-likelihood")
for k in [0, 1]:
ax[1].plot(scales_arr[:, k], color=label_colours_binary[k], label=labels_binary[k])
ax[1].set_title("Standard deviation ML estimates vs epoch")
ax[1].set_xlabel("Epoch")
ax[1].set_ylabel("Standard deviation")
plt.legend()
plt.show()
We can also plot the contours of the class-conditional Gaussian distributions as before, this time with just binary labelled data. Notice the contours are the same for each class, just with a different centre location.
We can also plot the decision regions for this binary classifier model, notice that the decision boundary is now linear.
The following animation shows how we can learn the standard deviation parameters for the class-conditional distributions for Naive Bayes using tensorflow, with the above code snippet.
Link to logistic regression
In fact, we can see that our predictive distribution P(Y=y0|X) can be written as follows:
With our additional modelling assumption of a shared covariance matrix Σ, it can be shown (using the Gaussian pdf) that a is in fact a linear function of X:
The model therefore takes the form P(Y=y0|X)=σ(wTX+w0), with weights w∈R2 and bias w0∈R. This is the form used by logistic regression, and explains why the decision boundary above is linear.
In the above we have outlined the derivation of the generative logistic regression model. The parameters are typically estimated with maximum likelihood, as we have done.
Finally, we will use the above equations to directly parameterize the output Bernoulli distribution of the generative logistic regression model.
Let’s now write the following function, according to the following specification:
- The inputs to the function are:
- the prior distribution
prior
over the two classes - the (batched) class-conditional distribution
class_conditionals
- the prior distribution
- The function should use the parameters of the above distributions to compute the weights and bias terms w and w0 as above
- The function should then return a tuple of two numpy arrays for w and w0
def
get_logistic_regression_params(prior, class_conditionals):
Sigma
=
class_conditionals.covariance().numpy()
SI
=
np.linalg.inv(Sigma)
p
=
prior.probs.numpy()
mu
=
class_conditionals.parameters[
'loc'
]
#.numpy()
w
=
SI @ (mu[
0
]
-
mu[
1
])
w0
=
-
0.5
*
mu[
0
].T@SI@mu[
0
]
+
0.5
*
mu[
1
].T@SI@mu[
1
]
+
np.log(p[
0
]
/
p[
1
])
return
w, w0
w, w0
=
get_logistic_regression_params(prior_binary, class_conditionals_binary)
We can now use these parameters to make a contour plot to display the predictive distribution of our logistic regression model.
Probabilistic generative models
Let’s start with generative models, using normalizing flow networks and the variational autoencoder algorithm. We shall create a synthetic dataset with a normalizing flow with randomised parameters. This dataset will then be used to train a variational autoencoder, and the trained model will be used to interpolate between the generated images. The concepts to be used will be
- Distribution objects
- Probabilistic layers
- Bijectors
- ELBO optimization
- KL divergence Regularizers.
The next figure represents the theory required for the implementation:
Let’s start by running importing the required libraries first.
import
tensorflow as tf
import
tensorflow_probability as tfp
tfd
=
tfp.distributions
tfb
=
tfp.bijectors
tfpl
=
tfp.layers
import
numpy as np
import
matplotlib.pyplot as plt
We shall create our own image dataset from contour plots of a transformed distribution using a random normalizing flow network and then use the variational autoencoder algorithm to train generative and inference networks, and synthesise new images by interpolating in the latent space.
The normalising flow
- To construct the image dataset, you will build a normalizing flow to transform the 2-D Gaussian random variable z = (z1, z2), which has mean 0 and covariance matrix Σ=σ^2.I2, with σ=0.3.
- This normalizing flow uses bijectors that are parameterized by the following random variables:
- θ ∼ U[0,2π)
- a ∼ N(3,1)
The complete normalising flow is given by the following chain of transformations:
- f1(z)=(z1,z2−2)
- f2(z)=(z1,z2/2),
- f3(z)=(z1,z2+a.z1^2),
- f4(z)=R.z, where R is a rotation matrix with angle θ,
- f5(z)=tanh(z), where the tanh function is applied elementwise.
The transformed random variable x is given by x=f5(f4(f3(f2(f1(z))))).
- We need to use or construct bijectors for each of the transformations fi, i=1,…,5 and use
tfb.Chain
andtfb.TransformedDistribution
to construct the final transformed distribution. - Ensure to implement the
log_det_jacobian
methods for any subclassed bijectors that you write. - Display a scatter plot of samples from the base distribution.
- Display 4 scatter plot images of the transformed distribution from your random normalizing flow, using samples of θ and a. Fix the axes of these 4 plots to the range [−1,1].
The following code block shows how to implement the above steps:
def
plot_distribution(samples, ax, title, col
=
'red'
):
ax.scatter(samples[:,
0
], samples[:,
1
], marker
=
'.'
, c
=
col, alpha
=
0.5
)
ax.set_xlim([
-
1
,
1
])
ax.set_ylim([
-
1
,
1
])
ax.set_title(title, size
=
15
)
# f3(𝑧)=(𝑧1,𝑧2+𝑎𝑧1^2)
class
Degree2Polynomial(tfb.Bijector):
def
__init__(
self
, a):
self
.a
=
a
super
(Degree2Polynomial,
self
).__init__(forward_min_event_ndims
=
1
, is_constant_jacobian
=
True
)
def
_forward(
self
, x):
return
tf.concat([x[..., :
1
], x[...,
1
:]
+
self
.a
*
tf.square(x[..., :
1
])], axis
=
-
1
)
def
_inverse(
self
, y):
return
tf.concat([y[..., :
1
], y[...,
1
:]
-
self
.a
*
tf.square(y[..., :
1
])], axis
=
-
1
)
def
_forward_log_det_jacobian(
self
, x):
return
tf.constant(
0.
, dtype
=
x.dtype)
# f4(𝑧)=Rz
class
Rotation(tfb.Bijector):
def
__init__(
self
, theta):
self
.R
=
tf.constant([[np.cos(theta),
-
np.sin(theta)],
[np.sin(theta), np.cos(theta)]], dtype
=
tf.float32)
super
(Rotation,
self
).__init__(forward_min_event_ndims
=
1
, is_constant_jacobian
=
True
)
def
_forward(
self
, x):
return
tf.linalg.matvec(
self
.R, x)
def
_inverse(
self
, y):
return
tf.linalg.matvec(tf.transpose(
self
.R), y)
def
_forward_log_det_jacobian(
self
, x):
return
tf.constant(
0.
, x.dtype)
def
get_normalizing_flow_dist(a, theta):
bijectors
=
[
tfb.Shift([
0.
,
-
2
]),
# f1
tfb.Scale([
1
,
1
/
2
]),
# f2
Degree2Polynomial(a),
# f3
Rotation(theta),
# f4
tfb.Tanh()
# f5
]
flow_bijector
=
tfb.Chain(
list
(
reversed
(bijectors)))
return
tfd.TransformedDistribution(distribution
=
base_distribution,
bijector
=
flow_bijector)
nsamples
=
10000
sigma
=
0.3
base_distribution
=
tfd.MultivariateNormalDiag(loc
=
tf.zeros(
2
), scale_diag
=
sigma
*
tf.ones(
2
))
samples
=
base_distribution.sample(nsamples)
fig, ax
=
plt.subplots(figsize
=
(
8
,
8
))
plot_distribution(samples, ax,
'Base distribution'
,
'blue'
)
plt.show()
fig, axes
=
plt.subplots(nrows
=
2
, ncols
=
2
, figsize
=
(
15
,
15
))
axes
=
axes.flatten()
plt.subplots_adjust(
0
,
0
,
1
,
0.925
,
0.05
,
0.05
)
colors
=
[
'red'
,
'green'
,
'orange'
,
'magenta'
]
for
i
in
range
(
4
):
a
=
tfd.Normal(loc
=
3
, scale
=
1
).sample(
1
)[
0
].numpy()
theta
=
tfd.Uniform(low
=
0
, high
=
2
*
np.pi).sample(
1
)[
0
].numpy()
transformed_distribution
=
get_normalizing_flow_dist(a, theta)
samples
=
transformed_distribution.sample(nsamples)
plot_distribution(samples, axes[i], r
'$\theta$={:.02f}, a={:.02f}'
.
format
(theta, a), colors[i])
plt.suptitle(
'Transformed Distribution with Normalizing Flow'
, size
=
20
)
plt.show()
Create the image dataset
- Let’s now use your random normalizing flow to generate an image dataset of contour plots from our random normalising flow network.
- First, let’s display a sample of 4 contour plot images from our normalizing flow network using 4 independently sampled sets of parameters, using the following
get_densities
function: this function calculates density values for a (batched) Distribution for use in a contour plot. - The dataset should consist of at least 1000 images, stored in a numpy array of shape
(N, 36, 36, 3)
. Each image in the dataset should correspond to a contour plot of a transformed distribution from a normalizing flow with an independently sampled set of parameters. It will take a few minutes to create the dataset. - As well as the
get_densities
function, the followingget_image_array_from_density_values
function will help to generate the dataset. This function creates a numpy array for an image of the contour plot for a given set of density values Z. Feel free to choose your own options for the contour plots. - Let’s display a sample of 20 images from your generated dataset in a figure.
X, Y
=
np.meshgrid(np.linspace(
-
1
,
1
,
100
), np.linspace(
-
1
,
1
,
100
))
inputs
=
np.transpose(np.stack((X, Y)), [
1
,
2
,
0
])
def
get_densities(transformed_distribution):
batch_shape
=
transformed_distribution.batch_shape
Z
=
transformed_distribution.prob(np.expand_dims(inputs,
2
))
Z
=
np.transpose(Z,
list
(
range
(
2
,
2
+
len
(batch_shape)))
+
[
0
,
1
])
return
Z
import
numpy as np
from
matplotlib.backends.backend_agg
import
FigureCanvasAgg as FigureCanvas
from
matplotlib.figure
import
Figure
def
get_image_array_from_density_values(Z):
assert
Z.shape
=
=
(
100
,
100
)
fig
=
Figure(figsize
=
(
0.5
,
0.5
))
canvas
=
FigureCanvas(fig)
ax
=
fig.gca()
ax.contourf(X, Y, Z, cmap
=
'hot'
, levels
=
100
)
ax.axis(
'off'
)
fig.tight_layout(pad
=
0
)
ax.margins(
0
)
fig.canvas.draw()
image_from_plot
=
np.frombuffer(fig.canvas.tostring_rgb(), dtype
=
np.uint8)
image_from_plot
=
image_from_plot.reshape(fig.canvas.get_width_height()[::
-
1
]
+
(
3
,))
return
image_from_plot
plt.figure(figsize
=
(
5
,
5
))
plt.subplots_adjust(
0
,
0
,
1
,
0.95
,
0.05
,
0.08
)
for
i
in
range
(
4
):
a
=
tfd.Normal(loc
=
3
, scale
=
1
).sample(
1
)[
0
].numpy()
theta
=
tfd.Uniform(low
=
0
, high
=
2
*
np.pi).sample(
1
)[
0
].numpy()
transformed_distribution
=
get_normalizing_flow_dist(a, theta)
transformed_distribution
=
tfd.BatchReshape(transformed_distribution, [
1
])
Z
=
get_densities(transformed_distribution)
image
=
get_image_array_from_density_values(Z.squeeze())
plt.subplot(
2
,
2
,i
+
1
), plt.imshow(image), plt.axis(
'off'
)
plt.title(r
'$\theta$={:.02f}, a={:.02f}'
.
format
(theta, a), size
=
10
)
plt.show()
N
=
1000
image_dataset
=
np.zeros((N,
36
,
36
,
3
))
for
i
in
range
(N):
a
=
tfd.Normal(loc
=
3
, scale
=
1
).sample(
1
)[
0
].numpy()
theta
=
tfd.Uniform(low
=
0
, high
=
2
*
np.pi).sample(
1
)[
0
].numpy()
transformed_distribution
=
tfd.BatchReshape(get_normalizing_flow_dist(a, theta), [
1
])
image_dataset[i,...]
=
get_image_array_from_density_values(get_densities(transformed_distribution).squeeze())
image_dataset
=
tf.convert_to_tensor(image_dataset, dtype
=
tf.float32)
image_dataset.shape
# TensorShape([1000, 36, 36, 3])
plt.figure(figsize
=
(
20
,
4
))
plt.subplots_adjust(
0
,
0
,
1
,
0.95
,
0.05
,
0.08
)
indices
=
np.random.choice(N,
20
)
for
i
in
range
(
20
):
image
=
image_dataset[indices[i]].numpy()
image
=
image
/
image.
max
()
plt.subplot(
2
,
10
,i
+
1
), plt.imshow(image), plt.axis(
'off'
)
plt.show()
Create tf.data.Dataset
objects
- Let’s now split your dataset to create
tf.data.Dataset
objects for training and validation data. - Using the
map
method, ;et’s normalize the pixel values so that they lie between 0 and 1. - These Datasets will be used to train a variational autoencoder (VAE). Use the
map
method to return a tuple of input and output Tensors where the image is duplicated as both input and output. - Randomly shuffle the training Dataset.
- Batch both datasets with a batch size of 20, setting
drop_remainder=True
. - Print the
element_spec
property for one of the Dataset objects.
n
=
len
(image_dataset)
tf_image_dataset
=
tf.data.Dataset.from_tensor_slices(image_dataset)
tf_image_dataset
=
tf_image_dataset.shuffle(
3
)
tf_image_dataset
=
tf_image_dataset.
map
(
lambda
x : x
/
tf.reduce_max(x))
tf_image_dataset
=
tf_image_dataset.
map
(
lambda
x: (x, x))
train_sz
=
int
(
0.8
*
n)
training
=
tf_image_dataset.take(train_sz)
validation
=
tf_image_dataset.skip(train_sz)
training
=
training.batch(batch_size
=
20
, drop_remainder
=
True
)
validation
=
validation.batch(batch_size
=
20
, drop_remainder
=
True
)
training.element_spec
#(TensorSpec(shape=(20, 36, 36, 3), dtype=tf.float32, name=None),
# TensorSpec(shape=(20, 36, 36, 3), dtype=tf.float32, name=None))
Build the encoder and decoder networks
- Let’s now create the encoder and decoder for the variational autoencoder algorithm.
- Let’s design these networks, subject to the following constraints:
- The encoder and decoder networks should be built using the
Sequential
class. - The encoder and decoder networks should use probabilistic layers where necessary to represent distributions.
- The prior distribution should be a zero-mean, isotropic Gaussian (identity covariance matrix).
- The encoder network should add the KL divergence loss to the model.
- The encoder and decoder networks should be built using the
- Print the model summary for the encoder and decoder networks.
from
tensorflow.keras.models
import
Sequential, Model
from
tensorflow.keras.layers
import
(Dense, Flatten, Reshape, Concatenate, Conv2D, UpSampling2D, BatchNormalization)
latent_dim
=
2
#50
prior
=
tfd.MultivariateNormalDiag(loc
=
tf.zeros(latent_dim))
def
get_kl_regularizer(prior_distribution):
return
tfpl.KLDivergenceRegularizer(prior_distribution,
weight
=
1.0
,
use_exact_kl
=
False
,
test_points_fn
=
lambda
q: q.sample(
3
),
test_points_reduce_axis
=
(
0
,
1
))
kl_regularizer
=
get_kl_regularizer(prior)
def
get_encoder(latent_dim, kl_regularizer):
return
Sequential([
Conv2D(filters
=
32
, kernel_size
=
3
, activation
=
'relu'
, strides
=
2
, padding
=
'same'
, input_shape
=
(
36
,
36
,
3
)),
BatchNormalization(),
Conv2D(filters
=
64
, kernel_size
=
3
, activation
=
'relu'
, strides
=
2
, padding
=
'same'
),
BatchNormalization(),
Conv2D(filters
=
128
, kernel_size
=
3
, activation
=
'relu'
, strides
=
3
, padding
=
'same'
),
BatchNormalization(),
Flatten(),
Dense(tfpl.MultivariateNormalTriL.params_size(latent_dim)),
tfpl.MultivariateNormalTriL(latent_dim, activity_regularizer
=
kl_regularizer)
], name
=
'encoder'
)
def
get_decoder(latent_dim):
return
Sequential([
Dense(
1152
, activation
=
'relu'
, input_shape
=
(latent_dim,)),
Reshape((
3
,
3
,
128
)),
UpSampling2D(size
=
(
3
,
3
)),
Conv2D(filters
=
64
, kernel_size
=
3
, activation
=
'relu'
, padding
=
'same'
),
UpSampling2D(size
=
(
2
,
2
)),
Conv2D(filters
=
32
, kernel_size
=
2
, activation
=
'relu'
, padding
=
'same'
),
UpSampling2D(size
=
(
2
,
2
)),
Conv2D(filters
=
128
, kernel_size
=
2
, activation
=
'relu'
, padding
=
'same'
),
Conv2D(filters
=
3
, kernel_size
=
2
, activation
=
None
, padding
=
'same'
),
Flatten(),
tfpl.IndependentBernoulli(event_shape
=
(
36
,
36
,
3
))
], name
=
'decoder'
)
encoder
=
get_encoder(latent_dim
=
2
, kl_regularizer
=
kl_regularizer)
#encoder.losses
encoder.summary()
Model:
"encoder"
_________________________________________________________________
Layer (
type
) Output Shape Param
#
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
conv2d (Conv2D) (
None
,
18
,
18
,
32
)
896
_________________________________________________________________
batch_normalization (BatchNo (
None
,
18
,
18
,
32
)
128
_________________________________________________________________
conv2d_1 (Conv2D) (
None
,
9
,
9
,
64
)
18496
_________________________________________________________________
batch_normalization_1 (Batch (
None
,
9
,
9
,
64
)
256
_________________________________________________________________
conv2d_2 (Conv2D) (
None
,
3
,
3
,
128
)
73856
_________________________________________________________________
batch_normalization_2 (Batch (
None
,
3
,
3
,
128
)
512
_________________________________________________________________
flatten (Flatten) (
None
,
1152
)
0
_________________________________________________________________
dense (Dense) (
None
,
5
)
5765
_________________________________________________________________
multivariate_normal_tri_l (M multiple
0
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
Total params:
99
,
909
Trainable params:
99
,
461
Non
-
trainable params:
448
_________________________________________________________________
decoder
=
get_decoder(latent_dim
=
2
)
decoder.summary()
Model:
"decoder"
_________________________________________________________________
Layer (
type
) Output Shape Param
#
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
dense_1 (Dense) (
None
,
1152
)
3456
_________________________________________________________________
reshape (Reshape) (
None
,
3
,
3
,
128
)
0
_________________________________________________________________
up_sampling2d (UpSampling2D) (
None
,
9
,
9
,
128
)
0
_________________________________________________________________
conv2d_3 (Conv2D) (
None
,
9
,
9
,
64
)
73792
_________________________________________________________________
up_sampling2d_1 (UpSampling2 (
None
,
18
,
18
,
64
)
0
_________________________________________________________________
conv2d_4 (Conv2D) (
None
,
18
,
18
,
32
)
8224
_________________________________________________________________
up_sampling2d_2 (UpSampling2 (
None
,
36
,
36
,
32
)
0
_________________________________________________________________
conv2d_5 (Conv2D) (
None
,
36
,
36
,
128
)
16512
_________________________________________________________________
conv2d_6 (Conv2D) (
None
,
36
,
36
,
3
)
1539
_________________________________________________________________
flatten_1 (Flatten) (
None
,
3888
)
0
_________________________________________________________________
independent_bernoulli (Indep multiple
0
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
Total params:
103
,
523
Trainable params:
103
,
523
Non
-
trainable params:
0
____________________________
def
reconstruction_loss(batch_of_images, decoding_dist):
return
-
tf.reduce_mean(decoding_dist.log_prob(batch_of_images))
Train the variational autoencoder
- Let’s now train the variational autoencoder. Build the VAE using the
Model
class and the encoder and decoder models. Print the model summary. - Compile the VAE with the negative log likelihood loss and train with the
fit
method, using the training and validation Datasets. - Plot the learning curves for loss vs epoch for both training and validation sets.
vae
=
Model(inputs
=
encoder.inputs, outputs
=
decoder(encoder.outputs))
optimizer
=
tf.keras.optimizers.Adam(learning_rate
=
0.0005
)
vae.
compile
(optimizer
=
optimizer, loss
=
reconstruction_loss)
history
=
vae.fit(training, validation_data
=
validation, epochs
=
20
)
Epoch
1
/
20
40
/
40
[
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
]
-
34s
777ms
/
step
-
loss:
1250.2296
-
val_loss:
1858.7103
Epoch
2
/
20
40
/
40
[
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
]
-
29s
731ms
/
step
-
loss:
661.8687
-
val_loss:
1605.1261
Epoch
3
/
20
40
/
40
[
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
]
-
29s
720ms
/
step
-
loss:
545.2802
-
val_loss:
1245.0518
Epoch
4
/
20
40
/
40
[
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
]
-
28s
713ms
/
step
-
loss:
489.1101
-
val_loss:
1024.5863
Epoch
5
/
20
40
/
40
[
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
]
-
29s
718ms
/
step
-
loss:
453.3464
-
val_loss:
841.4725
Epoch
6
/
20
40
/
40
[
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
]
-
29s
733ms
/
step
-
loss:
438.8413
-
val_loss:
742.0212
Epoch
7
/
20
40
/
40
[
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
]
-
30s
751ms
/
step
-
loss:
433.2563
-
val_loss:
657.4024
Epoch
8
/
20
40
/
40
[
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
]
-
30s
751ms
/
step
-
loss:
417.5353
-
val_loss:
602.7039
Epoch
9
/
20
40
/
40
[
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
]
-
29s
726ms
/
step
-
loss:
409.8351
-
val_loss:
545.5004
Epoch
10
/
20
40
/
40
[
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
]
-
30s
741ms
/
step
-
loss:
406.3284
-
val_loss:
507.9868
Epoch
11
/
20
40
/
40
[
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
]
-
30s
741ms
/
step
-
loss:
402.9056
-
val_loss:
462.0777
Epoch
12
/
20
40
/
40
[
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
]
-
29s
733ms
/
step
-
loss:
397.4801
-
val_loss:
444.4444
Epoch
13
/
20
40
/
40
[
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
]
-
30s
741ms
/
step
-
loss:
398.2078
-
val_loss:
423.1287
Epoch
14
/
20
40
/
40
[
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
]
-
29s
723ms
/
step
-
loss:
395.5187
-
val_loss:
411.3030
Epoch
15
/
20
40
/
40
[
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
]
-
30s
739ms
/
step
-
loss:
397.3987
-
val_loss:
407.5134
Epoch
16
/
20
40
/
40
[
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
]
-
29s
721ms
/
step
-
loss:
399.3271
-
val_loss:
402.7288
Epoch
17
/
20
40
/
40
[
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
]
-
29s
736ms
/
step
-
loss:
393.4259
-
val_loss:
401.4711
Epoch
18
/
20
40
/
40
[
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
]
-
29s
726ms
/
step
-
loss:
390.5508
-
val_loss:
399.1924
Epoch
19
/
20
40
/
40
[
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
]
-
29s
736ms
/
step
-
loss:
389.3187
-
val_loss:
401.1656
Epoch
20
/
20
40
/
40
[
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
]
-
29s
728ms
/
step
-
loss:
389.4718
-
val_loss:
393.5178
nepochs
=
20
plt.figure(figsize
=
(
8
,
5
))
plt.plot(
range
(nepochs), history.history[
'loss'
], label
=
'train-loss'
)
plt.plot(
range
(nepochs), history.history[
'val_loss'
], label
=
'valid-loss'
)
plt.legend()
plt.xlabel(
'epochs'
)
plt.ylabel(
'loss'
)
plt.show()
Use the encoder and decoder networks
- Let’s now put your encoder and decoder networks into practice!
- Randomly sample 1000 images from the dataset, and pass them through the encoder. Display the embeddings in a scatter plot (project to 2 dimensions if the latent space has dimension higher than two).
- Randomly sample 4 images from the dataset and for each image, display the original and reconstructed image from the VAE in a figure.
- Use the mean of the output distribution to display the images.
- Randomly sample 6 latent variable realisations from the prior distribution, and display the images in a figure.
- Again use the mean of the output distribution to display the images.
def
reconstruct(encoder, decoder, batch_of_images):
approx_distribution
=
encoder(batch_of_images)
decoding_dist
=
decoder(approx_distribution.mean())
return
decoding_dist.mean()
embedding
=
encoder(image_dataset
/
255
).mean()
fig, ax
=
plt.subplots(figsize
=
(
8
,
8
))
plt.scatter(embedding[:,
0
], embedding[:,
1
], c
=
'red'
, s
=
50
, edgecolor
=
'k'
)
plt.title(
'Embedding'
, size
=
20
)
plt.show()
plt.figure(figsize
=
(
6
,
12
))
plt.subplots_adjust(
0
,
0
,
1
,
0.95
,
0.05
,
0.08
)
indices
=
np.random.choice(
len
(image_dataset),
4
)
for
i
in
range
(
4
):
image
=
image_dataset[indices[i]].numpy()
image
=
image
/
image.
max
()
plt.subplot(
4
,
2
,
2
*
i
+
1
), plt.imshow(image), plt.axis(
'off'
)
reconstructions
=
reconstruct(encoder, decoder, np.expand_dims(image, axis
=
0
))
plt.subplot(
4
,
2
,
2
*
i
+
2
), plt.imshow(reconstructions[
0
].numpy()), plt.axis(
'off'
)
plt.suptitle(
'original (left column) vs. VAE-reconstructed (right column)'
, size
=
15
)
plt.show()
nsample
=
6
samples
=
np.random.uniform(
-
10
,
10
, (nsample, latent_dim))
#prior.sample(6)
fig, ax
=
plt.subplots(figsize
=
(
8
,
8
))
plt.scatter(samples[:,
0
], samples[:,
1
], color
=
'blue'
)
for
i
in
range
(nsample):
plt.text(samples[i,
0
]
+
0.05
, samples[i,
1
]
+
0.05
,
'embedding {}'
.
format
(i), fontsize
=
15
)
plt.title(
'Embeddings'
, size
=
20
)
plt.show()
reconstructions
=
decoder(samples).mean()
#print(samples.shape, reconstructions.shape)
plt.figure(figsize
=
(
8
,
6
))
plt.subplots_adjust(
0
,
0
,
1
,
0.9
,
0.05
,
0.08
)
indices
=
np.random.choice(
len
(image_dataset),
4
)
for
i
in
range
(nsample):
plt.subplot(
2
,
3
,i
+
1
), plt.imshow(reconstructions[i]), plt.title(
'image {}'
.
format
(i)), plt.axis(
'off'
)
plt.suptitle(
'VAE-reconstructions'
, size
=
20
)
plt.show()
The following animation of latent space interpolation shows the decoder’s generations, depending on the latent space.
To be continued…
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK