This is not GTEx related. This workflow converts data from an immune response eQTL study in primary human monocytes to mash format.
Here is the data. I keep this notebook here because it is mostly the codes from this notebook.
Summary statistics data are in text format, one row per gene-snp pair. Columns are:
gene_id variant_id tss_distance pval_nominal slope se
Each condition has a separate text file.
[global]
cwd = '~/Documents/Misc/ImmuneQTL'
sumstats_files = glob.glob("${cwd!a}/nominal_*.txt")
sumstats_db = "${cwd!a}/ImmuneQTLSummary.h5"
parameter: msg = "Immune QTL study fastQTL analysis"
parameter: maxsize = 1000 # maximum number of groups per HDF5 file
[convert_0, extract_0]
depends: Py_Module('tables')
report: output = '.sos/utils.py'
import sys, os, re, copy
import numpy as np, pandas as pd, tables as tb
tb.parameters.MAX_GROUP_WIDTH = 51200
# tb.parameters.NODE_CACHE_SLOTS = -51200
# tb.parameters.METADATA_CACHE_SIZE = 1048576 * 100000
# tb.parameters.CHUNK_CACHE_SIZE = 2097152 * 100000
# tb.parameters.CHUNK_CACHE_NELMTS = 521
class Environment:
def __init__(self):
self.float = np.float64
self.duplicate_tag = '_duplicated_'
self.common_suffix = '.h5'
env = Environment()
class TBData(dict):
def __init__(self, data, name, msg = None, root = '/', complib = 'bzip2'):
'''bzip2 may not be compatible with other hdf5 applications; but zlib is fine'''
self.__root = root.strip('/')
self.__group = name
self.__msg = msg
try:
if type(data) is dict:
self.update(data)
elif type(data) is str:
# is file name
self.__load(tb.open_file(data))
else:
# is file stream
self.__load(data)
except tb.exceptions.NoSuchNodeError:
raise ValueError('Cannot find dataset {}!'.format(name))
self.tb_filters = tb.Filters(complevel = 9, complib=complib)
def sink(self, filename):
with tb.open_file(filename, 'a') as f:
if self.__root:
try:
f.create_group("/", self.__root)
except:
pass
try:
# there is existing data -- have to merge with current data
# have to do this because the input file lines are not grouped by gene names!!
# use try ... except to hopefully faster than if ... else
# e.g., if not f.__contains__('/{}'.format(self.__group)) ... else ...
for element in f.list_nodes('/{}/{}'.format(self.__root, self.__group)):
if element.name != 'colnames':
self[element.name] = np.concatenate((element[:], self[element.name]))
except tb.exceptions.NoSuchNodeError:
f.create_group("/" + self.__root, self.__group,
self.__msg if self.__msg else self.__group)
for key in self:
self.__store_array(key, f)
f.flush()
def dump(self, table, output = False):
if output:
pd.DataFrame({self['colnames'][i] : self[table][:,i] for i in range(len(self['colnames']))}, index = self['rownames']).to_csv(sys.stdout, na_rep = 'NA')
return None
else:
return pd.DataFrame({self['colnames'][i] : self[table][:,i] for i in range(len(self['colnames']))}, index = self['rownames'])
def __load(self, fstream):
try:
for element in fstream.list_nodes('/{}/{}'.format(self.__root, self.__group)):
self[element.name] = element[:]
fstream.close()
except:
fstream.close()
raise
def __roll_back(self, group, name):
try:
n = getattr(group, name)
n._f_remove()
except AttributeError:
pass
def __store_array(self, name, fstream):
if self.__root:
element = getattr(getattr(fstream.root, self.__root), self.__group)
else:
element = getattr(fstream.root, self.__group)
arr = self[name]
if type(arr) is list:
arr = np.array(arr)
self.__roll_back(element, name)
#
if arr.shape != (0,):
ds = fstream.create_carray(element, name, tb.Atom.from_dtype(arr.dtype), arr.shape,
filters = self.tb_filters)
ds[:] = arr
def get_tb_grps(filenames, group_name = None):
if isinstance(filenames, str):
filenames = [filenames]
names = set()
for filename in filenames:
with tb.open_file(filename) as f:
names.update([node._v_name for node in (f.root if group_name is None else getattr(f.root, '{}'.format(group_name)))])
return sorted(names)
class SSData:
def __init__(self, header = False):
self.data = {'buffer':{'data':[], 'rownames':[]}, 'output':{}}
self.header = header
self.previous_name = self.current_name = None
self.count = -1
def parse(self, line, ensg_version = 0):
# input line is snp, gene, beta, t, pval
if not line:
self.__reset()
self.current_name = None
return 1
line = line.strip().split()
self.count += 1
if self.header and self.count == 0:
return 0
#
line[0] = line[0].strip('"')
if ensg_version == 0:
line[0] = line[0].split('.')[0]
if self.previous_name is None:
self.previous_name = line[0]
self.current_name = line[0]
if self.current_name != self.previous_name:
self.__reset()
self.data['buffer']['data'].append([line[4], line[5], line[3]])
self.data['buffer']['rownames'].append(line[1].strip('"'))
return 0
def __reset(self):
self.data['buffer']['data'] = np.array(self.data['buffer']['data'], dtype = env.float)
self.data['buffer']['rownames'] = np.array(self.data['buffer']['rownames'])
self.data['buffer']['colnames'] = np.array(['beta','se','pval'])
self.data['output'] = copy.deepcopy(self.data['buffer'])
self.data['buffer'] = {'data':[], 'rownames':[]}
def dump(self):
return self.data['output']
class DataMerger(TBData):
def __init__(self, files, name, msg = None):
TBData.__init__(self, {}, name, msg, complib = "zlib")
self.files = sorted(files)
self.__group = name
def merge(self):
data = {}
one_snp = None
failure_ct = 0
# Collect data
for item in self.files:
tissue = re.sub(r'{}$'.format(env.common_suffix), '', os.path.basename(item))
try:
data[tissue] = TBData(item, self.__group)
if one_snp is None: one_snp = data[tissue]['rownames'][0]
except ValueError:
data[tissue] = {'data' : np.array([[np.nan, np.nan, np.nan]]), 'rownames': None}
failure_ct += 1
# Fix row name
# Because in GTEx data file there are duplicated gene-snp pairs having different sumstats!!
if data[tissue]['rownames'] is not None:
data[tissue]['rownames'] = self.__dedup(data[tissue]['rownames'], item)
if failure_ct == len(self.files):
return 1
# Merge data
for idx, item in enumerate(['beta','se','pval']):
self[item] = pd.concat([pd.DataFrame(
{tissue : data[tissue]['data'][:,idx]},
index = data[tissue]['rownames'] if data[tissue]['rownames'] is not None else [one_snp]
) for tissue in sorted(data.keys())], axis = 1)
if 'rownames' not in self:
self['rownames'] = np.array(self[item].index, dtype = str)
if 'colnames' not in self:
self['colnames'] = np.array(self[item].columns.values.tolist(), dtype = str)
self[item] = np.array(self[item].as_matrix(), dtype = env.float)
# np.savetxt(sys.stdout, self['pval'], fmt='%10.5f')
# print(self['rownames'])
# print(self['colnames'])
return 0
def __dedup(self, seq, filename):
seen = {}
dups = set()
def __is_seen(x, seen):
if x not in seen:
seen[x] = 0
return 0
else:
seen[x] += 1
dups.add(x)
return 1
# Tag them
obs = [x if not __is_seen(x, seen) else '%s%s%s' % (x, env.duplicate_tag, seen[x]) for x in seq]
# Log them
if len(dups):
filename = os.path.splitext(filename)[0]
with open(filename + '.error', 'a') as f:
for item in dups:
f.write('{}:{} appeared {} times in {}\n'.\
format(self.__group, item, seen[item] + 1, filename))
return obs
def get_gs_pairs(data, name, num = (1, 9), method = 'equal_space'):
'''choose gene-snp pairs from data, controlled by num = (a, b)
for the best gene-snp pair (a = 0 or 1), and b other random
gene-snp pairs'''
def random_sample(x, k):
return sorted(set(np.random.choice(x, min(len(x), k))))
def equal_sample(x, k):
if len(x) < k:
return x
f = lambda m, n: [i*n//m + n//(2*m) for i in range(m)]
return sorted(set([x[i] for i in f(k, len(x))]))
output = {'colnames' : data['colnames']}
lp = data.dump('pval')
lp = lp[np.all(np.isfinite(lp), axis=1)]
#
if lp.empty:
return None
# Find max SNP-gene pair
lp = -np.log10(lp)
rowidx = np.where(data['rownames'] == lp.max(axis=1).idxmax())[0][0]
if num[0] > 0:
output['max_rownames'] = ['%s_%s' % (name, data['rownames'][rowidx].decode())]
output['max'] = {}
for k in ['beta', 'pval', 'se']:
output['max'][k] = data[k][rowidx, :]
if num[1] > 0:
all_nullidxes = [y for y, x in enumerate(data['rownames']) if x in lp.index and y != rowidx]
sample_nullidxes = randome_sample(all_nullidxes, num[1]) if method == 'random' else equal_sample(all_nullidxes, num[1])
output['null_rownames'] = ['%s_%s' % (name, data['rownames'][x].decode()) for x in sample_nullidxes]
output['null'] = {}
for k in ['beta', 'pval', 'se']:
output['null'][k] = data[k][sample_nullidxes, :]
if not 'max' in output and not 'null' in output:
output = None
return output
[convert_1, extract_1]
depends: executable("h5copy")
input: sumstats_files, group_by = 1, pattern = "{path}/nominal_{name}_se.txt"
output: expand_pattern('{_path}/{_name}.h5')
task:
python: input = '.sos/utils.py'
import warnings
ssp = SSData(header = True)
group_counts = 0
with open(${_input!r}) as f:
while True:
line = f.readline()
quit = ssp.parse(line)
if ssp.current_name != ssp.previous_name:
group_counts += 1
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category = tb.FlavorWarning)
data = TBData(ssp.dump(), ssp.previous_name, ${msg!r})
data.sink("${_output!n}_%i.tmp" % (np.ceil(group_counts / ${maxsize})) if ${maxsize} > 0 else ${_output!r})
ssp.previous_name = ssp.current_name
if quit:
break
if ${maxsize} > 0:
from glob import glob
tmpfiles = list(glob("${_output!n}_*.tmp"))
for item in sorted(tmpfiles):
for name in get_tb_grps(item):
os.system('h5copy -i {0} -o ${_output} -s "/{1}" -d "/{1}"'.format(item, name))
bash:
rm -f ${_output!n}_*.tmp
Create tables for each summary statistic per gene. Rows are SNPs, columns are tissue names. This time I use zlib
compression for better compatibility.
[convert_2, extract_2]
gene_list = ''
depends: executable("h5copy")
output: sumstats_db
task:
python: input = '.sos/utils.py'
import warnings
if ${gene_list!r}:
gene_names = [${gene_list!r,}]
else:
gene_names = get_tb_grps([${input!r,}])
failure_ct = 0
for idx, item in enumerate(gene_names):
ssm = DataMerger([${input!r,}], item, ${msg!r})
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category = tb.FlavorWarning)
if ssm.merge() == 0:
ssm.sink("${output!n}_%i.tmp" % (np.ceil((idx + 1.0) / ${maxsize})) if ${maxsize} > 0 else ${_output!r})
else:
failure_ct += 1
with open("${output!n}.log", 'w') as f:
f.write("%s out of %s groups merged!\n" % (len(gene_names) - failure_ct, len(gene_names)))
if ${maxsize} > 0:
from glob import glob
tmpfiles = list(glob("${output!n}_*.tmp"))
for item in sorted(tmpfiles):
for name in get_tb_grps(item):
os.system('h5copy -i {0} -o ${output} -s "/{1}" -d "/{1}"'.format(item, name))
bash:
rm -f ${_output!n}_*.tmp
[extract_3]
num_null = 9
gene_list = ''
output: "${input!n}.4MASH.h5"
task:
python: input = '.sos/utils.py'
import warnings
if ${gene_list!r}:
gene_names = [${gene_list!r,}]
else:
gene_names = get_tb_grps([${input!r,}])
output = {}
output['null'] = {'colnames': None, 'rownames': [], 'beta': None, 'se': None, 'pval': None}
output['max'] = {'colnames': None, 'rownames': [], 'beta': None, 'se': None, 'pval': None}
failure_ct = 0
for idx, name in enumerate(gene_names):
# extract the best gene-snp pair or some null gene-snp pairs
res = get_gs_pairs(TBData(${input!r}, name), name, (1, ${num_null}))
#
if res is None:
failure_ct += 1
continue
for k in output:
if not k in res:
continue
for kk in output[k]:
if kk == 'rownames':
if output[k]['rownames'] is None:
output[k]['rownames'] = res["{}_rownames".format(k)]
else:
output[k]['rownames'].extend(res["{}_rownames".format(k)])
elif kk == 'colnames':
if output[k]['colnames'] is None:
output[k]['colnames'] = res['colnames']
else:
output[k][kk] = np.vstack((output[k][kk], res[k][kk])) if output[k][kk] is not None else res[k][kk]
#
if failure_ct < len(gene_names):
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category = tb.FlavorWarning)
for k in output:
TBData(dict(output[k]), k, msg = "%s, %s gene-snp pair" % (${msg!r}, k), complib = 'zlib').sink(${output!r})
with open("${output!n}.log", 'w') as f:
f.write("%s out of %s groups extracted!\n" % (len(gene_names) - failure_ct, len(gene_names)))
[extract_4]
# convert HDF5 to RDS
depends: R_library('rhdf5')
output: "${input!n}.rds"
task:
R:
ConvertP2Z <- function(pval, beta) {
z <- abs(qnorm(pval / 2))
z[which(beta < 0)] <- -1 * z[which(beta < 0)]
return(z)
}
GetSS <- function(table, db) {
dat <- rhdf5::h5read(db, table)
dat$"z" <- ConvertP2Z(dat$"pval", dat$"beta")
for (name in c("beta", "se", "pval", "z")) {
dat[[name]] <- t(dat[[name]])
colnames(dat[[name]]) <- dat$colnames
rownames(dat[[name]]) <- dat$rownames
}
dat$colnames <- dat$rownames <- NULL
return(dat)
}
saveRDS(list(max = GetSS('max', ${input!r}), null = GetSS('null', ${input!r})), ${output!r})
To convert to HDF5 only
sos run analysis/20171010_eQTLSummary_HDF5.ipynb convert
To convert to HDF5 AND extract mash
input
sos run analysis/20171010_eQTLSummary_HDF5.ipynb extract