Multivariate Bayesian variable selection regression

QTL data preprocessing for Yuxin Zou

This is not GTEx related. This workflow converts data from an immune response eQTL study in primary human monocytes to mash format.

Here is the data. I keep this notebook here because it is mostly the codes from this notebook.

Input data

Summary statistics data are in text format, one row per gene-snp pair. Columns are:

gene_id variant_id      tss_distance    pval_nominal      slope     se

Each condition has a separate text file.

In [1]:
[global]
cwd = '~/Documents/Misc/ImmuneQTL'
sumstats_files = glob.glob("${cwd!a}/nominal_*.txt")
sumstats_db = "${cwd!a}/ImmuneQTLSummary.h5"
parameter: msg = "Immune QTL study fastQTL analysis"
parameter: maxsize = 1000 # maximum number of groups per HDF5 file

Utility codes

In [2]:
[convert_0, extract_0]
depends: Py_Module('tables')
report: output = '.sos/utils.py'
import sys, os, re, copy
import numpy as np, pandas as pd, tables as tb
tb.parameters.MAX_GROUP_WIDTH = 51200
# tb.parameters.NODE_CACHE_SLOTS = -51200
# tb.parameters.METADATA_CACHE_SIZE = 1048576 * 100000
# tb.parameters.CHUNK_CACHE_SIZE = 2097152 * 100000
# tb.parameters.CHUNK_CACHE_NELMTS = 521

class Environment:
    def __init__(self):
        self.float = np.float64
        self.duplicate_tag = '_duplicated_'
        self.common_suffix = '.h5'

env = Environment()

class TBData(dict):
    def __init__(self, data, name, msg = None, root = '/', complib = 'bzip2'):
        '''bzip2 may not be compatible with other hdf5 applications; but zlib is fine'''
        self.__root = root.strip('/')
        self.__group = name
        self.__msg = msg
        try:
            if type(data) is dict:
                self.update(data)
            elif type(data) is str:
                # is file name
                self.__load(tb.open_file(data))
            else:
                # is file stream
                self.__load(data)
        except tb.exceptions.NoSuchNodeError:
            raise ValueError('Cannot find dataset {}!'.format(name))
        self.tb_filters = tb.Filters(complevel = 9, complib=complib)

    def sink(self, filename):
        with tb.open_file(filename, 'a') as f:
            if self.__root:
                try:
                    f.create_group("/", self.__root)
                except:
                    pass
            try:
                # there is existing data -- have to merge with current data
                # have to do this because the input file lines are not grouped by gene names!!
                # use try ... except to hopefully faster than if ... else
                # e.g., if not f.__contains__('/{}'.format(self.__group)) ... else ...
                for element in f.list_nodes('/{}/{}'.format(self.__root, self.__group)):
                    if element.name != 'colnames':
                        self[element.name] = np.concatenate((element[:], self[element.name]))
            except tb.exceptions.NoSuchNodeError:
                f.create_group("/" + self.__root, self.__group,
                               self.__msg if self.__msg else self.__group)
            for key in self:
                self.__store_array(key, f)
            f.flush()

    def dump(self, table, output = False):
        if output:
            pd.DataFrame({self['colnames'][i] : self[table][:,i] for i in range(len(self['colnames']))}, index = self['rownames']).to_csv(sys.stdout, na_rep = 'NA')
            return None
        else:
            return pd.DataFrame({self['colnames'][i] : self[table][:,i] for i in range(len(self['colnames']))}, index = self['rownames'])

    def __load(self, fstream):
        try:
            for element in fstream.list_nodes('/{}/{}'.format(self.__root, self.__group)):
                self[element.name] = element[:]
            fstream.close()
        except:
            fstream.close()
            raise

    def __roll_back(self, group, name):
        try:
            n = getattr(group, name)
            n._f_remove()
        except AttributeError:
            pass

    def __store_array(self, name, fstream):
        if self.__root:
            element = getattr(getattr(fstream.root, self.__root), self.__group)
        else:
            element = getattr(fstream.root, self.__group)
        arr = self[name]
        if type(arr) is list:
            arr = np.array(arr)
        self.__roll_back(element, name)
        #
        if arr.shape != (0,):
            ds = fstream.create_carray(element, name, tb.Atom.from_dtype(arr.dtype), arr.shape,
                                       filters = self.tb_filters)
            ds[:] = arr

