Source code for pysindy.optimizers.trapping_sr3

import warnings
from functools import partial
from itertools import combinations as combo_nr
from itertools import product
from itertools import repeat
from math import comb
from typing import cast
from typing import Optional
from typing import TypeVar
from typing import Union

import cvxpy as cp
import numpy as np
from numpy.typing import NDArray
from sklearn.exceptions import ConvergenceWarning

from .._typing import Float1D
from .._typing import Float2D
from .._typing import Float3D
from .._typing import Float4D
from .._typing import Float5D
from .._typing import Int1D
from ..feature_library.polynomial_library import n_poly_features
from ..feature_library.polynomial_library import PolynomialLibrary
from ..utils import reorder_constraints
from ._constrained_sr3 import ConstrainedSR3
from .base import FloatDType
from .base import NFeat
from .base import NTarget


class EnstrophyMat:
    """Pre-compute some useful factors of an enstrophy matrix

    The matrix, root, and root inverse are frequently used in transformation
    between the original and enstrophy bases
    """

    P: Float2D
    P_root: Float2D
    P_root_inv: Float2D

    def __init__(self, P):
        self.P = P
        lsv, sing_vals, rsv = np.linalg.svd(P)
        self.P_root = lsv @ np.diag(np.sqrt(sing_vals)) @ rsv
        self.P_root_inv = lsv @ np.diag(np.sqrt(1 / sing_vals)) @ rsv


