This is the core update for VEM step of M&M ASH model, version 2, single SNP calculation under MASH model.
The notebook prototypes the codes in libgaow
repo.
$\newcommand{\bs}[1]{\boldsymbol{#1}}$ $\DeclareMathOperator*{\diag}{diag}$ $\DeclareMathOperator*{\cov}{cov}$ $\DeclareMathOperator*{\rank}{rank}$ $\DeclareMathOperator*{\var}{var}$ $\DeclareMathOperator*{\tr}{tr}$ $\DeclareMathOperator*{\veco}{vec}$ $\DeclareMathOperator*{\uniform}{\mathcal{U}niform}$ $\DeclareMathOperator*{\argmin}{arg\ min}$ $\DeclareMathOperator*{\argmax}{arg\ max}$ $\DeclareMathOperator*{\N}{N}$ $\DeclareMathOperator*{\gm}{Gamma}$ $\DeclareMathOperator*{\dif}{d}$
%cd /home/gaow/GIT/software/libgaow/py/src/
dat = readRDS('/home/gaow/Documents/GTExV8/Thyroid.Lung.FMO2.filled.rds')
str(dat)
attach(dat)
%get X Y --from R
import numpy as np
X
Y = Y.as_matrix()
Y
%save /home/gaow/GIT/software/libgaow/py/src/model_mash.py -f
#!/usr/bin/env python3
__author__ = "Gao Wang"
__copyright__ = "Copyright 2016, Stephens lab"
__email__ = "gaow@uchicago.edu"
__license__ = "MIT"
__version__ = "0.1.0"
import numpy as np, scipy as sp
from scipy.stats import norm, multivariate_normal as mvnorm
from collections import OrderedDict
def inv_sympd(m):
'''
Inverse of symmetric positive definite
https://stackoverflow.com/questions/40703042/more-efficient-way-to-invert-a-matrix-knowing-it-is-symmetric-and-positive-semi
'''
zz , _ = sp.linalg.lapack.dpotrf(m, False, False)
inv_m , info = sp.linalg.lapack.dpotri(zz)
# lapack only returns the upper or lower triangular part
return np.triu(inv_m) + np.triu(inv_m, k=1).T
def get_svs(s, V):
'''
diag(s) @ V @ diag(s)
'''
return (s * V.T).T * s
def safe_mvnorm_logpdf(val, cov):
try:
return mvnorm.logpdf(val, cov=cov)
except np.linalg.linalg.LinAlgError:
if len(val.shape) == 1:
return np.inf if np.sum(val) < 1E-6 else -np.inf
else:
return np.array([np.inf if np.sum(x) < 1E-6 else -np.inf for x in val.T])
class LikelihoodMASH:
def __init__(self, data):
self.J = data.B.shape[0]
self.R = data.B.shape[1]
self.P = len(data.U)
self.data = data
self.data.lik = {'relative_likelihood' : None,
'lfactor': None,
'marginal_loglik': None,
'loglik': None,
'null_loglik': None,
'alt_loglik': None}
self.debug = None
def compute_log10bf(self):
self.data.l10bf = (self.data.lik['alt_loglik'] - self.data.lik['null_loglik']) / np.log(10)
def compute_relative_likelihood_matrix(self):
matrix_llik = self._calc_likelihood_matrix_comcov() if self.data.is_common_cov() \
else self._calc_likelihood_matrix()
lfactors = np.amax(matrix_llik, axis = 1)
self.data.lik['relative_likelihood'] = np.exp(matrix_llik - np.vstack(lfactors))
self.data.lik['lfactor'] = lfactors
def _calc_likelihood_matrix(self):
loglik = np.zeros((self.J, self.P))
for j in range(self.J):
sigma_mat = get_svs(self.data.S[j,:], self.data.V)
loglik[j,:] = np.array([safe_mvnorm_logpdf(self.data.B[j,:], sigma_mat + self.data.U[p]) for p in self.data.U])
return loglik
def _calc_likelihood_matrix_comcov(self):
sigma_mat = get_svs(self.data.S[0,:], self.data.V)
return np.array([safe_mvnorm_logpdf(self.data.B, sigma_mat + self.data.U[p]) for p in self.data.U]).T
def compute_loglik_from_matrix(self, options = ['marginal', 'alt', 'null']):
'''
data.lik.relative_likelihood first column is null, the rest are alt
'''
if 'marginal' in options:
self.data.lik['marginal_loglik'] = np.log(self.data.lik['relative_likelihood'] @ self.data.pi) + self.data.lik['lfactor']
self.data.lik['loglik'] = np.sum(self.data.lik['marginal_loglik'])
if 'alt' in options:
self.data.lik['alt_loglik'] = np.log(self.data.lik['relative_likelihood'][:,1:] @ (self.data.pi[1:] / (1 - self.data.pi[0]))) + self.data.lik['lfactor']
if 'null' in options:
self.data.lik['null_loglik'] = np.log(self.data.lik['relative_likelihood'][:,0]) + self.data.lik['lfactor']
class PosteriorMASH:
def __init__(self, data):
'''
// @param b_mat J by R
// @param s_mat J by R
// @param v_mat R by R
// @param U_cube list of prior covariance matrices, for each mixture component P by R by R
'''
self.J = data.B.shape[0]
self.R = data.B.shape[1]
self.P = len(data.U)
self.data = data
self.data.post_mean_mat = np.zeros((self.R, self.J))
self.data.post_mean2_mat = np.zeros((self.R, self.J))
self.data.neg_prob_mat = np.zeros((self.R, self.J))
self.data.zero_prob_mat = np.zeros((self.R, self.J))
def compute_posterior_weights(self):
d = (self.data.pi * self.data.lik['relative_likelihood'])
self.data.posterior_weights = (d.T / np.sum(d, axis = 1))
def compute_posterior(self):
for j in range(self.J):
Vinv_mat = inv_sympd(get_svs(self.data.S[j,:], self.data.V))
mu1_mat = np.zeros((self.R, self.P))
mu2_mat = np.zeros((self.R, self.P))
zero_mat = np.zeros((self.R, self.P))
neg_mat = np.zeros((self.R, self.P))
for p, name in enumerate(self.data.U.keys()):
U1_mat = self.get_posterior_cov(Vinv_mat, self.data.U[name])
mu1_mat[:,p] = self.get_posterior_mean_vec(self.data.B[j,:], Vinv_mat, U1_mat)
sigma_vec = np.sqrt(np.diag(U1_mat))
null_cond = (sigma_vec == 0)
mu2_mat[:,p] = np.square(mu1_mat[:,p]) + np.diag(U1_mat)
if not null_cond.all():
neg_mat[np.invert(null_cond),p] = norm.sf(mu1_mat[np.invert(null_cond),p], scale=sigma_vec[np.invert(null_cond)])
zero_mat[null_cond,p] = 1.0
self.data.post_mean_mat[:,j] = mu1_mat @ self.data.posterior_weights[:,j]
self.data.post_mean2_mat[:,j] = mu2_mat @ self.data.posterior_weights[:,j]
self.data.neg_prob_mat[:,j] = neg_mat @ self.data.posterior_weights[:,j]
self.data.zero_prob_mat[:,j] = zero_mat @ self.data.posterior_weights[:,j]
def compute_posterior_comcov(self):
Vinv_mat = inv_sympd(get_svs(self.data.S[0,:], self.data.V))
for p, name in enumerate(self.data.U.keys()):
zero_mat = np.zeros((self.R, self.J))
U1_mat = self.get_posterior_cov(Vinv_mat, self.data.U[name])
mu1_mat = self.get_posterior_mean_mat(self.data.B, Vinv_mat, U1_mat)
sigma_vec = np.sqrt(np.diag(U1_mat))
null_cond = (sigma_vec == 0)
sigma_mat = np.tile(sigma_vec, (self.J, 1)).T
neg_mat = np.zeros((self.R, self.J))
if not null_cond.all():
neg_mat[np.invert(null_cond),:] = norm.sf(mu1_mat[np.invert(null_cond),:], scale = sigma_mat[np.invert(null_cond),:])
mu2_mat = np.square(mu1_mat) + np.vstack(np.diag(U1_mat))
zero_mat[null_cond,:] = 1.0
self.data.post_mean_mat += self.data.posterior_weights[p,:] * mu1_mat
self.data.post_mean2_mat += self.data.posterior_weights[p,:] * mu2_mat
self.data.neg_prob_mat += self.data.posterior_weights[p,:] * neg_mat
self.data.zero_prob_mat += self.data.posterior_weights[p,:] * zero_mat
@staticmethod
def get_posterior_mean_vec(B, V_inv, U):
return U @ (V_inv @ B)
@staticmethod
def get_posterior_mean_mat(B, V_inv, U):
return (B @ V_inv @ U).T
@staticmethod
def get_posterior_cov(V_inv, U):
return U @ inv_sympd(V_inv @ U + np.identity(U.shape[0]))
@classmethod
def apply(cls, data):
obj = cls(data)
obj.compute_posterior_weights()
if data.is_common_cov():
obj.compute_posterior_comcov()
else:
obj.compute_posterior()
class PriorMASH:
def __init__(self, data):
self.data = data
self.R = data.B.shape[1]
def expand_cov(self, use_pointmass = True):
def product(x,y):
for item in y:
yield x*item
res = OrderedDict()
if use_pointmass:
res['null'] = np.zeros((self.R, self.R))
res.update(OrderedDict(sum([[(f"{p}.{i+1}", g) for i, g in enumerate(product(self.data.U[p], np.square(self.data.grid)))] for p in self.data.U], [])))
self.data.U = res
%save /home/gaow/GIT/software/libgaow/py/src/regression_data.py -f
#!/usr/bin/env python3
__author__ = "Gao Wang"
__copyright__ = "Copyright 2016, Stephens lab"
__email__ = "gaow@uchicago.edu"
__license__ = "MIT"
__version__ = "0.1.0"
#from .model_mash import PriorMASH, LikelihoodMASH, PosteriorMASH
from scipy.stats import linregress
from sklearn.linear_model import LinearRegression
class RegressionData:
def __init__(self, X = None, Y = None, Z = None, B = None, S = None):
self.X = X
self.Y = Y
self.Z = Z
self.B = B
self.S = S
self.lik = None
if (self.X is not None and self.Y is not None) and (self.B is None and self.S is None):
self.get_summary_stats()
def get_summary_stats(self):
'''
perform univariate regression
FIXME: it is slower than lapply + .lm.fit in R
FIXME: this faster implementation is on my watch list:
https://github.com/ajferraro/fastreg
'''
self.B = np.zeros((self.X.shape[1], self.Y.shape[1]))
self.S = np.zeros((self.X.shape[1], self.Y.shape[1]))
for r, y in enumerate(self.Y.T):
self.B[:,r], self.S[:,r] = self.univariate_simple_regression(self.X, y)[:,[0,2]].T
@staticmethod
def univariate_simple_regression(X, y, Z=None):
if Z is not None:
model = LinearRegression()
model.fit(Z, y)
y = y - model.predict(Z)
return np.vstack([linregress(x, y) for x in X.T])[:,[0,1,4]]
def __str__(self):
l = dir(self)
d = self.__dict__
from pprint import pformat
return pformat(d, indent = 4)
class MASHData(RegressionData):
def __init__(self, X = None, Y = None, Z = None, B = None, S = None):
RegressionData.__init__(self, X, Y, Z, B, S)
self.post_mean_mat = None
self.post_mean2_mat = None
self.neg_prob_mat = None
self.zero_prob_mat = None
self._is_common_cov = None
self.V = None
self.U = None
self.pi = None
self.posterior_weights = None
self.grid = None
self.l10bf = None
def is_common_cov(self):
if self._is_common_cov is None and self.S is not None:
self._is_common_cov = (self.S.T == self.S.T[0,:]).all()
return self._is_common_cov
def compute_posterior(self):
PosteriorMASH.apply(self)
def set_prior(self):
prior = PriorMASH(self)
prior.expand_cov()
res = .lm.fit(cbind(1, X[,1:2]), Y[,1])
head(res$residuals)
head(Y[,1] - cbind(1, X[,1:2]) %*% coef(res))
res = lm(Y[,1] ~ X[,1:2])
head(res$residuals)
head(Y[,1] - cbind(1, X[,1:2]) %*% coef(res))
from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X[:,0:2], Y[:,0])
res = Y[:,0] - model.predict(X[:,0:2])
res[:10]
def mash_compute_this_and_that(X, Y):
data = MASHData(X = X, Y = Y)
data.U = {'identity': np.identity(2)}
data.V = np.identity(2)
data.pi = np.array([0.9, 0.05, 0.05])
data.grid = [0.5, 1]
data.set_prior()
lik = LikelihoodMASH(data)
lik.compute_relative_likelihood_matrix()
lik.compute_loglik_from_matrix()
lik.compute_log10bf()
data.compute_posterior()
return data
res = mash_compute_this_and_that(X, Y)
print(res)
betahat = res.B
sebetahat = res.S
res.B.shape
%get betahat sebetahat
# SoS bug
betahat = matrix(betahat, ncol = 2, byrow = T)
sebetahat = matrix(sebetahat, ncol = 2, byrow = T)
data = mashr::set_mash_data(betahat, sebetahat)
g = list(Ulist = list(identity = diag(2)), grid = c(0.5,1), pi = c(0.9,0.05,0.05), usepointmass = T)
output = mashr::mash(data, g = g, fixg = TRUE, outputlevel = 2)
res = output$result
str(res)
head(res$PosteriorMean, n = 10)
print(res.post_mean_mat.T[:10,:])
head(res$PosteriorSD, n = 10)
print(np.sqrt(res.post_mean2_mat - np.square(res.post_mean_mat)).T[:10,:])
head(res$NegativeProb, n = 10)
print(res.neg_prob_mat.T[:10,:])
print(res.lik['loglik'])
output$loglik
head(output$null_loglik, 10)
head(output$alt_loglik, 10)
print(dim(output$null_loglik))
head(output$vloglik)
res.lik['null_loglik'][:10]
res.lik['alt_loglik'][:10]
res.lik['marginal_loglik'][:10]
head(mashr::get_log10bf(output), 10)
res.l10bf[:10]
res.S = res.S ** 0
def mash_compute_this_and_that(B, S):
data = MASHData(B = B, S = S)
data.U = {'identity': np.identity(2)}
data.V = np.identity(2)
data.pi = np.array([0.9, 0.05, 0.05])
data.grid = [0.5, 1]
data.set_prior()
lik = LikelihoodMASH(data)
lik.compute_relative_likelihood_matrix()
lik.compute_loglik_from_matrix()
lik.compute_log10bf()
data.compute_posterior()
return data
res0 = mash_compute_this_and_that(res.B, res.S)
print(res0.lik['marginal_loglik'][:10])
print(res0.lik['loglik'])
res0.l10bf[:10]
data0 = mashr::set_mash_data(betahat, sebetahat ^ 0)
output0 = mashr::mash(data0, g = g, fixg = TRUE, outputlevel = 2)
print(head(output0$vloglik))
print(output0$loglik)
print(np.sqrt(res0.post_mean2_mat - np.square(res0.post_mean_mat)).T[:10,:])
print(res0.neg_prob_mat.T[:10,:])
print(res0.zero_prob_mat.T[:10,:])
head(mashr::get_log10bf(output0), 10)
head(output0$result$PosteriorSD, n = 10)
head(output0$result$NegativeProb, n = 10)