Multivariate Bayesian variable selection regression

Converting tissue specific eQTL summary statistics to HDF5

Here I convert GTEx summary statistics to HDF5 format, making it easier to query and share the results.

This procedure was initially written and used for GTEx V6 data in 2015. It is now updated to deal with V8 data, for input to mashr.

Input data

Summar statistics data are in text format, one row per gene-snp pair. Columns are:

gene_id variant_id      tss_distance    ma_samples      ma_count        maf     pval_nominal    slope   slope_se

Each tissue has a separate text file. Additionally there are support files of gene transcription start site coordinates, and SNP coordinates.

Since each file has its own collection of genes, to make merger easier, we provide a list of genes extracted from file GTEx_Analysis_2017-06-05_v8_RNASeQCv1.1.9_gene_reads.gct.gz. This file has over 56K genes although the eQTL analysis only has the regular > 20K genes. The gene list GTEx_Analysis_2017-06-05_v8_RNASeQCv1.1.9_gene_reads.gct.annotation was generated from the ensembl_annotation workflow documented in analysis/20170929_Genotype.ipynb.

In [ ]:
[global]
cwd = '~/Documents/GTExV8'
sumstats_files = glob.glob("${cwd!a}/GTEx_Analysis_v8_eQTL_all_associations/*.allpairs.txt.gz")
v8db = "${cwd!a}/GTExV8.ciseQTL.h5"
parameter: msg = "GTEX V8 fastqtl analysis"
parameter: maxsize = 1000 # maximum number of groups per HDF5 file

Utility codes

In [ ]:
[convert_0, extract_0]
depends: Py_Module('tables')
report: output = '.sos/utils.py'
import sys, os, re, copy
import numpy as np, pandas as pd, tables as tb
tb.parameters.MAX_GROUP_WIDTH = 51200
# tb.parameters.NODE_CACHE_SLOTS = -51200
# tb.parameters.METADATA_CACHE_SIZE = 1048576 * 100000
# tb.parameters.CHUNK_CACHE_SIZE = 2097152 * 100000
# tb.parameters.CHUNK_CACHE_NELMTS = 521

class Environment:
    def __init__(self):
        self.float = np.float64
        self.duplicate_tag = '_duplicated_'
        self.common_suffix = '.h5'

env = Environment()

class TBData(dict):
    def __init__(self, data, name, msg = None, root = '/', complib = 'bzip2'):
        '''bzip2 may not be compatible with other hdf5 applications; but zlib is fine'''
        self.__root = root.strip('/')
        self.__group = name
        self.__msg = msg
        try:
            if type(data) is dict:
                self.update(data)
            elif type(data) is str:
                # is file name
                self.__load(tb.open_file(data))
            else:
                # is file stream
                self.__load(data)
        except tb.exceptions.NoSuchNodeError:
            raise ValueError('Cannot find dataset {}!'.format(name))
        self.tb_filters = tb.Filters(complevel = 9, complib=complib)

    def sink(self, filename):
        with tb.open_file(filename, 'a') as f:
            if self.__root:
                try:
                    f.create_group("/", self.__root)
                except:
                    pass
            try:
                # there is existing data -- have to merge with current data
                # have to do this because the input file lines are not grouped by gene names!!
                # use try ... except to hopefully faster than if ... else
                # e.g., if not f.__contains__('/{}'.format(self.__group)) ... else ...
                for element in f.list_nodes('/{}/{}'.format(self.__root, self.__group)):
                    if element.name != 'colnames':
                        self[element.name] = np.concatenate((element[:], self[element.name]))
            except tb.exceptions.NoSuchNodeError:
                f.create_group("/" + self.__root, self.__group,
                               self.__msg if self.__msg else self.__group)
            for key in self:
                self.__store_array(key, f)
            f.flush()

    def dump(self, table, output = False):
        if output:
            pd.DataFrame({self['colnames'][i] : self[table][:,i] for i in range(len(self['colnames']))}, index = self['rownames']).to_csv(sys.stdout, na_rep = 'NA')
            return None
        else:
            return pd.DataFrame({self['colnames'][i] : self[table][:,i] for i in range(len(self['colnames']))}, index = self['rownames'])

    def __load(self, fstream):
        try:
            for element in fstream.list_nodes('/{}/{}'.format(self.__root, self.__group)):
                self[element.name] = element[:]
            fstream.close()
        except:
            fstream.close()
            raise

    def __roll_back(self, group, name):
        try:
            n = getattr(group, name)
            n._f_remove()
        except AttributeError:
            pass

    def __store_array(self, name, fstream):
        if self.__root:
            element = getattr(getattr(fstream.root, self.__root), self.__group)
        else:
            element = getattr(fstream.root, self.__group)
        arr = self[name]
        if type(arr) is list:
            arr = np.array(arr)
        self.__roll_back(element, name)
        #
        if arr.shape != (0,):
            ds = fstream.create_carray(element, name, tb.Atom.from_dtype(arr.dtype), arr.shape,
                                       filters = self.tb_filters)
            ds[:] = arr

