Source code for chemicalchecker.tool.targetmate.universes.univs

"""Universe of molecules to be used for the sampling of negative data.
"""
import os
import uuid
import pickle
import random
import numpy as np
import collections

from sklearn.svm import OneClassSVM
from sklearn.cluster import MiniBatchKMeans

from ..utils.chemistry import maccs_matrix, morgan_arena, load_morgan_arena
from ..utils.chemistry import similarity_matrix

from chemicalchecker.util import logged
from chemicalchecker.database import Molrepo
from chemicalchecker.util.parser import Converter
from chemicalchecker.core import ChemicalChecker


[docs]class UniverseLoader(pickle.Unpickler):
[docs] def find_class(self, module, name): if name == "Universe": from .univs import Universe return Universe return super().find_class(module, name)
DEFAULTPATH = "/aloy/web_checker/repo_data/targetmate/universe/default/"
[docs]@logged class Universe: def __init__(self, cc_root=None, molrepo=None, k=None, model_path=None, tmp_path='/tmp/tm/tmp_universe', min_actives_oneclass=10, max_actives_oneclass=1000, representative_mols_per_cluster=10, trials = 1000000, only_bioactive = False): """Initialize the Universe class. Args: cc_root(str): Chemical Checker root directory (default=None). molrepo(str): Molrepo to use. Chembl if not specified (default=None) k(int): Number of partitions for the k-Means clustering (default=sqrt(N/2)). model_path(str): Folder where the universe should be stored (default = .) tmp_path(str): Temporary directory (default=/tmp/tm/tmp_universe). min_actives_oneclass(int): Minimum number of actives to use in the OneClassSVM (default=10). max_actives_oneclass(int): Maximum number of actives to use in the OneClassSVM (default=1000). representative_mols_per_cluster(int): Number of molecules to samples for each cluster (default=10). trials(int): Number of sampling trials before stop trying (default=1000000). only_bioactive(bool): Only include known bioactive compounds in the chemical space i.e. those compounds found in ChemicalChecker. """ self.k = k self.tmp_path = os.path.abspath(tmp_path) if not os.path.isdir(self.tmp_path): os.makedirs(self.tmp_path) if not model_path: if not os.path.exists(DEFAULTPATH): self.model_path = os.path.join(os.path.dirname(os.path.abspath(".")), "default") else: self.model_path = os.path.join(DEFAULTPATH) else: self.model_path = os.path.abspath(model_path) if not os.path.isdir(self.model_path): os.makedirs(self.model_path) if not molrepo: self.molrepo = "chembl" else: self.molrepo = molrepo self.min_actives_oneclass = min_actives_oneclass self.max_actives_oneclass = max_actives_oneclass self.representative_mols_per_cluster = representative_mols_per_cluster self.trials = trials self.only_bioactive = only_bioactive def save(self): pkl_file = os.path.join(self.model_path, "universe.pkl") with open(pkl_file, "wb") as f: pickle.dump(self, f) def smiles(self): with open(self.smiles_file, "rb") as f: return pickle.load(f) def clusters_dict(self): with open(self.clusters_dict_file, "rb") as f: return pickle.load(f) def representative_smiles(self): with open(self.representative_smiles_file, "rb") as f: return pickle.load(f) @staticmethod def load_universe(model_path = None): if not model_path: if os.path.exists(DEFAULTPATH): file_name = os.path.join(DEFAULTPATH, "universe.pkl") else: file_name = os.path.join(os.path.abspath("."), "universe.pkl") else: file_name = os.path.join(os.path.abspath(model_path), "universe.pkl") with open(file_name, "rb") as f: univ = UniverseLoader(f).load() return univ def fetch_molecules(self): self.__log.debug("Downloading molrepo") converter = Converter() molrepo = Molrepo.get_fields_by_molrepo_name(self.molrepo, ["inchikey", "src_id", "inchi"]) smiles = [] if self.only_bioactive: s = ChemicalChecker().signature("A1.001", "sign3") valid_inchikeys = s.keys for mol in molrepo: if self.only_bioactive: if mol[0] not in valid_inchikeys: continue smi = converter.inchi_to_smiles(mol[-1]) smiles += [(smi, mol[-1], mol[1], mol[0])] self.smiles_file = os.path.join(self.model_path, "smiles.pkl") with open(self.smiles_file, "wb") as f: pickle.dump(smiles, f) def cluster(self): smiles = self.smiles() maccs = maccs_matrix([smi[0] for smi in smiles]) if not self.k: self.k = int(np.sqrt(maccs.shape[0] / 2)) + 1 kmeans = MiniBatchKMeans( n_clusters=self.k, init_size=np.max([3 * self.k, 300])) kmeans.fit(maccs) clusters = kmeans.predict(maccs) clusters_dict = collections.defaultdict(list) for i, c in enumerate(clusters): clusters_dict[c] += [i] self.clusters_dict_file = os.path.join(self.model_path, "clusters_dict.pkl") with open(self.clusters_dict_file, "wb") as f: pickle.dump(clusters_dict, f) representative_smiles = [] for c, idxs in clusters_dict.items(): idxs_ = random.choices( idxs, k=self.representative_mols_per_cluster) for i in idxs_: representative_smiles += [(c, smiles[i])] self.representative_smiles_file = os.path.join(self.model_path, "representative_smiles.pkl") with open(self.representative_smiles_file, "wb") as f: pickle.dump(representative_smiles, f) def calculate_arena(self): fps_file = os.path.join(self.model_path, "arena.h5") representative_smiles = self.representative_smiles() morgan_arena([smi[1][0] for smi in representative_smiles], fps_file) self.arena_file = fps_file def fit_oneclass_svm(self, actives): tag = str(uuid.uuid4()) actives_list = list(actives) if len(actives_list) < self.min_actives_oneclass: actives_list = random.choices( actives_list, k=self.min_actives_oneclass) else: actives_list = actives_list[:self.max_actives_oneclass] actives_list_smiles = [smi[0] for smi in actives_list] fps_file = os.path.join(self.tmp_path, tag + "_actives_arena.h5") actives_arena = morgan_arena(actives_list_smiles, fps_file) sim_mat = similarity_matrix( actives_list_smiles, actives_arena, len(actives_list_smiles)) clf = OneClassSVM(kernel="precomputed") clf.fit(sim_mat) os.remove(fps_file) return clf, actives_list_smiles def fit(self): self.__log.info("Fetching molecules") self.fetch_molecules() self.__log.info("Clustering") self.cluster() self.__log.info("Calculating arena") self.calculate_arena()
[docs] def predict(self, actives, inactives, inactives_per_active=100, min_actives=10, naive=False, biased_universe=0, maximum_potential_actives = 5, random_state = None): """ Args: actives(list or set): Should include (smiles, id, inchikey). inactives(list or set): Should include (smiles, id, inchikey). inactives_per_active(int): Number of inactives to sample from the universe. Can be None (default=100). min_actives(int): Minimum number of actives (default=10). naive(bool): Sample naively (randomly), without using the OneClassSVM (default=False). biased_universe(float): Proportion of closer molecules to sample as putative inactives (default = 0). maximum_potential_actives(int): Maximum number of representative molecules within active hyperplane before cluster discarded, used for biased universe (default=5). """ self.__log.info("Sampling candidate inactives") self.__log.info("Representative molecules: {:d}".format(self.representative_mols_per_cluster)) if random_state is not None: random.seed(random_state) common_iks = set([smi[-1] for smi in actives] ).intersection([smi[-1] for smi in inactives]) actives = set([smi for smi in actives if smi[-1] not in common_iks]) if len(actives) < min_actives: return None inactives = set( [smi for smi in inactives if smi[-1] not in common_iks]) if not inactives_per_active: return actives, inactives, set(), np.array([]) # Inchikeys actives_iks = set([smi[-1] for smi in actives]) inactives_iks = set([smi[-1] for smi in inactives]) # Inactives sampling procedure N = int(len(actives) * inactives_per_active) + 1 if len(inactives) >= N: # return actives, random.sample(inactives, N), set() return actives, inactives, set(), np.array(set()) # Added by Paula: Prioritze real data, if there are more known inactives than actives mantain all compounds N = N - len(inactives) # Load relevant data smiles = self.smiles() if naive: self.__log.debug("Naively sampling candidates") # Select permitted candidates candidates_iks = set([smi[-1] for smi in smiles]).difference(actives_iks.union(inactives_iks)) if not candidates_iks: return actives, inactives, set(), np.array([]) candidates_dict = dict((smi[-1], (smi[0], smi[1], smi[2], j)) for j, smi in enumerate(smiles)) candidates_iks = random.sample(candidates_iks, int(np.min([N, len(candidates_iks)]))) candidates = set([(candidates_dict[ik][0], candidates_dict[ik][1], candidates_dict[ik][2], ik) for ik in candidates_iks]) candidate_idx = set(candidates_dict[ik][-1] for ik in candidates_iks) return actives, inactives, candidates, np.array(candidate_idx) else: # Load relevant data self.__log.debug("Loading relevant data") representative_smiles = self.representative_smiles() clusters_dict = self.clusters_dict() self.__log.debug("Sampling candidates based on OneClassSVM and clusters") # Fitting one-class SVM clf, actives_list_smiles = self.fit_oneclass_svm(actives) # Loading universe fingerprint arena arena = load_morgan_arena(self.arena_file) # Calculating similarity matrix sim_mat = similarity_matrix( actives_list_smiles, arena, len(representative_smiles)) sim_mat = sim_mat.T # Predicting using one-class SVM if biased_universe: # Added by Paula 31/10/2020 self.__log.info("Using biased universe") dec = clf.decision_function(sim_mat) # biased_weight_dict = collections.defaultdict(int) biased_weight_dict = collections.defaultdict(list) mean = np.mean(dec) std = np.std(dec) for i, d in enumerate(dec): if d - mean >= std: # biased_weight_dict[representative_smiles[i][0]] += (1/np.abs(d)) # biased_weight_dict[representative_smiles[i][0]] += np.abs(d) biased_weight_dict[representative_smiles[i][0]] += [np.abs(d)] # biased_weight_dict[representative_smiles[i][0]] += [d] # Assigning weights to clusters prd = clf.predict(sim_mat) weight_dict = collections.defaultdict(int) for i, p in enumerate(prd): if p == -1: weight_dict[representative_smiles[i][0]] += 1 candidate_clusters = sorted(weight_dict.keys()) candidate_weights = [weight_dict[k] for k in candidate_clusters] # Sample from candidates trials = self.trials t = 0 # candidates = set() candidates = [] candidate_idx = [] if biased_universe: # Added by Paula # vals = np.array(list(biased_weight_dict.values())) vals = np.concatenate(list(biased_weight_dict.values())) mean = np.mean(vals) std = np.std(vals) # for c in biased_weight_dict.keys(): # if abs(biased_weight_dict[c] - mean) >= 4 * std: # if biased_weight_dict[c] - mean > 0: # biased_weight_dict[c] = max(vals[abs(vals - mean) < 4 * std]) # elif biased_weight_dict[c] - mean < 0: # biased_weight_dict[c] = min(vals[abs(vals - mean) < 4 * std]) # print(self.representative_mols_per_cluster-maximum_potential_actives) # for c in weight_dict.keys(): # if weight_dict[c] <= (self.representative_mols_per_cluster-maximum_potential_actives): # if c in biased_weight_dict.keys(): # del biased_weight_dict[c] biased_candidate_clusters = [k for k, v in sorted(biased_weight_dict.items(), key=lambda item: item[1]) if len(v) <= maximum_potential_actives] # biased_candidate_weights = [np.around(biased_weight_dict[k]) for k in biased_candidate_clusters] # biased_candidate_weights = [1 / biased_weight_dict[k] for k in biased_candidate_clusters] # biased_candidate_weights = [(1 / np.mean(biased_weight_dict[k]))*len(biased_weight_dict[k]) for k in biased_candidate_clusters] biased_candidate_weights = [1 / np.mean(biased_weight_dict[k]) for k in biased_candidate_clusters] while len(candidates) < int(N*biased_universe) and t < trials: c = random.choices(biased_candidate_clusters, k=1, weights=biased_candidate_weights)[0] i = random.choice(clusters_dict[c]) cand = smiles[i] if cand[-1] not in actives_iks and cand[-1] not in inactives_iks: # candidates.update([cand]) if cand not in candidates: candidates.extend([cand]) candidate_idx.extend([i]) t += 1 count = collections.defaultdict(int) while len(candidates) < N and t < trials: if random_state is not None: random.seed(t) # DELETE THIS AFTER RUNNING TESTS!! c = random.choices(candidate_clusters, k=1, weights=candidate_weights)[0] i = random.choice(clusters_dict[c]) cand = smiles[i] count[c] = count[c] + 1 if cand[-1] not in actives_iks and cand[-1] not in inactives_iks: # candidates.update([cand]) if cand not in candidates: candidates.extend([cand]) candidate_idx.extend([i]) t += 1 if len(candidates) < N: all_iks = actives_iks.union(inactives_iks).union( [cc[-1] for cc in candidates]) remaining_universe = [ [j, smi] for j, smi in enumerate(smiles) if smi[-1] not in all_iks] N = N - len(candidates) if N >= len(remaining_universe): candidates.update([ru[-1] for ru in remaining_universe]) candidate_idx.extend([ru[0] for ru in remaining_universe]) else: remaining_universe = random.sample(remaining_universe, k=N) candidates.extend([ru[-1] for ru in remaining_universe]) candidate_idx.extend([ru[0] for ru in remaining_universe]) return actives, inactives, candidates, np.array(candidate_idx)