Here I convert GTEx summary statistics to HDF5 format, making it easier to query and share the results.
This procedure was initially written and used for GTEx V6 data in 2015. It is now updated to deal with V8 data, for input to mashr
.
Summar statistics data are in text format, one row per gene-snp pair. Columns are:
gene_id variant_id tss_distance ma_samples ma_count maf pval_nominal slope slope_se
Each tissue has a separate text file. Additionally there are support files of gene transcription start site coordinates, and SNP coordinates.
Since each file has its own collection of genes, to make merger easier, we provide a list of genes extracted from file GTEx_Analysis_2017-06-05_v8_RNASeQCv1.1.9_gene_reads.gct.gz
. This file has over 56K genes although the eQTL analysis only has the regular > 20K genes. The gene list GTEx_Analysis_2017-06-05_v8_RNASeQCv1.1.9_gene_reads.gct.annotation
was generated from the ensembl_annotation
workflow documented in analysis/20170929_Genotype.ipynb
.
[global]
cwd = '~/Documents/GTExV8'
sumstats_files = glob.glob("${cwd!a}/GTEx_Analysis_v8_eQTL_all_associations/*.allpairs.txt.gz")
v8db = "${cwd!a}/GTExV8.ciseQTL.h5"
parameter: msg = "GTEX V8 fastqtl analysis"
parameter: maxsize = 1000 # maximum number of groups per HDF5 file
[convert_0, extract_0]
depends: Py_Module('tables')
report: output = '.sos/utils.py'
import sys, os, re, copy
import numpy as np, pandas as pd, tables as tb
tb.parameters.MAX_GROUP_WIDTH = 51200
# tb.parameters.NODE_CACHE_SLOTS = -51200
# tb.parameters.METADATA_CACHE_SIZE = 1048576 * 100000
# tb.parameters.CHUNK_CACHE_SIZE = 2097152 * 100000
# tb.parameters.CHUNK_CACHE_NELMTS = 521
class Environment:
def __init__(self):
self.float = np.float64
self.duplicate_tag = '_duplicated_'
self.common_suffix = '.h5'
env = Environment()
class TBData(dict):
def __init__(self, data, name, msg = None, root = '/', complib = 'bzip2'):
'''bzip2 may not be compatible with other hdf5 applications; but zlib is fine'''
self.__root = root.strip('/')
self.__group = name
self.__msg = msg
try:
if type(data) is dict:
self.update(data)
elif type(data) is str:
# is file name
self.__load(tb.open_file(data))
else:
# is file stream
self.__load(data)
except tb.exceptions.NoSuchNodeError:
raise ValueError('Cannot find dataset {}!'.format(name))
self.tb_filters = tb.Filters(complevel = 9, complib=complib)
def sink(self, filename):
with tb.open_file(filename, 'a') as f:
if self.__root:
try:
f.create_group("/", self.__root)
except:
pass
try:
# there is existing data -- have to merge with current data
# have to do this because the input file lines are not grouped by gene names!!
# use try ... except to hopefully faster than if ... else
# e.g., if not f.__contains__('/{}'.format(self.__group)) ... else ...
for element in f.list_nodes('/{}/{}'.format(self.__root, self.__group)):
if element.name != 'colnames':
self[element.name] = np.concatenate((element[:], self[element.name]))
except tb.exceptions.NoSuchNodeError:
f.create_group("/" + self.__root, self.__group,
self.__msg if self.__msg else self.__group)
for key in self:
self.__store_array(key, f)
f.flush()
def dump(self, table, output = False):
if output:
pd.DataFrame({self['colnames'][i] : self[table][:,i] for i in range(len(self['colnames']))}, index = self['rownames']).to_csv(sys.stdout, na_rep = 'NA')
return None
else:
return pd.DataFrame({self['colnames'][i] : self[table][:,i] for i in range(len(self['colnames']))}, index = self['rownames'])
def __load(self, fstream):
try:
for element in fstream.list_nodes('/{}/{}'.format(self.__root, self.__group)):
self[element.name] = element[:]
fstream.close()
except:
fstream.close()
raise
def __roll_back(self, group, name):
try:
n = getattr(group, name)
n._f_remove()
except AttributeError:
pass
def __store_array(self, name, fstream):
if self.__root:
element = getattr(getattr(fstream.root, self.__root), self.__group)
else:
element = getattr(fstream.root, self.__group)
arr = self[name]
if type(arr) is list:
arr = np.array(arr)
self.__roll_back(element, name)
#
if arr.shape != (0,):
ds = fstream.create_carray(element, name, tb.Atom.from_dtype(arr.dtype), arr.shape,
filters = self.tb_filters)
ds[:] = arr
def get_tb_grps(filenames, group_name = None):
if isinstance(filenames, str):
filenames = [filenames]
names = set()
for filename in filenames:
with tb.open_file(filename) as f:
names.update([node._v_name for node in (f.root if group_name is None else getattr(f.root, '{}'.format(group_name)))])
return sorted(names)
class SSData:
def __init__(self, header = False):
self.data = {'buffer':{'data':[], 'rownames':[]}, 'output':{}}
self.header = header
self.previous_name = self.current_name = None
self.count = -1
def parse(self, line, ensg_version = 0):
# input line is snp, gene, beta, t, pval
if not line:
self.__reset()
self.current_name = None
return 1
line = line.strip().split()
self.count += 1
if self.header and self.count == 0:
return 0
#
if ensg_version == 0:
line[0] = line[0].split('.')[0]
if self.previous_name is None:
self.previous_name = line[0]
self.current_name = line[0]
if self.current_name != self.previous_name:
self.__reset()
self.data['buffer']['data'].append([line[7], line[8], line[6]])
self.data['buffer']['rownames'].append(line[1][3:-4])
return 0
def __reset(self):
self.data['buffer']['data'] = np.array(self.data['buffer']['data'], dtype = env.float)
self.data['buffer']['rownames'] = np.array(self.data['buffer']['rownames'])
self.data['buffer']['colnames'] = np.array(['beta','se','pval'])
self.data['output'] = copy.deepcopy(self.data['buffer'])
self.data['buffer'] = {'data':[], 'rownames':[]}
def dump(self):
return self.data['output']
class DataMerger(TBData):
def __init__(self, files, name, msg = None):
TBData.__init__(self, {}, name, msg, complib = "zlib")
self.files = sorted(files)
self.__group = name
def merge(self):
data = {}
one_snp = None
failure_ct = 0
# Collect data
for item in self.files:
tissue = re.sub(r'{}$'.format(env.common_suffix), '', os.path.basename(item))
try:
data[tissue] = TBData(item, self.__group)
if one_snp is None: one_snp = data[tissue]['rownames'][0]
except ValueError:
data[tissue] = {'data' : np.array([[np.nan, np.nan, np.nan]]), 'rownames': None}
failure_ct += 1
# Fix row name
# Because in GTEx data file there are duplicated gene-snp pairs having different sumstats!!
if data[tissue]['rownames'] is not None:
data[tissue]['rownames'] = self.__dedup(data[tissue]['rownames'], item)
if failure_ct == len(self.files):
return 1
# Merge data
for idx, item in enumerate(['beta','se','pval']):
self[item] = pd.concat([pd.DataFrame(
{tissue : data[tissue]['data'][:,idx]},
index = data[tissue]['rownames'] if data[tissue]['rownames'] is not None else [one_snp]
) for tissue in sorted(data.keys())], axis = 1)
if 'rownames' not in self:
self['rownames'] = np.array(self[item].index, dtype = str)
if 'colnames' not in self:
self['colnames'] = np.array(self[item].columns.values.tolist(), dtype = str)
self[item] = np.array(self[item].as_matrix(), dtype = env.float)
# np.savetxt(sys.stdout, self['pval'], fmt='%10.5f')
# print(self['rownames'])
# print(self['colnames'])
return 0
def __dedup(self, seq, filename):
seen = {}
dups = set()
def __is_seen(x, seen):
if x not in seen:
seen[x] = 0
return 0
else:
seen[x] += 1
dups.add(x)
return 1
# Tag them
obs = [x if not __is_seen(x, seen) else '%s%s%s' % (x, env.duplicate_tag, seen[x]) for x in seq]
# Log them
if len(dups):
filename = os.path.splitext(filename)[0]
with open(filename + '.error', 'a') as f:
for item in dups:
f.write('{}:{} appeared {} times in {}\n'.\
format(self.__group, item, seen[item] + 1, filename))
return obs
def get_gs_pairs(data, name, num = (1, 9), method = 'equal_space'):
'''choose gene-snp pairs from data, controlled by num = (a, b)
for the best gene-snp pair (a = 0 or 1), and b other random
gene-snp pairs'''
def random_sample(x, k):
return sorted(set(np.random.choice(x, min(len(x), k))))
def equal_sample(x, k):
f = lambda m, n: [i*n//m + n//(2*m) for i in range(m)]
return sorted(set([x[i] for i in f(k, len(x))]))
output = {'colnames' : data['colnames']}
lp = data.dump('pval')
lp = lp[np.all(np.isfinite(lp), axis=1)]
#
if lp.empty:
return None
# Find max SNP-gene pair
lp = -np.log10(lp)
rowidx = np.where(data['rownames'] == lp.max(axis=1).idxmax())[0][0]
if num[0] > 0:
output['max_rownames'] = ['%s_%s' % (name, data['rownames'][rowidx].decode())]
output['max'] = {}
for k in ['beta', 'pval', 'se']:
output['max'][k] = data[k][rowidx, :]
if num[1] > 0:
all_nullidxes = [y for y, x in enumerate(data['rownames']) if x in lp.index and y != rowidx]
sample_nullidxes = randome_sample(all_nullidxes, num[1]) if method == 'random' else equal_sample(all_nullidxes, num[1])
output['null_rownames'] = ['%s_%s' % (name, data['rownames'][x].decode()) for x in sample_nullidxes]
output['null'] = {}
for k in ['beta', 'pval', 'se']:
output['null'][k] = data[k][sample_nullidxes, :]
if not 'max' in output and not 'null' in output:
output = None
return output
Benchmark from previous runs:
time | original size | HDF5 size | method | data type |
---|---|---|---|---|
118 min | 5G (179,083,485 lines) | 3.8G | TBData, bzip2 | float32 |
118 min | 5G (179,083,485 lines) | 5.1G | TBData, bzip2 | float64 |
138 min | 5G (179,083,485 lines) | 4.9G | TBData, zlib | float32 |
39 min | 5G (179,083,485 lines) | 6.9G | HPData, zlib | float64 |
I will use bzip2
compression and float32
to make initial compression, then extract for mashr
input and save to RDS format.
Converts per tissue summary statistics to HDF5 files.
[convert_1, extract_1]
depends: executable("h5copy")
input: sumstats_files, group_by = 1, pattern = "{name}.allpairs.txt.gz"
output: expand_pattern('{_name}.h5')
task:
python: input = '.sos/utils.py'
import warnings
import gzip
ssp = SSData(header = True)
group_counts = 0
with gzip.open(${_input!r}) as f:
while True:
line = f.readline().decode()
quit = ssp.parse(line)
if ssp.current_name != ssp.previous_name:
group_counts += 1
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category = tb.NaturalNameWarning)
# warnings.filterwarnings("ignore", category = tb.PerformanceWarning)
data = TBData(ssp.dump(), ssp.previous_name, ${msg!r})
data.sink("${_output!n}_%i.tmp" % (np.ceil(group_counts / ${maxsize})) if ${maxsize} > 0 else ${_output!r})
ssp.previous_name = ssp.current_name
if quit:
break
if ${maxsize} > 0:
from glob import glob
tmpfiles = list(glob("${_output!n}_*.tmp"))
for item in sorted(tmpfiles):
for name in get_tb_grps(item):
os.system('h5copy -i {0} -o ${_output} -s "/{1}" -d "/{1}"'.format(item, name))
bash:
rm -f ${_output!n}_*.tmp
Create tables for each summary statistic per gene. Rows are SNPs, columns are tissue names. This time I use zlib
compression for better compatibility.
[convert_2, extract_2]
gene_list = get_output("cut -f4 -d ' ' ${cwd!a}/GTEx_Analysis_2017-06-05_v8_RNASeQCv1.1.9_gene_reads.gct.annotation").strip().split()
depends: executable("h5copy")
output: v8db
task:
python: input = '.sos/utils.py'
import warnings
if ${gene_list!r}:
gene_names = [${gene_list!r,}]
else:
gene_names = get_tb_grps([${input!r,}])
failure_ct = 0
for idx, item in enumerate(gene_names):
ssm = DataMerger([${input!r,}], item, ${msg!r})
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category = tb.NaturalNameWarning)
if ssm.merge() == 0:
ssm.sink("${output!n}_%i.tmp" % (np.ceil((idx + 1.0) / ${maxsize})) if ${maxsize} > 0 else ${_output!r})
else:
failure_ct += 1
with open("${output!n}.log", 'w') as f:
f.write("%s out of %s groups merged!\n" % (len(gene_names) - failure_ct, len(gene_names)))
if ${maxsize} > 0:
from glob import glob
tmpfiles = list(glob("${output!n}_*.tmp"))
for item in sorted(tmpfiles):
for name in get_tb_grps(item):
os.system('h5copy -i {0} -o ${output} -s "/{1}" -d "/{1}"'.format(item, name))
bash:
rm -f ${_output!n}_*.tmp
We need to extract the max-signals based on single tissue analysis, as well as some null data as training / testing sets.
For now we only consider genes having complete data for all tissues. Gene list file here is prepared via:
h5ls GTExV8.ciseQTL.h5 | awk '{print $1}' > GTExV8.ciseQTL.genes
[extract_3]
num_null = 9
gene_list = get_output("cat ${input!n}.genes").strip().split()
output: "${input!n}.4MASH.h5"
task:
python: input = '.sos/utils.py'
import warnings
if ${gene_list!r}:
gene_names = [${gene_list!r,}]
else:
gene_names = get_tb_grps([${input!r,}])
output = {}
output['null'] = {'colnames': None, 'rownames': [], 'beta': None, 'se': None, 'pval': None}
output['max'] = {'colnames': None, 'rownames': [], 'beta': None, 'se': None, 'pval': None}
failure_ct = 0
for idx, name in enumerate(gene_names):
# extract the best gene-snp pair or some null gene-snp pairs
res = get_gs_pairs(TBData(${input!r}, name), name, (1, ${num_null}))
#
if res is None:
failure_ct += 1
continue
for k in output:
if not k in res:
continue
for kk in output[k]:
if kk == 'rownames':
if output[k]['rownames'] is None:
output[k]['rownames'] = res["{}_rownames".format(k)]
else:
output[k]['rownames'].extend(res["{}_rownames".format(k)])
elif kk == 'colnames':
if output[k]['colnames'] is None:
output[k]['colnames'] = res['colnames']
else:
output[k][kk] = np.vstack((output[k][kk], res[k][kk])) if output[k][kk] is not None else res[k][kk]
#
if failure_ct < len(gene_names):
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category = tb.NaturalNameWarning)
for k in output:
TBData(dict(output[k]), k, msg = "%s, %s gene-snp pair" % (${msg!r}, k), complib = 'zlib').sink(${output!r})
with open("${output!n}.log", 'w') as f:
f.write("%s out of %s groups extracted!\n" % (len(gene_names) - failure_ct, len(gene_names)))
[extract_4]
# convert HDF5 to RDS
depends: R_library('rhdf5')
output: "${input!n}.rds"
task:
R:
ConvertP2Z <- function(pval, beta) {
z <- abs(qnorm(pval / 2))
z[which(beta < 0)] <- -1 * z[which(beta < 0)]
return(z)
}
GetSS <- function(table, db) {
dat <- rhdf5::h5read(db, table)
dat$"z" <- ConvertP2Z(dat$"pval", dat$"beta")
for (name in c("beta", "se", "pval", "z")) {
dat[[name]] <- t(dat[[name]])
colnames(dat[[name]]) <- dat$colnames
rownames(dat[[name]]) <- dat$rownames
}
dat$colnames <- dat$rownames <- NULL
return(dat)
}
saveRDS(list(max = GetSS('max', ${input!r}), null = GetSS('null', ${input!r})), ${output!r})
To convert to HDF5 only
sos run analysis/20170926_eQTLSummary_HDF5.ipynb convert
To convert to HDF5 AND extract mash
input
sos run analysis/20170926_eQTLSummary_HDF5.ipynb extract
For V8 data, 39784 genes are found having data in at least one of the 49 tissues. Out of them, 15632 have non-missing data in all tissues and thus extracted. The resulting HDF5 file is 153M (converted also to RDS file which is 211M).