[docs] class TrappingSR3(ConstrainedSR3): """ Generalized trapping variant of sparse relaxed regularized regression. This optimizer can be used to identify quadratically nonlinear systems with either a-priori globally or locally stable (bounded) solutions. This optimizer can be used to minimize five different objective functions: .. math:: 0.5\\|y-Xw\\|^2_2 + \\lambda \\times R(w) + 0.5\\|Pw-A\\|^2_2/\\eta + \\delta_0(Cw-d) + \\delta_{\\Lambda}(A) + \\alpha \\|Qijk\\| + \\beta \\|Q_{ijk} + Q_{jik} + Q_{kij}\\| where :math:`R(w)` is a regularization function, C is a constraint matrix detailing affine constraints on the model coefficients, A is a proxy for the quadratic contributions to the energy evolution, and :math:`Q_{ijk}` are the quadratic coefficients in the model. For provably globally bounded solutions, use :math:`\\alpha >> 1`, :math:`\\beta >> 1`, and equality constraints. For maximizing the local stability radius of the model one has the choice to do this by (1) minimizing the values in :math:`Q_{ijk}`, (2) promoting models with skew-symmetrix :math:`Q_{ijk}` coefficients, or (3) using inequality constraints for skew-symmetry in :math:`Q_{ijk}`. See the following references for more details: Kaptanoglu, Alan A., et al. "Promoting global stability in data-driven models of quadratic nonlinear dynamics." arXiv preprint arXiv:2105.01843 (2021). Parameters ---------- eta : Determines the strength of the stability term :math:`||Pw-A||^2` in the optimization. The default value is very large so that the algorithm default is to ignore the stability term. In this limit, this should be approximately equivalent to the ConstrainedSR3 method. eps_solver : If reg_weight_lam != 0, this specifies the error tolerance in the CVXPY (OSQP) solve. Default 1.0e-7 alpha: Determines the strength of the local stability term :math:`||Qijk||^2` in the optimization. The default value (1e20) is very large so that the algorithm default is to ignore this term. beta: Determines the strength of the local stability term :math:`||Qijk + Qjik + Qkij||^2` in the optimization. The default value is very large so that the algorithm default is to ignore this term. mod_matrix: Lyapunov matrix. Trapping theorems apply to energy :math:`\\propto \\dot y \\cdot y`, but also to any :math:`\\propto \\dot y P \\cdot y` for Lyapunov matrix :math:`P`. Defaults to the identity matrix. alpha_A : Determines the step size in the prox-gradient descent over A. For convergence, need alpha_A <= eta, so default alpha_A = eta is used. alpha_m : Determines the step size in the prox-gradient descent over m. For convergence, need alpha_m <= eta / ||w^T * PQ^T * PQ * w||. Typically 0.01 * eta <= alpha_m <= 0.1 * eta. (default eta * 0.1) gamma : Determines the negative interval that matrix A is projected onto. For most applications gamma = 0.1 - 1.0 works pretty well. tol_m : Tolerance used for determining convergence of the optimization algorithm over m. regularizer : Regularization function to use. For current trapping SINDy, only the L1 and L2 norms are implemented. Note that other convex norms could be straightforwardly implemented, but L0 requires reformulation because of nonconvexity. (default 'L1') m0 : Initial guess for trap center in the optimization. Default None initializes vector elements randomly in [-1, 1]. shape (n_targets) A0 : Initial guess for vector A in the optimization. Shape (n_targets, n_targets) Default None, meaning A is initialized as A = diag(gamma). Attributes ---------- A_history_ : list History of the auxiliary variable A that approximates diag(PW). m_history_ : list History of the shift vector m that determines the origin of the trapping region. PW_history_ : list History of PW = A^S, the quantity we are attempting to make negative definite. PWeigs_history_ : list History of diag(PW), a list of the eigenvalues of A^S at each iteration. Tracking this allows us to ascertain if A^S is indeed being pulled towards the space of negative definite matrices. PL_unsym_ : np.ndarray, shape (n_targets, n_targets, n_targets, n_features) Unsymmetrized linear coefficient part of the P matrix in :math:``||Pw - A||^2`` PL_ : np.ndarray, shape (n_targets, n_targets, n_targets, n_features) Linear coefficient part of the P matrix in :math:``||Pw - A||^2`` PQ_ : np.ndarray, shape (n_targets, n_targets, n_targets, n_targets, n_features) Quadratic coefficient part of the P matrix in :math:``||Pw - A||^2`` PT_ : np.ndarray, shape (n_targets, n_targets, n_targets, n_targets, n_features) Transpose of 1st dimension and 2nd dimension of quadratic coefficient part of the P matrix in :math:``||Pw - A||^2`` objective_history_ : list History of the value of the objective at each step. Note that the trapping SINDy problem is nonconvex, meaning that this value may increase and decrease as the algorithm works. Examples -------- >>> import numpy as np >>> from scipy.integrate import odeint >>> from pysindy import SINDy >>> from pysindy.optimizers import TrappingSR3 >>> lorenz = lambda z,t : [10*(z[1] - z[0]), >>> z[0]*(28 - z[2]) - z[1], >>> z[0]*z[1] - 8/3*z[2]] >>> t = np.arange(0,2,.002) >>> x = odeint(lorenz, [-8,8,27], t) >>> opt = TrappingSR3(reg_weight_lam=0.1) >>> model = SINDy(optimizer=opt) >>> model.fit(x, t=t[1]-t[0]) >>> model.print() x0' = -10.004 1 + 10.004 x0 x1' = 27.994 1 + -0.993 x0 + -1.000 1 x1 x2' = -2.662 x1 + 1.000 1 x0 """ def __init__( self, *, _n_tgts: int = None, _include_bias: bool = False, _interaction_only: bool = False, method: str = "global", eta: Union[float, None] = None, eps_solver: float = 1e-7, alpha: Optional[float] = None, beta: Optional[float] = None, mod_matrix: Optional[NDArray] = None, alpha_A: Union[float, None] = None, alpha_m: Union[float, None] = None, gamma: float = -0.1, tol_m: float = 1e-5, regularizer: str = "l1", m0: Union[NDArray, None] = None, A0: Union[NDArray, None] = None, **kwargs, ): # n_tgts, constraints, etc are data-dependent parameters and belong in # _reduce/fit (). The following is a hack until we refactor how # constraints are applied in ConstrainedSR3 and MIOSR self._include_bias = _include_bias self._interaction_only = _interaction_only self._n_tgts = _n_tgts self.mod_matrix = mod_matrix if _n_tgts is None: warnings.warn( "Trapping Optimizer initialized without _n_tgts. It will likely" " be unable to fit data" ) self._n_tgts = 1 if self.mod_matrix is None: mod_matrix = np.eye(self._n_tgts) self.enstrophy = EnstrophyMat(mod_matrix) if method == "global": if hasattr(kwargs, "constraint_separation_index"): constraint_separation_index = kwargs["constraint_separation_index"] elif kwargs.get("inequality_constraints", False): constraint_separation_index = kwargs["constraint_lhs"].shape[0] else: constraint_separation_index = 0 constraint_rhs, constraint_lhs = _make_constraints( self._n_tgts, include_bias=_include_bias ) constraint_lhs = np.tensordot(constraint_lhs, self.enstrophy.P, axes=1) constraint_order = kwargs.pop("constraint_order", "feature") if constraint_order == "target": constraint_lhs = np.transpose(constraint_lhs, [0, 2, 1]) constraint_lhs = np.reshape(constraint_lhs, (constraint_lhs.shape[0], -1)) try: constraint_lhs = np.concatenate( (kwargs.pop("constraint_lhs"), constraint_lhs), 0 ) constraint_rhs = np.concatenate( (kwargs.pop("constraint_rhs"), constraint_rhs), 0 ) except KeyError: pass super().__init__( constraint_lhs=constraint_lhs, constraint_rhs=constraint_rhs, constraint_separation_index=constraint_separation_index, constraint_order=constraint_order, equality_constraints=True, regularizer=regularizer, **kwargs, ) self.method = "global" elif method == "local": super().__init__(regularizer=regularizer, **kwargs) self.method = "local" else: raise ValueError(f"Can either use 'global' or 'local' method, not {method}") self.eps_solver = eps_solver self.m0 = m0 self.A0 = A0 self.alpha_A = alpha_A self.alpha_m = alpha_m self.eta = eta self.alpha = alpha self.beta = beta self.gamma = gamma self.tol_m = tol_m self.__post_init_guard() def __post_init_guard(self): """Conduct initialization post-init, as required by scikitlearn API""" if self.regularizer.lower() not in ("l1", "l2", "weighted_l1", "weighted_l2"): raise ValueError("Regularizer must be (weighted) L1 or L2") if self.eta is None: warnings.warn( "eta was not set, so defaulting to eta = 1e20 " "with alpha_m = 1e-2 * eta, alpha_A = eta. Here eta is so " "large that the stability term in the optimization " "will be ignored." ) self.eta = 1e20 self.alpha_m = 1e18 self.alpha_A = 1e20 else: if self.alpha_m is None: self.alpha_m = self.eta * 1e-2 if self.alpha_A is None: self.alpha_A = self.eta if self.eta <= 0: raise ValueError("eta must be positive") if self.alpha is None: self.alpha = 1e20 warnings.warn( "alpha was not set, so defaulting to alpha = 1e20 " "which is so" "large that the ||Qijk|| term in the optimization " "will be essentially ignored." ) if self.beta is None: self.beta = 1e20 warnings.warn( "beta was not set, so defaulting to beta = 1e20 " "which is so" "large that the ||Qijk + Qjik + Qkij|| " "term in the optimization will be essentially ignored." ) if self.alpha_m < 0 or self.alpha_m > self.eta: raise ValueError("0 <= alpha_m <= eta") if self.alpha_A < 0 or self.alpha_A > self.eta: raise ValueError("0 <= alpha_A <= eta") if self.gamma >= 0: raise ValueError("gamma must be negative") if self.tol <= 0 or self.tol_m <= 0 or self.eps_solver <= 0: raise ValueError("tol and tol_m must be positive") if self.inequality_constraints and np.any(self.reg_weight_lam == 0.0): raise ValueError("Inequality constraints requires reg_weight_lam!=0") if self.A0 is None: self.A0 = np.diag(self.gamma * np.ones(self._n_tgts)) if self.m0 is None: np.random.seed(1) self.m0 = (np.random.rand(self._n_tgts) - np.ones(self._n_tgts)) * 2
[docs] def set_params(self, **kwargs): super().set_params(**kwargs) self.__post_init_guard()
@staticmethod def _build_PC(polyterms: list[tuple[int, Int1D]]) -> Float3D: r"""Build the matrix that projects out the constant term of a library Coefficients in each polynomial equation :math:`i\in \{1,\dots, r\}` can be stored in an array arranged as written out on paper (e.g. :math:` f_i(x) = a^i_0 + a^i_1 x_1, a^i_2 x_1x_2, \dots a^i_N x_r^2`) or in a series of matrices :math:`E \in \mathbb R^r`, :math:`L\in \mathbb R^{r\times r}`, and (without loss of generality) in :math:`Q\in \mathbb R^{r \times r \times r}, where each :math:`Q^{(i)}_{j,k}` is symmetric in the last two indexes. This function builds the projection tensor for extracting the constant terms :math:`E` from a set of coefficients in the first representation. Args: polyterms: the ordering and meaning of terms in the equations. Each entry represents a term in the equation and comprises its index and an array of exponents for each variable Returns: 3rd order tensor """ n_targets, n_features, _, _, _ = _build_lib_info(polyterms) c_terms = [ind for ind, exps in polyterms if sum(exps) == 0] PC = np.zeros((n_targets, n_targets, n_features)) if c_terms: # either a length 0 or length 1 list PC[range(n_targets), range(n_targets), c_terms[0]] = 1.0 return PC @staticmethod def _build_PL(polyterms: list[tuple[int, Int1D]]) -> tuple[Float4D, Float4D]: r"""Build the matrix that projects out the linear terms of a library Coefficients in each polynomial equation :math:`i\in \{1,\dots, r\}` can be stored in an array arranged as written out on paper (e.g. :math:` f_i(x) = a^i_0 + a^i_1 x_1, a^i_2 x_1x_2, \dots a^i_N x_r^2`) or in a series of matrices :math:`E \in \mathbb R^r`, :math:`L\in \mathbb R^{r\times r}`, and (without loss of generality) in :math:`Q\in \mathbb R^{r \times r \times r}, where each :math:`Q^{(i)}_{j,k}` is symmetric in the last two indexes. This function builds the projection tensor for extracting the linear terms :math:`L` from a set of coefficients in the first representation. The function also calculates the projection tensor for extracting the symmetrized version of L Args: polyterms: the ordering and meaning of terms in the equations. Each entry represents a term in the equation and comprises its index and an array of exponents for each variable Returns: Two 4th order tensors, the first one symmetric in the first two indexes. """ n_targets, n_features, lin_terms, _, _ = _build_lib_info(polyterms) PL_tensor_unsym = np.zeros((n_targets, n_targets, n_targets, n_features)) tgts = range(n_targets) for j in range(n_targets): PL_tensor_unsym[tgts, j, tgts, lin_terms[j]] = 1 PL_tensor = (PL_tensor_unsym + np.transpose(PL_tensor_unsym, [1, 0, 2, 3])) / 2 return cast(Float4D, PL_tensor), cast(Float4D, PL_tensor_unsym) @staticmethod def _build_PQ(polyterms: list[tuple[int, Int1D]]) -> Float5D: r"""Build the matrix that projects out the quadratic terms of a library Coefficients in each polynomial equation :math:`i\in \{1,\dots, r\}` can be stored in an array arranged as written out on paper (e.g. :math:` f_i(x) = a^i_0 + a^i_1 x_1, a^i_2 x_1x_2, \dots a^i_N x_r^2`) or in a series of matrices :math:`E \in \mathbb R^r`, :math:`L\in \mathbb R^{r\times r}`, and (without loss of generality) in :math:`Q\in \mathbb R^{r \times r \times r}, where each :math:`Q^{(i)}_{j,k}` is symmetric in the last two indexes. This function builds the projection tensor for extracting the quadratic forms :math:`Q` from a set of coefficients in the first representation. Args: polyterms: the ordering and meaning of terms in the equations. Each entry represents a term in the equation and comprises its index and an array of exponents for each variable Returns: 5th order tensor symmetric in second and third indexes. """ n_targets, n_features, _, pure_terms, mixed_terms = _build_lib_info(polyterms) PQ = np.zeros((n_targets, n_targets, n_targets, n_targets, n_features)) tgts = range(n_targets) for j, k in product(*repeat(range(n_targets), 2)): if j == k: PQ[tgts, j, k, tgts, pure_terms[j]] = 1.0 if j != k: PQ[tgts, j, k, tgts, mixed_terms[frozenset({j, k})]] = 1 / 2 return cast(Float5D, PQ) def _set_Ptensors( self, n_targets: int ) -> tuple[Float3D, Float4D, Float4D, Float5D, Float5D, Float5D]: """Make the projection tensors used for the algorithm.""" lib = PolynomialLibrary(2, include_bias=self._include_bias).fit( np.zeros((1, n_targets)) ) polyterms = [(t_ind, exps) for t_ind, exps in enumerate(lib.powers_)] PC_tensor = self._build_PC(polyterms) PL_tensor, PL_tensor_unsym = self._build_PL(polyterms) PQ_tensor = self._build_PQ(polyterms) PT_tensor = PQ_tensor.transpose([1, 0, 2, 3, 4]) # PM is the sum of PQ and PQ which projects out the sum of Qijk and Qjik # These are the quadtratic terms of the energy growth PM_tensor = cast(Float5D, PQ_tensor + PT_tensor) return PC_tensor, PL_tensor_unsym, PL_tensor, PQ_tensor, PT_tensor, PM_tensor def _update_A(self, A_old, PW): """Update the proxy enstrophy quadratic form, :math:`A`? Currently, this function projects a proxy of the quadratic form onto the negative definite cone (w/tol gamma) and then "projects" the exitisting quadratic form onto those same eigenvalues """ eigvals, eigvecs = np.linalg.eigh(A_old) eigPW, eigvecsPW = np.linalg.eigh(PW) r = A_old.shape[0] A = np.diag(eigvals) for i in range(r): if eigvals[i] > self.gamma: A[i, i] = self.gamma return eigvecsPW @ A @ np.linalg.inv(eigvecsPW) def _convergence_criterion(self): """Calculate the convergence criterion for the optimization over w""" this_coef = self.history_[-1] if len(self.history_) > 1: last_coef = self.history_[-2] else: last_coef = np.zeros_like(this_coef) err_coef = np.sqrt(np.sum((this_coef - last_coef) ** 2)) return err_coef def _m_convergence_criterion(self): """Calculate the convergence criterion for the optimization over m""" return np.sum(np.abs(self.m_history_[-2] - self.m_history_[-1])) def _objective(self, x, y, coef_sparse, A, PW, k): """Objective function""" # Compute the errors sindy_loss = (y - np.dot(x, coef_sparse)) ** 2 relax_loss = (A - PW) ** 2 Qijk = np.einsum("ya,abcde,ed", self.enstrophy.P, self.PQ_, coef_sparse) # Qijk is H0 in the paper Qijk_permsum = _permutation_asymmetry(Qijk) * 3 H0tilde = _convert_quad_terms_to_ens_basis(Qijk_permsum, self.enstrophy) L1 = self.reg_weight_lam * np.sum(np.abs(coef_sparse.flatten())) sindy_loss = 0.5 * np.sum(sindy_loss) relax_loss = 0.5 * np.sum(relax_loss) / self.eta nonlin_ens_loss = 0.5 * np.sum(Qijk**2) / self.alpha cubic_ens_loss = 0.5 * np.sum(H0tilde**2) / self.beta obj = sindy_loss + relax_loss + L1 if self.method == "local": obj += nonlin_ens_loss + nonlin_ens_loss if self.verbose and k % max(1, self.max_iter // 10) == 0: print( f"{k:5d} ... {sindy_loss:8.3e} ... {relax_loss:8.3e} ... {L1:8.2e}" f" ... {nonlin_ens_loss:8.2e} ... {cubic_ens_loss:8.2e} ... {obj:8.2e}" ) return obj def _update_coef_sparse_rs(self, var_len, x_expanded, y, Pmatrix, A, coef_prev): """Solve coefficient update with CVXPY if reg_weight_lam != 0""" xi, cost = self._create_var_and_part_cost(var_len, x_expanded, y) cost = cost + cp.sum_squares(Pmatrix @ xi - A.flatten()) / self.eta if self.method == "local": p_Q = np.reshape(self.PQ_, (-1, var_len), "F") p_PQ = np.tensordot(self.enstrophy.P, self.PQ_, axes=([1], [0])) p_PQ_ep = _permutation_asymmetry(p_PQ) p_H0tilde = _convert_quad_terms_to_ens_basis(p_PQ_ep, self.enstrophy) p_H0tilde = np.reshape(p_H0tilde, (-1, var_len), "F") cost = cost + 0.5 * cp.sum_squares(p_Q @ xi) / self.alpha cost = cost + 0.5 * cp.sum_squares(p_H0tilde @ xi) / self.beta return self._update_coef_cvxpy(xi, cost, var_len, coef_prev, self.eps_solver) def _update_coef_nonsparse_rs( self, x: Float2D, y: Float2D, P_A: Float4D, quad_energy_coeff_A: Float2D, ): """Solve a partial minimization for w, the SINDy coefficients Letting :math:`Q_{ijk} =P_Q w` and :math:`A=P_A w`, Partially minimizes the sum of: * Error in predicting SINDy dynamics, :math:`||Xw-y||` * Deviation from previous iterate of quadratic energy coefficients, :math:`||A-A^{-}||^2` * (Optionally) the size of the quadratic ODE terms, :math:`||Q_{ijk}||^2` * (Optionally) the symmetry-breaking of the quadratic ODE terms, :math:`||Q_{ijk} + Q_{jki} + Q_{kij}||^2` Args: x: The samples of SINDy features y: The SINDy derivatives P_A: Projects coefficients w onto energy quadratic form. quad_energy_coeff_A: The energy quadratic form from the previous iteration of trapping SINDy coef_prev """ _, _, n_tgts, n_features = P_A.shape var_len = n_tgts * n_features # Input variable still has 2 dimensions, so hessians have 4 hess = np.zeros((n_tgts, n_features, n_tgts, n_features)) xTx = x.T @ x for tgt in range(n_tgts): hess[tgt, :, tgt, :] = xTx pTp = np.einsum("abcd,baef->cdef", P_A, P_A) hess += pTp / self.eta if self.method == "local": PQTPQ = np.tensordot(self.PQ_, self.PQ_, axes=([0, 1, 2], [0, 1, 2])) p_PQ = np.einsum("ya,abcde->ybcde", self.enstrophy.P, self.PQ_) p_H0 = _permutation_asymmetry(p_PQ) * 3 p_H0tilde = _convert_quad_terms_to_ens_basis(p_H0, self.enstrophy) PQTPQ_ep = np.tensordot(p_H0tilde, p_H0tilde, axes=([0, 1, 2], [0, 1, 2])) hess += PQTPQ / self.alpha + PQTPQ_ep / self.beta PaTA = np.einsum("bacd,ab->cd", P_A, quad_energy_coeff_A) # We are still, across most of SR3, handling (NFeat,NTarget) shaped arrays xTy = x.T @ y PaTA = np.transpose(PaTA) grad_const = xTy + PaTA / self.eta hess = np.transpose(hess, [1, 0, 3, 2]) hess = np.reshape(hess, (var_len, var_len)) grad_const = np.reshape(grad_const, (var_len)) coef_flat = self._solve_nonsparse_relax_and_split(hess, grad_const) return coef_flat.reshape(n_features, n_tgts) def _solve_m_relax_and_split( self, trap_ctr: Float1D, prev_A: Float2D, coef_sparse: np.ndarray[tuple[NFeat, NTarget], FloatDType], ) -> tuple[Float1D, Float2D]: r"""Updates the trap center Ideally, the step would find a trap center that reduces the enstrophy quadratic form as close as possible to the negative semidefinite cone. .. math:: \underset{m, A\in \mathcal S^{--}}{\arg\min}||(L-Qm)^S - A||^2 where the trap center is :math:`m`. However, the algorithm simply performs one step of gradient update on the trap center and a gradient-like step of the proxy enstrophy quadratic form. TODO: improve variable names, test out variants such as completely optimizing over trap center, limiting A update to projection onto negative definite cone, or using updated trap center in A update. See eqn 31-35 in Kaptanoglu et al 2021 and Algorithm 1 Returns: new trap center (:math:`m`) and proxy enstrophy quadratic terms (:math:`A`) """ # prox-gradient descent for (A, m) # Calculate As p_AS = _create_A_symm(self.PL_unsym_, self.PM_, trap_ctr, self.enstrophy) AS_coeff = np.tensordot(p_AS, coef_sparse, axes=([3, 2], [0, 1])) # Calculate error in quadratic balance, and adjust trap center relax_err_wrt_proxy = (prev_A - AS_coeff) / self.eta # Calculate quadratic terms of As as a function of m A_wrt_m = np.tensordot(self.PM_, coef_sparse, axes=([4, 3], [0, 1])) A_wrt_m = np.einsum( "ya,abc,bz->yzc", self.enstrophy.P_root, A_wrt_m, self.enstrophy.P_root_inv ) A_wrt_m = (A_wrt_m + np.transpose(A_wrt_m, [1, 0, 2])) / 2 # PMT_PW is gradient of relaxation wrt trap center (eqn 35) PMT_PW = np.tensordot(A_wrt_m, relax_err_wrt_proxy, axes=([2, 1], [0, 1])) trap_new = trap_ctr - self.alpha_m * PMT_PW # Update A A_new = self._update_A(prev_A - self.alpha_A * relax_err_wrt_proxy, AS_coeff) return trap_new, A_new def _solve_nonsparse_relax_and_split(self, hess, gradient_constant): """Update for the coefficients if reg_weight_lam = 0.""" if self.use_constraints: coef_nonsparse = _equality_constrained_linlsq( hess, gradient_constant, self.constraint_lhs, self.constraint_rhs ) else: hess_inv = np.linalg.pinv(hess, rcond=1e-10) coef_nonsparse = hess_inv.dot(gradient_constant) return coef_nonsparse def _reduce(self, x, y): """ Perform at most ``self.max_iter`` iterations of the TrappingSR3 algorithm. Assumes initial guess for coefficients is stored in ``self.coef_``. """ self.A_history_ = [] self.m_history_ = [] self.p_history_ = [] self.PW_history_ = [] self.PWeigs_history_ = [] self.history_ = [] n_samples, n_tgts = y.shape n_features = n_poly_features( n_tgts, 2, include_bias=self._include_bias, interaction_only=self._interaction_only, ) var_len = n_features * n_tgts ( self.PC_, self.PL_unsym_, self.PL_, self.PQ_, self.PT_, self.PM_, ) = self._set_Ptensors(n_tgts) # Set initial coefficients if self.use_constraints and self.constraint_order.lower() == "target": self.constraint_lhs = reorder_constraints( self.constraint_lhs, n_features, output_order="feature" ) coef_sparse: np.ndarray[tuple[NFeat, NTarget], FloatDType] = self.coef_.T # Print initial values for each term in the optimization if self.verbose: row = [ "Iter", "|y-Xw|^2", "|Pw-A|^2/eta", "|w|_1", "|Qijk|/a", "|Qijk+...|/b", "Total:", ] print( "{: >5} ... {: >8} ... {: >10} ... {: >5}" " ... {: >8} ... {: >10} ... {: >8}".format(*row) ) A = self.A0 self.A_history_.append(A) trap_ctr = self.m0 self.m_history_.append(trap_ctr) # Precompute some objects for optimization x_expanded = np.zeros((n_samples, n_tgts, n_features, n_tgts)) for i in range(n_tgts): x_expanded[:, i, :, i] = x x_expanded = np.reshape(x_expanded, (n_samples * n_tgts, n_tgts * n_features)) # keep track of last solution in case method fails trap_prev_ctr = trap_ctr # Begin optimization loop objective_history = [] for k in range(self.max_iter): # update p_AS tensor from the newest trap center p_AS = _create_A_symm(self.PL_unsym_, self.PM_, trap_ctr, self.enstrophy) Pmatrix = p_AS.reshape(n_tgts * n_tgts, n_tgts * n_features) self.p_history_.append(p_AS) coef_prev = coef_sparse if np.any(self.reg_weight_lam > 0.0) or self.inequality_constraints: coef_sparse = self._update_coef_sparse_rs( var_len, x_expanded, y, Pmatrix, A, coef_prev ) else: coef_sparse = self._update_coef_nonsparse_rs(x, y, p_AS, A) # If problem over xi becomes infeasible, break out of the loop if coef_sparse is None: coef_sparse = coef_prev break trap_ctr, A = self._solve_m_relax_and_split(trap_ctr, A, coef_sparse) # If problem over m becomes infeasible, break out of the loop if trap_ctr is None: trap_ctr = trap_prev_ctr break self.history_.append(coef_sparse.T) PW = np.tensordot(p_AS, coef_sparse, axes=([3, 2], [0, 1])) # (m,A) update finished, append the result self.m_history_.append(trap_ctr) self.A_history_.append(A) eigvals, eigvecs = np.linalg.eig(PW) self.PW_history_.append(PW) self.PWeigs_history_.append(np.sort(eigvals)) # update objective objective_history.append(self._objective(x, y, coef_sparse, A, PW, k)) if ( self._m_convergence_criterion() < self.tol_m and self._convergence_criterion() < self.tol ): break else: warnings.warn( f"TrappingSR3 did not converge after {self.max_iter} iters.", ConvergenceWarning, ) self.coef_ = coef_sparse.T self.objective_history = objective_history
def _make_constraints(n_tgts: int, **kwargs): """Create constraints for the Quadratic terms in TrappingSR3. These are the constraints from equation 5 of the Trapping SINDy paper. Args: n_tgts: number of coordinates or modes for which you're fitting an ODE. kwargs: Keyword arguments to PolynomialLibrary such as ``include_bias``. Returns: A tuple of the constraint zeros, and a constraint matrix to multiply by the coefficient matrix of Polynomial terms. Number of constraints is ``n_tgts + 2 * math.comb(n_tgts, 2) + math.comb(n_tgts, 3)``. Constraint matrix is of shape ``(n_constraint, n_feature, n_tgt)``. To get "feature" order constraints, use ``np.reshape(constraint_matrix, (n_constraints, -1))``. To get "target" order constraints, transpose axis 1 and 2 before reshaping. """ lib = PolynomialLibrary(2, **kwargs).fit(np.zeros((1, n_tgts))) terms = [(t_ind, exps) for t_ind, exps in enumerate(lib.powers_)] _, n_terms, linear_terms, pure_terms, mixed_terms = _build_lib_info(terms) # index of tgt -> index of its pure quadratic term pure_terms = {np.argmax(exps): t_ind for t_ind, exps in terms if max(exps) == 2} # two indexes of tgts -> index of their mixed quadratic term mixed_terms = { frozenset(np.argwhere(exponent == 1).flatten()): t_ind for t_ind, exponent in terms if max(exponent) == 1 and sum(exponent) == 2 } constraint_mat = np.vstack( ( _pure_constraints(n_tgts, n_terms, pure_terms), _antisymm_double_constraint(n_tgts, n_terms, pure_terms, mixed_terms), _antisymm_triple_constraints(n_tgts, n_terms, mixed_terms), ) ) return np.zeros(len(constraint_mat)), constraint_mat def _pure_constraints(n_tgts: int, n_terms: int, pure_terms: dict[int, int]) -> Float2D: """Set constraints for coefficients adorning terms like a_i^3 = 0""" constraint_mat = np.zeros((n_tgts, n_terms, n_tgts)) for constr_ind, (tgt_ind, term_ind) in zip(range(n_tgts), pure_terms.items()): constraint_mat[constr_ind, term_ind, tgt_ind] = 1.0 return constraint_mat def _antisymm_double_constraint( n_tgts: int, n_terms: int, pure_terms: dict[int, int], mixed_terms: dict[frozenset[int], int], ) -> Float2D: """Set constraints for coefficients adorning terms like a_i^2 * a_j=0""" constraint_mat_1 = np.zeros((len(mixed_terms), n_terms, n_tgts)) # a_i^2 * a_j constraint_mat_2 = np.zeros((len(mixed_terms), n_terms, n_tgts)) # a_i * a_j^2 for constr_ind, ((tgt_i, tgt_j), mix_term) in enumerate(mixed_terms.items()): constraint_mat_1[constr_ind, mix_term, tgt_i] = 1.0 constraint_mat_1[constr_ind, pure_terms[tgt_i], tgt_j] = 1.0 constraint_mat_2[constr_ind, mix_term, tgt_j] = 1.0 constraint_mat_2[constr_ind, pure_terms[tgt_j], tgt_i] = 1.0 return np.concatenate((constraint_mat_1, constraint_mat_2), axis=0) def _antisymm_triple_constraints( n_tgts: int, n_terms: int, mixed_terms: dict[frozenset[int], int] ) -> Float2D: constraint_mat = np.zeros((comb(n_tgts, 3), n_terms, n_tgts)) # a_ia_ja_k def find_symm_term(a, b): return mixed_terms[frozenset({a, b})] for constr_ind, (tgt_i, tgt_j, tgt_k) in enumerate(combo_nr(range(n_tgts), 3)): constraint_mat[constr_ind, find_symm_term(tgt_j, tgt_k), tgt_i] = 1 constraint_mat[constr_ind, find_symm_term(tgt_k, tgt_i), tgt_j] = 1 constraint_mat[constr_ind, find_symm_term(tgt_i, tgt_j), tgt_k] = 1 return constraint_mat def _build_lib_info( polyterms: list[tuple[int, Int1D]], ) -> tuple[int, int, dict[int, int], dict[int, int], dict[frozenset[int], int]]: """From polynomial, calculate various useful info Args: polyterms. The output of PolynomialLibrary.powers_. Each term is a tuple of it's index in the ordering and a 1D array of the exponents of each feature. Returns: the number of targets the number of features a dictionary from each target to its linear term index a dictionary from each target to its quadratic term index a dictionary from each pair of targets to its mixed term index """ try: n_targets = len(polyterms[0][1]) except IndexError: raise ValueError("Passed a polynomial library with no terms") n_features = len(polyterms) mixed_terms = { frozenset(np.argwhere(exps == 1).flatten()): t_ind for t_ind, exps in polyterms if max(exps) == 1 and sum(exps) == 2 } pure_terms = {np.argmax(exps): t_ind for t_ind, exps in polyterms if max(exps) == 2} linear_terms = { np.argmax(exps): t_ind for t_ind, exps in polyterms if sum(exps) == 1 } return n_targets, n_features, linear_terms, pure_terms, mixed_terms def _equality_constrained_linlsq( hess: Float2D, grad_const: Float2D, constraint_lhs: Float2D, constraint_rhs: Float2D ): """Solve the constrained least squares problem via lagrange multipliers For the inversion of the lagrange gradient matrix, see `wikipedia <https://en.wikipedia.org/wiki/Block_matrix#Inversion>`_ Arguments: hess: the hessian of the loss term. For regression Ax = b, this is A^TA. Must be a square (positive definite) matrix grad_const: the constant part of the gradient of the loss term. For regression Ax = b, this is A^Tb. Must be a column vector. constraint_lhs: matrix on left hand side of constraint equation Cx=d. Must have same second dimension as hess constraint_rhs: vector on right hand side of constraint equation Cx=d. Must be a column vector. Returns: Column vector """ C = constraint_lhs d = constraint_rhs # Careful with ill-conditioned matrices! inv1 = np.linalg.pinv(hess, rcond=1e-10) inv2 = np.linalg.pinv(C @ inv1 @ C.T, rcond=1e-10) return inv1 @ (grad_const + C.T @ inv2 @ (d - C @ inv1 @ grad_const)) TwoOrFourD = TypeVar("TwoOrFourD", Float2D, Float4D) def _create_A_symm( L_obj: TwoOrFourD, M_obj: Union[Float3D, Float5D], trap_ctr: Float1D, ens: EnstrophyMat, ) -> TwoOrFourD: r"""Create the enstrophy/energy growth quadratic form In the paper, this is :math:`A^S`. This function can be used to create either the matrix itself or a projector from SINDy coefficient layout to the matrix. Note that L and Q themselves are the unsymmetrized variants. Args: L_obj: The linear terms in the original differential equation. This can either be the coefficients themselves, or a projector onto the coefficients M_obj: The quadratic form of the original differential equation, plus its transpose of the 2nd and 3rd axes. See eqn 3.8 of Schlegel and Noack 2015. This can be the quadratic form, or a projector onto the quadratic form. If a projector, it must match L_obj. trap_ctr: The posited center of the trapping region. ens: the enstrophy matrix of the system """ mPM = np.einsum("ijk...,k->ij...", M_obj, trap_ctr) A = np.einsum("ya,ab...,bz->yz...", ens.P_root, L_obj + mPM, ens.P_root_inv) A_S = (A + np.einsum("ij...->ji...", A)) / 2 return A_S Q_Arr = TypeVar("Q_Arr", Float3D, Float5D) def _permutation_asymmetry(Q_obj: Q_Arr) -> Q_Arr: r"""Calculate the permutation-asymmetric part of the first 3 axes of Q In the paper, this defines the directions of cubic energy growth. It is used to create :math:`\tilde{Q}'`, its 2D flattening, :math:`H_0`, and its enstrophy-basis (z-space) version, :math:`\tilde {H_0}` This works on both the true quadratic terms as well as the projector onto the quadratic terms. Note: The paper uses three times this quantity. """ p1 = partial(np.einsum, "ijk...->jki...") p2 = partial(np.einsum, "ijk...->kij...") return (Q_obj + p1(Q_obj) + p2(Q_obj)) / 3 def _convert_quad_terms_to_ens_basis(PQ: Q_Arr, ens: EnstrophyMat) -> Q_Arr: r"""Convert quadratic enstrophy terms to enstrophy basis. In the paper, this captures the change from :math:`\tilde{Q}=PQ`, the quadratic enstrophy terms acting on :math:`y`, to the quadratic terms acting on :math:`z=P^{1/2}y`. It is also used to convert the cubic enstrophy growth terms to cubic growth terms in the enstrophy basis, i.e. :math:`\tilde {H_0}` from :math:`H_0`. This works on both the true quadratic terms as well as the projector onto the quadratic terms """ return np.einsum( "xa,abc...,by,cz->xyz...", ens.P_root_inv, PQ, ens.P_root_inv, ens.P_root_inv )