"""
Base class for SINDy optimizers.
"""
import abc
import warnings
from typing import Callable
from typing import Tuple
import numpy as np
from scipy import sparse
from sklearn.linear_model import LinearRegression
from sklearn.linear_model._base import _preprocess_data
from sklearn.utils.extmath import safe_sparse_dot
from sklearn.utils.validation import check_is_fitted
from sklearn.utils.validation import check_X_y
from ..utils import AxesArray
from ..utils import drop_nan_samples
def _rescale_data(X, y, sample_weight):
"""Rescale data so as to support sample_weight"""
n_samples = X.shape[0]
sample_weight = np.asarray(sample_weight)
if sample_weight.ndim == 0:
sample_weight = np.full(n_samples, sample_weight, dtype=sample_weight.dtype)
sample_weight = np.sqrt(sample_weight)
sw_matrix = sparse.dia_matrix((sample_weight, 0), shape=(n_samples, n_samples))
X = safe_sparse_dot(sw_matrix, X)
y = safe_sparse_dot(sw_matrix, y)
return X, y
[docs]class ComplexityMixin:
@property
def complexity(self):
check_is_fitted(self)
return np.count_nonzero(self.coef_) + np.count_nonzero(self.intercept_)
[docs]class BaseOptimizer(LinearRegression, ComplexityMixin):
"""
Base class for SINDy optimizers. Subclasses must implement
a _reduce method for carrying out the bulk of the work of
fitting a model.
Parameters
----------
normalize_columns : boolean, optional (default False)
Normalize the columns of x (the SINDy library terms) before regression
by dividing by the L2-norm.
copy_X : boolean, optional (default True)
If True, X will be copied; else, it may be overwritten.
initial_guess : np.ndarray, shape (n_features,) or (n_targets, n_features),
optional (default None)
Initial guess for coefficients ``coef_``.
If None, the initial guess is obtained via a least-squares fit.
unbias: Whether to perform an extra step of unregularized linear
regression to unbias the coefficients for the identified
support. If an optimizer (``self.optimizer``) applies any type
of regularization, that regularization may bias coefficients,
improving the conditioning of the problem but harming the
quality of the fit. Setting ``unbias==True`` enables an extra
step wherein unregularized linear regression is applied, but
only for the coefficients in the support identified by the
optimizer. This helps to remove the bias introduced by
regularization.
Attributes
----------
coef_ : array, shape (n_features,) or (n_targets, n_features)
Weight vector(s).
ind_ : array, shape (n_features,) or (n_targets, n_features)
Array of bools indicating which coefficients of the
weight vector have not been masked out.
history_ : list
History of ``coef_`` over iterations of the optimization algorithm.
Theta_ : np.ndarray, shape (n_samples, n_features)
The Theta matrix to be used in the optimization. We save it as
an attribute because access to the full library of terms is
sometimes needed for various applications.
"""
def __init__(
self,
max_iter=20,
normalize_columns=False,
initial_guess=None,
copy_X=True,
unbias: bool = True,
):
super().__init__(fit_intercept=False, copy_X=copy_X)
if max_iter <= 0:
raise ValueError("max_iter must be positive")
self.max_iter = max_iter
self.iters = 0
if np.ndim(initial_guess) == 1:
initial_guess = initial_guess.reshape(1, -1)
self.initial_guess = initial_guess
self.normalize_columns = normalize_columns
self.unbias = unbias
# Force subclasses to implement this
@abc.abstractmethod
def _reduce(self):
"""
Carry out the bulk of the work of the fit function.
Subclass implementations MUST update self.coef_ as shape
(n_targets, n_inputs).
"""
raise NotImplementedError
[docs] def fit(self, x_, y, sample_weight=None, **reduce_kws):
"""
Fit the model.
Parameters
----------
x_ : array-like, shape (n_samples, n_features)
Training data
y : array-like, shape (n_samples,) or (n_samples, n_targets)
Target values
sample_weight : float or numpy array of shape (n_samples,), optional
Individual weights for each sample
reduce_kws : dict
Optional keyword arguments to pass to the _reduce method
(implemented by subclasses)
Returns
-------
self : returns an instance of self
"""
x_ = AxesArray(np.asarray(x_), {"ax_sample": 0, "ax_coord": 1})
y_axes = {"ax_sample": 0} if y.ndim == 1 else {"ax_sample": 0, "ax_coord": 1}
y = AxesArray(np.asarray(y), y_axes)
x_, y = drop_nan_samples(x_, y)
x_, y = check_X_y(x_, y, accept_sparse=[], y_numeric=True, multi_output=True)
x, y, X_offset, y_offset, X_scale = _preprocess_data(
x_,
y,
fit_intercept=False,
copy=self.copy_X,
sample_weight=sample_weight,
)
if sample_weight is not None:
x, y = _rescale_data(x, y, sample_weight)
self.iters = 0
if y.ndim == 1:
y = y.reshape(-1, 1)
coef_shape = (y.shape[1], x.shape[1])
self.ind_ = np.ones(coef_shape, dtype=bool)
self.Theta_ = x
x_normed = np.copy(x)
if self.normalize_columns:
reg = 1 / np.linalg.norm(x, 2, axis=0)
x_normed = x * reg
if self.initial_guess is None:
self.coef_ = np.linalg.lstsq(x_normed, y, rcond=None)[0].T
else:
if not self.initial_guess.shape == coef_shape:
raise ValueError(
"initial_guess shape is incompatible with training data. "
f"Expected: {coef_shape}. Received: {self.initial_guess.shape}."
)
self.coef_ = self.initial_guess
self.history_ = [self.coef_]
x_normed = np.asarray(x_normed)
self._reduce(x_normed, y, **reduce_kws)
self.ind_ = np.abs(self.coef_) > 1e-14
if self.unbias:
self._unbias(x_normed, y)
# Rescale coefficients to original units
if self.normalize_columns:
self.coef_ = np.multiply(reg, self.coef_)
if hasattr(self, "coef_full_"):
self.coef_full_ = np.multiply(reg, self.coef_full_)
for i in range(np.shape(self.history_)[0]):
self.history_[i] = np.multiply(reg, self.history_[i])
self._set_intercept(X_offset, y_offset, X_scale)
return self
def _unbias(self, x: np.ndarray, y: np.ndarray):
coef = np.zeros((y.shape[1], x.shape[1]))
for i in range(self.ind_.shape[0]):
if np.any(self.ind_[i]):
coef[i, self.ind_[i]] = (
LinearRegression(fit_intercept=False)
.fit(x[:, self.ind_[i]], y[:, i])
.coef_
)
self.coef_ = coef
[docs]class EnsembleOptimizer(BaseOptimizer):
"""Wrapper class for ensembling methods.
Parameters
----------
opt: BaseOptimizer
The underlying optimizer to run on each ensemble
bagging : boolean, optional (default False)
This parameter is used to allow for "ensembling", i.e. the
generation of many SINDy models (n_models) by choosing a random
temporal subset of the input data (n_subset) for each sparse
regression. This often improves robustness because averages
(bagging) or medians (bragging) of all the models are usually
quite high-performing. The user can also generate "distributions"
of many models, and calculate how often certain library terms
are included in a model.
library_ensemble : boolean, optional (default False)
This parameter is used to allow for "library ensembling",
i.e. the generation of many SINDy models (n_models) by choosing
a random subset of the candidate library terms to truncate. So,
n_models are generated by solving n_models sparse regression
problems on these "reduced" libraries. Once again, this often
improves robustness because averages (bagging) or medians
(bragging) of all the models are usually quite high-performing.
The user can also generate "distributions" of many models, and
calculate how often certain library terms are included in a model.
n_models : int, optional (default 20)
Number of models to generate via ensemble
n_subset : int, optional (default len(time base))
Number of time points to use for ensemble
n_candidates_to_drop : int, optional (default 1)
Number of candidate terms in the feature library to drop during
library ensembling.
replace : boolean, optional (default True)
If ensemble true, whether or not to time sample with replacement.
ensemble_aggregator : callable, optional (default numpy.median)
Method to aggregate model coefficients across different samples.
This method argument is only used if ``ensemble`` or ``library_ensemble``
is True.
The method should take in a list of 2D arrays and return a 2D
array of the same shape as the arrays in the list.
Example: :code:`lambda x: np.median(x, axis=0)`
Attributes
----------
coef_ : array, shape (n_features,) or (n_targets, n_features)
Regularized weight vector(s). This is the v in the objective
function.
coef_full_ : array, shape (n_features,) or (n_targets, n_features)
Weight vector(s) that are not subjected to the regularization.
This is the w in the objective function.
"""
def __init__(
self,
opt: BaseOptimizer,
bagging: bool = False,
library_ensemble: bool = False,
n_models: int = 20,
n_subset: int = None,
n_candidates_to_drop: int = 1,
replace: bool = True,
ensemble_aggregator: Callable = None,
):
if not hasattr(opt, "initial_guess"):
opt.initial_guess = None
super().__init__(
max_iter=opt.max_iter,
initial_guess=opt.initial_guess,
copy_X=opt.copy_X,
)
if not bagging and not library_ensemble:
raise ValueError(
"If not ensembling data or library terms, use another optimizer"
)
if bagging and n_subset is not None and n_subset < 1:
raise ValueError("n_subset must be a positive integer or None if bagging")
if library_ensemble and (
n_candidates_to_drop is None or n_candidates_to_drop < 1
):
raise ValueError(
"n_candidates_to_drop must be a positive integer if ensembling library"
)
if n_models < 1:
raise ValueError(
"n_candidates_to_drop must be a positive integer if ensembling library"
)
self.opt = opt
self.n_models = n_models
self.n_subset = n_subset
self.bagging = bagging
self.library_ensemble = library_ensemble
self.ensemble_aggregator = ensemble_aggregator
self.replace = replace
self.n_candidates_to_drop = n_candidates_to_drop
self.coef_list = []
self.unbias = False
def _reduce(self, x: AxesArray, y: np.ndarray) -> None:
x = AxesArray(np.asarray(x), {"ax_sample": 0, "ax_coord": 1})
n_samples = x.shape[x.ax_sample]
if self.bagging and self.n_subset is None:
self.n_subset = int(0.6 * n_samples)
if self.bagging and self.n_subset > n_samples and not self.replace:
warnings.warn(
"n_subset is larger than sample count without replacement; cannot bag."
)
n_subset = n_samples
else:
n_subset = self.n_subset
n_features = x.n_coord
if self.library_ensemble and self.n_candidates_to_drop > n_features:
warnings.warn(
"n_candidates_to_drop larger than number of features. Cannot "
"ensemble library."
)
n_candidates_to_drop = 0
else:
n_candidates_to_drop = self.n_candidates_to_drop
for _ in range(self.n_models):
if self.bagging:
x_ensemble, y_ensemble = _drop_random_samples(
x, y, n_subset, self.replace
)
else:
x_ensemble, y_ensemble = x, y
keep_inds = np.arange(n_features)
if self.library_ensemble:
keep_inds = np.sort(
np.random.choice(
range(n_features),
n_features - n_candidates_to_drop,
replace=False,
)
)
x_ensemble = x_ensemble.take(keep_inds, axis=x.ax_coord)
self.opt.fit(x_ensemble, y_ensemble)
new_coefs = np.zeros((y.shape[1], n_features))
new_coefs[:, keep_inds] = self.opt.coef_
self.coef_list.append(new_coefs)
# Get average coefficients
if self.ensemble_aggregator is None:
self.coef_ = np.median(self.coef_list, axis=0)
else:
self.coef_ = self.ensemble_aggregator(self.coef_list)
def _drop_random_samples(
x: AxesArray,
x_dot: AxesArray,
n_subset: int,
replace: bool,
) -> Tuple[AxesArray]:
n_samples = x.shape[x.ax_sample]
rand_inds = np.random.choice(range(n_samples), n_subset, replace=replace)
x_new = np.take(x, rand_inds, axis=x.ax_sample)
x_dot_new = np.take(x_dot, rand_inds, axis=x.ax_sample)
return x_new, x_dot_new