Source code for tsgm.models.augmentations

import math
import numpy as np
import numpy.typing as npt
import random
import scipy.interpolate
from dtaidistance import dtw_barycenter
from typing import List, Dict, Any, Optional, Tuple, Union
from tensorflow.python.types.core import TensorLike

import logging


AugmentationOutput = Union[TensorLike, Tuple[TensorLike, TensorLike]]


logger = logging.getLogger("augmentations")
logger.setLevel(logging.DEBUG)


[docs]class BaseAugmenter: def __init__( self, per_feature: bool, ) -> None: self.per_channel = per_feature def _get_seeds(self, total_num: int, n_seeds: int) -> TensorLike: seeds_idx = np.random.choice(range(total_num), size=n_seeds, replace=True) return seeds_idx
[docs] def generate( self, X: TensorLike, y: Optional[TensorLike] = None, n_samples: int = 1 ) -> AugmentationOutput: raise NotImplementedError
[docs]class BaseCompose: def __init__( self, augmentations: List[BaseAugmenter], ) -> None: if isinstance(augmentations, (BaseCompose, BaseAugmenter)): augmentations = [augmentations] self.augmentations = augmentations def __len__(self) -> int: return len(self.augmentations) def __call__(self, *args, **data) -> Dict[str, Any]: raise NotImplementedError def __getitem__(self, item: int) -> BaseAugmenter: return self.augmentations[item]
[docs]class GaussianNoise(BaseAugmenter): """Apply noise to the input time series. Args: variance ((float, float) or float): variance range for noise. If var_limit is a single float, the range will be (0, var_limit). Default: (10.0, 50.0). mean (float): mean of the noise. Default: 0 per_feature (bool): if set to True, noise will be sampled for each feature independently. Otherwise, the noise will be sampled once for all features. Default: True """ def __init__( self, per_feature: bool = True, ) -> None: super(GaussianNoise, self).__init__(per_feature)
[docs] def generate(self, X: TensorLike, y: Optional[TensorLike] = None, n_samples: int = 1, mean: float = 0, variance: float = 1.0,) -> AugmentationOutput: """ Generate synthetic data with Gaussian noise. :param X: Input data tensor of shape (n_data, n_timesteps, n_features). :type X: TensorLike :param y: Optional labels tensor. If provided, labels will also be returned :type y: Optional[TensorLike] :param n_samples: Number of augmented samples to generate. Default is 1. :type n_samples: int :param mean: The mean of the noise. Default is 0. :type mean: float :param variance: The variance of the noise. Default is 1.0. :type variance: float :return: Augmented data tensor of shape (n_samples, n_timesteps, n_features) and optionally augmented labels if 'y' is provided. :rtype: Union[TensorLike, Tuple[TensorLike, TensorLike]] """ seeds_idx = self._get_seeds(total_num=X.shape[0], n_seeds=n_samples) sigma = variance**0.5 has_labels = y is not None if self.per_channel: gauss = np.random.normal( mean, sigma, (n_samples, X.shape[1], X.shape[2]) ) else: gauss = np.random.normal(mean, sigma, (n_samples, X.shape[1])) gauss = np.expand_dims(gauss, -1) synthetic_X = X[seeds_idx] + gauss if has_labels: synthetic_y = y[seeds_idx] return np.array(synthetic_X), np.array(synthetic_y) else: return np.array(synthetic_X)
[docs]class SliceAndShuffle(BaseAugmenter): """Slice the time series in k pieces and create a new time series by shuffling. Args: per_feature (bool): if set to True, each time series is sliced independently. Otherwise, all features are sliced in the same way. Default: True """ def __init__( self, per_feature: bool = False, ) -> None: super(SliceAndShuffle, self).__init__(per_feature)
[docs] def generate(self, X: TensorLike, y: Optional[TensorLike] = None, n_samples: int = 1, n_segments: int = 2) -> AugmentationOutput: """ Generate synthetic data using Slice-And-Shuffle strategy. Slices are randomly selected. :param X: Input data tensor of shape (n_data, n_timesteps, n_features). :type X: TensorLike :param y: Optional labels tensor. If provided, labels will also be returned :type y: Optional[TensorLike] :param n_segments: The number of slices, default is 2. :type n_segments: int :param n_samples: Number of augmented samples to generate. Default is 1. :type n_samples: int :return: Augmented data tensor of shape (n_samples, n_timesteps, n_features) and optionally augmented labels if 'y' is provided. :rtype: Union[TensorLike, Tuple[TensorLike, TensorLike]] """ assert 0 < n_segments <= X.shape[1] seeds_idx = self._get_seeds(total_num=X.shape[0], n_seeds=n_samples) synthetic_data = [] has_labels = y is not None if has_labels: new_labels = [] for i in seeds_idx: sequence = X[i] if self.per_channel: raise NotImplementedError( "SliceAndShuffle separately by feature is not supported yet." ) else: # Randomly pick n_segments-1 points where to slice idxs = np.random.randint(0, sequence.shape[0], size=n_segments - 1) slices = [] start_idx = 0 for j in sorted(idxs): s = sequence[start_idx:j] start_idx = j slices.append(s) slices.append(sequence[start_idx:]) np.random.shuffle(slices) # concatenate the slices sequence = np.concatenate(slices) synthetic_data.append(sequence) if has_labels: new_labels.append(y[i]) if has_labels: return np.array(synthetic_data), np.array(new_labels) else: return np.array(synthetic_data)
[docs]class Shuffle(BaseAugmenter): """ Shuffles time series features. Shuffling is beneficial when each feature corresponds to interchangeable sensors. """ def __init__(self) -> None: super(Shuffle, self).__init__(per_feature=False) def _n_repeats(self, n: int, total_num: int) -> int: return math.ceil(n / total_num)
[docs] def generate(self, X: TensorLike, y: Optional[TensorLike] = None, n_samples: int = 1) -> AugmentationOutput: """ Generate synthetic data using Shuffle strategy. Features are randomly shuffled to generate novel samples. :param X: Input data tensor of shape (n_data, n_timesteps, n_features). :type X: TensorLike :param y: Optional labels tensor. If provided, labels will also be returned :type y: Optional[TensorLike] :param n_samples: Number of augmented samples to generate. Default is 1. :type n_samples: int :return: Augmented data tensor of shape (n_samples, n_timesteps, n_features) and optionally augmented labels if 'y' is provided. :rtype: Union[TensorLike, Tuple[TensorLike, TensorLike]] """ seeds_idx = self._get_seeds(total_num=X.shape[0], n_seeds=n_samples) n_features = X.shape[2] n_repeats = self._n_repeats(n_samples, total_num=len(X)) shuffle_ids = [ np.random.choice(np.arange(n_features), n_features, replace=False) for _ in range(n_repeats) ] synthetic_data = [] has_labels = y is not None if has_labels: new_labels = [] for num, i in enumerate(seeds_idx): sequence = X[i] id_repeat = self._n_repeats(num + 1, total_num=len(X)) synthetic_data.append(sequence[:, shuffle_ids[id_repeat - 1]]) if has_labels: new_labels.append(y[i]) if has_labels: return np.array(synthetic_data), np.array(new_labels) else: return np.array(synthetic_data)
[docs]class MagnitudeWarping(BaseAugmenter): """ Magnitude warping changes the magnitude of each sample by convolving the data window with a smooth curve varying around one https://dl.acm.org/doi/pdf/10.1145/3136755.3136817 """ def __init__(self) -> None: super(MagnitudeWarping, self).__init__(per_feature=False)
[docs] def generate(self, X: TensorLike, y: Optional[TensorLike] = None, n_samples: int = 1, sigma: float = 0.2, n_knots: int = 4) -> AugmentationOutput: """ Generates augmented samples via MagnitudeWarping for (X, y) :param X: Input data tensor of shape (n_data, n_timesteps, n_features). :type X: TensorLike :param y: Optional labels tensor. If provided, labels will also be returned :type y: Optional[TensorLike] :param n_samples: Number of augmented samples to generate. Default is 1. :type n_samples: int :param sigma: Standard deviation for the random warping. Default is 0.2. :type sigma: float :param n_knots: Number of knots used for warping curve. Default is 4. :type n_knots: int :return: Augmented data tensor of shape (n_samples, n_timesteps, n_features) and optionally augmented labels if 'y' is provided. :rtype: Union[TensorLike, Tuple[TensorLike, TensorLike]] """ n_data = X.shape[0] n_timesteps = X.shape[1] n_features = X.shape[2] orig_steps = np.arange(n_timesteps) random_warps = np.random.normal(loc=1.0, scale=sigma, size=(n_samples, n_knots + 2, n_features)) warp_steps = (np.ones( (n_features, 1)) * (np.linspace(0, n_timesteps - 1., num=n_knots + 2))).T result = np.zeros((n_samples, n_timesteps, n_features)) has_labels = y is not None if has_labels: result_y = np.zeros((n_samples, 1)) for i in range(n_samples): random_sample_id = random.randint(0, n_data - 1) warper = np.array( [ scipy.interpolate.CubicSpline( warp_steps[:, dim], random_warps[i, :, dim] )(orig_steps) for dim in range(n_features) ] ).T result[i] = X[random_sample_id] * warper if has_labels: result_y[i] = y[random_sample_id] if has_labels: return result, result_y else: return result
[docs]class WindowWarping(BaseAugmenter): """ https://halshs.archives-ouvertes.fr/halshs-01357973/document """ def __init__(self) -> None: super(WindowWarping, self).__init__(per_feature=False)
[docs] def generate(self, X: TensorLike, y: Optional[TensorLike] = None, window_ratio: float = 0.2, scales: Tuple = (0.25, 1.0), n_samples: int = 1) -> AugmentationOutput: """ Generates augmented samples via MagnitudeWarping for (X, y) :param X: Input data tensor of shape (n_data, n_timesteps, n_features). :type X: TensorLike :param y: Optional labels tensor. If provided, labels will also be returned :type y: Optional[TensorLike] :param window_ratio: The ratio of the window size relative to the total number of timesteps. Default is 0.2. :type window_ratio: float :param scale: A tuple specifying the scale range for warping. Default is (0.25, 1.0). :type scale: tuple :param n_samples: Number of augmented samples to generate. Default is 1. :type n_samples: int :return: Augmented data tensor of shape (n_samples, n_timesteps, n_features) and optionally augmented labels if 'y' is provided. :rtype: Union[TensorLike, Tuple[TensorLike, TensorLike]] """ n_data = X.shape[0] n_timesteps = X.shape[1] n_features = X.shape[2] scales_per_sample = np.random.choice(scales, n_samples) warp_size = max(np.round(window_ratio * n_timesteps).astype(np.int64), 1) result = np.zeros((n_samples, n_timesteps, n_features)) result_y = np.zeros((n_samples, 1)) has_labels = y is not None for i in range(n_samples): window_starts = np.random.randint( low=0, high=n_timesteps - warp_size, size=(n_samples)) window_ends = window_starts + warp_size random_sample_id = random.randint(0, n_data - 1) random_sample = X[random_sample_id] for dim in range(n_features): start_seg = random_sample[:window_starts[i], dim] warp_ts_size = max(round(warp_size * scales_per_sample[i]), 1) window_seg = np.interp( x=np.linspace(0, warp_size - 1, num=warp_ts_size), xp=np.arange(warp_size), fp=random_sample[window_starts[i] : window_ends[i], dim], ) end_seg = random_sample[window_ends[i] :, dim] warped = np.concatenate((start_seg, window_seg, end_seg)) result[i, :, dim] = np.interp( np.arange(n_timesteps), np.linspace(0, n_timesteps - 1.0, num=warped.size), warped, ).T if has_labels: result_y[i] = y[random_sample_id] if has_labels: return result, result_y else: return result
[docs]class DTWBarycentricAveraging(BaseAugmenter): """ DTW Barycenter Averaging (DBA) [1] method estimated through Expectation-Maximization algorithm [2] as in https://github.com/tslearn-team/tslearn/ ---------- References ---------- .. [1] F. Petitjean, A. Ketterlin & P. Gancarski. A global averaging method for dynamic time warping, with applications to clustering. Pattern Recognition, Elsevier, 2011, Vol. 44, Num. 3, pp. 678-693 .. [2] D. Schultz and B. Jain. Nonsmooth Analysis and Subgradient Methods for Averaging in Dynamic Time Warping Spaces. Pattern Recognition, 74, 340-358. """ def __init__(self): super(DTWBarycentricAveraging, self).__init__(per_feature=False)
[docs] def generate( self, X: TensorLike, y: Optional[TensorLike] = None, n_samples: int = 1, num_initial_samples: Optional[int] = None, initial_timeseries: Optional[List[TensorLike]] = None, initial_labels: Optional[List[int]] = None, **kwargs, ) -> AugmentationOutput: """ Parameters ---------- X : TensorLike, the timeseries dataset y : TensorLike or None, the classes n_samples : int, number of samples to generate (per class, if y is given) num_initial_samples : int or None (default: None) The number of timeseries to draw (per class) from the dataset before computing DTW_BA. If None, use the entire set (per class). initial_timeseries : array or None (default: None) Initial timesteries to start from for the optimization process, with shape (original_size, d). In case y is given, the shape of initial_timeseries is assumed to be (n_classes, original_size, d) initial_labels: array or None (default: None) Labels for samples from `initial_timeseries` Returns ------- np.array of shape (n_samples, original_size, d) if y is None or (n_classes * n_samples, original_size, d), and np.array of labels (or None) """ assert initial_timeseries is None or len(initial_timeseries) == n_samples has_labels = y is not None if isinstance(X, list): X = np.asarray(X) if isinstance(y, list): y = np.asarray(y) random_samples = random.choices(range(X.shape[0]), k=n_samples) if initial_timeseries is None: initial_timeseries = X[random_samples] if has_labels: if initial_labels is None: initial_labels = y[random_samples] y_new = [] X_new = [] unique_labels = np.unique(initial_labels) for i, label in enumerate(unique_labels): logger.debug(f"DTWBA Class {label}...") cur_initial_timeseries = initial_timeseries[np.ravel(initial_labels) == label] n_samples_per_label = len(cur_initial_timeseries) X_class = X[np.ravel(y) == label] y_new += [label] * n_samples_per_label X_new.append( self._dtwba( X_subset=X_class, n_samples=n_samples_per_label, num_initial_samples=num_initial_samples, initial_timeseries=cur_initial_timeseries, **kwargs, ) ) return np.concatenate(X_new), np.array(y_new).reshape(-1, 1) else: return self._dtwba( X_subset=X, n_samples=n_samples, num_initial_samples=num_initial_samples, initial_timeseries=initial_timeseries, **kwargs, )
def _dtwba( self, X_subset: TensorLike, n_samples: int, num_initial_samples: Optional[int], initial_timeseries: Optional[TensorLike], **kwargs, ) -> npt.NDArray: samples = [] for i, st in enumerate(initial_timeseries): samples.append( dtw_barycenter.dba( s=X_subset, c=st, nb_initial_samples=num_initial_samples, # TODO: use_c=True, **kwargs, ) ) return np.array(samples)