def get_tb_grps(filenames, group_name = None):
    if isinstance(filenames, str):
        filenames = [filenames]
    names = set()
    for filename in filenames:
        with tb.open_file(filename) as f:
            names.update([node._v_name for node in (f.root if group_name is None else getattr(f.root, '{}'.format(group_name)))])
    return sorted(names)

class SSData:
    def __init__(self, header = False):
        self.data = {'buffer':{'data':[], 'rownames':[]}, 'output':{}}
        self.header = header
        self.previous_name = self.current_name = None
        self.count = -1

    def parse(self, line, ensg_version = 0):
        # input line is snp, gene, beta, t, pval
        if not line:
            self.__reset()
            self.current_name = None
            return 1
        line = line.strip().split()
        self.count += 1
        if self.header and self.count == 0:
            return 0
        #
        if ensg_version == 0:
            line[0] = line[0].split('.')[0]
        if self.previous_name is None:
            self.previous_name = line[0]
        self.current_name = line[0]
        if self.current_name != self.previous_name:
            self.__reset()
        self.data['buffer']['data'].append([line[7], line[8], line[6]])
        self.data['buffer']['rownames'].append(line[1][3:-4])
        return 0

    def __reset(self):
        self.data['buffer']['data'] = np.array(self.data['buffer']['data'], dtype = env.float)
        self.data['buffer']['rownames'] = np.array(self.data['buffer']['rownames'])
        self.data['buffer']['colnames'] = np.array(['beta','se','pval'])
        self.data['output'] = copy.deepcopy(self.data['buffer'])
        self.data['buffer'] = {'data':[], 'rownames':[]}

    def dump(self):
        return self.data['output']

class DataMerger(TBData):
    def __init__(self, files, name, msg = None):
        TBData.__init__(self, {}, name, msg, complib = "zlib")
        self.files = sorted(files)
        self.__group = name

    def merge(self):
        data = {}
        one_snp = None
        failure_ct = 0
        # Collect data
        for item in self.files:
            tissue = re.sub(r'{}$'.format(env.common_suffix), '', os.path.basename(item))
            try:
                data[tissue] = TBData(item, self.__group)
                if one_snp is None: one_snp = data[tissue]['rownames'][0]
            except ValueError:
                data[tissue] = {'data' : np.array([[np.nan, np.nan, np.nan]]), 'rownames': None}
                failure_ct += 1
            # Fix row name
            # Because in GTEx data file there are duplicated gene-snp pairs having different sumstats!!
            if data[tissue]['rownames'] is not None:
                data[tissue]['rownames'] = self.__dedup(data[tissue]['rownames'], item)
        if failure_ct == len(self.files):
            return 1
        # Merge data
        for idx, item in enumerate(['beta','se','pval']):
            self[item] = pd.concat([pd.DataFrame(
                {tissue : data[tissue]['data'][:,idx]},
                index = data[tissue]['rownames'] if data[tissue]['rownames'] is not None else [one_snp]
                ) for tissue in sorted(data.keys())], axis = 1)
            if 'rownames' not in self:
                self['rownames'] = np.array(self[item].index, dtype = str)
            if 'colnames' not in self:
                self['colnames'] = np.array(self[item].columns.values.tolist(), dtype = str)
            self[item] = np.array(self[item].as_matrix(), dtype = env.float)
        # np.savetxt(sys.stdout, self['pval'], fmt='%10.5f')
        # print(self['rownames'])
        # print(self['colnames'])
        return 0

    def __dedup(self, seq, filename):
        seen = {}
        dups = set()
        def __is_seen(x, seen):
            if x not in seen:
                seen[x] = 0
                return 0
            else:
                seen[x] += 1
                dups.add(x)
                return 1
        # Tag them
        obs = [x if not __is_seen(x, seen) else '%s%s%s' % (x, env.duplicate_tag, seen[x]) for x in seq]
        # Log them
        if len(dups):
            filename = os.path.splitext(filename)[0]
            with open(filename + '.error', 'a') as f:
                for item in dups:
                    f.write('{}:{} appeared {} times in {}\n'.\
                            format(self.__group, item, seen[item] + 1, filename))
        return obs