def get_tb_grps(filenames, group_name = None):
    if isinstance(filenames, str):
        filenames = [filenames]
    names = set()
    for filename in filenames:
        with tb.open_file(filename) as f:
            names.update([node._v_name for node in (f.root if group_name is None else getattr(f.root, '{}'.format(group_name)))])
    return sorted(names)

class SSData:
    def __init__(self, header = False):
        self.data = {'buffer':{'data':[], 'rownames':[]}, 'output':{}}
        self.header = header
        self.previous_name = self.current_name = None
        self.count = -1

    def parse(self, line, ensg_version = 0):
        # input line is snp, gene, beta, t, pval
        if not line:
            self.__reset()
            self.current_name = None
            return 1
        line = line.strip().split()
        self.count += 1
        if self.header and self.count == 0:
            return 0
        #
        line[0] = line[0].strip('"')
        if ensg_version == 0:
            line[0] = line[0].split('.')[0]
        if self.previous_name is None:
            self.previous_name = line[0]
        self.current_name = line[0]
        if self.current_name != self.previous_name:
            self.__reset()
        self.data['buffer']['data'].append([line[4], line[5], line[3]])
        self.data['buffer']['rownames'].append(line[1].strip('"'))
        return 0

    def __reset(self):
        self.data['buffer']['data'] = np.array(self.data['buffer']['data'], dtype = env.float)
        self.data['buffer']['rownames'] = np.array(self.data['buffer']['rownames'])
        self.data['buffer']['colnames'] = np.array(['beta','se','pval'])
        self.data['output'] = copy.deepcopy(self.data['buffer'])
        self.data['buffer'] = {'data':[], 'rownames':[]}

    def dump(self):
        return self.data['output']

class DataMerger(TBData):
    def __init__(self, files, name, msg = None):
        TBData.__init__(self, {}, name, msg, complib = "zlib")
        self.files = sorted(files)
        self.__group = name

    def merge(self):
        data = {}
        one_snp = None
        failure_ct = 0
        # Collect data
        for item in self.files:
            tissue = re.sub(r'{}$'.format(env.common_suffix), '', os.path.basename(item))
            try:
                data[tissue] = TBData(item, self.__group)
                if one_snp is None: one_snp = data[tissue]['rownames'][0]
            except ValueError:
                data[tissue] = {'data' : np.array([[np.nan, np.nan, np.nan]]), 'rownames': None}
                failure_ct += 1
            # Fix row name
            # Because in GTEx data file there are duplicated gene-snp pairs having different sumstats!!
            if data[tissue]['rownames'] is not None:
                data[tissue]['rownames'] = self.__dedup(data[tissue]['rownames'], item)
        if failure_ct == len(self.files):
            return 1
        # Merge data
        for idx, item in enumerate(['beta','se','pval']):
            self[item] = pd.concat([pd.DataFrame(
                {tissue : data[tissue]['data'][:,idx]},
                index = data[tissue]['rownames'] if data[tissue]['rownames'] is not None else [one_snp]
                ) for tissue in sorted(data.keys())], axis = 1)
            if 'rownames' not in self:
                self['rownames'] = np.array(self[item].index, dtype = str)
            if 'colnames' not in self:
                self['colnames'] = np.array(self[item].columns.values.tolist(), dtype = str)
            self[item] = np.array(self[item].as_matrix(), dtype = env.float)
        # np.savetxt(sys.stdout, self['pval'], fmt='%10.5f')
        # print(self['rownames'])
        # print(self['colnames'])
        return 0

    def __dedup(self, seq, filename):
        seen = {}
        dups = set()
        def __is_seen(x, seen):
            if x not in seen:
                seen[x] = 0
                return 0
            else:
                seen[x] += 1
                dups.add(x)
                return 1
        # Tag them
        obs = [x if not __is_seen(x, seen) else '%s%s%s' % (x, env.duplicate_tag, seen[x]) for x in seq]
        # Log them
        if len(dups):
            filename = os.path.splitext(filename)[0]
            with open(filename + '.error', 'a') as f:
                for item in dups:
                    f.write('{}:{} appeared {} times in {}\n'.\
                            format(self.__group, item, seen[item] + 1, filename))
        return obs

