Source code for chemicalchecker.tool.targetmate.evaluation.prediction

import os
import pickle
import re
import shutil
import joblib

import h5py

import numpy as np
from chemicalchecker.util import logged

from chemicalchecker.core import ChemicalChecker
from signaturizer import Signaturizer as SignaturizerExternal
from ..utils import chemistry

from ..utils import HPCUtils
from chemicalchecker.util import Config
import uuid
from sklearn.preprocessing import RobustScaler

MAXQUEUE = 15

[docs]@logged class Prediction(HPCUtils): """ Helper class to carry out external prediction. Args: directory_root(str): where to search for models depth(int): number of directories to search into data(list): list of inchis to test output_directory(str): folder to output predictions datasets(list/str): dataset to search inchis """ def __init__(self, directory_root, data, output_directory="./", depth=2, datasets=None, molrepo="chembl", compressed='', hpc=False, n_jobs_hpc=8, tmp_path=None, chunk_size=None, overwrite=False, wipe=True, log="INFO", use_cc=True, is_classic=False, keytype='InChiKey', cctype="sign3", signature_type=None, pchembl=None, fold=None, running=False, **kwargs): HPCUtils.__init__(self, **kwargs) self.root = os.path.abspath(directory_root) assert type(data) == str or type(data) == np.ndarray or type(data) == list, "Unknown data format" self.datasets = datasets self.data = data self.output = os.path.abspath(output_directory) if not os.path.isdir(self.output): os.mkdir(self.output) self.depth = depth self.molrepo = molrepo.upper() self.hpc = hpc if self.hpc: self.n_jobs_hpc = n_jobs_hpc if not tmp_path: subpath = self.root.rstrip("/").split("/")[-1] self.tmp_path = os.path.join(Config().PATH.CC_TMP, "targetmate", subpath, str(uuid.uuid4())) else: self.tmp_path = os.path.abspath(tmp_path) if not os.path.isdir(self.tmp_path): os.makedirs(self.tmp_path, exist_ok=True) self.chunk_size = chunk_size self.wipe = wipe self.overwrite = overwrite self.compressed = compressed self.use_cc = use_cc self.is_classic = is_classic self.keytype = keytype self.cctype = cctype if signature_type is not None: if signature_type == 'CC': self.use_cc = True elif signature_type == 'Signaturizer': self.use_cc = False self.is_classic = False elif signature_type == 'ECFP4': self.use_cc = False self.is_classic = False elif type(signature_type) == str: self.__log.info("Signature type not available") else: self.__log.info("Setting signature type from variables") if pchembl is not None: self.pchembl = "pchembl_{:d}".format(pchembl * 100) else: self.pchembl = pchembl self.fold = fold self.running = running def _prepare_cc(self, datasets, cctype): cc = ChemicalChecker() if datasets is None: datasets = cc.datasets_exemplary() sign = [] for dataset in datasets: sign += [cc.signature(dataset, cctype)] return sign def _signaturize_fingerprinter(self, smiles): """Calculate fingerprints""" V = chemistry.morgan_matrix(smiles) return V def _signaturize_signaturizer(self, molecules, moleculetype='SMILES'): V = None for ds in self.datasets: s3_ = SignaturizerExternal(ds.split(".")[0]) s = s3_.predict(molecules, keytype=moleculetype) v = s.signature[~s.failed] if V is None: V = v else: V = np.hstack([V, v]) return V def _read_signatures_by_inchikey_from_cc(self, inchikeys): """Read signatures from CC. InChIKeys are used.""" iks_dict = dict((k, i) for i, k in enumerate(inchikeys)) V = None for s in self.sign: keys, v = s.get_vectors_lite(inchikeys) if V is None: V = v else: V = np.hstack([V, v]) idxs = np.array([iks_dict[k] for k in keys if k in iks_dict]).astype(np.int) # self.__log.debug("Signature read: {}".format(V.shape)) return V, idxs def model_iterator(self, mod_dir, uncalib): for m in os.listdir(mod_dir): m_ = m.split(".z")[0] # TODO: Consider additional variable so that can change type of compression if m_.split("---")[-1] == "base_model": continue if uncalib is False: if m_.split("-")[1] == "uncalib": continue else: if m_.split("-")[1] != "uncalib": continue fn = os.path.join(mod_dir, m) with open(fn, "rb") as f: mod = joblib.load(fn) yield mod def search_models(self, model_directory): dat = [] if type(model_directory) == str: for n in os.listdir(model_directory): if os.path.isdir(os.path.join(model_directory, n)): dat.append(os.path.join(model_directory, n)) elif type(model_directory) == list: for m in model_directory: for n in os.listdir(m): if os.path.isdir(os.path.join(m, n)): dat.append(os.path.join(m, n)) return dat def get_models(self, targets): directories = self.search_models(self.root) self.depth -= 1 while self.depth > 0: directories = self.search_models(directories) self.depth -= 1 directories = [d for d in directories if os.path.isdir(os.path.join(d, "bases"))] if self.pchembl is not None: directories = [d for d in directories if self.pchembl in d] if targets is not None: directories = [d for d in directories for t in targets if t in d] if self.running: directories = [d for d in directories if os.path.exists(os.path.join(d, "bases", "Stacked---0.z"))] return directories def chunker(self, seq, size): return (seq[pos:pos + size] for pos in range(0, len(seq), size)) def load_signatures(self, sign_f): with h5py.File(sign_f, "r") as f: # List all groups X = f["signature"][:] # TODO: Currently only for signaturizer, apply to CC as well return X def _is_done(self, target): if os.path.exists(os.path.join(self.output, target + ".pkl")): self.__log.info("File exists in %s" % self.output) return True else: return False def _predict(self, mod, X, target, iteration): if iteration is not None: destination_path = os.path.join(self.output, "%s_%d.pkl" % (target, iteration)) else: destination_path = os.path.join(self.output, "%s.pkl" % target) if os.path.exists(destination_path): os.remove( destination_path) if self.fold is not None: if not os.path.isdir(os.path.join(mod, "bases", self.fold)): return None else: if not os.path.isdir(os.path.join(mod, "bases")): return None preds = None if self.fold is not None: mod_path = os.path.join(mod, "bases", self.fold) else: mod_path = os.path.join(mod, "bases") smoothing = None for i, mod in enumerate(self.model_iterator(mod_path, uncalib=False)): if smoothing is None: smoothing = np.random.uniform(0, 1, size=( len(X), len(mod.classes))) # Added by Paula: Apply same smoothing to all ccp p = mod.predict(X, smoothing=smoothing) if preds is None: preds = p else: preds = preds + p preds = preds / (i + 1) with open(destination_path, "wb") as f: pickle.dump(preds, f) def predict(self, mod, X, target, iteration=None): return self._predict(mod, X, target, iteration) def incorporate_background(self, X, mod): test_idx = len(X) with open(os.path.join(mod, "trained_data.pkl"), "rb") as f: td = pickle.load(f) if self.use_cc: train, idx = self._read_signatures_by_inchikey_from_cc(td.inchikey) else: if self.is_classic: train = self._signaturize_fingerprinter(td.molecule) else: train = self._signaturize_signaturizer(td.molecule, td.moleculetype) X = np.vstack([X, train]) idx = np.zeros(len(X)) idx[:test_idx] = 1 return X, idx def predict_background(self, mod, X, target, iteration=None): if iteration is not None: destination_path = os.path.join(self.output, "%s_%d.pkl" % (target, iteration)) else: destination_path = os.path.join(self.output, "%s.pkl" % target) if os.path.exists(destination_path): os.remove( destination_path) if self.fold is not None: if not os.path.isdir(os.path.join(mod, "bases", self.fold)): return None else: if not os.path.isdir(os.path.join(mod, "bases")): return None preds = None zscore = None if self.fold is not None: mod_path = os.path.join(mod, "bases", self.fold) else: mod_path = os.path.join(mod, "bases") X, test_idx = self.incorporate_background(X, mod) i = False smoothing = None # Added by Paula: currently set class size to 2, eed to make so that it depends on model for i, mod in enumerate(self.model_iterator(mod_path, uncalib=False)): if smoothing is None: smoothing = np.random.uniform(0, 1, size=( len(X), len(mod.classes))) # Added by Paula: Apply same smoothing to all ccp p = mod.predict(X, smoothing=smoothing) if preds is None: preds = p else: preds = preds + p RS = RobustScaler().fit(p[test_idx == 0, 1].reshape(-1, 1)) if zscore is None: zscore = RS.transform(p[:, 1].reshape(-1, 1)).flatten() else: zscore = zscore + RS.transform(p[:, 1].reshape(-1, 1)).flatten() if i is False: return None preds = preds / (i + 1) zscore = zscore / (i + 1) preds = np.vstack([preds.transpose(), zscore, test_idx]).transpose() self.__log.debug("Storing prediciton: {:s}".format(destination_path)) with open(destination_path, "wb") as f: pickle.dump(preds, f) def run(self, target_name_location=-2, zscore=False, targets=None): if self.molrepo == 'CHEMBL': p = re.compile('CHEMBL[0-9]+') else: assert target_name_location < 0, "Incorrect target name location: must be negative integer" self.__log.info("Getting models") directories = self.get_models() self.__log.info("Will calculate predictions for {:d} proteins".format(len(directories))) self.__log.info("Loading compounds to predict") jobs = [] if type(self.data) == str: self.__log.info("File to data used: loading signatures") self.data = os.path.abspath(self.data) data = self.load_signatures(self.data) elif (type(self.data) == np.ndarray) and (self.data.dtype == np.float32): self.__log.info("Signatures introduced") data = self.data elif type(self.data) == list: if self.use_cc: self.sign = self._prepare_cc(self.datasets, self.cctype) data, idx = self._read_signatures_by_inchikey_from_cc(self.data) else: if self.is_classic: data = self._signaturize_fingerprinter(self.data) else: data = self._signaturize_signaturizer(self.data, self.keytype) for d in directories: if self.molrepo == 'CHEMBL': target = p.search(d).group() else: target = d.split("/")[target_name_location] if targets is not None: if target not in targets: continue if not self.overwrite: if self._is_done(target): self.__log.warn("{:s} already exists".format(target)) continue self.__log.info("Working on {:s}".format(target)) self.__log.info("Predicting compounds") if self.hpc: if zscore: jobs += [ self.func_hpc("predict_background", d, data, target, None, cpu=self.n_jobs_hpc, job_base_path=self.tmp_path)] else: if self.chunk_size: # TODO: Add alert so that if compound library is larger than X recommend using chunksize for j, c in tqdm(enumerate(self.chunker(data, self.chunk_size))): jobs += [ self.func_hpc("predict", d, c, target, j, cpu=self.n_jobs_hpc, job_base_path=self.tmp_path)] else: jobs += [self.func_hpc("predict", d, data, target, None, cpu=self.n_jobs_hpc, job_base_path=self.tmp_path)] if len(jobs) > MAXQUEUE: self.waiter(jobs) jobs = [] else: if zscore: self.predict_background(d, data, target) else: self.predict(d, data, target) if self.hpc: self.waiter(jobs) if self.wipe: self.__log.info("Deleting temporary directories") shutil.rmtree(self.tmp_path, ignore_errors=True)