Source code for chemicalchecker.tool.targetmate.base

"""
TargetMate base classes.
"""
import os
import pickle
import uuid

import joblib
import numpy as np
import shap
from chemicalchecker.util import logged
from sklearn.decomposition import PCA

from .io import InputData, Prediction, Explanation
from .signaturizer import SignaturizerSetup
from .tmsetup import ModelSetup
from .utils import metrics
from .utils import splitters


[docs]@logged class Model(ModelSetup): """Generic model class""" def __init__(self, is_classifier, **kwargs): """Initialize TargetMate model. Args: is_classifier(bool): Is the model a classifier or a regressor? """ ModelSetup.__init__(self, is_classifier, **kwargs) self.weights = None self.mod_dir = None self.mod_uncalib_dir = None def array_on_disk(self, ar): fn = os.path.join(self.arrays_tmp_path, str(uuid.uuid4()) + ".npy") np.save(fn, ar) return fn def check_array_from_disk(self, ar): if ar is None: return None if type(ar) == str: return np.load(ar, allow_pickle=True) else: return ar def sampler(self, X, y): self.shuffle = True spl = splitters.ToppedSampler(max_samples=self.max_train_samples, max_ensemble_size=self.max_train_ensemble, chance=self.train_sample_chance, try_balance=True, shuffle=self.shuffle, brute=True) self.__log.debug("Sampling") for shuff in spl.sample(X=X, y=y): yield shuff
[docs] def find_base_mod(self, X, y, smiles, destination_dir): """Select a pipeline, for example, using HyperOpt.""" destdir = destination_dir + "---base_model" X = self.check_array_from_disk(X) y = self.check_array_from_disk(y) smiles = self.check_array_from_disk(smiles) self.__log.info("Setting the base model") for shuff in self.sampler(X, y): break base_mod = self.algo.as_pipeline(X=X[shuff], y=y[shuff], smiles=smiles[shuff]) self.__log.info("Saving base model in %s" % destdir) self.base_mod_dir = destdir with open(self.base_mod_dir, "wb") as f: joblib.dump(base_mod, f)
[docs] def load_base_mod(self): """Load base model""" with open(self.base_mod_dir, "rb") as f: return joblib.load(f)
[docs] def metric_calc(self, y_true, y_pred, metric=None): """Calculate metric. Returns (value, weight) tuple.""" if not metric: metric = self.metric return metrics.Metric(metric)(y_true, y_pred)
def _weight(self, X, y): """Weight the association between a certain data type and an y variable using a cross-validation scheme, and a relatively simple model.""" X = self.check_array_from_disk(X) y = self.check_array_from_disk(y) for shuff in self.sampler(X, y): break X = X[shuff] y = y[shuff] mod = self.weight_algo.as_pipeline(X, y) Spl = splitters.GetSplitter(is_cv=False, is_classifier=self.is_classifier, is_stratified=True, scaffold_split=self.scaffold_split) kf = Spl(n_splits=1, test_size=0.2, random_state=42) y_pred = [] y_true = [] for train_idx, test_idx in kf.split(X, y): mod.fit(X[train_idx], y[train_idx]) if self.is_classifier: y_pred += list(mod.predict_proba(X[test_idx])[:, 1]) else: y_pred += list(mod.predict(X[test_idx])) y_true += list(y[test_idx]) return self.metric_calc(np.array(y_true), np.array(y_pred))[1] def _check_y(self, y): """Randomly sample positives or negatives if not enough in the class. Should be a corner case.""" y = np.array(y) act = np.sum(y) ina = len(y) - act if act >= self.min_class_size_active and ina >= self.min_class_size_inactive: # Added by Paula: Specific to each class return y if act < self.min_class_size_active: # Added by Paula: Specific to each class m = self.min_class_size_active - act idxs = np.argwhere(y == 0).ravel() np.random.shuffle(idxs) idxs = idxs[:m] y[idxs] = 1 return y if ina < self.min_class_size_inactive: # Added by Paula: Specific to each class m = self.min_class_size_inactive - ina idxs = np.argwhere(y == 1).ravel() np.random.shuffle(idxs) idxs = idxs[:m] y[idxs] = 0 return y def _fit(self, X, y, smiles=None, destination_dir=None): """Fit a model, using a specified pipeline. Args: X(array): Signatures matrix. y(array): Labels vector. destination_dir(str): File where to store the model. If not specified, the model is returned in memory (default=None). n_jobs(int): If jobs are specified, the number of CPUs per model are overwritten. This is relevant when sending jobs to the cluster (default=None). """ # Set up if destination_dir is None: raise Exception("destination_dir cannot be None") self.mod_dir = destination_dir self.mod_uncalib_dir = destination_dir + "-uncalib" # Check if array is a file X = self.check_array_from_disk(X) y = self.check_array_from_disk(y) smiles = self.check_array_from_disk(smiles) # Get started self.find_base_mod(X, self._check_y(y), smiles, destination_dir=destination_dir) base_mod = self.load_base_mod() for i, shuff in enumerate(self.sampler(X, y)): X_ = X[shuff] y_ = y[shuff] y_ = self._check_y(y_) self.__log.info("Fitting round %i" % i) if self.conformity: n_models = min(self.ccp_folds, np.sum(y_)) self.__log.info("Preparing cross-conformity (n_models = %d)" % n_models) # Cross-conformal prediction mod = self.cross_conformal_func(base_mod, n_models=n_models) # Do normal as well mod_uncalib = base_mod else: self.__log.info("Using the selected pipeline") mod = base_mod mod_uncalib = None # Fit the model self.__log.info("Fitting (%d actives, %d inactives)" % (np.sum(y_), len(shuff) - np.sum(y_))) mod.fit(X_, y_) if mod_uncalib is not None: self.__log.info("Fitting model, but without calibration") mod_uncalib.fit(X_, y_) for p in range(len(mod.predictors)): # Added by Paula: way to reduce memory when storing models mod.predictors[p].cal_x = None mod.predictors[p].cal_y = None # Save the destination directory of the model destdir = self.mod_dir + "---%d" % i destdir_uncalib = self.mod_uncalib_dir + "---%d" % i self.__log.debug("Saving fitted model in %s" % destdir) with open(destdir, "wb") as f: joblib.dump(mod, f) if mod_uncalib is not None: self.__log.debug("Saving fitted uncalibrated model in %s" % destdir_uncalib) with open(destdir_uncalib, "wb") as f: joblib.dump(mod_uncalib, f) else: self.__log.debug("Calibrated and uncalibrated models are the same %s" % destdir_uncalib) os.symlink(destdir, destdir_uncalib) self.__log.info("Fitting full model, but without calibration") if type(mod_uncalib).__name__ == 'RandomForestClassifier': mod_uncalib.oob_score = True mod_uncalib.fit(X, y) destdir_uncalib = self.mod_uncalib_dir + "---complete" with open(destdir_uncalib, "wb") as f: joblib.dump(mod_uncalib, f) def model_iterator(self, uncalib): if uncalib: mod_dir = self.mod_uncalib_dir else: mod_dir = self.mod_dir name = mod_dir.split("/")[-1] path = os.path.dirname(mod_dir) for m in os.listdir(path): if m.split("---")[0] != name: continue if m.split("---")[-1] == "base_model": continue fn = os.path.join(path, m) with open(fn, "rb") as f: mod = joblib.load(fn) yield mod def _predict(self, X, destination_dir=None): """Make predictions Returns: A (n_samples, n_classes) array. For now, n_classes = 2. """ X = self.check_array_from_disk(X) self.__log.info("Predicting") preds = None smoothing = None for i, mod in enumerate(self.model_iterator(uncalib=False)): if smoothing is None: smoothing = np.random.uniform(0, 1, size=(len(X), len(mod.classes))) # Added by Paula: Apply same smoothing to all ccp if self.conformity: p = mod.predict(X, smoothing = smoothing) else: p = mod.predict_proba(X) if preds is None: preds = p else: preds = preds + p preds = preds / (i + 1) if destination_dir: self.__log.debug("Saving prediction in %s" % destination_dir) with open(destination_dir, "wb") as f: pickle.dump(preds, f) else: return preds def _predict_uncalib(self, X, destination_dir=None): """Make predictions Returns: A (n_samples, n_classes) array. For now, n_classes = 2. """ X = self.check_array_from_disk(X) self.__log.info("Predicting") path = self.mod_uncalib_dir + "---complete" with open(path, "rb") as f: mod = joblib.load(f) preds = mod.predict_proba(X) if destination_dir: self.__log.debug("Saving uncalibrated prediction in %s" % destination_dir) with open(destination_dir, "wb") as f: pickle.dump(preds, f) else: return preds def _explain(self, X, destination_dir=None): """Explain the output of a model. Returns: """ X = self.check_array_from_disk(X) self.__log.info("Explaining") shaps = None for i, mod in enumerate(self.model_iterator(uncalib=True)): explainer = shap.TreeExplainer( mod) # TO-DO: Apply kernel explainer for non-tree methods. Perhaps use LIME when computational cost is high. shaps = explainer.shap_values(X, check_additivity=False) break if destination_dir: self.__log.debug("Saving explanations in %s" % destination_dir) with open(destination_dir, "wb") as f: pickle.dump(shaps, f) else: return shaps def fit(self): return self._fit def predict(self): return self._predict def explain(self): return self._explain
[docs]@logged class SignaturedModel(Model, SignaturizerSetup): ## Commented by Paula # class SignaturedModel(Model, Fingerprinter, Signaturizer): def __init__(self, **kwargs): """Initialize a signatured model.""" Model.__init__(self, **kwargs) SignaturizerSetup.__init__(self, do_init=False, **kwargs) def get_data_fit(self, data, inchikey_idx=None, activity_idx=0, srcid_idx=None, use_inchikey=False, **kwargs): smiles_idx = kwargs.get('smiles_idx', None) inchi_idx = kwargs.get('inchi_idx', None) data = self.prepare_data(data, smiles_idx, inchi_idx, inchikey_idx, activity_idx, srcid_idx, use_inchikey) data = self.prepare_for_ml(data) return data def get_data_predict(self, data, smiles_idx=None, inchi_idx=None, inchikey_idx=None, activity_idx=None, srcid_idx=None, use_inchikey=False, **kwargs): data = self.prepare_data(data, smiles_idx, inchi_idx, inchikey_idx, activity_idx, srcid_idx, use_inchikey) data = self.prepare_for_ml(data, predict=True) # TODO: Check that this is necessary... return data
[docs] def get_Xy_from_data(self, data, idxs, scramble = False): """Given data and certain idxs, get X and y (if available)""" # filter data by specified indices res = data.as_dict(idxs) y = res["activity"] idxs = res["idx"] molecule = res["molecule"] inchikeys = res["inchikey"] X, idxs_ = self.read_signatures(datasets=self.datasets, idxs=idxs, smiles=molecule, inchikeys=inchikeys, is_tmp=self.is_tmp_signatures) if y is not None: y = y[idxs_] if scramble: self.__log.info("Scrambling y") np.random.shuffle(y) idxs = idxs[idxs_] molecule = molecule[idxs_] inchikeys = inchikeys[idxs_] self.__log.info("X shape: (%d, %d) / Y length: %d" % (X.shape[0], X.shape[1], len(y))) # saving arrays on disk X = self.array_on_disk(X) if y is not None: y = self.array_on_disk(y) molecule = self.array_on_disk(molecule) inchikeys = self.array_on_disk(inchikeys) idxs = self.array_on_disk(idxs) results = { "X": X, "y": y, "idxs": idxs, "molecule": molecule, "inchikeys": inchikeys } self.__log.info("Arrays saved on disk %s" % results) return results
[docs]@logged class StackedModel(SignaturedModel): """Stacked TargetMate model.""" def __init__(self, n_components=None, **kwargs): """Initialize the stacked model.""" SignaturedModel.__init__(self, **kwargs) self.is_ensemble = False self.n_components = n_components self.pca = None def pca_fit(self, X): if not self.n_components: return if self.n_components > X.shape[1]: return self.__log.debug("Doing PCA fit") for shuff in self.sampler(X, y=None): break self.pca = PCA(n_components=self.n_components) self.pca.fit(X[shuff]) def pca_transform(self, X): self.__log.debug("Doing PCA transform") if not self.pca: return X return self.pca.transform(X) def _prepare_fit_stack(self, data, idxs, scramble): self.__log.info("Reading signatures from data") res = self.get_Xy_from_data(data, idxs, scramble) X = res["X"] y = res["y"] molecule = res["molecule"] if self.is_tmp_bases: dest = os.path.join(self.bases_tmp_path, "Stacked") else: dest = os.path.join(self.bases_models_path, "Stacked") return X, y, molecule, dest def _fit_stack(self, data, idxs, scramble): X, y, smiles, dest = self._prepare_fit_stack(data, idxs, scramble) return self._fit(X, y, smiles=smiles, destination_dir=dest) def fit_stack(self, data, idxs, wait, scramble): jobs = [] if self.use_cc: self.__log.info("Reading and fitting altogether") if self.hpc: jobs += [self.func_hpc("_fit_stack", data, idxs, scramble, cpu=self.n_jobs_hpc, job_base_path=self.tmp_path)] else: self._fit_stack(data, idxs, scramble) else: X, y, smiles, dest = self._prepare_fit_stack(data, idxs, scramble) if self.hpc: jobs += [self.func_hpc("_fit", X, y, smiles, dest, cpu=self.n_jobs_hpc, job_base_path=self.tmp_path)] else: self._fit(X, y, smiles=smiles, destination_dir=dest) if wait: self.waiter(jobs) return jobs
[docs] def fit(self, data, idxs=None, is_tmp=False, wait=True, scramble=False): """ Fit the stacked model. Args: data(InputData): Input data. idxs(array): Indices to use for the signatures. If none specified, all are used (default=None). is_tmp(bool): Save in the temporary directory or in the models directory. """ jobs = self.fit_stack(data, idxs=idxs, wait=wait, scramble = scramble) if not wait: return jobs
def _predict_(self, X, dest, dest_uncalib): self.__log.info("saving paths for stacked") if self.is_tmp_bases: dest_ = os.path.join(self.bases_tmp_path, "Stacked") dest_uncalib_ = os.path.join(self.bases_tmp_path, "Stacked-uncalib") else: dest_ = os.path.join(self.bases_models_path, "Stacked") dest_uncalib_ = os.path.join(self.bases_models_path, "Stacked-uncalib") self.mod_dir = dest_ self.mod_uncalib_dir = dest_uncalib_ self._predict(X, dest) self._predict_uncalib(X, dest_uncalib) def _prepare_predict_stack(self, data, idxs): self.__log.info("Reading signatures from data") res = self.get_Xy_from_data(data, idxs) X = res["X"] y = res["y"] # X = self.pca_transform(X) if self.is_tmp_predictions: dest = os.path.join(self.predictions_tmp_path, "Stacked") dest_uncalib = os.path.join(self.predictions_tmp_path, "Stacked_uncalib") else: dest = os.path.join(self.predictions_models_path, "Stacked") dest_uncalib = os.path.join(self.predictions_models_path, "Stacked_uncalib") self.__log.info("Saving metadata in %s-meta" % dest) meta = { "y": self.check_array_from_disk(y), "idxs": idxs, } with open(dest + "-meta", "wb") as f: pickle.dump(meta, f) return X, dest, dest_uncalib def _predict_stack(self, data, idxs): X, dest, dest_uncalib = self._prepare_predict_stack(data, idxs) return self._predict_(X, dest, dest_uncalib) def predict_stack(self, data, idxs, wait): jobs = [] if self.use_cc: if self.hpc: jobs += [self.func_hpc("_predict_stack", data, idxs, cpu=self.n_jobs_hpc, job_base_path=self.tmp_path)] else: self._predict_stack(data, idxs) else: X, dest, dest_uncalib = self._prepare_predict_stack(data, idxs) self.__log.info("Starting predictions") jobs = [] if self.hpc: jobs += [self.func_hpc("_predict_", X, dest, dest_uncalib, cpu=self.n_jobs_hpc, job_base_path=self.tmp_path)] else: self._predict_(X, dest, dest_uncalib) if wait: self.waiter(jobs) return jobs def load_predictions(self, datasets=None): datasets = self.get_datasets(datasets) # y_pred_calibrated = [] # if self.is_tmp_predictions: dest = os.path.join(self.predictions_tmp_path, "Stacked") dest_uncalib = os.path.join(self.predictions_tmp_path, "Stacked_uncalib") else: dest = os.path.join(self.predictions_models_path, "Stacked") dest_uncalib = os.path.join(self.predictions_models_path, "Stacked_uncalib") with open(dest, "rb") as f: y_pred_calibrated = pickle.load(f) with open(dest_uncalib, "rb") as f: y_pred_uncalibrated = pickle.load(f) with open(dest + "-meta", "rb") as f: meta = pickle.load(f) y_true = meta["y"] return Prediction( datasets=datasets, y_true=y_true, y_pred_calibrated=y_pred_calibrated, y_pred_uncalibrated=y_pred_uncalibrated, is_ensemble=self.is_ensemble, weights=None ) def predict(self, data, idxs=None, datasets=None, is_tmp=True, wait=True): # self.is_tmp = is_tmp datasets = self.get_datasets(datasets) # self.signaturize(smiles=data.molecule, datasets=datasets, moleculetype=data.moleculetype) jobs = self.predict_stack(data, idxs=idxs, wait=wait) if wait: return self.load_predictions(datasets) else: return jobs def _explain_(self, X, dest): if self.is_tmp_bases: dest_ = os.path.join(self.bases_tmp_path, "Stacked") else: dest_ = os.path.join(self.bases_models_path, "Stacked") if self.conformity: self.mod_uncalib_dir = dest_ + "-uncalib" else: self.mod_dir = dest_ self.mod_uncalib_dir = dest_ + "-uncalib" self._explain(X, dest) def _prepare_explain_stack(self, data, idxs): self.__log.info("Getting signatures from data") res = self.get_Xy_from_data(data, idxs) X = res["X"] # X = self.pca_transform(X) if self.is_tmp_predictions: dest = os.path.join(self.predictions_tmp_path, "Stacked-expl") else: dest = os.path.join(self.predictions_models_path, "Stacked-expl") return X, dest def _explain_stack(self, data, idxs): X, dest = self._prepare_explain_stack(data, idxs) self._explain_(X, dest) def explain_stack(self, data, idxs, wait): jobs = [] if self.use_cc: if self.hpc: jobs += [self.func_hpc("_explain_stack", data, idxs, cpu=self.n_jobs_hpc, job_base_path=self.tmp_path)] else: self._explain_stack(data, idxs) else: X, dest = self._prepare_explain_stack(data, idxs) if self.hpc: jobs += [self.func_hpc("_explain_", X, dest, cpu=self.n_jobs_hpc, job_base_path=self.tmp_path)] else: self._explain_(X, dest) if wait: self.waiter(jobs) return jobs def load_explanations(self, datasets=None): datasets = self.get_datasets(datasets) y_pred = [] if self.is_tmp_predictions: dest = os.path.join(self.predictions_tmp_path, "Stacked-expl") else: dest = os.path.join(self.predictions_models_path, "Stacked-expl") if not os.path.exists(dest): self.__log.info("Explanations not found") return None with open(dest, "rb") as f: shaps = pickle.load(f) return Explanation( datasets=datasets, shaps=shaps, is_ensemble=self.is_ensemble ) def explain(self, data, idxs=None, datasets=None, is_tmp=True, wait=True): # self.is_tmp = is_tmp datasets = self.get_datasets(datasets) self.signaturize(smiles=data.molecule, datasets=datasets, moleculetype=data.moleculetype) jobs = self.explain_stack(data, idxs=idxs, wait=wait) if wait: return self.load_explanations(datasets) else: return jobs