import collections
import random
import numpy as np
from sklearn import model_selection
from chemicalchecker.util import logged
from .chemistry import generate_scaffold
TRIALS = 3
[docs]class Splitter(object):
def __init__(self, n_splits=1, test_size=0.2, random_state=42, train_size=None, **kwargs):
self.n_splits = n_splits
self.test_size = test_size
self.train_size=train_size
if random_state is None:
self.random_state = random.randint(1, 99999)
else:
self.random_state = random_state
def calc_sizes(self, n):
if self.test_size < 1:
self.n_test = int(n*self.test_size)
else:
self.n_test = int(test_size)
self.n_train = n - self.n_test
self.prop = self.n_test / n
self.n = n
[docs]def generate_scaffolds(smiles):
scaffolds = collections.defaultdict(list)
for i, smi in enumerate(smiles):
scaffold = generate_scaffold(smi)
scaffolds[scaffold] += [i]
scaffolds = {key: sorted(value) for key, value in scaffolds.items()}
return scaffolds
[docs]@logged
class ToppedSampler(object):
"""Sample so that coverage is maximized."""
def __init__(self, max_samples, max_ensemble_size, chance, try_balance, shuffle, brute=True):
"""Initialize the topped sampler.
Args:
max_samples(int): Maximum number of samples allowed per draw.
max_ensemble_size(int): Maximum number of draws.
chance(float): Desired probability of drawing a sample at least once.
try_balance(bool): Try to balance, given the available samples. That is, instead of stratifying, give higher probability to the minority class.
shuffle(bool): Shuffle indices.
brute(bool): When trying to balance, be brute and do not sample by probability (default=True).
"""
self.max_samples = max_samples
self.max_ensemble_size = max_ensemble_size
self.chance = chance
self.try_balance = try_balance
self.shuffle = shuffle
self.brute = brute
self.min_ensemble_size = 3
@staticmethod
def get_resamp(s, N, chance):
p = 1 - float(s) / N
return np.log(1-chance)/np.log(p)
def calc_ensemble_size(self):
if self.N <= self.max_samples:
self.samples = self.N
self.ensemble_size = 1
else:
self.samples = self.max_samples
self.ensemble_size = int(
np.ceil(self.get_resamp(self.samples, self.N, self.chance)))
self.ensemble_size = np.min([self.max_ensemble_size, self.ensemble_size])
self.ensemble_size = np.max([self.min_ensemble_size, self.ensemble_size])
@staticmethod
def probabilities(y, bins):
h, b = np.histogram(y, bins)
min_b = np.min(b)
max_b = np.max(b)
b = b[:-1]
m = (max_b-min_b)/(np.max(b)-np.min(b))
n = min_b - m*np.min(b)
b = m*b+n
w = np.interp(y, b, h).ravel()
w = (1 - w/np.sum(h))+1e-6
probas = w/np.sum(w)
return probas
def brute_sample(self, y):
if len(y) <= self.max_samples:
idx = np.array([i for i in range(0, len(y))], dtype=np.int)
np.random.shuffle(idx)
return idx
y0_idx = np.argwhere(y == 0).ravel()
y1_idx = np.argwhere(y == 1).ravel()
n0 = len(y0_idx)
n1 = len(y1_idx)
max_minority = int(self.max_samples/2)
if n0 >= n1:
self.__log.info("0 >= 1 : %d %d" % (n0, n1))
if n1 > max_minority:
y1_idx = np.random.choice(y1_idx, max_minority, replace=False)
n = self.max_samples - len(y1_idx)
if n0 > n:
y0_idx = np.random.choice(y0_idx, n, replace=False)
else:
self.__log.info("0 < 1 : %d %d" % (n0, n1))
if n0 > max_minority:
y0_idx = np.random.choice(y0_idx, max_minority, replace=False)
n = self.max_samples - len(y0_idx)
if n1 > n:
y1_idx = np.random.choice(y1_idx, n, replace=False)
idx = list(y0_idx) + list(y1_idx)
idx = np.array(idx, dtype=np.int)
np.random.shuffle(idx)
self.__log.info("...1: %d total: %d" % (np.sum(y[idx]), len(idx)))
return idx
def ret(self, idxs):
if self.shuffle:
return idxs
else:
idxs_ = np.argsort(idxs)
return idxs[idxs_]
[docs] def sample(self, X=None, y=None, bins=10):
"""Main method"""
self.__log.info("Topped Sampling | max samples %d" % (self.max_samples))
if X is None and y is None:
raise Exception("X and y are None")
if X is None:
self.N = len(y)
else:
self.N = X.shape[0]
if self.N <= self.max_samples:
self.__log.info("...topped sampling not necessary")
idxs = np.array(range(len(y)))
if self.shuffle:
random.shuffle(idxs)
yield self.ret(idxs)
else:
self.calc_ensemble_size()
if self.try_balance:
if y is None:
for _ in range(0, self.ensemble_size):
idxs = np.random.choice(self.N, self.samples, replace=False)
yield self.ret(idxs)
else:
for _ in range(0, self.ensemble_size):
if self.brute:
idxs = self.brute_sample(y)
else:
probas = self.probabilities(y, bins=bins)
idxs = np.random.choice([i for i in range(0,self.N)], self.samples, p=probas, replace=False)
yield self.ret(idxs)
else:
if y is None:
splits = ShuffleSplit(
n_splits=self.ensemble_size, train_size=self.samples)
else:
splits = StratifiedShuffleSplit(
n_splits=self.ensemble_size, train_size=self.samples)
for split in splits:
yield self.ret(split[0])
[docs]@logged
class OutOfUniverseStratified(Splitter):
"""In a stratified skaffold split, only molecules that are not present in the universe are accepted at testing time"""
def __init__(self, cc=None, datasets = ["A1.001"],
cctype="sign1", inchikeys_universe=None, **kwargs):
"""Initialize.
Args:
cc(ChemicalChecker): ChemicalChecker instance.
datasets_universe(list): Datasets to consider as the universe.
cctype(str): Signature type corresponding to the datasets.
inchikeys_universe(list): List of inchikeys to be considered as the universe. If not None, overwrites datasets (default=None).
"""
Splitter.__init__(self, **kwargs)
if inchikeys_universe is None:
if cc is None:
raise Exception("ChemicalChecker instance need be specified")
univ = set()
for ds in datasets:
s = cc.signature(ds, cctype)
univ.update(list(s.keys))
univ = list(univ)
else:
univ = inchikeys_universe
self.univ = set([k.split("-")[0] for k in univ])
def one_split(self, ins_idxs, out_idxs, y, seed):
assert set(y) == set([0,1]), "Labels are different than 0/1"
y = np.array(y)
# Expected numbers
exp_n_te = int(np.round(len(y)*self.test_size, 0))
exp_n_tr = len(y) - exp_n_te
prop_1_0 = np.sum(y) / len(y)
exp_n_tr_0 = max(1, int(np.round(exp_n_tr*(1-prop_1_0),0)))
exp_n_tr_1 = max(1, int(np.round(exp_n_tr*(prop_1_0),0)))
exp_n_te_0 = max(1, int(np.round(exp_n_te*(1-prop_1_0),0)))
exp_n_te_1 = max(1, int(np.round(exp_n_te*(prop_1_0),0)))
# Masks
ins_mask = np.array([False]*len(y))
out_mask = np.array([False]*len(y))
ins_mask[ins_idxs] = True
out_mask[out_idxs] = True
mask_0 = y == 0
mask_1 = y == 1
# Check availability
obs_ins_0 = np.argwhere(np.logical_and(ins_mask, mask_0)).ravel()
obs_ins_1 = np.argwhere(np.logical_and(ins_mask, mask_1)).ravel()
obs_out_0 = np.argwhere(np.logical_and(out_mask, mask_0)).ravel()
obs_out_1 = np.argwhere(np.logical_and(out_mask, mask_1)).ravel()
obs_ins_n_0 = len(obs_ins_0)
obs_ins_n_1 = len(obs_ins_1)
obs_out_n_0 = len(obs_out_0)
obs_out_n_1 = len(obs_out_1)
if obs_out_n_1 == 0 or obs_out_n_0 == 0:
self.__log.warn("Unfortunately, not enough out-of-universe samples are available")
return None, None
# Decide how to sample
# start with test set
self.__log.info("... sampling test first")
test_idxs = []
# negatives
if obs_out_n_0 >= exp_n_te_0:
self.__log.info("More 0s than needed, subsampling")
n = exp_n_te_0
np.random.seed(seed)
test_idxs += list(np.random.choice(obs_out_0, size=n, replace=False))
else:
self.__log.info("Less 0s than needed, that's ok...")
test_idxs += list(obs_out_0)
# positives
if obs_out_n_1 >= exp_n_te_1:
self.__log.info("More 1s than needed, subsampling")
n = exp_n_te_1
np.random.seed(seed)
test_idxs += list(np.random.choice(obs_out_1, size=n, replace=False))
else:
self.__log.info("Less 1s than needed, that's ok...")
test_idxs += list(obs_out_1)
# rebalance if necessary
self.__log.debug("Re-balance if necessary")
test_prop_1_0 = np.sum(y[test_idxs])/len(test_idxs)
self.__log.info("Observed test prop 1/0: %.3f, full prop 1/0: %.3f" % (test_prop_1_0, prop_1_0))
idxs_0 = np.argwhere(y == 0).ravel()
idxs_1 = np.argwhere(y == 1).ravel()
test_idxs_0 = list(set(test_idxs).intersection(idxs_0))
test_idxs_1 = list(set(test_idxs).intersection(idxs_1))
if test_prop_1_0 > prop_1_0:
n_1 = int(np.round(len(test_idxs)*prop_1_0,0))
np.random.seed(seed)
test_idxs_1 = np.random.choice(test_idxs_1, size=min(len(test_idxs_1), n_1), replace=False)
else:
n_0 = int(np.round(len(test_idxs)*(1-prop_1_0),0))
np.random.seed(seed)
test_idxs_0 = np.random.choice(test_idxs_0, size=min(len(test_idxs_0), n_0), replace=False)
test_idxs = list(test_idxs_0) + list(test_idxs_1)
if len(test_idxs_0) == 0 or len(test_idxs_1) == 0:
return None, None
# continue with train set
self.__log.info("... sampling train now")
train_idxs = []
# negatives
if obs_ins_n_0 >= exp_n_tr_0:
self.__log.info("More 0s than needed, subsampling")
n = exp_n_tr_0
np.random.seed(seed)
train_idxs += list(np.random.choice(obs_ins_0, size=n, replace=False))
else:
self.__log.info("Less 0s than needed, sampling from out-of-universe")
train_idxs += list(obs_ins_0)
n = exp_n_tr_0 - obs_out_n_0
eligible = list(set(obs_out_0).difference(test_idxs))
n = min(n, len(eligible))
if n > 0:
np.random.seed(seed)
train_idxs += list(np.random.choice(eligible, size=min(n, len(eligible)), replace=False))
# positives
if obs_ins_n_1 >= exp_n_tr_1:
self.__log.info("More 1s than needed, subsampling")
n = exp_n_tr_1
np.random.seed(seed)
train_idxs += list(np.random.choice(obs_ins_1, size=n, replace=False))
else:
self.__log.info("Less 1s than needed, sampling from out-of-universe")
train_idxs += list(obs_ins_1)
n = exp_n_tr_1 - obs_ins_n_1
eligible = list(set(obs_out_1).difference(test_idxs))
n = min(n, len(eligible))
if n > 0:
np.random.seed(seed)
train_idxs += list(np.random.choice(eligible, size=n, replace=False))
# Sort
train_idxs = np.array(train_idxs).astype(np.int)
test_idxs = np.array(test_idxs).astype(np.int)
np.random.seed(seed)
np.random.shuffle(train_idxs)
np.random.seed(seed)
np.random.shuffle(test_idxs)
return train_idxs, test_idxs
def split(self, X, y):
seed = self.random_state
inchikeys = X
inchikeys_conn = np.array([k.split("-")[0] for k in inchikeys])
ins_idxs = []
out_idxs = []
for i, k in enumerate(inchikeys_conn):
if k in self.univ:
ins_idxs += [i]
else:
out_idxs += [i]
ins_idxs = np.array(ins_idxs)
out_idxs = np.array(out_idxs)
self.__log.info("Inside universe: %d, Outside universe: %d" % (len(ins_idxs), len(out_idxs)))
for _ in range(0, self.n_splits):
train_idx, test_idx = self.one_split(ins_idxs, out_idxs, y, seed)
seed += 1
yield train_idx, test_idx
[docs]@logged
class ShuffleScaffoldSplit(Splitter):
"""Random sampling based on scaffolds. It tries to satisfy the desired proportion"""
def __init__(self, trials=TRIALS, **kwargs):
Splitter.__init__(self, **kwargs)
self.trials = trials
def one_split(self, smiles, seed):
self.calc_sizes(len(smiles))
scaffolds = generate_scaffolds(smiles)
scaffolds = [scaffolds[k] for k in sorted(scaffolds.keys())]
cur_train_idx = None
cur_test_idx = None
cur_prop = None
for _ in range(0, self.trials):
train_idx = []
test_idx = []
scaff_idxs = [i for i in range(0, len(scaffolds))]
random.seed(seed)
random.shuffle(scaff_idxs)
seed += 1
for scaff_idx in scaff_idxs:
idxs = scaffolds[scaff_idx]
if len(train_idx) + len(idxs) > self.n_train:
test_idx += idxs
else:
train_idx += idxs
prop = len(test_idx) / self.n
if cur_train_idx is None:
cur_train_idx = train_idx
cur_test_idx = test_idx
cur_prop = prop
else:
if np.abs(prop - self.prop) < np.abs(cur_prop - self.prop):
cur_train_idx = train_idx
cur_test_idx = test_idx
cur_prop = prop
train_idx = np.array(cur_train_idx).astype(np.int)
test_idx = np.array(cur_test_idx).astype(np.int)
random.seed(seed)
random.shuffle(train_idx)
random.seed(seed)
random.shuffle(test_idx)
self.__log.info("Train size: %d, Test size: %d, Prop : %.2f" % (len(train_idx), len(test_idx), cur_prop))
return train_idx, test_idx
def split(self, X, y = None):
seed = self.random_state
smiles = X
for _ in range(0, self.n_splits):
train_idx, test_idx = self.one_split(smiles, seed)
seed += 1
yield train_idx, test_idx
[docs]@logged
class StratifiedShuffleScaffoldSplit(Splitter):
"""Stratified shuffle split based on scaffolds.
It tries to preserve the proportion of samples in the train and test sets.
The current algorithm is very rough..."""
def __init__(self, trials=TRIALS, **kwargs):
ShuffleScaffoldSplit.__init__(self, trials=10, **kwargs)
self.trials = trials
def one_split(self, smiles, y, seed):
assert set(y) == set([0,1]), "Labels are different than 0/1"
y = np.array(y)
self.calc_sizes(len(smiles))
scaffolds = generate_scaffolds(smiles)
scaffolds = [scaffolds[k] for k in sorted(scaffolds.keys())]
balance = np.sum(y) / len(y)
cur_train_idx = None
cur_test_idx = None
cur_prop = None
cur_balance_train = None
for _ in range(0, self.trials):
train_idx = []
test_idx = []
scaff_idxs = [i for i in range(0, len(scaffolds))]
random.seed(seed)
random.shuffle(scaff_idxs)
seed += 1
for scaff_idx in scaff_idxs:
idxs = scaffolds[scaff_idx]
if len(train_idx) + len(idxs) > self.n_train:
test_idx += idxs
else:
train_idx += idxs
prop = len(test_idx) / self.n
balance_train = np.sum(y[train_idx]) / len(train_idx)
if cur_train_idx is None:
cur_train_idx = train_idx
cur_test_idx = test_idx
cur_prop = prop
cur_balance_train = balance_train
else:
if (np.abs(balance - balance_train) + np.abs(prop - self.prop)) < (np.abs(balance - cur_balance_train) + np.abs(cur_prop - self.prop)):
cur_train_idx = train_idx
cur_test_idx = test_idx
cur_prop = prop
cur_balance_train = balance_train
train_idx = np.array(cur_train_idx).astype(np.int)
test_idx = np.array(cur_test_idx).astype(np.int)
random.seed(seed)
random.shuffle(train_idx)
random.seed(seed)
random.shuffle(test_idx)
self.__log.info("Train size: %d, Test size: %d, Prop: %.2f, Balance: %.2f (%.2f)" % (len(train_idx), len(test_idx), cur_prop, cur_balance_train, balance))
return train_idx, test_idx
def split(self, X, y):
seed = self.random_state
smiles = X
for _ in range(0, self.n_splits):
train_idx, test_idx = self.one_split(smiles, y, seed)
seed += 1
yield train_idx, test_idx
[docs]@logged
class DeepchemScaffoldSplit(Splitter):
"""Analogous to DeepChem implementation. First it sorts by scaffold set size."""
def __init__(self, **kwargs):
Splitter.__init__(self, **kwargs)
def split(self, X, y=None):
seed = self.random_state
smiles = X
self.calc_sizes(len(smiles))
scaffolds = generate_scaffolds(smiles)
scaffolds = [v for k,v in sorted(scaffolds.items(), key = lambda x: (len(x[1]), x[1][0]), reverse=True)]
train_idx = []
test_idx = []
for idxs in scaffolds:
if len(train_idx) + len(idxs) > self.n_train:
test_idx += idxs
else:
train_idx += idxs
train_idx = np.array(train_idx).astype(np.int)
test_idx = np.array(test_idx).astype(np.int)
random.seed(seed)
random.shuffle(train_idx)
random.seed(seed)
random.shuffle(test_idx)
prop = len(test_idx) / (len(test_idx) + len(train_idx))
self.__log.info("Train size: %d, Test size: %d, prop: %.2f" % (len(train_idx), len(test_idx), prop))
yield train_idx, test_idx
[docs]@logged
class StandardStratifiedShuffleSplit(Splitter):
"""A wrapper for the sklearn stratified shuffle split"""
def __init__(self, **kwargs):
self.__log.info("Standard stratified shuffle splitter")
Splitter.__init__(self, **kwargs)
def split(self, X, y):
spl = model_selection.StratifiedShuffleSplit(n_splits=self.n_splits,
test_size=self.test_size,
random_state=self.random_state)
for train_idx, test_idx in spl.split(X=X, y=y):
yield train_idx, test_idx
[docs]def GetSplitter(is_cv, is_classifier, is_stratified, scaffold_split, outofuniverse_split):
"""Select the splitter, depending on the characteristics of the problem"""
if is_classifier:
if is_cv:
if is_stratified:
spl = model_selection.StratifiedKFold
else:
spl = model_selection.KFold
else:
if outofuniverse_split:
spl = OutOfUniverseStratified
else:
if scaffold_split:
if is_stratified:
spl = StratifiedShuffleScaffoldSplit
else:
spl = ShuffleScaffoldSplit
else:
if is_stratified:
spl = StandardStratifiedShuffleSplit
else:
spl = model_selection.ShuffleSplit
else:
# TO-DO
pass
return spl