def get_gs_pairs(data, name, num = (1, 9), method = 'equal_space'):
    '''choose gene-snp pairs from data, controlled by num = (a, b)
    for the best gene-snp pair (a = 0 or 1), and b other random 
    gene-snp pairs'''
    
    def random_sample(x, k):
        return sorted(set(np.random.choice(x, min(len(x), k))))

    def equal_sample(x, k):
        if len(x) < k:
            return x
        f = lambda m, n: [i*n//m + n//(2*m) for i in range(m)]
        return sorted(set([x[i] for i in f(k, len(x))]))

    output = {'colnames' : data['colnames']}
    lp = data.dump('pval')
    lp = lp[np.all(np.isfinite(lp), axis=1)]
    #
    if lp.empty:
        return None
    # Find max SNP-gene pair
    lp = -np.log10(lp)
    rowidx = np.where(data['rownames'] == lp.max(axis=1).idxmax())[0][0]
    if num[0] > 0:
        output['max_rownames'] = ['%s_%s' % (name, data['rownames'][rowidx].decode())]
        output['max'] = {}
        for k in ['beta', 'pval', 'se']:
            output['max'][k] = data[k][rowidx, :]
    if num[1] > 0:
        all_nullidxes = [y for y, x in enumerate(data['rownames']) if x in lp.index and y != rowidx]
        sample_nullidxes = randome_sample(all_nullidxes, num[1]) if method == 'random' else equal_sample(all_nullidxes, num[1])
        output['null_rownames'] = ['%s_%s' % (name, data['rownames'][x].decode()) for x in sample_nullidxes]
        output['null'] = {}
        for k in ['beta', 'pval', 'se']:
            output['null'][k] = data[k][sample_nullidxes, :]
    if not 'max' in output and not 'null' in output:
        output = None
    return output

Convert to HDF5 format

Per study conversion

In [1]:
[convert_1, extract_1]
depends: executable("h5copy")
input: sumstats_files, group_by = 1, pattern = "{path}/nominal_{name}_se.txt"
output: expand_pattern('{_path}/{_name}.h5')
task:
python: input = '.sos/utils.py'
    import warnings
    ssp = SSData(header = True)
    group_counts = 0
    with open(${_input!r}) as f:
        while True:
            line = f.readline()
            quit = ssp.parse(line)
            if ssp.current_name != ssp.previous_name:
                group_counts += 1
                with warnings.catch_warnings():
                    warnings.filterwarnings("ignore", category = tb.FlavorWarning)
                    data = TBData(ssp.dump(), ssp.previous_name, ${msg!r})
                    data.sink("${_output!n}_%i.tmp" % (np.ceil(group_counts / ${maxsize})) if ${maxsize} > 0 else ${_output!r})
                ssp.previous_name = ssp.current_name
            if quit:
                break
    if ${maxsize} > 0:
        from glob import glob
        tmpfiles = list(glob("${_output!n}_*.tmp"))
        for item in sorted(tmpfiles):
            for name in get_tb_grps(item):
                os.system('h5copy -i {0} -o ${_output} -s "/{1}" -d "/{1}"'.format(item, name))
bash:
    rm -f ${_output!n}_*.tmp

Merge per tissue HDF5 to one HDF5

Create tables for each summary statistic per gene. Rows are SNPs, columns are tissue names. This time I use zlib compression for better compatibility.

In [2]:
[convert_2, extract_2]
gene_list = ''
depends: executable("h5copy")
output: sumstats_db
task:
python: input = '.sos/utils.py'
    import warnings
    if ${gene_list!r}:
        gene_names = [${gene_list!r,}]
    else:
        gene_names = get_tb_grps([${input!r,}])
    failure_ct = 0
    for idx, item in enumerate(gene_names):
        ssm = DataMerger([${input!r,}], item, ${msg!r})
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", category = tb.FlavorWarning)
            if ssm.merge() == 0:
                ssm.sink("${output!n}_%i.tmp" % (np.ceil((idx + 1.0) / ${maxsize})) if ${maxsize} > 0 else ${_output!r})
            else:
                failure_ct += 1
    with open("${output!n}.log", 'w') as f:
        f.write("%s out of %s groups merged!\n" % (len(gene_names) - failure_ct, len(gene_names)))
    if ${maxsize} > 0:
        from glob import glob
        tmpfiles = list(glob("${output!n}_*.tmp"))
        for item in sorted(tmpfiles):
            for name in get_tb_grps(item):
                os.system('h5copy -i {0} -o ${output} -s "/{1}" -d "/{1}"'.format(item, name))
bash:
    rm -f ${_output!n}_*.tmp

Extract data for mash analysis

In [ ]:
[extract_3]
num_null = 9
gene_list = ''
output: "${input!n}.4MASH.h5"
task:
python: input = '.sos/utils.py'
    import warnings
    if ${gene_list!r}:
        gene_names = [${gene_list!r,}]
    else:
        gene_names = get_tb_grps([${input!r,}])
    output = {}
    output['null'] = {'colnames': None, 'rownames': [], 'beta': None, 'se': None, 'pval': None}
    output['max'] = {'colnames': None, 'rownames': [], 'beta': None, 'se': None, 'pval': None}
    failure_ct = 0
    for idx, name in enumerate(gene_names):
        # extract the best gene-snp pair or some null gene-snp pairs
        res = get_gs_pairs(TBData(${input!r}, name), name, (1, ${num_null}))
        #
        if res is None:
            failure_ct += 1
            continue
        for k in output:
            if not k in res:
                continue
            for kk in output[k]:
                if kk == 'rownames':
                    if output[k]['rownames'] is None:
                        output[k]['rownames'] = res["{}_rownames".format(k)]
                    else:
                        output[k]['rownames'].extend(res["{}_rownames".format(k)])
                elif kk == 'colnames':
                    if output[k]['colnames'] is None:
                        output[k]['colnames'] = res['colnames']
                else:
                    output[k][kk] = np.vstack((output[k][kk], res[k][kk])) if output[k][kk] is not None else res[k][kk]
    #
    if failure_ct < len(gene_names):
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", category = tb.FlavorWarning)
            for k in output:
                TBData(dict(output[k]), k, msg = "%s, %s gene-snp pair" % (${msg!r}, k), complib = 'zlib').sink(${output!r})
    with open("${output!n}.log", 'w') as f:
        f.write("%s out of %s groups extracted!\n" % (len(gene_names) - failure_ct, len(gene_names)))

[extract_4]
# convert HDF5 to RDS
depends: R_library('rhdf5')
output: "${input!n}.rds"
task:
R:
ConvertP2Z <- function(pval, beta) {
  z <- abs(qnorm(pval / 2))
  z[which(beta < 0)] <- -1 * z[which(beta < 0)]
  return(z)
}

GetSS <- function(table, db) {
  dat <- rhdf5::h5read(db, table)
  dat$"z" <- ConvertP2Z(dat$"pval", dat$"beta")
  for (name in c("beta", "se", "pval", "z")) {
    dat[[name]] <- t(dat[[name]])
    colnames(dat[[name]]) <- dat$colnames
    rownames(dat[[name]]) <- dat$rownames
  }
  dat$colnames <- dat$rownames <- NULL
  return(dat)
}

saveRDS(list(max = GetSS('max', ${input!r}), null = GetSS('null', ${input!r})), ${output!r})

Run workflow

To convert to HDF5 only

sos run analysis/20171010_eQTLSummary_HDF5.ipynb convert

To convert to HDF5 AND extract mash input

sos run analysis/20171010_eQTLSummary_HDF5.ipynb extract

Copyright © 2016-2020 Gao Wang et al at Stephens Lab, University of Chicago