From bfcf9896dd9484ad2a8b4381eb59ae022a1fe302 Mon Sep 17 00:00:00 2001 From: zhengp0 Date: Wed, 14 Feb 2024 11:26:37 -0800 Subject: [PATCH] use the right estimate_re function --- src/limetr/__init__.py | 222 ++++++++++++++++++++--------------------- tests/test_limetr.py | 146 ++++++++++++++++----------- 2 files changed, 199 insertions(+), 169 deletions(-) diff --git a/src/limetr/__init__.py b/src/limetr/__init__.py index d17a824..4440ffe 100644 --- a/src/limetr/__init__.py +++ b/src/limetr/__init__.py @@ -78,11 +78,11 @@ def __init__( self.V = S**2 # pass in the priors - self.use_constraints = (C is not None) - self.use_regularizer = (H is not None) - self.use_uprior = (uprior is not None) - self.use_gprior = (gprior is not None) - self.use_lprior = (lprior is not None) + self.use_constraints = C is not None + self.use_regularizer = H is not None + self.use_uprior = uprior is not None + self.use_gprior = gprior is not None + self.use_lprior = lprior is not None self.C = C self.JC = JC @@ -104,17 +104,16 @@ def __init__( if self.use_regularizer: self.num_regularizer = H(np.zeros(self.k)).size self.hm = self.h[0] - self.hw = 1.0/self.h[1]**2 + self.hw = 1.0 / self.h[1] ** 2 else: self.num_regularizer = 0 if self.use_uprior: self.uprior = uprior else: - self.uprior = np.array([ - [-np.inf]*self.k_beta + [0.0]*self.k_gamma, - [np.inf]*self.k - ]) + self.uprior = np.array( + [[-np.inf] * self.k_beta + [0.0] * self.k_gamma, [np.inf] * self.k] + ) self.use_uprior = True self.lb = self.uprior[0] @@ -123,40 +122,42 @@ def __init__( if self.use_gprior: self.gprior = gprior self.gm = gprior[0] - self.gw = 1.0/gprior[1]**2 + self.gw = 1.0 / gprior[1] ** 2 if self.use_lprior: self.lprior = lprior self.lm = lprior[0] - self.lw = np.sqrt(2.0)/lprior[1] + self.lw = np.sqrt(2.0) / lprior[1] # double dimension pass into ipopt self.k_total += self.k # extend the constraints matrix if self.use_constraints: + def constraints(x): - v = x[:self.k] - v_abs = x[self.k:] + v = x[: self.k] + v_abs = x[self.k :] vec1 = C(v) - vec2 = np.hstack((v_abs - (v - self.lm), - v_abs + (v - self.lm))) + vec2 = np.hstack((v_abs - (v - self.lm), v_abs + (v - self.lm))) return np.hstack((vec1, vec2)) def jacobian(x): - v = x[:self.k] + v = x[: self.k] Id = np.eye(self.k) mat1 = JC(v) mat2 = np.block([[-Id, Id], [Id, Id]]) return np.vstack((mat1, mat2)) + else: + def constraints(x): - v = x[:self.k] - v_abs = x[self.k:] + v = x[: self.k] + v_abs = x[self.k :] vec = np.hstack((v_abs - v, v_abs + v)) @@ -168,42 +169,41 @@ def jacobian(x): return mat - self.num_constraints += 2*self.k + self.num_constraints += 2 * self.k self.constraints = constraints self.jacobian = jacobian - self.cl = np.hstack((self.cl, np.zeros(2*self.k))) - self.cu = np.hstack((self.cu, np.repeat(np.inf, 2*self.k))) + self.cl = np.hstack((self.cl, np.zeros(2 * self.k))) + self.cu = np.hstack((self.cu, np.repeat(np.inf, 2 * self.k))) # extend the regularizer matrix if self.use_regularizer: + def H_new(x): - v = x[:self.k] + v = x[: self.k] return H(v) def JH_new(x): - v = x[:self.k] + v = x[: self.k] - return np.hstack((JH(v), - np.zeros((self.num_regularizer, - self.k)))) + return np.hstack((JH(v), np.zeros((self.num_regularizer, self.k)))) self.H = H_new self.JH = JH_new # extend Uniform priors if self.use_uprior: - uprior_abs = np.array([[0.0]*self.k, [np.inf]*self.k]) + uprior_abs = np.array([[0.0] * self.k, [np.inf] * self.k]) self.uprior = np.hstack((self.uprior, uprior_abs)) self.lb = self.uprior[0] self.ub = self.uprior[1] # trimming option - self.use_trimming = (0.0 < inlier_percentage < 1.0) + self.use_trimming = 0.0 < inlier_percentage < 1.0 self.inlier_percentage = inlier_percentage - self.num_inliers = np.floor(inlier_percentage*self.N) + self.num_inliers = np.floor(inlier_percentage * self.N) self.num_outliers = self.N - self.num_inliers - self.w = np.repeat(self.num_inliers/self.N, self.N) + self.w = np.repeat(self.num_inliers / self.N, self.N) self.active_trimming_id = None # specify solution to be None @@ -239,7 +239,7 @@ def _check(self): assert 0.0 < self.inlier_percentage <= 1.0 if self.k > self.N: - warn('information insufficient!') + warn("information insufficient!") def _get_vars(self, x: NDArray) -> tuple[NDArray, NDArray]: beta, gamma = x[self.idx_beta], x[self.idx_gamma] @@ -251,10 +251,10 @@ def _get_nll_components(self, beta: NDArray) -> tuple[NDArray, ...]: if self.use_trimming: sqrt_w = np.sqrt(self.w) sqrt_W = sqrt_w.reshape(self.N, 1) - F_beta = self.F(beta)*sqrt_w - JF_beta = self.JF(beta)*sqrt_W - Y = self.Y*sqrt_w - Z = self.Z*sqrt_W + F_beta = self.F(beta) * sqrt_w + JF_beta = self.JF(beta) * sqrt_W + Y = self.Y * sqrt_w + Z = self.Z * sqrt_W V = self.V**self.w else: F_beta = self.F(beta) @@ -275,21 +275,21 @@ def objective(self, x: NDArray) -> float: # residual and variance R = Y - F_beta - val = 0.5*self.N*np.log(2.0*np.pi) + val = 0.5 * self.N * np.log(2.0 * np.pi) - D = BDLMat(dvecs=V, lmats=Z*np.sqrt(gamma), dsizes=self.n) - val += 0.5*D.logdet() - val += 0.5*R.dot(D.invdot(R)) + D = BDLMat(dvecs=V, lmats=Z * np.sqrt(gamma), dsizes=self.n) + val += 0.5 * D.logdet() + val += 0.5 * R.dot(D.invdot(R)) # add gpriors if self.use_regularizer: - val += 0.5*self.hw.dot((self.H(x) - self.hm)**2) + val += 0.5 * self.hw.dot((self.H(x) - self.hm) ** 2) if self.use_gprior: - val += 0.5*self.gw.dot((x[:self.k] - self.gm)**2) + val += 0.5 * self.gw.dot((x[: self.k] - self.gm) ** 2) if self.use_lprior: - val += self.lw.dot(x[self.k:]) + val += self.lw.dot(x[self.k :]) return val @@ -302,7 +302,7 @@ def gradient(self, x: NDArray) -> NDArray: # residual and variance R = Y - F_beta - D = BDLMat(dvecs=V, lmats=Z*np.sqrt(gamma), dsizes=self.n) + D = BDLMat(dvecs=V, lmats=Z * np.sqrt(gamma), dsizes=self.n) # gradient for beta DR = D.invdot(R) @@ -310,20 +310,19 @@ def gradient(self, x: NDArray) -> NDArray: # gradient for gamma DZ = D.invdot(Z) - g_gamma = 0.5*np.sum(Z*DZ, axis=0) - \ - 0.5*np.sum( - np.add.reduceat(DZ.T*R, self.idx_split, axis=1)**2, - axis=1) + g_gamma = 0.5 * np.sum(Z * DZ, axis=0) - 0.5 * np.sum( + np.add.reduceat(DZ.T * R, self.idx_split, axis=1) ** 2, axis=1 + ) g = np.hstack((g_beta, g_gamma)) # add gradient from the regularizer if self.use_regularizer: - g += self.JH(x).T.dot((self.H(x) - self.hm)*self.hw) + g += self.JH(x).T.dot((self.H(x) - self.hm) * self.hw) # add gradient from the gprior if self.use_gprior: - g += (x[:self.k] - self.gm)*self.gw + g += (x[: self.k] - self.gm) * self.gw # add gradient from the lprior if self.use_lprior: @@ -336,22 +335,22 @@ def hessian(self, x: NDArray) -> NDArray: _, JF_beta, _, Z, V = self._get_nll_components(beta) sqrt_gamma = np.sqrt(gamma) - d = BDLMat(dvecs=V, lmats=Z*np.sqrt(gamma), dsizes=self.n) + d = BDLMat(dvecs=V, lmats=Z * np.sqrt(gamma), dsizes=self.n) split_idx = np.cumsum(self.n)[:-1] v_study = np.split(V, split_idx) z_study = np.split(Z, split_idx, axis=0) - dlmats = [DLMat(v, z*sqrt_gamma) for v, z in zip(v_study, z_study)] + dlmats = [DLMat(v, z * sqrt_gamma) for v, z in zip(v_study, z_study)] beta_fisher = JF_beta.T.dot(d.invdot(JF_beta)) gamma_fisher = np.zeros((self.k_gamma, self.k_gamma)) for i, dlmat in enumerate(dlmats): - gamma_fisher += 0.5*(z_study[i].T.dot(dlmat.invdot(z_study[i])))**2 + gamma_fisher += 0.5 * (z_study[i].T.dot(dlmat.invdot(z_study[i]))) ** 2 hessian = block_diag(beta_fisher, gamma_fisher) if self.use_regularizer: JH = self.JH(x) - hessian += (JH.T*self.hw).dot(JH) + hessian += (JH.T * self.hw).dot(JH) if self.use_gprior: idx = np.arange(self.k) @@ -367,35 +366,29 @@ def objective_trimming(self, w: NDArray) -> float: r = self.Y - self.F(self.beta) d = self.V + t - val = 0.5*np.sum(r**2*w/d) - val += 0.5*self.N*np.log(2.0*np.pi) + 0.5*w.dot(np.log(d)) + val = 0.5 * np.sum(r**2 * w / d) + val += 0.5 * self.N * np.log(2.0 * np.pi) + 0.5 * w.dot(np.log(d)) return val def gradient_trimming(self, w: NDArray) -> NDArray: t = (self.Z**2).dot(self.gamma) - r = (self.Y - self.F(self.beta))**2 + r = (self.Y - self.F(self.beta)) ** 2 d = self.V + t - g = 0.5*r/d - g += 0.5*np.log(d) + g = 0.5 * r / d + g += 0.5 * np.log(d) return g - def optimize(self, - x0: Optional[NDArray] = None, - options: Optional[dict] = None): + def optimize(self, x0: Optional[NDArray] = None, options: Optional[dict] = None): if x0 is None: x0 = np.hstack((self.beta, self.gamma)) if self.use_lprior: x0 = np.hstack((x0, np.zeros(self.k))) constraints = [] if self.use_lprior or self.use_constraints: - constraints = [LinearConstraint( - self.jacobian(x0), - self.cl, - self.cu - )] + constraints = [LinearConstraint(self.jacobian(x0), self.cl, self.cu)] self.info = minimize( self.objective, x0, @@ -404,20 +397,22 @@ def optimize(self, hess=self.hessian, constraints=constraints, bounds=self.uprior.T, - options=options + options=options, ) self.soln = self.info.x self.beta, self.gamma = self._get_vars(self.soln) - def fit(self, - x0: Optional[NDArray] = None, - inner_options: Optional[dict] = None, - outer_verbose: bool = False, - outer_max_iter: int = 100, - outer_step_size: float = 1.0, - outer_tol: float = 1e-6, - normalize_trimming_grad: bool = False): + def fit( + self, + x0: Optional[NDArray] = None, + inner_options: Optional[dict] = None, + outer_verbose: bool = False, + outer_max_iter: int = 100, + outer_step_size: float = 1.0, + outer_tol: float = 1e-6, + normalize_trimming_grad: bool = False, + ): if not self.use_trimming: self.optimize(x0=x0, options=inner_options) @@ -436,29 +431,29 @@ def fit(self, if normalize_trimming_grad: w_grad /= np.linalg.norm(w_grad) w_new = utils.proj_capped_simplex( - self.w - outer_step_size*w_grad, + self.w - outer_step_size * w_grad, self.num_inliers, - active_id=self.active_trimming_id) + active_id=self.active_trimming_id, + ) - err = np.linalg.norm(w_new - self.w)/outer_step_size + err = np.linalg.norm(w_new - self.w) / outer_step_size np.copyto(self.w, w_new) num_iter += 1 if outer_verbose: obj = self.objective_trimming(self.w) - print('iter %4d, obj %8.2e, err %8.2e' % (num_iter, obj, err)) + print("iter %4d, obj %8.2e, err %8.2e" % (num_iter, obj, err)) if num_iter >= outer_max_iter: - print('reach max outer iter') + print("reach max outer iter") break return self.beta, self.gamma, self.w - def estimate_re(self, - beta: NDArray = None, - gamma: NDArray = None, - use_gamma: bool = True) -> NDArray: + def estimate_re( + self, beta: NDArray = None, gamma: NDArray = None, use_gamma: bool = True + ) -> NDArray: beta = self.beta if beta is None else beta gamma = self.gamma if gamma is None else gamma @@ -470,19 +465,19 @@ def estimate_re(self, u = [] for i in range(self.m): - rhs = (z[i].T/v[i]).dot(r[i]) + rhs = (z[i].T / v[i]).dot(r[i]) if use_gamma: - q = (z[i].T/v[i]).dot(z[i])*gamma + np.identity(self.k_gamma) - u.append(gamma*np.linalg.inv(q).dot(rhs)) + q = (z[i].T / v[i]).dot(z[i]) * gamma + np.identity(self.k_gamma) + u.append(gamma * np.linalg.inv(q).dot(rhs)) else: - q = (z[i].T/v[i]).dot(z[i]) + q = (z[i].T / v[i]).dot(z[i]) u.append(np.linalg.inv(q).dot(rhs)) return np.vstack(u) def get_re_vcov(self): if self.soln is None: - print('Please fit the model first.') + print("Please fit the model first.") return None _, _, _, Z, V = self._get_nll_components(self.beta) @@ -492,7 +487,7 @@ def get_re_vcov(self): vcov = [] for i in range(self.m): - hessian = (z[i].T/v[i]).dot(z[i]) + np.diag(1.0/self.gamma) + hessian = (z[i].T / v[i]).dot(z[i]) + np.diag(1.0 / self.gamma) vcov.append(np.linalg.pinv(hessian)) return vcov @@ -502,14 +497,14 @@ def get_gamma_fisher(self, gamma: NDArray) -> NDArray: v = np.split(self.S**2, np.cumsum(self.n)[:-1]) H = np.zeros((self.k_gamma, self.k_gamma)) for i in range(self.m): - q = np.diag(v[i]) + (z[i]*gamma).dot(z[i].T) + q = np.diag(v[i]) + (z[i] * gamma).dot(z[i].T) q = z[i].T.dot(np.linalg.inv(q).dot(z[i])) - H += 0.5*(q**2) + H += 0.5 * (q**2) return H def sample_beta(self, size: int = 1) -> NDArray: hessian = self.hessian(self.soln) - beta_hessian = hessian[:self.k_beta, :self.k_beta] + beta_hessian = hessian[: self.k_beta, : self.k_beta] beta_vcov = np.linalg.inv(beta_hessian) return np.random.multivariate_normal(self.beta, beta_vcov, size=size) @@ -520,8 +515,13 @@ def get_baseline_model(model: LimeTr): k_gamma = 1 Y = model.Y.copy() intercept = np.ones((Y.size, 1)) - def F(beta): return intercept.dot(beta) - def JF(beta): return intercept + + def F(beta): + return intercept.dot(beta) + + def JF(beta): + return intercept + Z = intercept S = model.S.copy() w = model.w.copy() @@ -536,51 +536,49 @@ def get_fe_pred(model: LimeTr): def get_re_pred(model: LimeTr): - re = model.estimateRE() - return np.sum(model.Z*np.repeat(re, model.n, axis=0), axis=1) + re = model.estimate_re() + return np.sum(model.Z * np.repeat(re, model.n, axis=0), axis=1) def get_varmat(model: LimeTr): S = model.S**model.w - Z = model.Z*np.sqrt(model.w)[:, None] + Z = model.Z * np.sqrt(model.w)[:, None] n = model.n gamma = model.gamma - return BDLMat(dvecs=S**2, lmats=Z*np.sqrt(gamma), dsizes=n) + return BDLMat(dvecs=S**2, lmats=Z * np.sqrt(gamma), dsizes=n) def get_marginal_rvar(model: LimeTr): - residual = (model.Y - get_fe_pred(model))*np.sqrt(model.w) + residual = (model.Y - get_fe_pred(model)) * np.sqrt(model.w) varmat = get_varmat(model) - return residual.dot(varmat.invdot(residual))/model.w.sum() + return residual.dot(varmat.invdot(residual)) / model.w.sum() def get_conditional_rvar(model: LimeTr): - residual = (model.Y - get_fe_pred(model) - get_re_pred(model))*np.sqrt(model.w) - varvec = (model.S**model.w)**2 - return (residual/varvec).dot(residual)/model.w.sum() + residual = (model.Y - get_fe_pred(model) - get_re_pred(model)) * np.sqrt(model.w) + varvec = (model.S**model.w) ** 2 + return (residual / varvec).dot(residual) / model.w.sum() -def get_marginal_R2(model: LimeTr, - baseline_model: LimeTr = None) -> float: +def get_marginal_R2(model: LimeTr, baseline_model: LimeTr = None) -> float: if baseline_model is None: baseline_model = get_baseline_model(model) - return 1 - get_marginal_rvar(model)/get_marginal_rvar(baseline_model) + return 1 - get_marginal_rvar(model) / get_marginal_rvar(baseline_model) -def get_conditional_R2(model: LimeTr, - baseline_model: LimeTr = None): +def get_conditional_R2(model: LimeTr, baseline_model: LimeTr = None): if baseline_model is None: baseline_model = get_baseline_model(model) - return 1 - get_conditional_rvar(model)/get_conditional_rvar(baseline_model) + return 1 - get_conditional_rvar(model) / get_conditional_rvar(baseline_model) def get_R2(model: LimeTr): baseline_model = get_baseline_model(model) return { "conditional_R2": get_conditional_R2(model, baseline_model), - "marginal_R2": get_marginal_R2(model, baseline_model) + "marginal_R2": get_marginal_R2(model, baseline_model), } diff --git a/tests/test_limetr.py b/tests/test_limetr.py index 66c6a14..cd8278c 100644 --- a/tests/test_limetr.py +++ b/tests/test_limetr.py @@ -1,6 +1,6 @@ import numpy as np import pytest -from limetr import LimeTr +from limetr import LimeTr, get_conditional_R2, get_marginal_R2, get_rmse from scipy.linalg import block_diag @@ -9,28 +9,28 @@ def lmtr_test_problem( use_constraints=False, use_regularizer=False, use_uprior=False, - use_gprior=False + use_gprior=False, ): np.random.seed(123) m = 10 - n = [5]*m + n = [5] * m N = sum(n) k_beta = 3 k_gamma = 2 k = k_beta + k_gamma beta_t = np.random.randn(k_beta) - gamma_t = np.random.rand(k_gamma)*0.09 + 0.01 + gamma_t = np.random.rand(k_gamma) * 0.09 + 0.01 X = np.random.randn(N, k_beta) Z = np.random.randn(N, k_gamma) - S = np.random.rand(N)*0.09 + 0.01 + S = np.random.rand(N) * 0.09 + 0.01 V = S**2 - D = np.diag(V) + (Z*gamma_t).dot(Z.T) + D = np.diag(V) + (Z * gamma_t).dot(Z.T) U = np.random.multivariate_normal(np.zeros(N), D) - E = np.random.randn(N)*S + E = np.random.randn(N) * S Y = X.dot(beta_t) + U + E @@ -68,12 +68,12 @@ def JH(x, M=M): H, JH, h = None, None, None if use_uprior: - uprior = np.array([[0.0]*k, [np.inf]*k]) + uprior = np.array([[0.0] * k, [np.inf] * k]) else: uprior = None if use_gprior: - gprior = np.array([[0.0]*k, [2.0]*k]) + gprior = np.array([[0.0] * k, [2.0] * k]) else: gprior = None @@ -82,17 +82,31 @@ def JH(x, M=M): else: inlier_percentage = 1.0 - return LimeTr(n, k_beta, k_gamma, Y, F, JF, Z, S=S, - C=C, JC=JC, c=c, - H=H, JH=JH, h=h, - uprior=uprior, gprior=gprior, - inlier_percentage=inlier_percentage) + return LimeTr( + n, + k_beta, + k_gamma, + Y, + F, + JF, + Z, + S=S, + C=C, + JC=JC, + c=c, + H=H, + JH=JH, + h=h, + uprior=uprior, + gprior=gprior, + inlier_percentage=inlier_percentage, + ) def lmtr_test_problem_lasso(): np.random.seed(123) m = 100 - n = [1]*m + n = [1] * m N = sum(n) k_beta = 10 k_gamma = 1 @@ -106,7 +120,7 @@ def lmtr_test_problem_lasso(): Y = X.dot(beta_t) S = np.repeat(1.0, N) - weight = 0.1*np.linalg.norm(X.T.dot(Y), np.inf) + weight = 0.1 * np.linalg.norm(X.T.dot(Y), np.inf) def F(beta): return X.dot(beta) @@ -114,11 +128,10 @@ def F(beta): def JF(beta): return X - uprior = np.array([[-np.inf]*k_beta + [0.0], [np.inf]*k_beta + [0.0]]) - lprior = np.array([[0.0]*k, [np.sqrt(2.0)/weight]*k]) + uprior = np.array([[-np.inf] * k_beta + [0.0], [np.inf] * k_beta + [0.0]]) + lprior = np.array([[0.0] * k, [np.sqrt(2.0) / weight] * k]) - return LimeTr(n, k_beta, k_gamma, Y, F, JF, Z, S=S, - uprior=uprior, lprior=lprior) + return LimeTr(n, k_beta, k_gamma, Y, F, JF, Z, S=S, uprior=uprior, lprior=lprior) def lmtr_objective(lmtr, x): @@ -131,29 +144,30 @@ def lmtr_objective(lmtr, x): # residual and variance R = Y - F_beta - val = 0.5*lmtr.N*np.log(2.0*np.pi) + val = 0.5 * lmtr.N * np.log(2.0 * np.pi) # should only use when testing split_idx = np.cumsum(lmtr.n)[:-1] v_study = np.split(V, split_idx) z_study = np.split(Z, split_idx, axis=0) - D = block_diag(*[np.diag(v) + (z*gamma).dot(z.T) - for v, z in zip(v_study, z_study)]) + D = block_diag( + *[np.diag(v) + (z * gamma).dot(z.T) for v, z in zip(v_study, z_study)] + ) inv_D = np.linalg.inv(D) eigvals = np.linalg.eigvals(D) - val += 0.5*np.sum(np.log(eigvals)) - val += 0.5*R.dot(inv_D.dot(R)) + val += 0.5 * np.sum(np.log(eigvals)) + val += 0.5 * R.dot(inv_D.dot(R)) # add gpriors if lmtr.use_regularizer: - val += 0.5*lmtr.hw.dot((lmtr.H(x) - lmtr.hm)**2) + val += 0.5 * lmtr.hw.dot((lmtr.H(x) - lmtr.hm) ** 2) if lmtr.use_gprior: - val += 0.5*lmtr.gw.dot((x[:lmtr.k] - lmtr.gm)**2) + val += 0.5 * lmtr.gw.dot((x[: lmtr.k] - lmtr.gm) ** 2) if lmtr.use_lprior: - val += lmtr.lw.dot(x[lmtr.k:]) + val += lmtr.lw.dot(x[lmtr.k :]) return val @@ -163,9 +177,9 @@ def lmtr_gradient(lmtr, x, eps=1e-10): g = np.zeros(lmtr.k_total) z = x + 0j for i in range(lmtr.k_total): - z[i] += eps*1j - g[i] = lmtr_objective(lmtr, z).imag/eps - z[i] -= eps*1j + z[i] += eps * 1j + g[i] = lmtr_objective(lmtr, z).imag / eps + z[i] -= eps * 1j return g @@ -174,9 +188,9 @@ def lmtr_gradient_trimming(lmtr, w, eps=1e-10): g = np.zeros(lmtr.N) z = w + 0j for i in range(lmtr.N): - z[i] += eps*1j - g[i] = lmtr.objective_trimming(z).imag/eps - z[i] -= eps*1j + z[i] += eps * 1j + g[i] = lmtr.objective_trimming(z).imag / eps + z[i] -= eps * 1j return g @@ -184,11 +198,13 @@ def lmtr_gradient_trimming(lmtr, w, eps=1e-10): def test_gradient(): # setup test problem # ------------------------------------------------------------------------- - model = lmtr_test_problem(use_trimming=True, - use_constraints=True, - use_regularizer=True, - use_uprior=True, - use_gprior=True) + model = lmtr_test_problem( + use_trimming=True, + use_constraints=True, + use_regularizer=True, + use_uprior=True, + use_gprior=True, + ) # test the gradient # ------------------------------------------------------------------------- @@ -202,11 +218,13 @@ def test_gradient(): def test_hessian(): - model = lmtr_test_problem(use_trimming=True, - use_constraints=True, - use_regularizer=True, - use_uprior=True, - use_gprior=True) + model = lmtr_test_problem( + use_trimming=True, + use_constraints=True, + use_regularizer=True, + use_uprior=True, + use_gprior=True, + ) x = np.random.randn(model.k) x[model.idx_gamma] = 0.1 hess = model.hessian(x) @@ -220,7 +238,7 @@ def test_limetr_gradient_trimming(): model = lmtr_test_problem(use_trimming=True) # decouple all the studies - model.n = np.array([1]*model.N) + model.n = np.array([1] * model.N) # test gradient_trimming # ------------------------------------------------------------------------- @@ -253,7 +271,7 @@ def test_limetr_lasso(): if beta[i] == 0.0 and np.abs(g_beta[i]) < model.lw[i]: g_beta[i] = 0.0 else: - g_beta[i] += np.sign(beta[i])*model.lw[i] + g_beta[i] += np.sign(beta[i]) * model.lw[i] assert np.abs(g_beta).max() < tol @@ -261,10 +279,9 @@ def test_limetr_lasso(): def test_limetr_objective(): # setup test problem # ------------------------------------------------------------------------- - model = lmtr_test_problem(use_constraints=True, - use_regularizer=True, - use_uprior=True, - use_gprior=True) + model = lmtr_test_problem( + use_constraints=True, use_regularizer=True, use_uprior=True, use_gprior=True + ) # test objective # ------------------------------------------------------------------------- @@ -290,8 +307,11 @@ def test_limetr_objective_trimming(): t = (model.Z**2).dot(model.gamma) d = model.V + t - tr_obj = 0.5*np.sum(r**2*w/d) + 0.5*model.N*np.log(2.0*np.pi)\ - + 0.5*w.dot(np.log(d)) + tr_obj = ( + 0.5 * np.sum(r**2 * w / d) + + 0.5 * model.N * np.log(2.0 * np.pi) + + 0.5 * w.dot(np.log(d)) + ) my_obj = model.objective_trimming(w) assert np.isclose(tr_obj, my_obj) @@ -304,14 +324,26 @@ def test_estimate_re(): re = model.estimate_re() F_beta, _, Y, Z, V = model._get_nll_components(model.beta) - r = F_beta + np.sum(Z*np.repeat(re, model.n, axis=0), axis=1) - Y + r = F_beta + np.sum(Z * np.repeat(re, model.n, axis=0), axis=1) - Y r = np.split(r, np.cumsum(model.n)[:-1]) z = np.split(Z, np.cumsum(model.n)[:-1], axis=0) v = np.split(V, np.cumsum(model.n)[:-1]) - g = np.vstack([ - (zi.T/vi).dot(ri) + ui / model.gamma - for zi, vi, ri, ui in zip(z, v, r, re) - ]) + g = np.vstack( + [(zi.T / vi).dot(ri) + ui / model.gamma for zi, vi, ri, ui in zip(z, v, r, re)] + ) assert np.allclose(g, 0.0) + + +def test_performance(): + model = lmtr_test_problem() + model.optimize() + + conditional_r2 = get_conditional_R2(model) + marginal_r2 = get_marginal_R2(model) + rmse = get_rmse(model) + + assert isinstance(conditional_r2, float) + assert isinstance(marginal_r2, float) + assert isinstance(rmse, float)