"""
Base class for feature library classes.
"""
import abc
import warnings
from functools import wraps
from typing import Sequence
import numpy as np
from scipy import sparse
from sklearn import __version__
from sklearn.base import TransformerMixin
from sklearn.utils.validation import check_is_fitted
from ..utils import AxesArray
from ..utils import comprehend_axes
from ..utils import validate_no_reshape
from ..utils import wrap_axes
[docs]class BaseFeatureLibrary(TransformerMixin):
"""
Base class for feature libraries.
Forces subclasses to implement ``fit``, ``transform``,
and ``get_feature_names`` methods.
Parameters
----------
library_ensemble : boolean, optional (default False)
Whether or not to use library bagging (regress on subset of the
candidate terms in the library)
ensemble_indices : integer array, optional (default [0])
The indices to use for ensembling the library.
"""
def __init__(self, library_ensemble=None, ensemble_indices=[0]):
if library_ensemble is not None:
warnings.warn(
"Library ensembling is no longer performed by feature libraries. Use "
"EnsemblingOptimizer to fit an ensemble model.",
DeprecationWarning,
)
self.library_ensemble = library_ensemble
if np.any(np.asarray(ensemble_indices) < 0):
raise ValueError("Library ensemble indices must be 0 or positive integers.")
self.ensemble_indices = ensemble_indices
[docs] def reshape_samples_to_spatial_grid(self, x: np.ndarray) -> AxesArray:
"""Adapt predictions to fitted spatial grid."""
spatial_grid = self.get_spatial_grid()
if spatial_grid is None:
return AxesArray(x, {"ax_sample": 0, "ax_coord": 1})
# PDELibrary can have a zero spatial dimension for SINDyPI.
shape = [dim for dim in self.get_spatial_grid().shape[:-1] if dim != 0]
x = np.reshape(x, (*shape, -1, x.shape[-1]))
return AxesArray(
x,
{
"ax_spatial": list(range(len(shape))),
"ax_sample": len(shape),
"ax_coord": len(shape) + 1,
},
)
[docs] def correct_shape(self, x: AxesArray):
"""Correct the shape of x, given what we know of the problem"""
if len(x.shape) == 1:
data = np.asarray(x).reshape((-1, 1))
return AxesArray(data, {"ax_time": 0, "ax_coord": 1})
elif len(x.shape) > 2 and type(self) is BaseFeatureLibrary:
warnings.warn(
"Data shapes with more than 2 axes are "
"deprecated for the default problem. We assume that time "
"axis comes first, then coordinate axis, then all other "
"axes continue the time axis.",
DeprecationWarning,
)
return x
[docs] def calc_trajectory(self, diff_method, x, t):
axes = x.__dict__
x_dot = diff_method(x, t=t)
return AxesArray(x_dot, axes)
[docs] def get_spatial_grid(self):
return None
# Force subclasses to implement this
[docs] @abc.abstractmethod
def fit(self, x, y=None):
"""
Compute number of output features.
Parameters
----------
x : array-like, shape (n_samples, n_features)
The data.
Returns
-------
self : instance
"""
raise NotImplementedError
# Force subclasses to implement this
# Force subclasses to implement this
[docs] @abc.abstractmethod
def get_feature_names(self, input_features=None):
"""Return feature names for output features.
Parameters
----------
input_features : list of string, length n_features, optional
String names for input features if available. By default,
"x0", "x1", ... "xn_features" is used.
Returns
-------
output_feature_names : list of string, length n_output_features
"""
raise NotImplementedError
def _ensemble(self, xp):
"""
If library bagging, return xp without
the terms at ensemble_indices
"""
warnings.warn(
"Library ensembling is no longer performed by feature libraries. Use "
"EnsemblingOptimizer to fit an ensemble model.",
UserWarning,
)
if self.library_ensemble:
if self.n_output_features_ <= len(self.ensemble_indices):
raise ValueError(
"Error: you are trying to chop more library terms "
"than are available to remove!"
)
inds = range(self.n_output_features_)
inds = np.delete(inds, self.ensemble_indices)
return [x[..., inds] for x in xp]
else:
return xp
def __add__(self, other):
return ConcatLibrary([self, other])
def __mul__(self, other):
return TensoredLibrary([self, other])
def __rmul__(self, other):
return TensoredLibrary([self, other])
@property
def size(self):
check_is_fitted(self)
return self.n_output_features_
[docs]def x_sequence_or_item(wrapped_func):
"""Allow a feature library's method to handle list or item inputs."""
@wraps(wrapped_func)
def func(self, x, *args, **kwargs):
if isinstance(x, Sequence):
xs = [AxesArray(xi, comprehend_axes(xi)) for xi in x]
result = wrapped_func(self, xs, *args, **kwargs)
if isinstance(result, Sequence): # e.g. transform() returns x
return [AxesArray(xp, comprehend_axes(xp)) for xp in result]
return result # e.g. fit() returns self
else:
if not sparse.issparse(x):
x = AxesArray(x, comprehend_axes(x))
def reconstructor(x):
return x
else: # sparse arrays
reconstructor = type(x)
axes = comprehend_axes(x)
wrap_axes(axes, x)
result = wrapped_func(self, [x], *args, **kwargs)
if isinstance(result, Sequence): # e.g. transform() returns x
return reconstructor(result[0])
return result # e.g. fit() returns self
return func
[docs]class ConcatLibrary(BaseFeatureLibrary):
"""Concatenate multiple libraries into one library. All settings
provided to individual libraries will be applied.
Parameters
----------
libraries : list of libraries
Library instances to be applied to the input matrix.
library_ensemble : boolean, optional (default False)
Whether or not to use library bagging (regress on subset of the
candidate terms in the library).
ensemble_indices : integer array, optional (default [0])
The indices to use for ensembling the library. For instance, if
ensemble_indices = [0], it chops off the first column of the library.
Attributes
----------
libraries_ : list of libraries
Library instances to be applied to the input matrix.
n_input_features_ : int
The total number of input features.
WARNING: This is deprecated in scikit-learn version 1.0 and higher so
we check the sklearn.__version__ and switch to n_features_in if needed.
n_output_features_ : int
The total number of output features. The number of output features
is the sum of the numbers of output features for each of the
concatenated libraries.
Examples
--------
>>> import numpy as np
>>> from pysindy.feature_library import FourierLibrary, CustomLibrary
>>> from pysindy.feature_library import ConcatLibrary
>>> x = np.array([[0.,-1],[1.,0.],[2.,-1.]])
>>> functions = [lambda x : np.exp(x), lambda x,y : np.sin(x+y)]
>>> lib_custom = CustomLibrary(library_functions=functions)
>>> lib_fourier = FourierLibrary()
>>> lib_concat = ConcatLibrary([lib_custom, lib_fourier])
>>> lib_concat.fit()
>>> lib.transform(x)
"""
def __init__(
self,
libraries: list,
library_ensemble=False,
ensemble_indices=[0],
):
super(ConcatLibrary, self).__init__(
library_ensemble=library_ensemble, ensemble_indices=ensemble_indices
)
self.libraries_ = libraries
[docs] @x_sequence_or_item
def fit(self, x_full, y=None):
"""
Compute number of output features.
Parameters
----------
x : array-like, shape (n_samples, n_features)
The data.
Returns
-------
self : instance
"""
n_features = x_full[0].shape[x_full[0].ax_coord]
if float(__version__[:3]) >= 1.0:
self.n_features_in_ = n_features
else:
self.n_input_features_ = n_features
# First fit all libs provided below
fitted_libs = [lib.fit(x_full, y) for lib in self.libraries_]
# Calculate the sum of output features
self.n_output_features_ = sum([lib.n_output_features_ for lib in fitted_libs])
# Save fitted libs
self.libraries_ = fitted_libs
return self
[docs] def get_feature_names(self, input_features=None):
"""Return feature names for output features.
Parameters
----------
input_features : list of string, length n_features, optional
String names for input features if available. By default,
"x0", "x1", ... "xn_features" is used.
Returns
-------
output_feature_names : list of string, length n_output_features
"""
feature_names = list()
for lib in self.libraries_:
lib_feat_names = lib.get_feature_names(input_features)
feature_names += lib_feat_names
return feature_names
[docs] def calc_trajectory(self, diff_method, x, t):
return self.libraries_[0].calc_trajectory(diff_method, x, t)
[docs]class TensoredLibrary(BaseFeatureLibrary):
"""Tensor multiple libraries together into one library. All settings
provided to individual libraries will be applied.
Parameters
----------
libraries : list of libraries
Library instances to be applied to the input matrix.
library_ensemble : boolean, optional (default False)
Whether or not to use library bagging (regress on subset of the
candidate terms in the library).
ensemble_indices : integer array, optional (default [0])
The indices to use for ensembling the library. For instance, if
ensemble_indices = [0], it chops off the first column of the library.
Attributes
----------
libraries_ : list of libraries
Library instances to be applied to the input matrix.
inputs_per_library_ : numpy nd.array
Array that specifies which inputs should be used for each of the
libraries you are going to tensor together. Used for building
GeneralizedLibrary objects.
n_input_features_ : int
The total number of input features.
WARNING: This is deprecated in scikit-learn version 1.0 and higher so
we check the sklearn.__version__ and switch to n_features_in if needed.
n_output_features_ : int
The total number of output features. The number of output features
is the product of the numbers of output features for each of the
libraries that were tensored together.
Examples
--------
>>> import numpy as np
>>> from pysindy.feature_library import FourierLibrary, CustomLibrary
>>> from pysindy.feature_library import TensoredLibrary
>>> x = np.array([[0.,-1],[1.,0.],[2.,-1.]])
>>> functions = [lambda x : np.exp(x), lambda x,y : np.sin(x+y)]
>>> lib_custom = CustomLibrary(library_functions=functions)
>>> lib_fourier = FourierLibrary()
>>> lib_tensored = lib_custom * lib_fourier
>>> lib_tensored.fit(x)
>>> lib_tensored.transform(x)
"""
def __init__(
self,
libraries: list,
library_ensemble=False,
inputs_per_library=None,
ensemble_indices=[0],
):
super(TensoredLibrary, self).__init__(
library_ensemble=library_ensemble, ensemble_indices=ensemble_indices
)
self.libraries_ = libraries
self.inputs_per_library_ = inputs_per_library
def _combinations(self, lib_i, lib_j):
"""
Compute combinations of the numerical libraries.
Returns
-------
lib_full : All combinations of the numerical library terms.
"""
# the shape here should be fixed with ax_coord....
shape = np.array(lib_i.shape)
shape[lib_i.ax_coord] = (
lib_i.shape[lib_i.ax_coord] * lib_j.shape[lib_j.ax_coord]
)
lib_full = np.reshape(
lib_i[..., :, np.newaxis] * lib_j[..., np.newaxis, :],
shape,
)
return lib_full
def _name_combinations(self, lib_i, lib_j):
"""
Compute combinations of the library feature names.
Returns
-------
lib_full : All combinations of the library feature names.
"""
lib_full = []
for i in range(len(lib_i)):
for j in range(len(lib_j)):
lib_full.append(lib_i[i] + " " + lib_j[j])
return lib_full
def _set_inputs_per_library(self, inputs_per_library):
"""
Extra function to make building a GeneralizedLibrary object easier
"""
self.inputs_per_library_ = inputs_per_library
[docs] @x_sequence_or_item
def fit(self, x_full, y=None):
"""
Compute number of output features.
Parameters
----------
x : array-like, shape (n_samples, n_features)
The data.
Returns
-------
self : instance
"""
n_features = x_full[0].shape[x_full[0].ax_coord]
if float(__version__[:3]) >= 1.0:
self.n_features_in_ = n_features
else:
self.n_input_features_ = n_features
# If parameter is not set, use all the inputs
if self.inputs_per_library_ is None:
temp_inputs = np.tile(range(n_features), len(self.libraries_))
self.inputs_per_library_ = np.reshape(
temp_inputs, (len(self.libraries_), n_features)
)
# First fit all libs provided below
fitted_libs = [
lib.fit(
[x[..., np.unique(self.inputs_per_library_[i, :])] for x in x_full], y
)
for i, lib in enumerate(self.libraries_)
]
# Calculate the sum of output features
output_sizes = [lib.n_output_features_ for lib in fitted_libs]
self.n_output_features_ = 1
for osize in output_sizes:
self.n_output_features_ *= osize
# Save fitted libs
self.libraries_ = fitted_libs
return self
[docs] def get_feature_names(self, input_features=None):
"""Return feature names for output features.
Parameters
----------
input_features : list of string, length n_features, optional
String names for input features if available. By default,
"x0", "x1", ... "xn_features" is used.
Returns
-------
output_feature_names : list of string, length n_output_features
"""
feature_names = list()
for i in range(len(self.libraries_)):
lib_i = self.libraries_[i]
if input_features is None:
input_features_i = [
"x%d" % k for k in np.unique(self.inputs_per_library_[i, :])
]
else:
input_features_i = np.asarray(input_features)[
np.unique(self.inputs_per_library_[i, :])
].tolist()
lib_i_feat_names = lib_i.get_feature_names(input_features_i)
for j in range(i + 1, len(self.libraries_)):
lib_j = self.libraries_[j]
if input_features is None:
input_features_j = [
"x%d" % k for k in np.unique(self.inputs_per_library_[j, :])
]
else:
input_features_j = np.asarray(input_features)[
np.unique(self.inputs_per_library_[j, :])
].tolist()
lib_j_feat_names = lib_j.get_feature_names(input_features_j)
feature_names += self._name_combinations(
lib_i_feat_names, lib_j_feat_names
)
return feature_names
[docs] def calc_trajectory(self, diff_method, x, t):
return self.libraries_[0].calc_trajectory(diff_method, x, t)