import abc
import copy
import sklearn
from scipy import integrate
from tqdm import tqdm
import typing as T
import numpy as np
import tensorflow_probability as tfp
from tensorflow.python.types.core import TensorLike
import tsgm
[docs]class BaseSimulator(abc.ABC):
"""
Abstract base class for simulators. This class defines the interface for simulators.
Methods
-------
generate(num_samples: int, *args) -> tsgm.dataset.Dataset
Generate a dataset with the specified number of samples.
dump(path: str, format: str = "csv") -> None
Save the generated dataset to a file in the specified format.
"""
@abc.abstractmethod
[docs] def generate(self, num_samples: int, *args) -> tsgm.dataset.Dataset:
"""
Abstract method to generate a dataset.
Parameters
----------
num_samples : int
Number of samples to generate.
*args
Additional arguments to be passed to the method.
Returns
-------
tsgm.dataset.Dataset
The generated dataset.
"""
pass
@abc.abstractmethod
[docs] def dump(self, path: str, format: str = "csv") -> None:
"""
Abstract method to save the generated dataset to a file.
Parameters
----------
path : str
The file path where the dataset will be saved.
format : str, optional
The format in which to save the dataset, by default "csv".
"""
pass
[docs]class Simulator(BaseSimulator):
"""
Concrete class for a basic simulator. This class implements the basic methods for fitting a model and
generating a dataset, but does not implement the generation and dump methods.
Attributes
----------
_data : tsgm.dataset.DatasetProperties
Properties of the dataset to be used by the simulator.
_driver : Optional[tsgm.types.Model]
The model to be used for generating data.
"""
def __init__(self, data: tsgm.dataset.DatasetProperties, driver: T.Optional[tsgm.types.Model] = None):
"""
Initialize the Simulator with dataset properties and an optional model.
Parameters
----------
data : tsgm.dataset.DatasetProperties
Properties of the dataset to be used.
driver : Optional[tsgm.types.Model], optional
The model to be used for generating data, by default None.
"""
self._data = data
self._driver = driver
[docs] def fit(self, **kwargs) -> None:
"""
Fit the model using the dataset properties.
Parameters
----------
**kwargs
Additional keyword arguments to pass to the model's fit method.
"""
if self._data.y is not None:
self._driver.fit(self._data.X, self._data.y, **kwargs)
else:
self._driver.fit(self._data.X, **kwargs)
[docs] def generate(self, num_samples: int, *args) -> TensorLike:
"""
Method to generate a dataset. Not implemented in this class.
Parameters
----------
num_samples : int
Number of samples to generate.
*args
Additional arguments to be passed to the method.
Returns
-------
TensorLike
The generated dataset.
Raises
------
NotImplementedError
This method is not implemented in this class.
"""
raise NotImplementedError
[docs] def dump(self, path: str, format: str = "csv") -> None:
"""
Method to save the generated dataset to a file. Not implemented in this class.
Parameters
----------
path : str
The file path where the dataset will be saved.
format : str, optional
The format in which to save the dataset, by default "csv".
Raises
------
NotImplementedError
This method is not implemented in this class.
"""
raise NotImplementedError
[docs] def clone(self) -> "Simulator":
"""
Create a deep copy of the simulator.
Returns
-------
Simulator
A deep copy of the current simulator instance.
"""
return Simulator(copy.deepcopy(self._data))
[docs]class ModelBasedSimulator(Simulator):
"""
A simulator that is based on a model. This class extends the Simulator class and provides additional
methods for handling model parameters.
Methods
-------
params() -> T.Dict[str, T.Any]
Get a dictionary of the simulator's parameters.
set_params(params: T.Dict[str, T.Any]) -> None
Set the simulator's parameters from a dictionary.
generate(num_samples: int, *args) -> None
Generate a dataset with the specified number of samples.
"""
def __init__(self, data: tsgm.dataset.DatasetProperties):
"""
Initialize the ModelBasedSimulator with dataset properties.
Parameters
----------
data : tsgm.dataset.DatasetProperties
Properties of the dataset to be used.
"""
super().__init__(data)
[docs] def params(self) -> T.Dict[str, T.Any]:
"""
Get a dictionary of the simulator's parameters.
Returns
-------
dict
A dictionary containing the simulator's parameters.
"""
params = copy.deepcopy(self.__dict__)
if "_data" in params:
del params["_data"]
if "_driver" in params:
del params["_driver"]
return params
[docs] def set_params(self, params: T.Dict[str, T.Any]) -> None:
"""
Set the simulator's parameters from a dictionary.
Parameters
----------
params : dict
A dictionary containing the parameters to set.
"""
for param_name, param_value in params.items():
self.__dict__[param_name] = param_value
@abc.abstractmethod
[docs] def generate(self, num_samples: int, *args) -> None:
"""
Abstract method to generate a dataset. Must be implemented by subclasses.
Parameters
----------
num_samples : int
Number of samples to generate.
*args
Additional arguments to be passed to the method.
Raises
------
NotImplementedError
This method is not implemented in this class and must be overridden by subclasses.
"""
raise NotImplementedError
[docs]class NNSimulator(Simulator):
[docs] def clone(self) -> "NNSimulator":
return NNSimulator(copy.deepcopy(self._data), self._driver.clone())
[docs]class SineConstSimulator(ModelBasedSimulator):
"""
Sine and Constant Function Simulator class that extends the ModelBasedSimulator base class.
Attributes:
_scale: TensorFlow probability distribution for scaling factor.
_const: TensorFlow probability distribution for constant.
_shift: TensorFlow probability distribution for shift.
Methods:
__init__(data, max_scale=10.0, max_const=5.0): Initializes the simulator with dataset properties and optional parameters.
set_params(max_scale, max_const, *args, **kwargs): Sets the parameters for scale, constant, and shift distributions.
generate(num_samples, *args) -> tsgm.dataset.Dataset: Generates a dataset based on sine and constant functions.
clone() -> SineConstSimulator: Creates and returns a deep copy of the current simulator.
"""
def __init__(self, data: tsgm.dataset.DatasetProperties, max_scale: float = 10.0, max_const: float = 5.0) -> None:
"""
Initializes the SineConstSimulator with dataset properties and optional maximum scale and constant values.
Args:
data (tsgm.dataset.DatasetProperties): Dataset properties for the simulator.
max_scale (float, optional): Maximum value for the scale parameter. Defaults to 10.0.
max_const (float, optional): Maximum value for the constant parameter. Defaults to 5.0.
"""
super().__init__(data)
self.set_params(max_scale, max_const)
[docs] def set_params(self, max_scale: float, max_const: float, *args, **kwargs):
"""
Sets the parameters for scale, constant, and shift distributions.
Args:
max_scale (float): Maximum value for the scale parameter.
max_const (float): Maximum value for the constant parameter.
"""
self._scale = tfp.distributions.Uniform(0, max_scale)
self._const = tfp.distributions.Uniform(0, max_const)
self._shift = tfp.distributions.Uniform(0, 2)
super().set_params({"max_scale": max_scale, "max_const": max_const})
[docs] def generate(self, num_samples: int, *args) -> tsgm.dataset.Dataset:
"""
Generates a dataset based on sine and constant functions.
Args:
num_samples (int): Number of samples to generate.
Returns:
tsgm.dataset.Dataset: A dataset containing generated samples.
"""
result_X, result_y = [], []
for i in range(num_samples):
scales = self._scale.sample(self._data.D)
consts = self._const.sample(self._data.D)
shifts = self._shift.sample(self._data.D)
if np.random.random() < 0.5:
times = np.repeat(np.arange(0, self._data.T, 1)[:, None], self._data.D, axis=1) / 10
result_X.append(np.sin(times + shifts) * scales)
result_y.append(0)
else:
result_X.append(np.tile(consts, (self._data.T, 1)))
result_y.append(1)
return tsgm.dataset.Dataset(x=np.array(result_X), y=np.array(result_y))
[docs] def clone(self) -> "SineConstSimulator":
"""
Creates a deep copy of the current SineConstSimulator instance.
Returns:
SineConstSimulator: A new instance of SineConstSimulator with copied data and parameters.
"""
copy_simulator = SineConstSimulator(self._data)
params = self.params()
copy_simulator.set_params(max_scale=params["max_scale"], max_const=params["max_const"])
return copy_simulator
[docs]class PredictiveMaintenanceSimulator(ModelBasedSimulator):
"""
Predictive Maintenance Simulator class that extends the ModelBasedSimulator base class.
The simulator is based on https://github.com/AaltoPML/human-in-the-loop-predictive-maintenance
From publication:
Nikitin, Alexander, and Samuel Kaski. "Human-in-the-loop large-scale predictive maintenance of
workstations." Proceedings of the 28th ACM SIGKDD Conference on Knowledge Discovery and Data Mining. 2022.
Attributes:
CAT_FEATURES (list): List of categorical feature indices.
encoders (dict): Dictionary of OneHotEncoders for categorical features.
Methods:
__init__(data): Initializes the simulator with dataset properties and sets encoders.
S(lmbd, t): Calculates the survival curve.
R(rho, lmbd, t): Calculates the recovery curve parameter.
set_params(**kwargs): Sets the parameters for the simulator.
mixture_function(a, x): Calculates the mixture function.
sample_equipment(num_samples): Samples equipment data and generates the dataset.
generate(num_samples): Generates the predictive maintenance dataset.
clone() -> PredictiveMaintenanceSimulator: Creates and returns a deep copy of the current simulator.
"""
# categorical features
[docs] CAT_FEATURES = [0, 1, 2, 3, 4, 5, 6, 7]
def __init__(self, data: tsgm.dataset.DatasetProperties) -> None:
"""
Initializes the PredictiveMaintenanceSimulator with dataset properties and sets encoders for categorical features.
Args:
data (tsgm.dataset.DatasetProperties): Dataset properties for the simulator.
"""
self._data = data
self.encoders = {d: sklearn.preprocessing.OneHotEncoder() for d in self.CAT_FEATURES}
for d in self.CAT_FEATURES:
self.encoders[d].fit([[d], [d + 2], [d + 4], [d + 1], [d + 3], [d + 5], [d + 7]])
self.set_params()
[docs] def S(self, lmbd, t):
"""
Calculates the survival curve.
Args:
lmbd: Lambda parameter for the exponential distribution.
t: Time variable.
Returns:
float: Survival probability at time t.
"""
return np.exp(-lmbd * t)
[docs] def R(self, rho, lmbd, t):
"""
Calculates the recovery curve parameter.
Args:
rho: Rho parameter for the recovery function.
lmbd: Lambda parameter for the exponential distribution.
t: Time variable.
Returns:
float: Recovery curve parameter at time t.
"""
s_ = self.S(lmbd, t)
return (1 - s_) - rho
[docs] def set_params(self, **kwargs):
"""
Sets the parameters for the simulator.
Args:
**kwargs: Arbitrary keyword arguments for setting simulator parameters.
"""
if "switches" in kwargs:
self._switches = kwargs["switches"]
else:
self._switches = {d: np.random.gamma(4, 2) for d in range(self._data.D)}
if "m_norms" in kwargs:
self._m_norms = kwargs["m_norms"]
else:
self._m_norms = {d: lambda: np.random.gamma(2, 1) for d in range(self._data.D)}
if "sigma_norms" in kwargs:
self._sigma_norms = kwargs["sigma_norms"]
else:
self._sigma_norms = {d: lambda: np.random.gamma(1, 1) for d in range(self._data.D)}
super().set_params({
"switches": self._switches,
"m_norms": self._m_norms,
"sigma_norms": self._sigma_norms
})
[docs] def mixture_function(self, a, x):
"""
Calculates the mixture function.
Args:
a: Mixture parameter.
x: Input variable.
Returns:
float: Mixture function value.
"""
return (a**x - 1) / (a - 1)
[docs] def sample_equipment(self, num_samples):
"""
Samples equipment data and generates the dataset.
Args:
num_samples (int): Number of samples to generate.
Returns:
tuple: A tuple containing the dataset and equipment information.
"""
equipment, dataset = [], []
for _ in tqdm(range(num_samples)):
last_norm_tmp = 0
lmbd = np.random.gamma(1, 0.005)
rho = np.random.gamma(1, 0.1)
equipment.append({
"lambda": lmbd,
"rho": rho
})
current_measurements = []
ss = []
fix_tmps = []
rnd = np.random.uniform(0, 1)
for t in range(self._data.T):
measurements = []
s_ = self.S(lmbd, t - last_norm_tmp)
r_ = self.R(rho, lmbd, t - last_norm_tmp)
ss.append(s_)
if rnd < r_:
rnd = np.random.uniform(0, 1)
last_norm_tmp = t
fix_tmps.append(t)
for d in range(self._data.D):
m_norm = self._m_norms[d]()
sigma_norm = self._sigma_norms[d]()
m_abnorm = m_norm + self._switches[d]
sigma_abnorm = 1.5 * sigma_norm
if d in self.CAT_FEATURES:
norm_functioning = np.random.choice([d, d + 2, d + 4], p=[0.7, 0.2, 0.1])
abnorm_functioning = np.random.choice([d + 1, d + 3, d + 5, d + 7], p=[0.2, 0.2, 0.4, 0.2])
else:
norm_functioning = np.random.normal(m_norm, sigma_norm)
abnorm_functioning = np.random.normal(m_abnorm, sigma_abnorm)
mixt = self.mixture_function(3, s_)
if d in self.CAT_FEATURES:
if rnd < 1 - s_:
measurements.extend(self.encoders[d].transform([[abnorm_functioning]]).toarray()[0])
else:
measurements.extend(self.encoders[d].transform([[norm_functioning]]).toarray()[0])
else:
measurements.extend([mixt * norm_functioning + (1 - mixt) * abnorm_functioning])
if not len(current_measurements):
current_measurements.append([measurements])
current_measurements = np.array(current_measurements[0])
else:
current_measurements = np.concatenate((current_measurements, np.array(measurements)[np.newaxis, :]), axis=0)
equipment[-1]["fixes"] = fix_tmps
equipment[-1]["ss"] = ss
dataset.append(current_measurements)
dataset = np.transpose(np.array(dataset), [0, 2, 1])
return dataset, equipment
[docs] def generate(self, num_samples: int):
"""
Samples equipment data and generates the dataset.
Args:
num_samples (int): Number of samples to generate.
Returns:
tuple: A tuple containing the dataset and equipment information.
"""
return self.sample_equipment(num_samples)
[docs] def clone(self) -> "PredictiveMaintenanceSimulator":
"""
Creates a deep copy of the current PredictiveMaintenanceSimulator instance.
Returns:
PredictiveMaintenanceSimulator: A new instance of PredictiveMaintenanceSimulator with copied data and parameters.
"""
copy_simulator = PredictiveMaintenanceSimulator(self._data)
params = self.params()
copy_simulator.set_params(
switches=params["switches"],
m_norms=params["m_norms"],
sigma_norms=params["sigma_norms"])
return copy_simulator
def _lv_derivative(X, t, alpha, beta, delta, gamma):
x, y = X
dotx = x * (alpha - beta * y)
doty = y * (-delta + gamma * x)
return np.array([dotx, doty])
[docs]class LotkaVolterraSimulator(ModelBasedSimulator):
"""
Simulates the Lotka-Volterra equations, which model the dynamics of biological systems in which two species interact,
one as a predator and the other as prey.
For the details refer to https://en.wikipedia.org/wiki/Lotka%E2%80%93Volterra_equations
"""
def __init__(
self, data: tsgm.dataset.DatasetProperties,
alpha: float = 1, beta: float = 1, gamma: float = 1, delta: float = 1,
x0: float = 1, y0: float = 1) -> None:
"""
Initializes the Lotka-Volterra simulator with given parameters.
Args:
data (tsgm.dataset.DatasetProperties): The dataset properties.
alpha (float): The maximum prey per capita growth rate. Default is 1.
beta (float): The effect of the presence of predators on the prey death rate. Default is 1.
gamma (float): The predator's per capita death rate. Default is 1.
delta (float): The effect of the presence of prey on the predator's growth rate. Default is 1.
x0 (float): The initial population density of prey. Default is 1.
y0 (float): The initial population density of predator. Default is 1.
"""
self._data = data
self.set_params(
alpha=alpha,
beta=beta,
gamma=gamma,
delta=delta,
x0=x0,
y0=y0
)
[docs] def set_params(self, alpha, beta, gamma, delta, x0, y0, **kwargs):
"""
Sets the parameters for the simulator.
Args:
alpha (float): The maximum prey per capita growth rate.
beta (float): The effect of the presence of predators on the prey death rate.
gamma (float): The predator's per capita death rate.
delta (float): The effect of the presence of prey on the predator's growth rate.
x0 (float): The initial population density of prey.
y0 (float): The initial population density of predator.
**kwargs: Arbitrary keyword arguments for setting simulator parameters.
"""
super().set_params({
"alpha": alpha,
"beta": beta,
"gamma": gamma,
"delta": delta,
"x0": x0,
"y0": y0,
})
[docs] def generate(self, num_samples: int, tmax: float = 1):
"""
Generates the simulation data based on the Lotka-Volterra equations.
Args:
num_samples (int): The number of sample points to generate.
tmax (float): The maximum time value for the simulation. Default is 1.
Returns:
np.ndarray: An array containing the population densities of prey and predators over time.
"""
t = np.linspace(0., tmax, num_samples)
X0 = [self.x0, self.y0]
res = integrate.odeint(_lv_derivative, X0, t, args=(self.alpha, self.beta, self.delta, self.gamma))
return res
[docs] def clone(self) -> "LotkaVolterraSimulator":
"""
Creates a deep copy of the current LotkaVolterraSimulator instance.
Returns:
LotkaVolterraSimulator: A new instance of LotkaVolterraSimulator with copied data and parameters.
"""
copy_simulator = LotkaVolterraSimulator(self._data)
params = self.params()
copy_simulator.set_params(
alpha=params["alpha"],
beta=params["beta"],
gamma=params["gamma"],
delta=params["delta"],
x0=params["x0"],
y0=params["y0"])
return copy_simulator