import warnings
from typing import Union
import numpy as np
from scipy.linalg import LinAlgWarning
from sklearn.exceptions import ConvergenceWarning
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import ridge_regression
from sklearn.utils.validation import check_is_fitted
from .base import BaseOptimizer
[docs]class STLSQ(BaseOptimizer):
"""Sequentially thresholded least squares algorithm.
Defaults to doing Sequentially thresholded Ridge regression.
Attempts to minimize the objective function
:math:`\\|y - Xw\\|^2_2 + \\alpha \\|w\\|^2_2`
by iteratively performing least squares and masking out
elements of the weight array w that are below a given threshold.
See the following reference for more details:
Brunton, Steven L., Joshua L. Proctor, and J. Nathan Kutz.
"Discovering governing equations from data by sparse
identification of nonlinear dynamical systems."
Proceedings of the national academy of sciences
113.15 (2016): 3932-3937.
Parameters
----------
threshold : float, optional (default 0.1)
Minimum magnitude for a coefficient in the weight vector.
Coefficients with magnitude below the threshold are set
to zero.
alpha : float, optional (default 0.05)
Optional L2 (ridge) regularization on the weight vector.
max_iter : int, optional (default 20)
Maximum iterations of the optimization algorithm.
ridge_kw : dict, optional (default None)
Optional keyword arguments to pass to the ridge regression.
normalize_columns : boolean, optional (default False)
Normalize the columns of x (the SINDy library terms) before regression
by dividing by the L2-norm. Note that the 'normalize' option in sklearn
is deprecated in sklearn versions >= 1.0 and will be removed.
copy_X : boolean, optional (default True)
If True, X will be copied; else, it may be overwritten.
initial_guess : np.ndarray, shape (n_features) or (n_targets, n_features),
optional (default None)
Initial guess for coefficients ``coef_``.
If None, least-squares is used to obtain an initial guess.
verbose : bool, optional (default False)
If True, prints out the different error terms every iteration.
sparse_ind : list, optional (default None)
Indices to threshold and perform ridge regression upon.
If None, sparse thresholding and ridge regression is applied to all
indices.
Attributes
----------
coef_ : array, shape (n_features,) or (n_targets, n_features)
Weight vector(s).
ind_ : array, shape (n_features,) or (n_targets, n_features)
Array of 0s and 1s indicating which coefficients of the
weight vector have not been masked out, i.e. the support of
``self.coef_``.
history_ : list
History of ``coef_``. ``history_[k]`` contains the values of
``coef_`` at iteration k of sequentially thresholded least-squares.
Examples
--------
>>> import numpy as np
>>> from scipy.integrate import odeint
>>> from pysindy import SINDy
>>> from pysindy.optimizers import STLSQ
>>> lorenz = lambda z,t : [10*(z[1] - z[0]),
>>> z[0]*(28 - z[2]) - z[1],
>>> z[0]*z[1] - 8/3*z[2]]
>>> t = np.arange(0,2,.002)
>>> x = odeint(lorenz, [-8,8,27], t)
>>> opt = STLSQ(threshold=.1, alpha=.5)
>>> model = SINDy(optimizer=opt)
>>> model.fit(x, t=t[1]-t[0])
>>> model.print()
x0' = -9.999 1 + 9.999 x0
x1' = 27.984 1 + -0.996 x0 + -1.000 1 x1
x2' = -2.666 x1 + 1.000 1 x0
"""
def __init__(
self,
threshold=0.1,
alpha=0.05,
max_iter=20,
ridge_kw=None,
normalize_columns=False,
copy_X=True,
initial_guess=None,
verbose=False,
sparse_ind=None,
unbias=True,
):
super().__init__(
max_iter=max_iter,
copy_X=copy_X,
normalize_columns=normalize_columns,
unbias=unbias,
)
if threshold < 0:
raise ValueError("threshold cannot be negative")
if alpha < 0:
raise ValueError("alpha cannot be negative")
self.threshold = threshold
self.alpha = alpha
self.ridge_kw = ridge_kw
self.initial_guess = initial_guess
self.verbose = verbose
self.sparse_ind = sparse_ind
def _sparse_coefficients(
self, dim: int, ind_nonzero: np.ndarray, coef: np.ndarray, threshold: float
) -> (np.ndarray, np.ndarray):
"""Perform thresholding of the weight vector(s) (on specific indices
if ``self.sparse_ind`` is not None)"""
c = np.zeros(dim)
c[ind_nonzero] = coef
big_ind = np.abs(c) >= threshold
if self.sparse_ind is not None:
nonsparse_ind_mask = np.ones_like(ind_nonzero)
nonsparse_ind_mask[self.sparse_ind] = False
big_ind = big_ind | nonsparse_ind_mask
c[~big_ind] = 0
return c, big_ind
def _regress(self, x: np.ndarray, y: np.ndarray, dim: int, sparse_sub: np.ndarray):
"""Perform the ridge regression (on specific indices if
``self.sparse_ind`` is not None)"""
kw = self.ridge_kw or {}
if self.sparse_ind is None:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=LinAlgWarning)
try:
coef = ridge_regression(x, y, self.alpha, **kw)
except LinAlgWarning:
# increase alpha until warning stops
self.alpha = 2 * self.alpha
self.iters += 1
return coef
if self.sparse_ind is not None:
alpha_array = np.zeros((dim, dim))
alpha_array[sparse_sub, sparse_sub] = np.sqrt(self.alpha)
x_lin = np.concatenate((x, alpha_array), axis=0)
y_lin = np.concatenate((y, np.zeros((dim,))))
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=LinAlgWarning)
try:
coef = (
LinearRegression(fit_intercept=False, **kw)
.fit(x_lin, y_lin)
.coef_
)
except LinAlgWarning:
# increase alpha until warning stops
self.alpha = 2 * self.alpha
self.iters += 1
return coef
def _no_change(self):
"""Check if the coefficient mask has changed after thresholding"""
this_coef = self.history_[-1].flatten()
if len(self.history_) > 1:
last_coef = self.history_[-2].flatten()
else:
last_coef = np.zeros_like(this_coef)
return all(bool(i) == bool(j) for i, j in zip(this_coef, last_coef))
def _reduce(self, x, y):
"""Performs at most ``self.max_iter`` iterations of the
sequentially-thresholded least squares algorithm.
Assumes an initial guess for coefficients and support are saved in
``self.coef_`` and ``self.ind_``.
"""
if self.initial_guess is not None:
self.coef_ = self.initial_guess
ind = self.ind_
n_samples, n_features = x.shape
n_targets = y.shape[1]
n_features_selected = np.sum(ind)
sparse_sub = [np.array(self.sparse_ind)] * y.shape[1]
optvar = np.zeros((n_targets, n_features))
# Print initial values for each term in the optimization
if self.verbose:
row = [
"Iteration",
"|y - Xw|^2",
"a * |w|_2",
"|w|_0",
"Total error: |y - Xw|^2 + a * |w|_2",
]
print(
"{: >10} ... {: >10} ... {: >10} ... {: >10}"
" ... {: >10}".format(*row)
)
for k in range(self.max_iter):
if np.count_nonzero(ind) == 0:
warnings.warn(
"Sparsity parameter is too big ({}) and eliminated all "
"coefficients".format(self.threshold)
)
optvar = np.zeros((n_targets, n_features))
break
optvar = np.zeros((n_targets, n_features))
for i in range(n_targets):
if np.count_nonzero(ind[i]) == 0:
warnings.warn(
"Sparsity parameter is too big ({}) and eliminated all "
"coefficients".format(self.threshold)
)
continue
coef_i = self._regress(
x[:, ind[i]], y[:, i], np.count_nonzero(ind[i]), sparse_sub[i]
)
coef_i, ind_i = self._sparse_coefficients(
n_features, ind[i], coef_i, self.threshold
)
if self.sparse_ind is not None:
vals_to_remove = np.intersect1d(
self.sparse_ind, np.where(coef_i == 0)
)
sparse_sub[i] = _remove_and_decrement(
self.sparse_ind, vals_to_remove
)
optvar[i] = coef_i
ind[i] = ind_i
self.history_.append(optvar)
if self.verbose:
R2 = np.sum((y - np.dot(x, optvar.T)) ** 2)
L2 = self.alpha * np.sum(optvar**2)
L0 = np.count_nonzero(optvar)
row = [k, R2, L2, L0, R2 + L2]
print(
"{0:10d} ... {1:10.4e} ... {2:10.4e} ... {3:10d}"
" ... {4:10.4e}".format(*row)
)
if np.sum(ind) == n_features_selected or self._no_change():
# could not (further) select important features
break
else:
warnings.warn(
"STLSQ did not converge after {self.max_iter} iterations.",
ConvergenceWarning,
)
if self.sparse_ind is None:
self.coef_ = optvar
self.ind_ = ind
else:
non_sparse_ind = np.setxor1d(self.sparse_ind, range(n_features))
self.coef_ = optvar[:, self.sparse_ind]
self.ind_ = ind[:, self.sparse_ind]
self.optvar_non_sparse_ = optvar[:, non_sparse_ind]
self.ind_non_sparse_ = ind[:, non_sparse_ind]
@property
def complexity(self):
check_is_fitted(self)
return np.count_nonzero(self.coef_) + np.count_nonzero(
[abs(self.intercept_) >= self.threshold]
)
def _unbias(self, x: np.ndarray, y: np.ndarray):
if not self.sparse_ind:
return super()._unbias(x, y)
regression_col_mask = np.zeros((y.shape[1], x.shape[1]), dtype=bool)
regression_col_mask[:, self.sparse_ind] = self.ind_
non_sparse_ind = np.setxor1d(self.sparse_ind, range(x.shape[1]))
regression_col_mask[:, non_sparse_ind] = self.ind_non_sparse_
for i in range(self.ind_.shape[0]):
if np.any(self.ind_[i]):
optvar = (
LinearRegression(fit_intercept=False)
.fit(x[:, regression_col_mask[i]], y[:, i])
.coef_
)
self.coef_[i] = optvar[self.sparse_ind]
self.optvar_non_sparse_[i] = optvar[non_sparse_ind]
def _remove_and_decrement(
existing_vals: Union[np.ndarray, list], vals_to_remove: Union[np.ndarray, list]
) -> np.ndarray:
"""Remove elements from existing values and decrement the elements
that are greater than the removed elements"""
for s in reversed(vals_to_remove):
existing_vals = np.delete(existing_vals, np.where(s == existing_vals))
existing_vals = np.where(existing_vals > s, existing_vals - 1, existing_vals)
return existing_vals