def get_gs_pairs(data, name, num = (1, 9), method = 'equal_space'):
    '''choose gene-snp pairs from data, controlled by num = (a, b)
    for the best gene-snp pair (a = 0 or 1), and b other random 
    gene-snp pairs'''
    
    def random_sample(x, k):
        return sorted(set(np.random.choice(x, min(len(x), k))))

    def equal_sample(x, k):
        f = lambda m, n: [i*n//m + n//(2*m) for i in range(m)]
        return sorted(set([x[i] for i in f(k, len(x))]))

    output = {'colnames' : data['colnames']}
    lp = data.dump('pval')
    lp = lp[np.all(np.isfinite(lp), axis=1)]
    #
    if lp.empty:
        return None
    # Find max SNP-gene pair
    lp = -np.log10(lp)
    rowidx = np.where(data['rownames'] == lp.max(axis=1).idxmax())[0][0]
    if num[0] > 0:
        output['max_rownames'] = ['%s_%s' % (name, data['rownames'][rowidx].decode())]
        output['max'] = {}
        for k in ['beta', 'pval', 'se']:
            output['max'][k] = data[k][rowidx, :]
    if num[1] > 0:
        all_nullidxes = [y for y, x in enumerate(data['rownames']) if x in lp.index and y != rowidx]
        sample_nullidxes = randome_sample(all_nullidxes, num[1]) if method == 'random' else equal_sample(all_nullidxes, num[1])
        output['null_rownames'] = ['%s_%s' % (name, data['rownames'][x].decode()) for x in sample_nullidxes]
        output['null'] = {}
        for k in ['beta', 'pval', 'se']:
            output['null'][k] = data[k][sample_nullidxes, :]
    if not 'max' in output and not 'null' in output:
        output = None
    return output

Convert to HDF5 format

Benchmark from previous runs:

time original size HDF5 size method data type
118 min 5G (179,083,485 lines) 3.8G TBData, bzip2 float32
118 min 5G (179,083,485 lines) 5.1G TBData, bzip2 float64
138 min 5G (179,083,485 lines) 4.9G TBData, zlib float32
39 min 5G (179,083,485 lines) 6.9G HPData, zlib float64

I will use bzip2 compression and float32 to make initial compression, then extract for mashr input and save to RDS format.

Per tissue conversion

Converts per tissue summary statistics to HDF5 files.

In [1]:
[convert_1, extract_1]
depends: executable("h5copy")
input: sumstats_files, group_by = 1, pattern = "{name}.allpairs.txt.gz"
output: expand_pattern('{_name}.h5')
task:
python: input = '.sos/utils.py'
    import warnings
    import gzip
    ssp = SSData(header = True)
    group_counts = 0
    with gzip.open(${_input!r}) as f:
        while True:
            line = f.readline().decode()
            quit = ssp.parse(line)
            if ssp.current_name != ssp.previous_name:
                group_counts += 1
                with warnings.catch_warnings():
                    warnings.filterwarnings("ignore", category = tb.NaturalNameWarning)
                    # warnings.filterwarnings("ignore", category = tb.PerformanceWarning)
                    data = TBData(ssp.dump(), ssp.previous_name, ${msg!r})
                    data.sink("${_output!n}_%i.tmp" % (np.ceil(group_counts / ${maxsize})) if ${maxsize} > 0 else ${_output!r})
                ssp.previous_name = ssp.current_name
            if quit:
                break
    if ${maxsize} > 0:
        from glob import glob
        tmpfiles = list(glob("${_output!n}_*.tmp"))
        for item in sorted(tmpfiles):
            for name in get_tb_grps(item):
                os.system('h5copy -i {0} -o ${_output} -s "/{1}" -d "/{1}"'.format(item, name))
bash:
    rm -f ${_output!n}_*.tmp

Merge per tissue HDF5 to one HDF5

Create tables for each summary statistic per gene. Rows are SNPs, columns are tissue names. This time I use zlib compression for better compatibility.

In [2]:
[convert_2, extract_2]
gene_list = get_output("cut -f4 -d ' ' ${cwd!a}/GTEx_Analysis_2017-06-05_v8_RNASeQCv1.1.9_gene_reads.gct.annotation").strip().split()
depends: executable("h5copy")
output: v8db
task:
python: input = '.sos/utils.py'
    import warnings
    if ${gene_list!r}:
        gene_names = [${gene_list!r,}]
    else:
        gene_names = get_tb_grps([${input!r,}])
    failure_ct = 0
    for idx, item in enumerate(gene_names):
        ssm = DataMerger([${input!r,}], item, ${msg!r})
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", category = tb.NaturalNameWarning)
            if ssm.merge() == 0:
                ssm.sink("${output!n}_%i.tmp" % (np.ceil((idx + 1.0) / ${maxsize})) if ${maxsize} > 0 else ${_output!r})
            else:
                failure_ct += 1
    with open("${output!n}.log", 'w') as f:
        f.write("%s out of %s groups merged!\n" % (len(gene_names) - failure_ct, len(gene_names)))
    if ${maxsize} > 0:
        from glob import glob
        tmpfiles = list(glob("${output!n}_*.tmp"))
        for item in sorted(tmpfiles):
            for name in get_tb_grps(item):
                os.system('h5copy -i {0} -o ${output} -s "/{1}" -d "/{1}"'.format(item, name))
bash:
    rm -f ${_output!n}_*.tmp

Extract data for mash analysis

We need to extract the max-signals based on single tissue analysis, as well as some null data as training / testing sets.

For now we only consider genes having complete data for all tissues. Gene list file here is prepared via:

h5ls GTExV8.ciseQTL.h5 |  awk '{print $1}' > GTExV8.ciseQTL.genes
In [ ]:
[extract_3]
num_null = 9
gene_list = get_output("cat ${input!n}.genes").strip().split()
output: "${input!n}.4MASH.h5"
task:
python: input = '.sos/utils.py'
    import warnings
    if ${gene_list!r}:
        gene_names = [${gene_list!r,}]
    else:
        gene_names = get_tb_grps([${input!r,}])
    output = {}
    output['null'] = {'colnames': None, 'rownames': [], 'beta': None, 'se': None, 'pval': None}
    output['max'] = {'colnames': None, 'rownames': [], 'beta': None, 'se': None, 'pval': None}
    failure_ct = 0
    for idx, name in enumerate(gene_names):
        # extract the best gene-snp pair or some null gene-snp pairs
        res = get_gs_pairs(TBData(${input!r}, name), name, (1, ${num_null}))
        #
        if res is None:
            failure_ct += 1
            continue
        for k in output:
            if not k in res:
                continue
            for kk in output[k]:
                if kk == 'rownames':
                    if output[k]['rownames'] is None:
                        output[k]['rownames'] = res["{}_rownames".format(k)]
                    else:
                        output[k]['rownames'].extend(res["{}_rownames".format(k)])
                elif kk == 'colnames':
                    if output[k]['colnames'] is None:
                        output[k]['colnames'] = res['colnames']
                else:
                    output[k][kk] = np.vstack((output[k][kk], res[k][kk])) if output[k][kk] is not None else res[k][kk]
    #
    if failure_ct < len(gene_names):
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", category = tb.NaturalNameWarning)
            for k in output:
                TBData(dict(output[k]), k, msg = "%s, %s gene-snp pair" % (${msg!r}, k), complib = 'zlib').sink(${output!r})
    with open("${output!n}.log", 'w') as f:
        f.write("%s out of %s groups extracted!\n" % (len(gene_names) - failure_ct, len(gene_names)))

[extract_4]
# convert HDF5 to RDS
depends: R_library('rhdf5')
output: "${input!n}.rds"
task:
R:
ConvertP2Z <- function(pval, beta) {
  z <- abs(qnorm(pval / 2))
  z[which(beta < 0)] <- -1 * z[which(beta < 0)]
  return(z)
}

GetSS <- function(table, db) {
  dat <- rhdf5::h5read(db, table)
  dat$"z" <- ConvertP2Z(dat$"pval", dat$"beta")
  for (name in c("beta", "se", "pval", "z")) {
    dat[[name]] <- t(dat[[name]])
    colnames(dat[[name]]) <- dat$colnames
    rownames(dat[[name]]) <- dat$rownames
  }
  dat$colnames <- dat$rownames <- NULL
  return(dat)
}

saveRDS(list(max = GetSS('max', ${input!r}), null = GetSS('null', ${input!r})), ${output!r})

Run workflow

To convert to HDF5 only

sos run analysis/20170926_eQTLSummary_HDF5.ipynb convert

To convert to HDF5 AND extract mash input

sos run analysis/20170926_eQTLSummary_HDF5.ipynb extract

For V8 data, 39784 genes are found having data in at least one of the 49 tissues. Out of them, 15632 have non-missing data in all tissues and thus extracted. The resulting HDF5 file is 153M (converted also to RDS file which is 211M).


Copyright © 2016-2020 Gao Wang et al at Stephens Lab, University of Chicago