Source code for chemicalchecker.tool.targetmate.tmsetup

"""Set up TargetMate"""

import os
import shutil
import uuid
import pickle
import numpy as np
import joblib

from sklearn.model_selection import StratifiedKFold, KFold

from chemicalchecker.util import logged
from chemicalchecker.core import ChemicalChecker
from chemicalchecker.util import Config

from .utils import HPCUtils
from .utils import conformal
from .utils.log import set_logging
from .io import read_data, reassemble_activity_sets
from .universes import Universe
from .models.vanillaconfigs import VanillaClassifierConfigs


[docs]@logged class TargetMateSetup(HPCUtils): """Set up the base TargetMate class""" def __init__(self, models_path, tmp_path = None, cc_root = None, is_classic = False, classic_dataset = "A1.001", classic_cctype = "sign0", prestacked_dataset = None, overwrite = True, n_jobs = None, n_jobs_hpc = 8, max_train_samples = 10000, max_train_ensemble = 10, train_sample_chance = 0.95, standardize = False, is_cv = False, is_stratified = True, n_splits = 3, test_size_hyperopt = 0.2, scaffold_split = False, outofuniverse_split = False, outofuniverse_datasets = ["A1.001"], outofuniverse_cctype = "sign1", conformity = True, hpc = False, do_init = True, search_n_iter = 25, train_timeout = 7200, shuffle = False, log = "INFO", use_stacked_signature=False, is_tmp_bases=True, is_tmp_signatures=True, is_tmp_predictions=True, use_cc = True, **kwargs): """Basic setup of the TargetMate. Args: models_path(str): Directory where models will be stored. tmp_path(str): Directory where temporary data will be stored (relevant at predict time) (default=None). cc_root(str): CC root folder (default=None). is_classic(bool): Use a classical chemical fingerprint, instead of CC signatures (default=False). classic_dataset(str): Dataset code for the classic fingerprint. classic_cctype(str): Signature for the classic dataset. prestacked_dataset(str): Prestacked dataset signature. overwrite(bool): Clean models_path directory (default=True). n_jobs(int): Number of CPUs to use, all by default (default=None). n_jobs_hpc(int): Number of CPUs to use in HPC (default=1). max_train_samples(int): Maximum number of training samples to use (default=10000). max_train_ensemble(int): Maximum size of an ensemble (important when many samples are available) (default=10). train_sample_chance(float): Chance of visiting a sample (default=0.95). standardize(bool): Standardize small molecule structures (default=True). is_cv(bool): In hyper-parameter optimization, do cross-validation (default=False). is_stratified(bool): In hyper-parameter optimization, do stratified split (default=True). n_splits(int): If hyper-parameter optimization is done, number of splits (default=3). test_size_hyperopt(int): If hyper-parameter optimization is done, size of the test (default=0.2). scaffold_split(bool): Model should be evaluated with scaffold splits (default=False). outofuniverse_split(bool): Model should be evaluated with out-of-universe splits (default=False). outofuniverse_datasets(list): Datasets to consider as part of the universe in the out-of-universe split. outofuniverse_cctype(str): Signature type of the datasets considered to be part of the out-of-universe split. conformity(bool): Do cross-conformal prediction (default=True) hpc(bool): Use HPC (default=False) search_n_iter(int): Number of iterations in a search for hyperparameters (default=25). train_timeout(int): Maximum time in seconds for training a classifier; applies to autosklearn (default=7200). use_cc(bool): Use pre-computed CC signatures. """ if not do_init: return HPCUtils.__init__(self, **kwargs) # Jobs if not n_jobs: self.n_jobs = self.cpu_count() else: self.n_jobs = n_jobs # Models path self.models_path = os.path.abspath(models_path) # Temporary path if not tmp_path: subpath = self.models_path.rstrip("/").split("/")[-1] self.tmp_path = os.path.join( Config().PATH.CC_TMP, "targetmate", subpath, str(uuid.uuid4())) else: self.tmp_path = os.path.join(os.path.abspath(tmp_path), str(uuid.uuid4())) self.is_tmp_bases = is_tmp_bases self.is_tmp_signatures = is_tmp_signatures self.is_tmp_predictions = is_tmp_predictions if not os.path.exists(self.tmp_path): os.makedirs(self.tmp_path, exist_ok = True) self.bases_tmp_path, self.signatures_tmp_path, self.predictions_tmp_path = self.directory_tree(self.tmp_path) self._bases_tmp_path, self._signatures_tmp_path, self._predictions_tmp_path = self.bases_tmp_path, self.signatures_tmp_path, self.predictions_tmp_path self.arrays_tmp_path = os.path.join(self.tmp_path, "arrays") os.makedirs(self.arrays_tmp_path, exist_ok = True) # Initialize the ChemicalChecker self.cc = ChemicalChecker(cc_root) # Use classical or CC fingerprint self.is_classic = is_classic self.classic_dataset = classic_dataset self.classic_cctype = classic_cctype # Stacked signature self.use_stacked_signature = use_stacked_signature self.prestacked_dataset = prestacked_dataset # Standardize self.standardize = standardize # Do conformal modeling self.conformity = conformity # Topping a classifier with a determined number of samples self.max_train_samples = max_train_samples self.max_train_ensemble = max_train_ensemble self.train_sample_chance = train_sample_chance # Do cross-validation self.overwrite = overwrite self.is_cv = is_cv # Stratified self.is_stratified = is_stratified # Number os splits self.n_splits = n_splits # Test size self.test_size_hyperopt = test_size_hyperopt # Scaffold splits self.scaffold_split = scaffold_split # Out-of-universe splits self.outofuniverse_split = outofuniverse_split if outofuniverse_datasets is None: self.outofuniverse_datasets = ["A1.001"] else: self.outofuniverse_datasets = outofuniverse_datasets self.outofuniverse_cctype = outofuniverse_cctype # Use HPC self.n_jobs_hpc = n_jobs_hpc self.hpc = hpc # Grid iterations self.search_n_iter = search_n_iter # Timeout self.train_timeout = train_timeout # Shuffle self.shuffle = shuffle # Logging self.log = log # set_logging(self.log) # Others self._is_fitted = False self._is_trained = False # self.is_tmp = False # Log path information self.__log.info("MODELS PATH: %s" % self.models_path) self.__log.info("TMP PATH: %s" % self.tmp_path) self.use_cc = use_cc # Directories functions @staticmethod def directory_tree(root): bases_path = os.path.join(root, "bases") if not os.path.exists(bases_path): os.mkdir(bases_path) signatures_path = os.path.join(root, "signatures") if not os.path.exists(signatures_path): os.mkdir(signatures_path) predictions_path = os.path.join(root, "predictions") if not os.path.exists(predictions_path): os.mkdir(predictions_path) return bases_path, signatures_path, predictions_path def create_models_path(self): if not os.path.exists(self.models_path): self.__log.warning( "Specified models directory does not exist: %s", self.models_path) os.makedirs(self.models_path, exist_ok = True) else: if self.overwrite: # Cleaning models directory self.__log.debug("Cleaning %s" % self.models_path) shutil.rmtree(self.models_path, ignore_errors=True) os.makedirs(self.models_path, exist_ok = True) self.bases_models_path, self.signatures_models_path, self.predictions_models_path = self.directory_tree(self.models_path) self._bases_models_path, self._signatures_models_path, self._predictions_models_path = self.bases_models_path, self.signatures_models_path, self.predictions_models_path def reset_path_bases(self): if self.is_tmp_bases: self.bases_tmp_path = self._bases_tmp_path else: self.bases_models_path = self._bases_models_path
[docs] def repath_bases_by_fold(self, fold_number, is_tmp = True, reset=True, only_train = False): """Redefine path of a TargetMate instance. Used by the Validation class.""" if reset: self.reset_path_bases() if not only_train: if is_tmp: self.bases_tmp_path = os.path.join(self.bases_tmp_path, "%02d" % fold_number) if not os.path.exists(self.bases_tmp_path): os.mkdir(self.bases_tmp_path) else: self.bases_models_path = os.path.join(self.bases_models_path, "%02d" % fold_number) if not os.path.exists(self.bases_models_path): os.mkdir(self.bases_models_path)
[docs] def reset_path_predictions(self, is_tmp=True): """Reset predictions path""" if is_tmp: self.predictions_tmp_path = self._predictions_tmp_path else: self.predictions_models_path = self._predictions_models_path
[docs] def repath_predictions_by_fold(self, fold_number, is_tmp=True, reset=True): """Redefine path of a TargetMate instance. Used by the Validation class.""" if reset: self.reset_path_predictions(is_tmp=is_tmp) if is_tmp: self.predictions_tmp_path = os.path.join(self.predictions_tmp_path, "%02d" % fold_number) if not os.path.exists(self.predictions_tmp_path): os.mkdir(self.predictions_tmp_path) else: self.predictions_models_path = os.path.join(self.predictions_models_path, "%02d" % fold_number) if not os.path.exists(self.predictions_models_path): os.mkdir(self.predictions_models_path)
[docs] def repath_predictions_by_set(self, is_train, is_tmp=True, reset=True): """Redefine path of a TargetMate instance. Used by the Validation class.""" if reset: self.reset_path_predictions(is_tmp=is_tmp) if is_train: s = "train" else: s = "test" if is_tmp: self.predictions_tmp_path = os.path.join(self.predictions_tmp_path, s) if not os.path.exists(self.predictions_tmp_path): os.mkdir(self.predictions_tmp_path) else: self.predictions_models_path = os.path.join(self.predictions_models_path, s) if not os.path.exists(self.predictions_models_path): os.mkdir(self.predictions_models_path)
def repath_predictions_by_fold_and_set(self, fold_number, is_train, is_tmp=True, reset=True, only_train = False): if not only_train: self.repath_predictions_by_fold(fold_number=fold_number, is_tmp=is_tmp, reset=reset) self.repath_predictions_by_set(is_train=is_train, is_tmp=is_tmp, reset=False) else: self.repath_predictions_by_set(is_train=is_train, is_tmp=is_tmp, reset=True) # Read input data def read_data(self, data, smiles_idx, inchi_idx, inchikey_idx, activity_idx, srcid_idx, use_inchikey, standardize=None, valid_inchikeys=None): if not standardize: standardize = self.standardize # Read data self.__log.info("Reading data, parsing molecules") return read_data(data, smiles_idx, inchi_idx, inchikey_idx, activity_idx, srcid_idx, standardize, use_inchikey, valid_inchikeys=valid_inchikeys) # Loading functions
[docs] @staticmethod def load(models_path): """Load previously stored TargetMate instance.""" with open(os.path.join(models_path, "/TargetMate.pkl", "r")) as f: return pickle.load(f)
[docs] def load_base_model(self, destination_dir, append_pipe=False): """Load a base model""" mod = joblib.load(destination_dir) if append_pipe: self.pipes += [pickle.load(open(destination_dir+".pipe", "rb"))] return mod
def load_data(self): self.__log.debug("Loading training data (only evidence)") fn = os.path.join(self.models_path, "trained_data.pkl") if not os.path.exists(fn): return with open(fn, "rb") as f: return pickle.load(f) # Saving functions
[docs] def save(self): """Save TargetMate instance""" # we avoid saving signature instances self.sign_predict_fn = None with open(self.models_path + "/TargetMate.pkl", "wb") as f: pickle.dump(self, f)
def save_data(self, data): self.__log.debug("Saving training data (only evidence)") with open(self.models_path + "/trained_data.pkl", "wb") as f: pickle.dump(data, f) # Wipe
[docs] def wipe(self): """Delete temporary data""" self.__log.debug("Removing %s" % self.tmp_path) shutil.rmtree(self.tmp_path, ignore_errors=True) for job_path in self.job_paths: if os.path.exists(job_path): self.__log.debug("Removing %s" % job_path) shutil.rmtree(job_path, ignore_errors=True)
[docs] def compress_models(self): """Store model in compressed format for persistance""" mod_dir = self.bases_models_path for m in os.listdir(mod_dir): fn = os.path.join(mod_dir, m) mod = joblib.load(fn) joblib.dump(mod, fn + ".z") os.remove(fn)
[docs]@logged class TargetMateClassifierSetup(TargetMateSetup): """Set up a TargetMate classifier. It can sample negatives from a universe of molecules (e.g. ChEMBL)""" def __init__(self, algo=None, model_config="autosklearn", weight_algo="naive_bayes", ccp_folds=10, min_class_size=10, min_class_size_active=None, # Added by Paula: different number of active/inactive minimum value min_class_size_inactive=None, # Added by Paula: different number of active/inactive minimum value active_value=1, inactive_value=None, inactives_per_active=100, metric="bacc", universe_path=None, naive_sampling=False, biased_universe=0, maximum_potential_actives = 5, universe_random_state = None, **kwargs): """Set up a TargetMate classifier Args: algo(str): Base algorithm to use (see /model configuration files) (default=random_forest). model_config(str): Model configurations for the base classifier (default=vanilla). weight_algo(str): Model used to weigh the contribution of an individual classifier. Should be fast. For the moment, only vanilla classifiers are accepted (default=naive_bayes). ccp_folds(int): Number of cross-conformal prediction folds. The default generator used is Stratified K-Folds (default=10). min_class_size(int): Minimum class size acceptable to train the classifier (default=10). min_class_size_active(int): Minimum active class size acceptable to train the classifier, if not stated, uses min_class_size (default=None). min_class_size_inactive(int): Minimum inactive class size acceptable to train the classifier, if not stated, uses min_class_size (default=None). active_value(int): When reading data, the activity value considered to be active (default=1). inactive_value(int): When reading data, the activity value considered to be inactive. If none specified, then any value different that active_value is considered to be inactive (default=None). inactives_per_active(int): Number of inactive to sample for each active. If None, only experimental actives and inactives are considered (default=100). metric(str): Metric to use to select the pipeline (default="auroc"). universe_path(str): Path to the universe. If not specified, the default one is used (default=None). naive_sampling(bool): Sample naively (randomly), without using the OneClassSVM (default=False). biased_universe(float): Proportion of closer molecules to sample as putative inactives (default = 0). """ # Inherit from TargetMateSetup TargetMateSetup.__init__(self, **kwargs) # Metric to use self.metric = metric # Cross-conformal folds self.ccp_folds = ccp_folds # Determine number of jobs if self.hpc: n_jobs = self.n_jobs_hpc else: n_jobs = self.n_jobs # Set the base classifier self.algo = algo self.model_config = model_config if self.model_config == "vanilla": self.algo = VanillaClassifierConfigs(self.algo, n_jobs=n_jobs) if self.model_config == "grid": from .models.gridconfigs import GridClassifierConfigs self.algo = GridClassifierConfigs(self.algo, n_jobs=n_jobs, n_iter=self.search_n_iter) if self.model_config == "hyperopt": from .models.hyperoptconfigs import HyperoptClassifierConfigs self.algo = HyperoptClassifierConfigs(self.algo, metric=self.metric, n_jobs=n_jobs, n_iter=self.search_n_iter, timeout=self.train_timeout, is_cv=self.is_cv, is_stratified=self.is_stratified, n_splits=self.n_splits, test_size=self.test_size_hyperopt, scaffold_split=self.scaffold_split) if self.model_config == "tpot": from .models.tpotconfigs import TPOTClassifierConfigs self.algo = TPOTClassifierConfigs(self.algo, n_jobs=n_jobs) if self.model_config == "autosklearn": from .models.autosklearnconfigs import AutoSklearnClassifierConfigs self.algo = AutoSklearnClassifierConfigs(n_jobs=n_jobs, tmp_path=self.tmp_path, train_timeout=self.train_timeout, log=self.log) # Weight algo self.weight_algo = VanillaClassifierConfigs(weight_algo, n_jobs=self.n_jobs) # TO-DO: This is run locally for now. # Minimum size of the minority class if min_class_size_active is None and min_class_size_inactive is None: # Added by Paula: change number of actives/inactives per model self.min_class_size_active = min_class_size self.min_class_size_inactive = min_class_size elif min_class_size_active is not None and min_class_size_inactive is None: self.min_class_size_active = min_class_size_active self.min_class_size_inactive = min_class_size elif min_class_size_active is None and min_class_size_inactive is not None: self.min_class_size_active = min_class_size self.min_class_size_inactive = min_class_size_inactive else: self.min_class_size_active = min_class_size_active self.min_class_size_inactive = min_class_size_inactive # Active value self.active_value = active_value # Inactive value self.inactive_value = inactive_value # Inactives per active self.inactives_per_active = inactives_per_active # Load universe self.universe = Universe.load_universe(universe_path) # naive_sampling self.naive_sampling = naive_sampling # Others self.cross_conformal_func = conformal.get_cross_conformal_classifier self.biased_universe = biased_universe self.universe_random_state = universe_random_state self.maximum_potential_actives = maximum_potential_actives def _reassemble_activity_sets(self, act, inact, putinact, inchi=False): self.__log.info("Reassembling activities. Convention: 1 = Active, -1 = Inactive, 0 = Sampled") return reassemble_activity_sets(act, inact, putinact, inchi=inchi) def prepare_data(self, data, smiles_idx, inchi_idx, inchikey_idx, activity_idx, srcid_idx, use_inchikey): # Read data if self.use_cc: s = self.cc.signature(self.classic_dataset, self.classic_cctype) valid_inchikeys = s.keys else: valid_inchikeys = None data = self.read_data(data, smiles_idx=smiles_idx, inchi_idx=inchi_idx, inchikey_idx=inchikey_idx, activity_idx=activity_idx, srcid_idx=srcid_idx, use_inchikey=use_inchikey, valid_inchikeys=valid_inchikeys) self.ny = np.sum(data.activity == 1) if self.ny < self.min_class_size_active or (len(data.activity) - self.ny) < self.min_class_size_inactive: # Added by Paula: different number of actives or inactivess self.__log.warning("Not enough data (%d)" % self.ny) return None # Create file structure self.create_models_path() # Save training data self.save_data(data) # Sample inactives, if necessary actives = set() inactives = set() for d in data: if d.activity == self.active_value: actives.update([(d.molecule, d.idx, d.inchikey)]) else: if not self.inactive_value: inactives.update([(d.molecule, d.idx, d.inchikey)]) else: if d.activity == self.inactive_value: inactives.update([(d.molecule, d.idx, d.inchikey)]) act, inact, putinact, self.putative_idx = self.universe.predict(actives, inactives, inactives_per_active=self.inactives_per_active, min_actives=self.min_class_size_active, # Added by Paula: change to specifically active class naive=self.naive_sampling, biased_universe=self.biased_universe, maximum_potential_actives = self.maximum_potential_actives, random_state= self.universe_random_state) # Added by Paula: sample proportion of universe closer to actives self.__log.info("Actives %d / Known inactives %d / Putative inactives %d" % (len(act), len(inact), len(putinact))) print("Actives %d / Known inactives %d / Putative inactives %d" % (len(act), len(inact), len(putinact))) self.__log.debug("Assembling") inchi = (smiles_idx is None) and (inchi_idx is not None) data = self._reassemble_activity_sets(act, inact, putinact, inchi) if self.shuffle: self.__log.debug("Shuffling") data.shuffle() return data def prepare_for_ml(self, data, predict=False): if data is None: return None """Prepare data for ML, i.e. convert to 1/0 and check that there are enough samples for training""" self.__log.debug("Prepare for machine learning (converting to 1/0") # Consider putative inactives as inactives (e.g. set -1 to 0) self.__log.debug("Considering putative inactives as inactives for training") data.activity[data.activity <= 0] = 0 # Check that there are enough molecules for training. self.ny = np.sum(data.activity) if self.ny < self.min_class_size_active or (len(data.activity) - self.ny) < self.min_class_size_inactive: # Added by Paula: seperate minimums for actives and inactives self.__log.warning( "Not enough valid molecules in the minority class..." + "Just keeping training data") self._is_fitted = True #self.save() return None self.__log.info("Actives %d / Merged inactives %d" % (self.ny, len(data.activity) - self.ny)) return data
[docs]@logged class TargetMateRegressorSetup(TargetMateSetup): """Set up a TargetMate regressor""" pass
[docs]class ModelSetup(TargetMateClassifierSetup, TargetMateRegressorSetup): def __init__(self, is_classifier, **kwargs): if is_classifier: TargetMateClassifierSetup.__init__(self, **kwargs) else: TargetMateRegressorSetup.__init__(self, **kwargs) self.is_classifier = is_classifier def prepare_data(self, data, smiles_idx, inchi_idx, inchikey_idx, activity_idx, srcid_idx, use_inchikey): if self.is_classifier: return TargetMateClassifierSetup.prepare_data(self, data, smiles_idx, inchi_idx, inchikey_idx, activity_idx, srcid_idx, use_inchikey) else: return TargetMateRegressorSetup.prepare_data(self, data, smiles_idx, inchi_idx, inchikey_idx, activity_idx, srcid_idx, use_inchikey) def prepare_for_ml(self, data, predict=False): if self.is_classifier: return TargetMateClassifierSetup.prepare_for_ml(self, data, predict=predict) else: return TargetMateRegressorSetup.prepare_for_ml(self, data)