Source code for tsgm.models.cvae

from tensorflow import keras
import tensorflow as tf
import typing as T

import tsgm.utils


[docs]class BetaVAE(keras.Model): """ beta-VAE implementation for unlabeled time series. """ def __init__(self, encoder: keras.Model, decoder: keras.Model, beta: float = 1.0, **kwargs) -> None: """ :param encoder: An encoder model which takes a time series as input and check whether the image is real or fake. :type encoder: keras.Model :param decoder: Takes as input a random noise vector of `latent_dim` length and returns a simulated time-series. :type decoder: keras.Model :param latent_dim: The size of the noise vector. :type latent_dim: int """ super(BetaVAE, self).__init__(**kwargs) self.beta = beta self.encoder = encoder self.decoder = decoder self.total_loss_tracker = keras.metrics.Mean(name="total_loss") self.reconstruction_loss_tracker = keras.metrics.Mean( name="reconstruction_loss" ) self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss") self._seq_len = self.decoder.output_shape[1] self.latent_dim = self.decoder.input_shape[1] @property
[docs] def metrics(self) -> T.List: """ :returns: A list of metrics trackers (e.g., generator's loss and discriminator's loss). """ return [ self.total_loss_tracker, self.reconstruction_loss_tracker, self.kl_loss_tracker, ]
[docs] def call(self, X: tsgm.types.Tensor) -> tsgm.types.Tensor: """ Encodes and decodes time series dataset X. :param X: The size of the noise vector. :type X: tsgm.types.Tensor :returns: Generated samples :rtype: tsgm.types.Tensor """ z_mean, _, _ = self.encoder(X) x_decoded = self.decoder(z_mean) if len(x_decoded.shape) == 1: x_decoded = x_decoded.reshape((1, -1)) return x_decoded
def _get_reconstruction_loss(self, X: tsgm.types.Tensor, Xr: tsgm.types.Tensor) -> float: reconst_loss = tsgm.utils.reconstruction_loss_by_axis(X, Xr, axis=0) +\ tsgm.utils.reconstruction_loss_by_axis(X, Xr, axis=1) +\ tsgm.utils.reconstruction_loss_by_axis(X, Xr, axis=2) return reconst_loss
[docs] def train_step(self, data: tsgm.types.Tensor) -> T.Dict: """ Performs a training step using a batch of data, stored in data. :param data: A batch of data in a format batch_size x seq_len x feat_dim :type data: tsgm.types.Tensor :returns: A dict with losses :rtype: T.Dict """ with tf.GradientTape() as tape: z_mean, z_log_var, z = self.encoder(data) reconstruction = self.decoder(z) reconstruction_loss = self._get_reconstruction_loss(data, reconstruction) kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)) kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1)) total_loss = reconstruction_loss + kl_loss grads = tape.gradient(total_loss, self.trainable_weights) self.optimizer.apply_gradients(zip(grads, self.trainable_weights)) self.total_loss_tracker.update_state(total_loss) self.reconstruction_loss_tracker.update_state(reconstruction_loss) self.kl_loss_tracker.update_state(kl_loss) return { "loss": self.total_loss_tracker.result(), "reconstruction_loss": self.reconstruction_loss_tracker.result(), "kl_loss": self.kl_loss_tracker.result(), }
[docs] def generate(self, n: int) -> tsgm.types.Tensor: """ Generates new data from the model. :param n: the number of samples to be generated. :type n: int :returns: A tensor with generated samples. :rtype: tsgm.types.Tensor """ z = tf.random.normal((n, self.latent_dim)) return self.decoder(z)
[docs]class cBetaVAE(keras.Model): def __init__(self, encoder: keras.Model, decoder: keras.Model, latent_dim: int, temporal: bool, beta: float = 1.0, **kwargs) -> None: super(cBetaVAE, self).__init__(**kwargs) self.beta = beta self.encoder = encoder self.decoder = decoder self.total_loss_tracker = keras.metrics.Mean(name="total_loss") self.reconstruction_loss_tracker = keras.metrics.Mean( name="reconstruction_loss" ) self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss") self._temporal = temporal self._seq_len = self.decoder.output_shape[1] self.latent_dim = latent_dim @property
[docs] def metrics(self) -> T.List: """ Returns the list of loss tracker: `[loss, reconstruction_loss, kl_loss]`. """ return [ self.total_loss_tracker, self.reconstruction_loss_tracker, self.kl_loss_tracker, ]
[docs] def generate(self, labels: tsgm.types.Tensor) -> T.Tuple[tsgm.types.Tensor, tsgm.types.Tensor]: """ Generates new data from the model. :param labels: the number of samples to be generated. :type labels: tsgm.types.Tensor :returns: a tuple of synthetically generated data and labels. :rtype: T.Tuple[tsgm.types.Tensor, tsgm.types.Tensor] """ batch_size = tf.shape(labels)[0] z = tf.random.normal((batch_size, self._seq_len, self.latent_dim), dtype=labels.dtype) decoder_input = self._get_decoder_input(z, labels) return (self.decoder(decoder_input), labels)
[docs] def call(self, data: tsgm.types.Tensor) -> tsgm.types.Tensor: """ Encodes and decodes time series dataset X. :param X: The size of the noise vector. :type X: tsgm.types.Tensor :returns: Generated samples :rtype: tsgm.types.Tensor """ X, labels = data encoder_input = self._get_encoder_input(X, labels) z_mean, _, _ = self.encoder(encoder_input) decoder_input = self._get_decoder_input(z_mean, labels) x_decoded = self.decoder(decoder_input) if len(x_decoded.shape) == 1: x_decoded = x_decoded.reshape((1, -1)) return x_decoded
def _get_reconstruction_loss(self, X: tsgm.types.Tensor, Xr: tsgm.types.Tensor) -> float: reconst_loss = tf.reduce_sum(tf.math.squared_difference(X, Xr)) +\ tf.reduce_sum(tf.math.squared_difference(tf.reduce_mean(X, axis=1), tf.reduce_mean(Xr, axis=1))) +\ tf.reduce_sum(tf.math.squared_difference(tf.reduce_mean(X, axis=2), tf.reduce_mean(Xr, axis=2))) return reconst_loss def _get_encoder_input(self, X: tsgm.types.Tensor, labels: tsgm.types.Tensor) -> tsgm.types.Tensor: if self._temporal: return tf.concat([X, labels[:, :, None]], axis=2) else: rep_labels = tf.repeat(labels[:, None, :], [self._seq_len], axis=1) return tf.concat([X, rep_labels], axis=2) def _get_decoder_input(self, z: tsgm.types.Tensor, labels: tsgm.types.Tensor) -> tsgm.types.Tensor: if self._temporal: rep_labels = labels[:, :, None] else: rep_labels = tf.repeat(labels[:, None, :], [self._seq_len], axis=1) z = tf.reshape(z, [-1, self._seq_len, self.latent_dim]) return tf.concat([z, rep_labels], axis=2)
[docs] def train_step(self, data: tsgm.types.Tensor) -> T.Dict[str, float]: """ Performs a training step using a batch of data, stored in data. :param data: A batch of data in a format batch_size x seq_len x feat_dim :type data: tsgm.types.Tensor :returns: A dict with losses :rtype: T.Dict[str, float] """ X, labels = data with tf.GradientTape() as tape: encoder_input = self._get_encoder_input(X, labels) z_mean, z_log_var, z = self.encoder(encoder_input) decoder_input = self._get_decoder_input(z_mean, labels) reconstruction = self.decoder(decoder_input) reconstruction_loss = self._get_reconstruction_loss(X, reconstruction) kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)) kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1)) total_loss = reconstruction_loss + self.beta * kl_loss grads = tape.gradient(total_loss, self.trainable_weights) self.optimizer.apply_gradients(zip(grads, self.trainable_weights)) self.total_loss_tracker.update_state(total_loss) self.reconstruction_loss_tracker.update_state(reconstruction_loss) self.kl_loss_tracker.update_state(kl_loss) return { "loss": self.total_loss_tracker.result(), "reconstruction_loss": self.reconstruction_loss_tracker.result(), "kl_loss": self.kl_loss_tracker.result(), }