import abc
import antropy
import typing as T
import logging
import numpy as np
import itertools
import sklearn
import scipy
from sklearn.metrics import precision_score
from scipy.stats import entropy
from tqdm import tqdm
from scipy.spatial.distance import pdist, squareform
from tensorflow.python.types.core import TensorLike
import tsgm
logger = logging.getLogger('utils')
logger.setLevel(logging.DEBUG)
DEFAULT_SPLIT_STRATEGY = sklearn.model_selection.KFold(
n_splits=3, random_state=42, shuffle=True)
def _dataset_or_tensor_to_tensor(D1: tsgm.dataset.DatasetOrTensor) -> tsgm.types.Tensor:
if isinstance(D1, tsgm.dataset.Dataset):
return D1.X
else:
return D1
[docs]class Metric(abc.ABC):
@abc.abstractmethod
def __call__(self, *args, **kwargs) -> float:
pass
[docs]class DistanceMetric(Metric):
"""
Metric that measures similarity between synthetic and real time series
"""
def __init__(self, statistics: list, discrepancy: T.Callable) -> None:
"""
:param statistics: A list of summary statistics (callable)
:type statistics: list
:param discrepancy: Discrepancy function, measures the distance between the vectors of summary statistics.
:type discrepancy: typing.Callable
"""
self._statistics = statistics
self._discrepancy = discrepancy
[docs] def stats(self, X: tsgm.types.Tensor) -> tsgm.types.Tensor:
"""
:param X: A time series dataset.
:type X: tsgm.types.Tensor.
:returns: a tensor with calculated summary statistics.
"""
return np.array(list(itertools.chain.from_iterable(s(X) for s in self._statistics))) if X is not None else None
[docs] def discrepancy(self, stats1: tsgm.types.Tensor, stats2: tsgm.types.Tensor) -> float:
"""
:param stats1: A vector of summary statistics.
:type stats1: tsgm.types.Tensor.
:param stats2: A vector of summary statistics.
:type stats2: tsgm.types.Tensor.
:returns: the distance between two vectors calculated by self._discrepancy.
"""
return self._discrepancy(stats1, stats2)
[docs] def __call__(self, D1: tsgm.dataset.DatasetOrTensor, D2: tsgm.dataset.DatasetOrTensor) -> float:
"""
:param D1: A time series dataset.
:type D1: tsgm.dataset.DatasetOrTensor.
:param D2: A time series dataset.
:type D2: tsgm.dataset.DatasetOrTensor.
:returns: similarity metric between D1 & D2.
"""
if isinstance(D1, tsgm.dataset.Dataset) and isinstance(D2, tsgm.dataset.Dataset):
X1, X2 = D1.Xy_concat, D2.Xy_concat
else:
X1, X2 = D1, D2
stats1, stats2 = self.stats(X1), self.stats(X2)
return self.discrepancy(stats1, stats2)
[docs]class ConsistencyMetric(Metric):
"""
Predictive consistency metric measures whether a set of evaluators yield consistent results on real and synthetic data.
"""
def __init__(self, evaluators: T.List) -> None:
"""
:param evaluators: A list of evaluators (each item should implement method `.evaluate(D)`)
:type evaluators: list
"""
self._evaluators = evaluators
def _apply_models(self, D: tsgm.dataset.DatasetOrTensor, D_test: tsgm.dataset.DatasetOrTensor) -> T.List:
return [e.evaluate(D, D_test) for e in self._evaluators]
[docs] def __call__(self, D1: tsgm.dataset.DatasetOrTensor, D2: tsgm.dataset.DatasetOrTensor, D_test: tsgm.dataset.DatasetOrTensor) -> float:
"""
:param D1: A time series dataset.
:type D1: tsgm.dataset.DatasetOrTensor.
:param D2: A time series dataset.
:type D2: tsgm.dataset.DatasetOrTensor.
:returns: consistency metric between D1 & D2.
"""
evaluations1 = self._apply_models(D1, D_test)
evaluations2 = self._apply_models(D2, D_test)
consistencies_cnt = 0
n_evals = len(evaluations1)
for i1 in tqdm(range(n_evals)):
for i2 in range(i1 + 1, n_evals):
if evaluations1[i1] > evaluations1[i2] and evaluations2[i1] > evaluations2[i2] or \
evaluations1[i1] < evaluations1[i2] and evaluations2[i1] < evaluations2[i2] or \
evaluations1[i1] == evaluations1[i2] and evaluations2[i1] == evaluations2[i2]:
consistencies_cnt += 1
total_pairs = n_evals * (n_evals - 1) / 2.0
return consistencies_cnt / total_pairs
[docs]class BaseDownstreamEvaluator(abc.ABC):
[docs] def evaluate(self, *args, **kwargs):
pass
[docs]class PrivacyMembershipInferenceMetric(Metric):
"""
The metric measures the possibility of membership inference attacks.
"""
def __init__(self, attacker: T.Any, metric: T.Optional[T.Callable] = None) -> None:
"""
:param attacker: An attacker, one class classififier (OCC) that implements methods `.fit` and `.predict`
:type attacker: typing.Any
:param metric: Measures quality of attacker (precision by default)
:type attacker: typing.Callable
"""
self._attacker = attacker
self._metric = metric or sklearn.metrics.precision_score
[docs] def __call__(self, d_tr: tsgm.dataset.Dataset, d_syn: tsgm.dataset.Dataset, d_test: tsgm.dataset.Dataset) -> float:
"""
:param d_tr: Training dataset (the dataset that was used to produce `d_dyn`).
:type d_tr: tsgm.dataset.DatasetOrTensor.
:param d_syn: Training dataset (the dataset that was used to produce `d_dyn`).
:type d_syn: tsgm.dataset.DatasetOrTensor.
:param d_test: Training dataset (the dataset that was used to produce `d_dyn`).
:type d_test: tsgm.dataset.DatasetOrTensor.
:returns: how well the attacker can distinguish `d_tr` & `d_test` when it is trained on `d_syn`.
"""
self._attacker.fit(d_syn.Xy_concat)
labels = self._attacker.predict((d_tr + d_test).Xy_concat)
correct_labels = [1] * len(d_tr) + [-1] * len(d_test)
return 1 - self._metric(labels, correct_labels)
[docs]class MMDMetric(Metric):
"""
This metric calculated MMD between real and synthetic samples
Args:
d (tsgm.dataset.DatasetOrTensor): The input dataset or tensor.
Returns:
float: The computed spectral entropy.
Example:
>>> metric = MMDMetric(kernel)
>>> dataset, synth_dataset = tsgm.dataset.Dataset(...), tsgm.dataset.Dataset(...)
>>> result = metric(dataset)
>>> print(result)
"""
def __init__(self, kernel: T.Callable = tsgm.utils.mmd.exp_quad_kernel) -> None:
self.kernel = kernel
def __call__(self, D1: tsgm.dataset.DatasetOrTensor, D2: tsgm.dataset.DatasetOrTensor) -> float:
if isinstance(D1, tsgm.dataset.Dataset) and D1.y is not None or isinstance(D2, tsgm.dataset.Dataset) and D2.y is not None:
logger.warning("It is currently impossible to run MMD for labeled time series. Labels will be ignored!")
X1, X2 = _dataset_or_tensor_to_tensor(D1), _dataset_or_tensor_to_tensor(D2)
return tsgm.utils.mmd.MMD(X1, X2, kernel=self.kernel)
[docs]class DiscriminativeMetric(Metric):
"""
The DiscriminativeMetric measures the discriminative performance of a model in distinguishing
between synthetic and real datasets.
This metric evaluates a discriminative model by training it on a combination of synthetic
and real datasets and assessing its performance on a test set.
:param d_hist: Real dataset.
:type d_hist: tsgm.dataset.DatasetOrTensor
:param d_syn: Synthetic dataset.
:type d_syn: tsgm.dataset.DatasetOrTensor
:param model: Discriminative model to be evaluated.
:type model: T.Callable
:param test_size: Proportion of the dataset to include in the test split
or the absolute number of test samples.
:type test_size: T.Union[float, int]
:param n_epochs: Number of training epochs for the model.
:type n_epochs: int
:param metric: Optional evaluation metric to use (default: accuracy).
:type metric: T.Optional[T.Callable]
:param random_seed: Optional random seed for reproducibility.
:type random_seed: T.Optional[int]
:return: Discriminative performance metric.
:rtype: float
Example:
--------
>>> from my_module import DiscriminativeMetric, MyDiscriminativeModel
>>> import tsgm.dataset
>>> import numpy as np
>>> import sklearn
>>>
>>> # Create real and synthetic datasets
>>> real_dataset = tsgm.dataset.Dataset(...) # Replace ... with appropriate arguments
>>> synthetic_dataset = tsgm.dataset.Dataset(...) # Replace ... with appropriate arguments
>>>
>>> # Create a discriminative model
>>> model = MyDiscriminativeModel() # Replace with the actual discriminative model class
>>>
>>> # Create and use the DiscriminativeMetric
>>> metric = DiscriminativeMetric()
>>> result = metric(real_dataset, synthetic_dataset, model, test_size=0.2, n_epochs=10)
>>> print(result)
"""
def __call__(self, d_hist: tsgm.dataset.DatasetOrTensor, d_syn: tsgm.dataset.DatasetOrTensor, model: T.Callable, test_size: T.Union[float, int], n_epochs: int, metric: T.Optional[T.Callable] = None, random_seed: T.Optional[int] = None) -> float:
X_hist, X_syn = _dataset_or_tensor_to_tensor(d_hist), _dataset_or_tensor_to_tensor(d_syn)
X_all, y_all = np.concatenate([X_hist, X_syn]), np.concatenate([[1] * len(d_hist), [0] * len(d_syn)])
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X_all, y_all, test_size=test_size, random_state=random_seed)
model.fit(X_train, y_train, epochs=n_epochs)
pred = model.predict(X_test)
# check the shape, 1D array or N-D arrary
if len(pred.shape) == 1: # binary classification with sigmoid activation
y_pred = (pred > 0.5).astype(int)
else: # multiple classification with softmax activation
y_pred = np.argmax(pred, axis=-1).astype(int)
if metric is None:
return sklearn.metrics.accuracy_score(y_test, y_pred)
else:
return metric(y_test, y_pred)
def _spectral_entropy_per_feature(X: TensorLike) -> TensorLike:
return antropy.spectral_entropy(X.ravel(), sf=1, method='welch', normalize=True)
def _spectral_entropy_per_sample(X: TensorLike) -> TensorLike:
if len(X.shape) == 1:
X = X[:, None]
return np.apply_along_axis(_spectral_entropy_per_feature, 0, X)
def _spectral_entropy_sum(X: TensorLike) -> TensorLike:
return np.apply_along_axis(_spectral_entropy_per_sample, 1, X)
[docs]class EntropyMetric(Metric):
"""
Calculates the spectral entropy of a dataset or tensor as a sum of individual entropies.
Args:
d (tsgm.dataset.DatasetOrTensor): The input dataset or tensor.
Returns:
float: The computed spectral entropy.
Example:
>>> metric = EntropyMetric()
>>> dataset = tsgm.dataset.Dataset(...)
>>> result = metric(dataset)
>>> print(result)
"""
[docs] def __call__(self, d: tsgm.dataset.DatasetOrTensor) -> float:
"""
Calculate the spectral entropy of the input dataset or tensor.
Args:
d (tsgm.dataset.DatasetOrTensor): The input dataset or tensor.
Returns:
float: The computed spectral entropy.
"""
X = _dataset_or_tensor_to_tensor(d)
return np.sum(_spectral_entropy_sum(X), axis=None)
[docs]class ShannonEntropyMetric(Metric):
"""
Shannon Entropy calculated over the labels of a dataset.
This index is a measure of diversity that accounts for categories present in a dataset.
"""
[docs] def _shannon_entropy(self, labels):
"""
Private method to calculate the Shannon Entropy for a given set of labels.
Parameters:
labels (array-like): The labels or categories for which the diversity measure is to be calculated.
Returns:
float: The Shannon Entropy value.
"""
_, counts = np.unique(labels, return_counts=True)
return entropy(counts)
[docs] def __call__(self, d: tsgm.dataset.DatasetOrTensor) -> float:
"""
Calculate the Shannon entropy for the dataset.
Parameters:
d (tsgm.dataset.DatasetOrTensor): The dataset or tensor object containing the labels.
Returns:
float: The Shannon entropy value.
Raises:
AssertionError: If the dataset does not contain labels.
"""
y = d.y
assert y is not None, "The dataset must contain labels."
return self._shannon_entropy(y)
[docs]class PairwiseDistanceMetric(Metric):
"""
Measures pairwise distances in a set of time series.
"""
[docs] def pairwise_euclidean_distances(self, ts: TensorLike) -> TensorLike:
"""
Computes the pairwise Euclidean distances for a set of time series.
Parameters:
ts (numpy.ndarray): A 2D array where each row represents a time series.
Returns:
numpy.ndarray: A 2D array representing the pairwise Euclidean distance matrix.
"""
distances = pdist(np.reshape(ts, (ts.shape[0], -1)), metric='euclidean')
return squareform(distances)
[docs] def __call__(self, d: tsgm.dataset.DatasetOrTensor) -> TensorLike:
"""
Calculates the pairwise Euclidean distances for a dataset or tensor.
Parameters:
d (tsgm.dataset.DatasetOrTensor): The input dataset or tensor containing time series data.
Returns:
float: The pairwise Euclidean distances of the input data.
"""
X = _dataset_or_tensor_to_tensor(d)
return self.pairwise_euclidean_distances(X)
[docs]class DemographicParityMetric(Metric):
"""
Measuring demographic parity between two datasets.
This metric assesses the difference in the distributions of a target variable among different groups in two datasets.
By default, it uses the Kolmogorov-Smirnov statistic to quantify the maximum vertical deviation between the cumulative distribution functions
of the target variable for the historical and synthetic data within each group.
Args:
d_hist (tsgm.dataset.DatasetOrTensor): The historical input dataset or tensor.
groups_hist (TensorLike): The group assignments for the historical data.
d_synth (tsgm.dataset.DatasetOrTensor): The synthetic input dataset or tensor.
groups_synth (TensorLike): The group assignments for the synthetic data.
metric (callable, optional): The metric used to compare the target variable distributions within each group.
Default is the Kolmogorov-Smirnov statistic.
Returns:
dict: A dictionary mapping each group to the computed demographic parity metric.
Example:
>>> metric = DemographicParityMetric()
>>> dataset_hist = tsgm.dataset.Dataset(...)
>>> dataset_synth = tsgm.dataset.Dataset(...)
>>> groups_hist = [0, 1, 0, 1, 1, 0]
>>> groups_synth = [1, 1, 0, 0, 0, 1]
>>> result = metric(dataset_hist, groups_hist, dataset_synth, groups_synth)
>>> print(result)
"""
_DEFAULT_KS_METRIC = lambda data1, data2: scipy.stats.ks_2samp(data1, data2).statistic # noqa: E731
[docs] def __call__(self, d_hist: tsgm.dataset.DatasetOrTensor, groups_hist: TensorLike, d_synth: tsgm.dataset.DatasetOrTensor, groups_synth: TensorLike, metric: T.Callable = _DEFAULT_KS_METRIC) -> T.Dict:
"""
Calculate the demographic parity metric for the input datasets.
Args:
d_hist (tsgm.dataset.DatasetOrTensor): The historical input dataset or tensor.
groups_hist (TensorLike): The group assignments for the historical data.
d_synth (tsgm.dataset.DatasetOrTensor): The synthetic input dataset or tensor.
groups_synth (TensorLike): The group assignments for the synthetic data.
metric (callable, optional): The metric used to compare the target variable distributions within each group.
Default is the Kolmogorov-Smirnov statistic.
Returns:
dict: A dictionary mapping each group to the computed demographic parity metric.
"""
y_hist, y_synth = d_hist.y, d_synth.y
unique_groups_hist, unique_groups_synth = set(groups_hist), set(groups_synth)
all_groups = unique_groups_hist | unique_groups_synth
if len(all_groups) > len(unique_groups_hist) or len(all_groups) > len(unique_groups_synth):
logger.warning("Groups in historical and synthetic data are not entirely identical.")
result = {}
for g in all_groups:
y_g_hist, y_g_synth = y_hist[groups_hist == g], y_synth[groups_synth == g]
if not len(y_g_synth):
result[g] = np.inf
elif not len(y_g_hist):
result[g] = -np.inf
else:
result[g] = metric(y_g_hist, y_g_synth)
return result
[docs]class PredictiveParityMetric:
"""
Measuring predictive parity between two datasets.
This metric assesses the discrepancy in the predictive performance of a
model among different groups in two datasets.
By default, it uses precision to quantify the predictive performance of the model within each group.
Args:
y_true_hist (TensorLike): The true target values for the historical data.
y_pred_hist (TensorLike): The predicted target values for the historical data.
groups_hist (TensorLike): The group assignments for the historical data.
y_true_synth (TensorLike): The true target values for the synthetic data.
y_pred_synth (TensorLike): The predicted target values for the synthetic data.
groups_synth (TensorLike): The group assignments for the synthetic data.
metric (callable, optional): The metric used to compare the predictive performance within each group.
Default is precision score.
Returns:
dict: A dictionary mapping each group to the computed predictive parity metric.
Example:
>>> metric = PredictiveParityMetric()
>>> y_true_hist = [0, 1, 0, 1, 1, 0]
>>> y_pred_hist = [0, 1, 0, 0, 1, 1]
>>> groups_hist = [0, 1, 0, 1, 1, 0]
>>> y_true_synth = [1, 0, 1, 0, 0, 1]
>>> y_pred_synth = [1, 0, 1, 1, 0, 0]
>>> groups_synth = [1, 1, 0, 0, 0, 1]
>>> result = metric(y_true_hist, y_pred_hist, groups_hist, y_true_synth, y_pred_synth, groups_synth)
>>> print(result)
"""
# using precision score by default
_DEFAULT_METRIC = lambda y_true, y_pred: precision_score(y_true, y_pred, average='binary') # noqa: E731
[docs] def __call__(self,
y_true_hist: TensorLike, y_pred_hist: TensorLike, groups_hist: TensorLike,
y_true_synth: TensorLike, y_pred_synth: TensorLike, groups_synth: TensorLike,
metric: T.Callable = _DEFAULT_METRIC) -> T.Dict[int, float]:
"""
Calculate the predictive parity metric for the input datasets.
Args:
y_true_hist (TensorLike): The true target values for the historical data.
y_pred_hist (TensorLike): The predicted target values for the historical data.
groups_hist (TensorLike): The group assignments for the historical data.
y_true_synth (TensorLike): The true target values for the synthetic data.
y_pred_synth (TensorLike): The predicted target values for the synthetic data.
groups_synth (TensorLike): The group assignments for the synthetic data.
metric (callable, optional): The metric used to compare the predictive performance within each group.
Default is precision score.
Returns:
dict: A dictionary mapping each group to the computed predictive parity metric.
"""
assert len(y_true_hist) == len(y_pred_hist) == len(groups_hist) == len(y_true_synth) == len(y_pred_synth) == len(groups_synth)
unique_groups_hist, unique_groups_synth = set(groups_hist), set(groups_synth)
all_groups = unique_groups_hist | unique_groups_synth
if len(all_groups) - len(unique_groups_hist) - len(unique_groups_synth) != 0:
logger.warn("Groups in historical and synthetic data are not entirely identical.")
result = {}
for g in all_groups:
y_true_g_hist, y_pred_g_hist = y_true_hist[groups_hist == g], y_pred_hist[groups_hist == g]
y_true_g_synth, y_pred_g_synth = y_true_synth[groups_synth == g], y_pred_synth[groups_synth == g]
if not len(y_true_g_synth) or not len(y_pred_g_synth):
result[g] = np.inf
elif not len(y_true_g_hist) or not len(y_pred_g_hist):
result[g] = -np.inf
else:
metric_hist = metric(y_true_g_hist, y_pred_g_hist)
metric_synth = metric(y_true_g_synth, y_pred_g_synth)
result[g] = metric_hist - metric_synth # Difference in metric scores between historical and synthetic data
return result