Source code for pysindy.optimizers.stlsq

import warnings

import numpy as np
from scipy.linalg import LinAlgWarning
from sklearn.exceptions import ConvergenceWarning
from sklearn.linear_model import ridge_regression
from sklearn.utils.validation import check_is_fitted

from .base import BaseOptimizer


[docs]class STLSQ(BaseOptimizer): """Sequentially thresholded least squares algorithm. Defaults to doing Sequentially thresholded Ridge regression. Attempts to minimize the objective function :math:`\\|y - Xw\\|^2_2 + \\alpha \\|w\\|^2_2` by iteratively performing least squares and masking out elements of the weight array w that are below a given threshold. See the following reference for more details: Brunton, Steven L., Joshua L. Proctor, and J. Nathan Kutz. "Discovering governing equations from data by sparse identification of nonlinear dynamical systems." Proceedings of the national academy of sciences 113.15 (2016): 3932-3937. Parameters ---------- threshold : float, optional (default 0.1) Minimum magnitude for a coefficient in the weight vector. Coefficients with magnitude below the threshold are set to zero. alpha : float, optional (default 0.05) Optional L2 (ridge) regularization on the weight vector. max_iter : int, optional (default 20) Maximum iterations of the optimization algorithm. ridge_kw : dict, optional (default None) Optional keyword arguments to pass to the ridge regression. fit_intercept : boolean, optional (default False) Whether to calculate the intercept for this model. If set to false, no intercept will be used in calculations. normalize_columns : boolean, optional (default False) Normalize the columns of x (the SINDy library terms) before regression by dividing by the L2-norm. Note that the 'normalize' option in sklearn is deprecated in sklearn versions >= 1.0 and will be removed. copy_X : boolean, optional (default True) If True, X will be copied; else, it may be overwritten. initial_guess : np.ndarray, shape (n_features) or (n_targets, n_features), optional (default None) Initial guess for coefficients ``coef_``. If None, least-squares is used to obtain an initial guess. verbose : bool, optional (default False) If True, prints out the different error terms every iteration. Attributes ---------- coef_ : array, shape (n_features,) or (n_targets, n_features) Weight vector(s). ind_ : array, shape (n_features,) or (n_targets, n_features) Array of 0s and 1s indicating which coefficients of the weight vector have not been masked out, i.e. the support of ``self.coef_``. history_ : list History of ``coef_``. ``history_[k]`` contains the values of ``coef_`` at iteration k of sequentially thresholded least-squares. Examples -------- >>> import numpy as np >>> from scipy.integrate import odeint >>> from pysindy import SINDy >>> from pysindy.optimizers import STLSQ >>> lorenz = lambda z,t : [10*(z[1] - z[0]), >>> z[0]*(28 - z[2]) - z[1], >>> z[0]*z[1] - 8/3*z[2]] >>> t = np.arange(0,2,.002) >>> x = odeint(lorenz, [-8,8,27], t) >>> opt = STLSQ(threshold=.1, alpha=.5) >>> model = SINDy(optimizer=opt) >>> model.fit(x, t=t[1]-t[0]) >>> model.print() x0' = -9.999 1 + 9.999 x0 x1' = 27.984 1 + -0.996 x0 + -1.000 1 x1 x2' = -2.666 x1 + 1.000 1 x0 """ def __init__( self, threshold=0.1, alpha=0.05, max_iter=20, ridge_kw=None, normalize_columns=False, fit_intercept=False, copy_X=True, initial_guess=None, verbose=False, ): super(STLSQ, self).__init__( max_iter=max_iter, fit_intercept=fit_intercept, copy_X=copy_X, normalize_columns=normalize_columns, ) if threshold < 0: raise ValueError("threshold cannot be negative") if alpha < 0: raise ValueError("alpha cannot be negative") self.threshold = threshold self.alpha = alpha self.ridge_kw = ridge_kw self.initial_guess = initial_guess self.verbose = verbose def _sparse_coefficients(self, dim, ind, coef, threshold): """Perform thresholding of the weight vector(s)""" c = np.zeros(dim) c[ind] = coef big_ind = np.abs(c) >= threshold c[~big_ind] = 0 return c, big_ind def _regress(self, x, y): """Perform the ridge regression""" kw = self.ridge_kw or {} with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=LinAlgWarning) try: coef = ridge_regression(x, y, self.alpha, **kw) except LinAlgWarning: # increase alpha until warning stops self.alpha = 2 * self.alpha self.iters += 1 return coef def _no_change(self): """Check if the coefficient mask has changed after thresholding""" this_coef = self.history_[-1].flatten() if len(self.history_) > 1: last_coef = self.history_[-2].flatten() else: last_coef = np.zeros_like(this_coef) return all(bool(i) == bool(j) for i, j in zip(this_coef, last_coef)) def _reduce(self, x, y): """Performs at most ``self.max_iter`` iterations of the sequentially-thresholded least squares algorithm. Assumes an initial guess for coefficients and support are saved in ``self.coef_`` and ``self.ind_``. """ if self.initial_guess is not None: self.coef_ = self.initial_guess ind = self.ind_ n_samples, n_features = x.shape n_targets = y.shape[1] n_features_selected = np.sum(ind) # Print initial values for each term in the optimization if self.verbose: row = [ "Iteration", "|y - Xw|^2", "a * |w|_2", "|w|_0", "Total error: |y - Xw|^2 + a * |w|_2", ] print( "{: >10} ... {: >10} ... {: >10} ... {: >10}" " ... {: >10}".format(*row) ) for k in range(self.max_iter): if np.count_nonzero(ind) == 0: warnings.warn( "Sparsity parameter is too big ({}) and eliminated all " "coefficients".format(self.threshold) ) coef = np.zeros((n_targets, n_features)) break coef = np.zeros((n_targets, n_features)) for i in range(n_targets): if np.count_nonzero(ind[i]) == 0: warnings.warn( "Sparsity parameter is too big ({}) and eliminated all " "coefficients".format(self.threshold) ) continue coef_i = self._regress(x[:, ind[i]], y[:, i]) coef_i, ind_i = self._sparse_coefficients( n_features, ind[i], coef_i, self.threshold ) coef[i] = coef_i ind[i] = ind_i self.history_.append(coef) if self.verbose: R2 = np.sum((y - np.dot(x, coef.T)) ** 2) L2 = self.alpha * np.sum(coef**2) L0 = np.count_nonzero(coef) row = [k, R2, L2, L0, R2 + L2] print( "{0:10d} ... {1:10.4e} ... {2:10.4e} ... {3:10d}" " ... {4:10.4e}".format(*row) ) if np.sum(ind) == n_features_selected or self._no_change(): # could not (further) select important features break else: warnings.warn( "STLSQ._reduce did not converge after {} iterations.".format( self.max_iter ), ConvergenceWarning, ) try: coef except NameError: coef = self.coef_ warnings.warn( "STLSQ._reduce has no iterations left to determine coef", ConvergenceWarning, ) self.coef_ = coef self.ind_ = ind @property def complexity(self): check_is_fitted(self) return np.count_nonzero(self.coef_) + np.count_nonzero( [abs(self.intercept_) >= self.threshold] )