"""Obtain pairs given an incomplete dataset"""
from chemicalchecker.util import logged
import collections
import numpy as np
import random
import h5py
from sklearn.model_selection import StratifiedShuffleSplit, ShuffleSplit
from sklearn.multioutput import MultiOutputClassifier
from sklearn.naive_bayes import BernoulliNB, ComplementNB
import os
import pickle
from scipy.stats import rankdata
[docs]@logged
class Pairs:
"""Pairs class samples/undersamples to accomplish a certain proportion of negative-positive"""
def __init__(self, neg_pos_ratio=100, max_pos=1000, primary_side="right", test_size=0.2, n_splits=1, random_state=None):
"""Initialize Pairs class
Args:
neg_pos_ratio(float): Expected number of negatives per positives (default=10).
primary_side(str): When doing the sampling, focus on balancing 'right' or 'left' (default='right').
max_pos(int): Maximum number of positives to take into account (default=1000).
test_size(float): When splitting, proportion of test samples (default=0.2).
n_splits(int): When splitting, number of runs (default=1).
random_state(int): Random state (default=None).
"""
self.neg_pos_ratio = neg_pos_ratio
self.max_pos = max_pos
self.primary_side = primary_side
if self.primary_side == "right":
self.primary_idx = 1
self.secondary_idx = 0
elif self.primary_side == "left":
self.primary_idx = 0
self.secondary_idx = 1
else:
self.__log.error("Argument primary_side must be 'right' or 'left'")
raise
self.test_size = test_size
self.n_splits = n_splits
self.random_state = random_state
def _calc_num_neg(self, num_pos):
return int(num_pos*self.neg_pos_ratio) + 1
def _choose(self, v, size, p):
return np.random.choice(list(v), size=size, replace=False, p=p)
def _tupler(self, k, v, y):
def to_tuple(k, x):
if self.primary_idx == 1:
return (x, k, y)
else:
return (k, x, y)
for x in v:
yield to_tuple(k, x)
def index_pairs(self, pairs, keys_left, keys_right):
self.keys_left = keys_left
self.keys_right = keys_right
self.__log.debug("Filtering pairs (only those being in the list are accepted)")
kl = dict((k, i) for i, k in enumerate(keys_left))
kr = dict((k, i) for i, k in enumerate(keys_right))
self.known_pairs = []
for p in pairs:
if p[0] not in kl: continue
if p[1] not in kr: continue
self.known_pairs += [(kl[p[0]], kr[p[1]], p[2])]
self.known_pairs = list(set(self.known_pairs))
self.known_pairs = np.array(self.known_pairs).astype(np.int)
self.__log.debug("Original pairs: %d, Remaining pairs: %d" % (len(pairs), len(self.known_pairs)))
self.__log.debug("Original left : %d, Remaining left : %d" % (len(set([p[0] for p in pairs])), len(set(self.known_pairs[:,0]))))
self.__log.debug("Original right: %d, Remaining right: %d" % (len(set([p[1] for p in pairs])), len(set(self.known_pairs[:,1]))))
def sample_left(self, pairs, keys_left, keys_right, max_pos, bioteque_priors):
from tqdm import tqdm
self.index_pairs(pairs, keys_left, keys_right)
#self.left_known_pairs_to_probabilities(max_size=max_size, pickle_filename=pickle_filename)
self.__log.debug("Making sure each left instance has the same number of positives")
A = collections.defaultdict(list)
I = collections.defaultdict(list)
for i in range(0, len(self.known_pairs)):
p = self.known_pairs[i]
if p[-1] == 1:
A[p[0]] += [p[1]]
else:
I[p[0]] += [p[1]]
A = dict((k, set(v)) for k,v in A.items())
I = dict((k, set(v)) for k,v in I.items())
if bioteque_priors:
self.__log.debug("Getting bioteque priors")
priors_dict = {}
with open("/aloy/home/mduran/myscripts/dream-ctd2-targetmate/data/bioteque_priors.tsv", "r") as f:
import csv
reader = csv.reader(f, delimiter="\t")
for r in reader:
priors_dict[r[0]] = float(r[1])
priors = np.zeros(len(keys_right))
priors_inv = np.zeros(len(keys_right))
for i, p in enumerate(keys_right):
if p in priors_dict:
priors[i] = priors_dict[p]
priors_inv[i] = 1 - priors_dict[p]
#priors_inv = 1 - priors
else:
self.__log.debug("Getting priors for each on the right")
priors = np.zeros(len(keys_right))
for k,v in A.items():
for i in list(v):
priors[i] += 1
priors = rankdata(priors)
priors_inv = rankdata(-priors)
#priors += 1e-10
#priors_inv += 1e-10
from scipy.stats import pearsonr
self.__log.debug("Priors and priors-inverse are correct (pearson %.2f)" % pearsonr(priors, priors_inv)[0])
self.__log.debug("Oversampling positives %d" % max_pos)
A_ = {}
for k,v in tqdm(A.items()):
v = list(v)
p = priors[v]
if np.sum(p)==0: continue
p = p/np.sum(p)
A_[k] = np.random.choice(v, size=max_pos, replace=True, p=p)
self.__log.debug("Now sample the negatives (%d ratio)" % self.neg_pos_ratio)
I_ = {}
universe = set([i for i in range(0, len(keys_right))])
for k,v in tqdm(A.items()):
if k not in A_: continue
if k in I:
n = int(max_pos*self.neg_pos_ratio - len(I[k]))
else:
n = int(max_pos*self.neg_pos_ratio)
v_ = list(universe.difference(v))
p = priors_inv[v_]
if np.sum(p) == 0: continue
p = p/np.sum(p)
I_[k] = np.random.choice(v_, size=n, replace=True, p=p)
self.__log.debug("Done. Now just appending")
pairs = []
from tqdm import tqdm
import uuid
tag = str(uuid.uuid4())
with open(tag, "w") as f:
for k,v in tqdm(A_.items()):
for x in v:
pairs += [(k, x, 1)]
#f.write("%d\t%d\t%d\n" % (k, x, 1))
for k,v in tqdm(I_.items()):
for x in v:
#f.write("%d\t%d\t%d\n" % (k, x, 1))
pairs += [(k, x, 0)]
#return
pairs = np.array(pairs, dtype=np.int)
self.__log.debug("Shuffling pairs")
idxs = np.array([i for i in range(0, pairs.shape[0])], dtype=np.int)
random.shuffle(idxs)
self.pairs = pairs[idxs]
self.__log.debug("Number of pairs: %d" % len(self.pairs))
def sample_random(self, pairs, keys_left, keys_right):
self.index_pairs(pairs, keys_left, keys_right)
self.__log.debug("Sampling to achieve a negative:positive balance of %d" % self.neg_pos_ratio)
done = set([(self.known_pairs[i,0], self.known_pairs[i,1]) for i in range(0, self.known_pairs.shape[0])])
n_pos = np.sum(self.known_pairs[:,-1]==1)
n_neg = np.sum(self.known_pairs[:,-1]==0)
to_sample = int(n_pos*self.neg_pos_ratio) - n_neg
self.__log.debug("Known positives: %d, Known negatives: %d, Negatives to sample: %d" % (n_pos, n_neg, to_sample))
L = len(keys_left)
R = len(keys_right)
sampled = set()
for _ in range(0, to_sample*10):
pair = (np.random.choice(L), np.random.choice(R))
if pair in done: continue
if pair in sampled: continue
sampled.update([pair])
if len(sampled)/1000000 == int(len(sampled)/1000000):
self.__log.debug("%d negative pairs sampled (%.2f)" % (len(sampled), len(sampled)/to_sample))
if len(sampled) > to_sample:
break
self.__log.debug("Done with the sampling. Merging and shuffling.")
pairs = []
for i in range(0, self.known_pairs.shape[0]):
pairs += [(self.known_pairs[i,0], self.known_pairs[i,1], self.known_pairs[i,2])]
for s in list(sampled):
pairs += [(s[0],s[1],0)]
pairs = np.array(pairs, dtype=np.int)
idxs = np.array([i for i in range(0, pairs.shape[0])], dtype=np.int)
random.shuffle(idxs)
self.pairs = pairs[idxs]
self.__log.debug("Number of pairs: %d" % len(self.pairs))
[docs] def sample_balanced(self, pairs, keys_left, keys_right):
"""Sample from the known pairs to obtain a longer list
Args:
pairs(list): List of (key_left, key_right, 1/0) values.
keys_left(list): Keys universe of the left side.
keys_right(list): Keys universe of the right side.
"""
self.index_pairs(pairs, keys_left, keys_right)
self.__log.debug("Sampling to achieve a negative:positive balance of %d" % self.neg_pos_ratio)
samp_counts = collections.defaultdict(int)
A = collections.defaultdict(list)
I = collections.defaultdict(list)
for p in self.known_pairs:
if p[-1] == 1:
A[p[self.primary_idx]] += [p[self.secondary_idx]]
else:
I[p[self.primary_idx]] += [p[self.secondary_idx]]
A = dict((k, np.array(v)) for k,v in A.items())
I = dict((k, np.array(v)) for k,v in I.items())
self.__log.debug("Looking for overabundant data")
max_A = self.max_pos
max_I = self._calc_num_neg(self.max_pos)
probas_A = None
probas_I = None
for k,v in A.items():
if len(v) > max_A:
A[k] = self._choose(v, size=max_A, p=probas_A)
self.__log.debug("Subsampling actives from %d, before %d, now %d" % (k, len(v), len(A[k])))
for k,v in I.items():
if k in A:
_max_I = np.min([self._calc_num_neg(len(A[k])), max_I])
else:
_max_I = max_I
if len(v) > _max_I:
I[k] = self._choose(v, size=_max_I, p=probas_I)
self.__log.debug("Oversampling negative class")
universe = set([x for d in [A, I] for k,v in d.items() for x in v])
self.__log.debug("Universe has %d entities" % len(universe))
from tqdm import tqdm
#i = 0
ks = []
for k, v in tqdm(A.items()):
## REMOVE THIS
#i += 1
#if i > 10: break
n = self._calc_num_neg(len(v))
sampling_universe = universe.difference(v)
n = np.min([len(sampling_universe), n])
if k in I:
n -= len(I[k])
if n > 0:
sampling_universe = sampling_universe.difference(I[k])
n = np.min([len(sampling_universe), n])
if n > 0:
samp = self._choose(sampling_universe, size=n, p=probas_I)
I[k] = np.concatenate([I[k], samp])
else:
I[k] = self._choose(sampling_universe, size=n, p=probas_I)
ks += [k]
self.__log.debug("Assembling pairs")
self.pairs = []
for k in ks:
self.pairs += [t for t in self._tupler(k, A[k], 1)]
self.pairs += [t for t in self._tupler(k, I[k], 0)]
self.pairs = np.array(self.pairs).astype(np.int)
self.__log.debug("Shuffling")
idxs = np.array([i for i in range(0, self.pairs.shape[0])])
random.shuffle(idxs)
self.pairs = self.pairs[idxs]
self.__log.debug("Number of pairs: %d" % len(self.pairs))
def _get_y(self):
return np.array([p[-1] for p in self.pairs])
def _to_dict(self, key_col):
d = collections.defaultdict(list)
if key_col == 1:
val_col = 0
else:
val_col = 1
for i in range(0, self.pairs.shape[0]):
d[self.pairs[i, key_col]] += [(self.pairs[i, val_col], self.pairs[i, 2])]
return d
def _column_split(self, col):
def to_tuple(k, x, y):
if col == 1:
return (x, k, y)
else:
return (k, x, y)
def appender(t_idx, idxs, d):
t = []
for idx in t_idx:
k = idxs[idx]
for x in d[k]:
t += [to_tuple(k, x[0], x[1])]
idxs_ = [i for i in range(0, len(t))]
random.shuffle(idxs_)
return np.array(t).astype(np.int)[idxs_]
idxs = list(set([p[col] for p in self.pairs]))
d = self._to_dict(col)
spl = ShuffleSplit(n_splits=self.n_splits, test_size=self.test_size, random_state=self.random_state)
for tr_idx, te_idx in spl.split(X=idxs):
train = appender(tr_idx, idxs, d)
test = appender(te_idx, idxs, d)
yield train, test
[docs] def as_indices(self):
"""Pairs iterator, returns indices"""
for i in range(0, self.pairs.shape[0]):
yield tuple(pairs[i])
[docs] def as_keys(self):
"""Pairs iterator, remaps to keys"""
for i in range(0, self.pairs.shape[0]):
yield self.keys_left[self.pairs[i,0]], self.keys_right[self.pairs[i,1]], self.pairs[i,2]
[docs] def naive_split(self):
"""Split pairs randomly"""
spl = StratifiedShuffleSplit(n_splits=self.n_splits, test_size=self.test_size, random_state=self.random_state)
for train_idx, test_idx in spl.split(X=self.pairs, y=self._get_y()):
yield self.pairs[train_idx], self.pairs[test_idx]
[docs] def right_split(self):
"""Split pairs by right side"""
return self._column_split(col=1)
[docs] def left_split(self):
"""Split pairs by left side"""
return self._column_split(col=0)
[docs] def left_right_split(self):
"""Split pairs by left and right sides"""
### TO DO
return
def save_h5(self, filename):
self.__log.debug("Saving to %s" % filename)
with h5py.File(filename, "w") as hf:
hf.create_dataset("pairs", data=self.pairs)
[docs]def onehot_proteins_signature():
from chemicalchecker.core.signature_data import DataSignature
import csv
import h5py
with open("/aloy/home/mduran/myscripts/dream-ctd2-targetmate/data/bioteque_priors.tsv", "r") as f:
reader = csv.reader(f, delimiter="\t")
keys = []
for r in reader:
keys += [r[0]]
keys = np.array(keys)
V = np.identity(len(keys)).astype(np.int)
with h5py.File("/aloy/home/mduran/myscripts/dream-ctd2-targetmate/paired_targetmate/X_r_1h.h5", "w") as hf:
hf.create_dataset("keys", data=np.array(keys, DataSignature.string_dtype()))
hf.create_dataset("V", data=V)