"""
Base class for SINDy optimizers.
"""
import abc
import warnings
from typing import Callable
from typing import Tuple
import numpy as np
from scipy import sparse
from sklearn.linear_model import LinearRegression
from sklearn.linear_model._base import _preprocess_data
from sklearn.utils.extmath import safe_sparse_dot
from sklearn.utils.validation import check_is_fitted
from sklearn.utils.validation import check_X_y
from ..utils import AxesArray
def _rescale_data(X, y, sample_weight):
"""Rescale data so as to support sample_weight"""
n_samples = X.shape[0]
sample_weight = np.asarray(sample_weight)
if sample_weight.ndim == 0:
sample_weight = np.full(n_samples, sample_weight, dtype=sample_weight.dtype)
sample_weight = np.sqrt(sample_weight)
sw_matrix = sparse.dia_matrix((sample_weight, 0), shape=(n_samples, n_samples))
X = safe_sparse_dot(sw_matrix, X)
y = safe_sparse_dot(sw_matrix, y)
return X, y
[docs]class ComplexityMixin:
@property
def complexity(self):
check_is_fitted(self)
return np.count_nonzero(self.coef_) + np.count_nonzero(self.intercept_)
[docs]class BaseOptimizer(LinearRegression, ComplexityMixin):
"""
Base class for SINDy optimizers. Subclasses must implement
a _reduce method for carrying out the bulk of the work of
fitting a model.
Parameters
----------
fit_intercept : boolean, optional (default False)
Whether to calculate the intercept for this model. If set to false, no
intercept will be used in calculations.
normalize_columns : boolean, optional (default False)
Normalize the columns of x (the SINDy library terms) before regression
by dividing by the L2-norm. Note that the 'normalize' option in sklearn
is deprecated in sklearn versions >= 1.0 and will be removed.
copy_X : boolean, optional (default True)
If True, X will be copied; else, it may be overwritten.
initial_guess : np.ndarray, shape (n_features,) or (n_targets, n_features),
optional (default None)
Initial guess for coefficients ``coef_``.
If None, the initial guess is obtained via a least-squares fit.
Attributes
----------
coef_ : array, shape (n_features,) or (n_targets, n_features)
Weight vector(s).
ind_ : array, shape (n_features,) or (n_targets, n_features)
Array of 0s and 1s indicating which coefficients of the
weight vector have not been masked out.
history_ : list
History of ``coef_`` over iterations of the optimization algorithm.
Theta_ : np.ndarray, shape (n_samples, n_features)
The Theta matrix to be used in the optimization. We save it as
an attribute because access to the full library of terms is
sometimes needed for various applications.
"""
def __init__(
self,
max_iter=20,
normalize_columns=False,
fit_intercept=False,
initial_guess=None,
copy_X=True,
):
super(BaseOptimizer, self).__init__(fit_intercept=fit_intercept, copy_X=copy_X)
if max_iter <= 0:
raise ValueError("max_iter must be positive")
self.max_iter = max_iter
self.iters = 0
if np.ndim(initial_guess) == 1:
initial_guess = initial_guess.reshape(1, -1)
self.initial_guess = initial_guess
self.normalize_columns = normalize_columns
# Force subclasses to implement this
@abc.abstractmethod
def _reduce(self):
"""
Carry out the bulk of the work of the fit function.
Subclass implementations MUST update self.coef_.
"""
raise NotImplementedError
[docs] def fit(self, x_, y, sample_weight=None, **reduce_kws):
"""
Fit the model.
Parameters
----------
x_ : array-like, shape (n_samples, n_features)
Training data
y : array-like, shape (n_samples,) or (n_samples, n_targets)
Target values
sample_weight : float or numpy array of shape (n_samples,), optional
Individual weights for each sample
reduce_kws : dict
Optional keyword arguments to pass to the _reduce method
(implemented by subclasses)
Returns
-------
self : returns an instance of self
"""
x_, y = check_X_y(x_, y, accept_sparse=[], y_numeric=True, multi_output=True)
x, y, X_offset, y_offset, X_scale = _preprocess_data(
x_,
y,
fit_intercept=self.fit_intercept,
copy=self.copy_X,
sample_weight=sample_weight,
)
if sample_weight is not None:
x, y = _rescale_data(x, y, sample_weight)
self.iters = 0
if y.ndim == 1:
y = y.reshape(-1, 1)
coef_shape = (y.shape[1], x.shape[1])
self.ind_ = np.ones(coef_shape, dtype=bool)
self.Theta_ = x
x_normed = np.copy(x)
if self.normalize_columns:
reg = 1 / np.linalg.norm(x, 2, axis=0)
x_normed = x * reg
if self.initial_guess is None:
self.coef_ = np.linalg.lstsq(x_normed, y, rcond=None)[0].T
else:
if not self.initial_guess.shape == coef_shape:
raise ValueError(
"initial_guess shape is incompatible with training data. "
f"Expected: {coef_shape}. Received: {self.initial_guess.shape}."
)
self.coef_ = self.initial_guess
self.history_ = [self.coef_]
x_normed = np.asarray(x_normed)
self._reduce(x_normed, y, **reduce_kws)
self.ind_ = np.abs(self.coef_) > 1e-14
# Rescale coefficients to original units
if self.normalize_columns:
self.coef_ = np.multiply(reg, self.coef_)
if hasattr(self, "coef_full_"):
self.coef_full_ = np.multiply(reg, self.coef_full_)
for i in range(np.shape(self.history_)[0]):
self.history_[i] = np.multiply(reg, self.history_[i])
self._set_intercept(X_offset, y_offset, X_scale)
return self
[docs]class EnsembleOptimizer(BaseOptimizer):
"""Wrapper class for ensembling methods.
Parameters
----------
opt: BaseOptimizer
The underlying optimizer to run on each ensemble
bagging : boolean, optional (default False)
This parameter is used to allow for "ensembling", i.e. the
generation of many SINDy models (n_models) by choosing a random
temporal subset of the input data (n_subset) for each sparse
regression. This often improves robustness because averages
(bagging) or medians (bragging) of all the models are usually
quite high-performing. The user can also generate "distributions"
of many models, and calculate how often certain library terms
are included in a model.
library_ensemble : boolean, optional (default False)
This parameter is used to allow for "library ensembling",
i.e. the generation of many SINDy models (n_models) by choosing
a random subset of the candidate library terms to truncate. So,
n_models are generated by solving n_models sparse regression
problems on these "reduced" libraries. Once again, this often
improves robustness because averages (bagging) or medians
(bragging) of all the models are usually quite high-performing.
The user can also generate "distributions" of many models, and
calculate how often certain library terms are included in a model.
n_models : int, optional (default 20)
Number of models to generate via ensemble
n_subset : int, optional (default len(time base))
Number of time points to use for ensemble
n_candidates_to_drop : int, optional (default 1)
Number of candidate terms in the feature library to drop during
library ensembling.
replace : boolean, optional (default True)
If ensemble true, whether or not to time sample with replacement.
ensemble_aggregator : callable, optional (default numpy.median)
Method to aggregate model coefficients across different samples.
This method argument is only used if ``ensemble`` or ``library_ensemble``
is True.
The method should take in a list of 2D arrays and return a 2D
array of the same shape as the arrays in the list.
Example: :code:`lambda x: np.median(x, axis=0)`
Attributes
----------
coef_ : array, shape (n_features,) or (n_targets, n_features)
Regularized weight vector(s). This is the v in the objective
function.
coef_full_ : array, shape (n_features,) or (n_targets, n_features)
Weight vector(s) that are not subjected to the regularization.
This is the w in the objective function.
unbias : boolean
Whether to perform an extra step of unregularized linear regression
to unbias the coefficients for the identified support.
``unbias`` is automatically set to False if a constraint is used and
is otherwise left uninitialized.
"""
def __init__(
self,
opt: BaseOptimizer,
bagging: bool = False,
library_ensemble: bool = False,
n_models: int = 20,
n_subset: int = None,
n_candidates_to_drop: int = 1,
replace: bool = True,
ensemble_aggregator: Callable = None,
):
if not hasattr(opt, "initial_guess"):
opt.initial_guess = None
super(EnsembleOptimizer, self).__init__(
max_iter=opt.max_iter,
fit_intercept=opt.fit_intercept,
initial_guess=opt.initial_guess,
copy_X=opt.copy_X,
)
if not bagging and not library_ensemble:
raise ValueError(
"If not ensembling data or library terms, use another optimizer"
)
if n_subset is not None and n_subset <= 0:
raise ValueError("n_subset must be a positive integer if bagging")
if n_candidates_to_drop is not None and n_candidates_to_drop <= 0:
raise ValueError(
"n_candidates_to_drop must be a positive integer if ensembling library"
)
self.opt = opt
if n_models is None or n_models == 0:
warnings.warn(
"n_models must be a positive integer. Explicitly initialized to zero"
" or None, defaulting to 20."
)
n_models = 20
self.n_models = n_models
self.n_subset = n_subset
self.bagging = bagging
self.library_ensemble = library_ensemble
self.ensemble_aggregator = ensemble_aggregator
self.replace = replace
self.n_candidates_to_drop = n_candidates_to_drop
self.coef_list = []
def _reduce(self, x: AxesArray, y: np.ndarray) -> None:
x = AxesArray(np.asarray(x), {"ax_sample": 0, "ax_coord": 1})
n_samples = x.shape[x.ax_sample]
if self.bagging and self.n_subset is None:
self.n_subset = int(0.6 * n_samples)
if self.bagging and self.n_subset > n_samples and not self.replace:
warnings.warn(
"n_subset is larger than sample count without replacement; cannot bag."
)
n_subset = n_samples
else:
n_subset = self.n_subset
n_features = x.shape[x.ax_coord]
if self.library_ensemble and self.n_candidates_to_drop > n_features:
warnings.warn(
"n_candidates_to_drop larger than number of features. Cannot "
"ensemble library."
)
n_candidates_to_drop = 0
else:
n_candidates_to_drop = self.n_candidates_to_drop
for _ in range(self.n_models):
if self.bagging:
x_ensemble, y_ensemble = _drop_random_samples(
x, y, n_subset, self.replace
)
else:
x_ensemble, y_ensemble = x, y
keep_inds = np.arange(n_features)
if self.library_ensemble:
keep_inds = np.sort(
np.random.choice(
range(n_features),
n_features - n_candidates_to_drop,
replace=False,
)
)
x_ensemble = x_ensemble.take(keep_inds, axis=x.ax_coord)
self.opt.fit(x_ensemble, y_ensemble)
new_coefs = np.zeros((y.shape[1], n_features))
new_coefs[:, keep_inds] = self.opt.coef_
self.coef_list.append(new_coefs)
# Get average coefficients
if self.ensemble_aggregator is None:
self.coef_ = np.median(self.coef_list, axis=0)
else:
self.coef_ = self.ensemble_aggregator(self.coef_list)
def _drop_random_samples(
x: AxesArray,
x_dot: AxesArray,
n_subset: int,
replace: bool,
) -> Tuple[AxesArray]:
n_samples = x.shape[x.ax_sample]
rand_inds = np.random.choice(range(n_samples), n_subset, replace=replace)
x_new = np.take(x, rand_inds, axis=x.ax_sample)
x_dot_new = np.take(x_dot, rand_inds, axis=x.ax_sample)
return x_new, x_dot_new