import warnings
from itertools import combinations_with_replacement as combo_wr
from itertools import product
from typing import Tuple
from typing import Union
import cvxpy as cp
import numpy as np
from numpy.typing import NDArray
from scipy.linalg import cho_factor
from scipy.linalg import cho_solve
from sklearn.exceptions import ConvergenceWarning
from ..utils import reorder_constraints
from .constrained_sr3 import ConstrainedSR3
[docs]class TrappingSR3(ConstrainedSR3):
"""
Trapping variant of sparse relaxed regularized regression.
This optimizer can be used to identify systems with globally
stable (bounded) solutions.
Attempts to minimize one of two related objective functions
.. math::
0.5\\|y-Xw\\|^2_2 + \\lambda R(w)
+ 0.5\\|Pw-A\\|^2_2/\\eta + \\delta_0(Cw-d)
+ \\delta_{\\Lambda}(A)
or
.. math::
0.5\\|y-Xw\\|^2_2 + \\lambda R(w)
+ \\delta_0(Cw-d)
+ 0.5 * maximumeigenvalue(A)/\\eta
where :math:`R(w)` is a regularization function, which must be convex,
:math:`\\delta_0` is an indicator function that provides a hard constraint
of CW = d, and :math:\\delta_{\\Lambda} is a term to project the :math:`A`
matrix onto the space of negative definite matrices.
See the following references for more details:
Kaptanoglu, Alan A., et al. "Promoting global stability in
data-driven models of quadratic nonlinear dynamics."
arXiv preprint arXiv:2105.01843 (2021).
Parameters
----------
evolve_w :
If false, don't update w and just minimize over (m, A)
eta :
Determines the strength of the stability term ||Pw-A||^2 in the
optimization. The default value is very large so that the
algorithm default is to ignore the stability term. In this limit,
this should be approximately equivalent to the ConstrainedSR3 method.
eps_solver :
If threshold != 0, this specifies the error tolerance in the
CVXPY (OSQP) solve. Default 1.0e-7 (Default is 1.0e-3 in OSQP.)
relax_optim :
If relax_optim = True, use the relax-and-split method. If False,
try a direct minimization on the largest eigenvalue.
inequality_constraints : bool, optional (default False)
If True, relax_optim must be false or relax_optim = True
AND threshold != 0, so that the CVXPY methods are used.
alpha_A :
Determines the step size in the prox-gradient descent over A.
For convergence, need alpha_A <= eta, so default
alpha_A = eta is used.
alpha_m :
Determines the step size in the prox-gradient descent over m.
For convergence, need alpha_m <= eta / ||w^T * PQ^T * PQ * w||.
Typically 0.01 * eta <= alpha_m <= 0.1 * eta. (default eta * 0.1)
gamma :
Determines the negative interval that matrix A is projected onto.
For most applications gamma = 0.1 - 1.0 works pretty well.
tol_m :
Tolerance used for determining convergence of the optimization
algorithm over m.
thresholder :
Regularization function to use. For current trapping SINDy,
only the L1 and L2 norms are implemented. Note that other convex norms
could be straightforwardly implemented, but L0 requires
reformulation because of nonconvexity. (default 'L1')
accel :
Whether or not to use accelerated prox-gradient descent for (m, A).
(default False)
m0 :
Initial guess for trap center in the optimization. Default None
initializes vector elements randomly in [-1, 1]. shape (n_targets)
A0 :
Initial guess for vector A in the optimization. Shape (n_targets, n_targets)
Default None, meaning A is initialized as A = diag(gamma).
Attributes
----------
A_history_ : list
History of the auxiliary variable A that approximates diag(PW).
m_history_ : list
History of the shift vector m that determines the origin of the
trapping region.
PW_history_ : list
History of PW = A^S, the quantity we are attempting to make
negative definite.
PWeigs_history_ : list
History of diag(PW), a list of the eigenvalues of A^S at
each iteration. Tracking this allows us to ascertain if
A^S is indeed being pulled towards the space of negative
definite matrices.
PL_unsym_ : np.ndarray, shape (n_targets, n_targets, n_targets, n_features)
Unsymmetrized linear coefficient part of the P matrix in ||Pw - A||^2
PL_ : np.ndarray, shape (n_targets, n_targets, n_targets, n_features)
Linear coefficient part of the P matrix in ||Pw - A||^2
PQ_ : np.ndarray, shape (n_targets, n_targets,
n_targets, n_targets, n_features)
Quadratic coefficient part of the P matrix in ||Pw - A||^2
objective_history_: list
History of the objective value at each iteration
Examples
--------
>>> import numpy as np
>>> from scipy.integrate import odeint
>>> from pysindy import SINDy
>>> from pysindy.optimizers import TrappingSR3
>>> lorenz = lambda z,t : [10*(z[1] - z[0]),
>>> z[0]*(28 - z[2]) - z[1],
>>> z[0]*z[1] - 8/3*z[2]]
>>> t = np.arange(0,2,.002)
>>> x = odeint(lorenz, [-8,8,27], t)
>>> opt = TrappingSR3(threshold=0.1)
>>> model = SINDy(optimizer=opt)
>>> model.fit(x, t=t[1]-t[0])
>>> model.print()
x0' = -10.004 1 + 10.004 x0
x1' = 27.994 1 + -0.993 x0 + -1.000 1 x1
x2' = -2.662 x1 + 1.000 1 x0
"""
def __init__(
self,
*,
eta: Union[float, None] = None,
eps_solver: float = 1e-7,
relax_optim: bool = True,
inequality_constraints=False,
alpha_A: Union[float, None] = None,
alpha_m: Union[float, None] = None,
gamma: float = -0.1,
tol_m: float = 1e-5,
thresholder: str = "l1",
accel: bool = False,
m0: Union[NDArray, None] = None,
A0: Union[NDArray, None] = None,
**kwargs,
):
super().__init__(thresholder=thresholder, **kwargs)
self.eps_solver = eps_solver
self.relax_optim = relax_optim
self.inequality_constraints = inequality_constraints
self.m0 = m0
self.A0 = A0
self.alpha_A = alpha_A
self.alpha_m = alpha_m
self.eta = eta
self.gamma = gamma
self.tol_m = tol_m
self.accel = accel
self.__post_init_guard()
def __post_init_guard(self):
"""Conduct initialization post-init, as required by scikitlearn API"""
if self.thresholder.lower() not in ("l1", "l2", "weighted_l1", "weighted_l2"):
raise ValueError("Regularizer must be (weighted) L1 or L2")
if self.eta is None:
warnings.warn(
"eta was not set, so defaulting to eta = 1e20 "
"with alpha_m = 1e-2 * eta, alpha_A = eta. Here eta is so "
"large that the stability term in the optimization "
"will be ignored."
)
self.eta = 1e20
self.alpha_m = 1e18
self.alpha_A = 1e20
else:
if self.alpha_m is None:
self.alpha_m = self.eta * 1e-2
if self.alpha_A is None:
self.alpha_A = self.eta
if self.eta <= 0:
raise ValueError("eta must be positive")
if self.alpha_m < 0 or self.alpha_m > self.eta:
raise ValueError("0 <= alpha_m <= eta")
if self.alpha_A < 0 or self.alpha_A > self.eta:
raise ValueError("0 <= alpha_A <= eta")
if self.gamma >= 0:
raise ValueError("gamma must be negative")
if self.tol <= 0 or self.tol_m <= 0 or self.eps_solver <= 0:
raise ValueError("tol and tol_m must be positive")
if self.inequality_constraints and self.relax_optim and self.threshold == 0.0:
raise ValueError(
"Ineq. constr. -> threshold!=0 + relax_optim=True or relax_optim=False."
)
[docs] def set_params(self, **kwargs):
super().set_params(**kwargs)
self.__post_init_guard
def _set_Ptensors(
self, n_targets: int
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Make the projection tensors used for the algorithm."""
N = int((n_targets**2 + 3 * n_targets) / 2.0)
# delta_{il}delta_{jk}
PL_tensor_unsym = np.zeros((n_targets, n_targets, n_targets, N))
for i, j in combo_wr(range(n_targets), 2):
PL_tensor_unsym[i, j, i, j] = 1.0
# Now symmetrize PL
PL_tensor = (PL_tensor_unsym + np.transpose(PL_tensor_unsym, [1, 0, 2, 3])) / 2
# if j == k, delta_{il}delta_{N-r+j,n}
# if j != k, delta_{il}delta_{r+j+k-1,n}
PQ_tensor = np.zeros((n_targets, n_targets, n_targets, n_targets, N))
for (i, j, k, kk), n in product(combo_wr(range(n_targets), 4), range(N)):
if (j == k) and (n == N - n_targets + j) and (i == kk):
PQ_tensor[i, j, k, kk, n] = 1.0
if (j != k) and (n == n_targets + j + k - 1) and (i == kk):
PQ_tensor[i, j, k, kk, n] = 1 / 2
return PL_tensor_unsym, PL_tensor, PQ_tensor
@staticmethod
def _check_P_matrix(
n_tgts: int, n_feat: int, n_feat_expected: int, PL: np.ndarray, PQ: np.ndarray
) -> Tuple[np.ndarray, np.ndarray]:
"""Check if P tensor is properly defined"""
if (
PQ is None
or PL is None
or PQ.shape != (n_tgts, n_tgts, n_tgts, n_tgts, n_feat)
or PL.shape != (n_tgts, n_tgts, n_tgts, n_feat)
or n_feat != n_feat_expected # library is not quadratic/incorrect shape
):
PL = np.zeros((n_tgts, n_tgts, n_tgts, n_feat))
PQ = np.zeros((n_tgts, n_tgts, n_tgts, n_tgts, n_feat))
warnings.warn(
"PQ and PL tensors not defined, wrong shape, or incompatible with "
"feature library shape. Ensure feature library is quadratic. "
"Setting tensors to zero"
)
if not np.allclose(
np.transpose(PL, [1, 0, 2, 3]), PL, atol=1e-10
) or not np.allclose(np.transpose(PQ, [0, 2, 1, 3, 4]), PQ, atol=1e-10):
raise ValueError("PQ/PL tensors were passed but have the wrong symmetry")
return PL, PQ
def _update_coef_constraints(self, H, x_transpose_y, P_transpose_A, coef_sparse):
"""Solves the coefficient update analytically if threshold = 0"""
g = x_transpose_y + P_transpose_A / self.eta
inv1 = np.linalg.pinv(H, rcond=1e-15)
inv2 = np.linalg.pinv(
self.constraint_lhs.dot(inv1).dot(self.constraint_lhs.T), rcond=1e-15
)
rhs = g.flatten() + self.constraint_lhs.T.dot(inv2).dot(
self.constraint_rhs - self.constraint_lhs.dot(inv1).dot(g.flatten())
)
rhs = rhs.reshape(g.shape)
return inv1.dot(rhs)
def _update_A(self, A_old, PW):
"""Update the symmetrized A matrix"""
eigvals, eigvecs = np.linalg.eigh(A_old)
eigPW, eigvecsPW = np.linalg.eigh(PW)
r = A_old.shape[0]
A = np.diag(eigvals)
for i in range(r):
if eigvals[i] > self.gamma:
A[i, i] = self.gamma
return eigvecsPW @ A @ np.linalg.inv(eigvecsPW)
def _convergence_criterion(self):
"""Calculate the convergence criterion for the optimization over w"""
this_coef = self.history_[-1]
if len(self.history_) > 1:
last_coef = self.history_[-2]
else:
last_coef = np.zeros_like(this_coef)
err_coef = np.sqrt(np.sum((this_coef - last_coef) ** 2))
return err_coef
def _m_convergence_criterion(self):
"""Calculate the convergence criterion for the optimization over m"""
return np.sum(np.abs(self.m_history_[-2] - self.m_history_[-1]))
def _objective(self, x, y, coef_sparse, A, PW, q):
"""Objective function"""
# Compute the errors
R2 = (y - np.dot(x, coef_sparse)) ** 2
A2 = (A - PW) ** 2
L1 = self.threshold * np.sum(np.abs(coef_sparse.flatten()))
# convoluted way to print every max_iter / 10 iterations
if self.verbose and q % max(1, self.max_iter // 10) == 0:
row = [
q,
0.5 * np.sum(R2),
0.5 * np.sum(A2) / self.eta,
L1,
0.5 * np.sum(R2) + 0.5 * np.sum(A2) / self.eta + L1,
]
print(
"{0:10d} ... {1:10.4e} ... {2:10.4e} ... {3:10.4e}"
" ... {4:10.4e}".format(*row)
)
return 0.5 * np.sum(R2) + 0.5 * np.sum(A2) / self.eta + L1
def _solve_m_relax_and_split(self, m_prev, m, A, coef_sparse, tk_previous):
"""
If using the relaxation formulation of trapping SINDy, solves the
(m, A) algorithm update.
"""
# prox-grad for (A, m)
# Accelerated prox gradient descent
if self.accel:
tk = (1 + np.sqrt(1 + 4 * tk_previous**2)) / 2.0
m_partial = m + (tk_previous - 1.0) / tk * (m - m_prev)
tk_previous = tk
mPQ = np.tensordot(m_partial, self.PQ_, axes=([0], [0]))
else:
mPQ = np.tensordot(m, self.PQ_, axes=([0], [0]))
p = self.PL_ - mPQ
PW = np.tensordot(p, coef_sparse, axes=([3, 2], [0, 1]))
PQW = np.tensordot(self.PQ_, coef_sparse, axes=([4, 3], [0, 1]))
A_b = (A - PW) / self.eta
PQWT_PW = np.tensordot(PQW, A_b, axes=([2, 1], [0, 1]))
if self.accel:
m_new = m_partial - self.alpha_m * PQWT_PW
else:
m_new = m_prev - self.alpha_m * PQWT_PW
m_current = m_new
# Update A
A_new = self._update_A(A - self.alpha_A * A_b, PW)
return m_current, m_new, A_new, tk_previous
def _solve_nonsparse_relax_and_split(self, H, xTy, P_transpose_A, coef_prev):
"""Update for the coefficients if threshold = 0."""
if self.use_constraints:
coef_sparse = self._update_coef_constraints(
H, xTy, P_transpose_A, coef_prev
).reshape(coef_prev.shape)
else:
cho = cho_factor(H)
coef_sparse = cho_solve(cho, xTy + P_transpose_A / self.eta).reshape(
coef_prev.shape
)
return coef_sparse
def _solve_m_direct(self, n_tgts, coef_sparse):
"""
If using the direct formulation of trapping SINDy, solves the
entire problem in CVXPY regardless of the threshold value.
Note that this is a convex-composite (i.e. technically nonconvex)
problem, solved in CVXPY, so convergence/quality guarantees are
not available here!
"""
if np.all(self.PL_ == 0) and np.all(self.PQ_ == 0):
return np.zeros(n_tgts), coef_sparse # no optimization over m
else:
m_cp = cp.Variable(n_tgts)
L = np.tensordot(self.PL_, coef_sparse, axes=([3, 2], [0, 1]))
Q = np.reshape(
np.tensordot(self.PQ_, coef_sparse, axes=([4, 3], [0, 1])),
(n_tgts, n_tgts * n_tgts),
)
Ls = 0.5 * (L + L.T).flatten()
cost_m = cp.lambda_max(cp.reshape(Ls - m_cp @ Q, (n_tgts, n_tgts)))
prob_m = cp.Problem(cp.Minimize(cost_m))
# default solver is SCS here
prob_m.solve(eps=self.eps_solver, verbose=self.verbose_cvxpy)
m = m_cp.value
if m is None:
print("Infeasible solve over m, increase/decrease eta")
return None
return m
def _reduce(self, x, y):
"""
Perform at most ``self.max_iter`` iterations of the
TrappingSR3 algorithm.
Assumes initial guess for coefficients is stored in ``self.coef_``.
"""
self.A_history_ = []
self.m_history_ = []
self.PW_history_ = []
self.PWeigs_history_ = []
self.history_ = []
n_samples, n_features = x.shape
n_tgts = y.shape[1]
var_len = n_features * n_tgts
n_feat_expected = int((n_tgts**2 + 3 * n_tgts) / 2.0)
# Only relevant if the stability term is turned on.
self.PL_unsym_, self.PL_, self.PQ_ = self._set_Ptensors(n_tgts)
# make sure dimensions/symmetries are correct
self.PL_, self.PQ_ = self._check_P_matrix(
n_tgts, n_features, n_feat_expected, self.PL_, self.PQ_
)
# Set initial coefficients
if self.use_constraints and self.constraint_order.lower() == "target":
self.constraint_lhs = reorder_constraints(
self.constraint_lhs, n_features, output_order="target"
)
coef_sparse = self.coef_.T
# Print initial values for each term in the optimization
if self.verbose:
row = [
"Iteration",
"Data Error",
"Stability Error",
"L1 Error",
"Total Error",
]
print(
"{: >10} ... {: >10} ... {: >10} ... {: >10} ... {: >10}".format(*row)
)
# initial A
if self.A0 is not None:
A = self.A0
elif np.any(self.PQ_ != 0.0):
A = np.diag(self.gamma * np.ones(n_tgts))
else:
A = np.diag(np.zeros(n_tgts))
self.A_history_.append(A)
# initial guess for m
if self.m0 is not None:
trap_ctr = self.m0
else:
np.random.seed(1)
trap_ctr = (np.random.rand(n_tgts) - np.ones(n_tgts)) * 2
self.m_history_.append(trap_ctr)
# Precompute some objects for optimization
x_expanded = np.zeros((n_samples, n_tgts, n_features, n_tgts))
for i in range(n_tgts):
x_expanded[:, i, :, i] = x
x_expanded = np.reshape(x_expanded, (n_samples * n_tgts, n_tgts * n_features))
xTx = np.dot(x_expanded.T, x_expanded)
xTy = np.dot(x_expanded.T, y.flatten())
# if using acceleration
tk_prev = 1
trap_prev_ctr = trap_ctr
# Begin optimization loop
self.objective_history_ = []
for k in range(self.max_iter):
# update P tensor from the newest trap center
mPQ = np.tensordot(trap_ctr, self.PQ_, axes=([0], [0]))
p = self.PL_ - mPQ
Pmatrix = p.reshape(n_tgts * n_tgts, n_tgts * n_features)
coef_prev = coef_sparse
if self.relax_optim:
if self.threshold > 0.0:
# sparse relax_and_split
coef_sparse = self._update_coef_sparse_rs(
var_len, x_expanded, y, Pmatrix, A, coef_prev
)
else:
coef_sparse = self._update_coef_nonsparse_rs(
Pmatrix, A, coef_prev, xTx, xTy
)
trap_prev_ctr, trap_ctr, A, tk_prev = self._solve_m_relax_and_split(
trap_prev_ctr, trap_ctr, A, coef_sparse, tk_prev
)
else:
coef_sparse = self._update_coef_direct(
var_len, x_expanded, y, Pmatrix, coef_prev, n_tgts
)
trap_ctr = self._solve_m_direct(n_tgts, coef_sparse)
# If problem over xi becomes infeasible, break out of the loop
if coef_sparse is None:
coef_sparse = coef_prev
break
# If problem over m becomes infeasible, break out of the loop
if trap_ctr is None:
trap_ctr = trap_prev_ctr
break
self.history_.append(coef_sparse.T)
PW = np.tensordot(p, coef_sparse, axes=([3, 2], [0, 1]))
# (m,A) update finished, append the result
self.m_history_.append(trap_ctr)
self.A_history_.append(A)
eigvals, eigvecs = np.linalg.eig(PW)
self.PW_history_.append(PW)
self.PWeigs_history_.append(np.sort(eigvals))
# update objective
self.objective_history_.append(self._objective(x, y, coef_sparse, A, PW, k))
if (
self._m_convergence_criterion() < self.tol_m
and self._convergence_criterion() < self.tol
):
break
else:
warnings.warn(
f"TrappingSR3 did not converge after {self.max_iter} iters.",
ConvergenceWarning,
)
self.coef_ = coef_sparse.T
def _update_coef_sparse_rs(self, var_len, x_expanded, y, Pmatrix, A, coef_prev):
xi, cost = self._create_var_and_part_cost(var_len, x_expanded, y)
cost = cost + cp.sum_squares(Pmatrix @ xi - A.flatten()) / self.eta
return self._update_coef_cvxpy(xi, cost, var_len, coef_prev, self.eps_solver)
def _update_coef_nonsparse_rs(self, Pmatrix, A, coef_prev, xTx, xTy):
pTp = np.dot(Pmatrix.T, Pmatrix)
H = xTx + pTp / self.eta
P_transpose_A = np.dot(Pmatrix.T, A.flatten())
return self._solve_nonsparse_relax_and_split(H, xTy, P_transpose_A, coef_prev)
def _update_coef_direct(self, var_len, x_expanded, y, Pmatrix, coef_prev, n_tgts):
xi, cost = self._create_var_and_part_cost(var_len, x_expanded, y)
cost += cp.lambda_max(cp.reshape(Pmatrix @ xi, (n_tgts, n_tgts))) / self.eta
return self._update_coef_cvxpy(xi, cost, var_len, coef_prev, self.eps_solver)