import os
import h5py
import numpy as np
from chemicalchecker.core.signature_data import DataSignature
from chemicalchecker.util import logged
from signaturizer import Signaturizer as SignaturizerExternal
from .tmsetup import TargetMateSetup
from .utils import HPCUtils
from .utils import chemistry
MAXQUEUE = 15
[docs]@logged
class BaseSignaturizer(TargetMateSetup, HPCUtils):
def __init__(self, master_sign_paths=None, cctype="sign3", **kwargs):
"""Initialize base signaturizer
Args:
master_sign_paths(dict): Path to signature files that are not specific to the collection being analysed (default=None).
cctype(str): CC signature type to be used (sign0, sign1, sign2, sign3) (default='sign3').
"""
TargetMateSetup.__init__(self, **kwargs)
if self.is_classic:
HPCUtils.__init__(self, **kwargs)
self.master_sign_paths = master_sign_paths
# if cctype != "sign3" and cctype != "sign4":
# raise Exception("cctype can only be 'sign3' or 'sign4'")
self.cctype = cctype
[docs] def master_key_type(self):
"""Check master key types"""
from chemicalchecker.util.keytype.detect import KeyTypeDetector
if self.master_sign_paths is None:
raise Exception("master_sign_paths is None")
ktypes = set()
for ds, path in self.master_sign_paths.items():
with h5py.File(self.master_sign_paths[ds], "r") as hf:
keys = hf["keys"][:]
kd = KeyTypeDetector(keys)
ktype = kd.detect()
ktypes.update([ktype])
if len(ktypes) > 1:
raise Exception("More than one key type detected")
return ktype
def get_datasets(self, datasets=None):
if self.is_classic:
return self.datasets
else:
if datasets is None:
return self.datasets
else:
return sorted(set(self.datasets).intersection(datasets))
def get_destination_dir(self, dataset, is_tmp=None):
if is_tmp:
return os.path.join(self.signatures_tmp_path, dataset)
else:
return os.path.join(self.signatures_models_path, dataset)
def get_master_idxs(self, dataset):
master_idxs = {}
master_dest = self.master_sign_paths[dataset]
with h5py.File(master_dest, "r") as hf:
keys = hf["keys"][:]
for i, key in enumerate(keys):
master_idxs[key] = i
return master_idxs
def master_mapping(self, keys, master_idxs):
idxs0 = []
idxs1 = []
for i, key in enumerate(keys):
if key not in master_idxs: continue
idxs0 += [i]
idxs1 += [master_idxs[key]]
idxs0 = np.array(idxs0).astype(np.int)
idxs1 = np.array(idxs1).astype(np.int)
return idxs0, idxs1
# Signature readers
def _read_signatures_by_inchikey_from_cc(self, dataset, inchikeys):
"""Read signatures from CC. InChIKeys are used."""
iks_dict = dict((k,i) for i,k in enumerate(inchikeys))
if dataset is None:
dataset = self.dataset
self.__log.info("Reading signature of type %s" % self.cctype)
sign = self.cc.signature(dataset, self.cctype)
self.__log.info("...data path: %s" % sign.data_path)
keys, V = sign.get_vectors_lite(inchikeys)
self.__log.info("Signature read")
idxs = np.array([iks_dict[k] for k in keys if k in iks_dict]).astype(np.int)
return V, idxs
def _get_predicted_signatures(self, dataset, inchikeys): # Added by Paula
iks_dict = dict((k,i) for i,k in enumerate(inchikeys))
if dataset is None:
dataset = self.dataset
self.__log.info("Reading signature of type %s" % self.cctype)
sign = self.cc.signature(dataset, self.cctype)
self.__log.info("...data path: %s" % sign.data_path)
keys, V = sign.get_vectors_lite(inchikeys)
self.__log.info("Signature read")
idxs = np.array([iks_dict[k] for k in keys if k in iks_dict]).astype(np.int)
return V, idxs
def _read_signatures_from_master(self, dataset, keys):
if dataset is None:
dataset = self.dataset
master_idxs = self.get_master_idxs(dataset)
idxs_or, idxs_mp = self.master_mapping(keys, master_idxs)
destination_dir = self.master_sign_paths[dataset]
with h5py.File(destination_dir, "r") as hf:
V = hf["V"][:][idxs_mp]
idxs = idxs_or
return V, idxs
def _read_signatures_by_inchikey_from_master(self, dataset, inchikeys):
"""Read signatures from a master signature file. InChIKeys are used"""
return self._read_signatures_from_master(dataset, inchikeys)
def _read_signatures_by_smiles_from_master(self, dataset, smiles):
"""Read signatures from a master signature file. SMILES are used"""
return self._read_signatures_from_master(dataset, smiles)
def _read_signatures_by_idxs_from_local(self, dataset, smiles, idxs, inchikeys, sign_folder, is_tmp):
"""Read a signature from an HDF5 file. This must be specific to the collection being analyzed."""
if not sign_folder:
destination_dir = self.get_destination_dir(dataset, is_tmp=is_tmp)
else:
destination_dir = os.path.join(sign_folder, dataset)
if self.use_cc or self.is_classic:
name = "V"
else:
name = "signature"
with h5py.File(destination_dir, "r") as hf:
if idxs is None:
V = hf[name][:]
idxs = np.array([i for i in range(V.shape[0])]).astype(np.int)
else:
V = hf[name][:][idxs]
if name == "signature":
failed = hf["failed"][:][idxs]
V = V[~failed]
idxs = np.array([i for i, f in enumerate(~failed) if f]).astype(np.int)
else:
idxs = np.array([i for i in range(V.shape[0])]).astype(np.int)
return V, idxs
def read_signatures(self, dataset, smiles, inchikeys, sign_folder, is_tmp, idxs=None):
if self.use_cc:
self.__log.info("Reading signatures from the Chemical Checker (inchikeys are used)")
if inchikeys is None:
raise Exception("inchikeys is None, cannot use_cc")
return self._read_signatures_by_inchikey_from_cc(dataset=dataset, inchikeys=inchikeys)
else:
if self.master_sign_paths is None:
self.__log.info("Reading signatures from a task-specific file")
return self._read_signatures_by_idxs_from_local(dataset=dataset, smiles= smiles, inchikeys=inchikeys, idxs=idxs, sign_folder=sign_folder, is_tmp=is_tmp)
else:
self.__log.info("Reading signatures from a master signatures file")
key_type = self.master_key_type()
if key_type == "inchikey":
if inchikeys is None:
raise Exception("inchikeys is None, cannot use master signatures")
return self._read_signatures_by_inchikey_from_master(dataset=dataset, inchikeys=inchikeys)
elif key_type == "smiles":
if smiles is None:
raise Exception("smiles is None, cannot use master signatures")
return self._read_signatures_by_smiles_from_master(dataset=dataset, smiles=smiles)
[docs]@logged
class Fingerprinter(BaseSignaturizer):
"""Set up a Fingerprinter. This is usually used as a baseline featurizer to compare with CC signatures."""
def __init__(self, **kwargs):
# Inherit
BaseSignaturizer.__init__(self, **kwargs)
# Featurizer
self.featurizer_func = chemistry.morgan_matrix
if not self.use_cc:
self.datasets = ["FP.000"]
self.dataset = self.datasets[0]
else:
self.datasets = [self.classic_dataset]
self.dataset = self.datasets[0]
self.cctype = self.classic_cctype
def featurizer(self, smiles, destination_dir):
V = self.featurizer_func(smiles)
with h5py.File(destination_dir, "w") as hf:
hf.create_dataset("V", data = V.astype(np.int8))
hf.create_dataset("keys", data = np.array(smiles, DataSignature.string_dtype()))
def _signaturize_fingerprinter(self, smiles, is_tmp=None, wait=True, **kwargs):
"""Calculate fingerprints"""
if self.use_cc:
self.__log.info("use_cc was set to True, i.e. signatures are already calculated!")
return []
if self.master_sign_paths is not None:
self.__log.info("Master signature paths exists")
return []
destination_dir = self.get_destination_dir(dataset = self.dataset, is_tmp = is_tmp)
jobs = []
if os.path.exists(destination_dir):
self.__log.debug("Fingerprint file already exists: %s" % destination_dir)
else:
self.__log.debug("Calculating fingerprint")
if not self.hpc:
self.featurizer(smiles, destination_dir)
else:
job = self.func_hpc("featurizer",
smiles,
destination_dir,
cpu = 4,
wait = False)
jobs += [job]
if wait:
self.waiter(jobs)
return jobs
[docs] def read_signatures(self, idxs, smiles, inchikeys, is_tmp, sign_folder):
"""Read signatures"""
return self._read_signatures_by_idxs_from_local(dataset=self.dataset, smiles = smiles, idxs=idxs, inchikeys = inchikeys, is_tmp=is_tmp, sign_folder=sign_folder)
def signaturize(self, **kwargs):
return self._signaturize_fingerprinter(**kwargs)
[docs]@logged
class Signaturizer(BaseSignaturizer):
"""Set up a Signaturizer"""
def __init__(self,
datasets=None,
sign_predict_paths=None,
**kwargs):
"""Set up a Signaturizer
Args:
datasets(list): CC datasets (A1.001-E5.999).
By default, all datasets having a SMILES-to-sign predictor are
used.
sign_predict_paths(dict): pre-loaded predict_fn, keys are dataset
codes, values are tuples of (sign, predict_fn)
"""
# Inherit
BaseSignaturizer.__init__(self, **kwargs)
self.sign_dim = 128
# Datasets
if not datasets:
self.datasets = []
for ds in self.cc.datasets_exemplary():
self.datasets += [ds]
else:
self.datasets = datasets
if sorted(self.datasets) != list(self.datasets):
raise Exception("Datasets must be sorted!")
# preloaded neural networks
if not sign_predict_paths:
self.sign_predict_paths = {}
for ds in self.datasets:
self.__log.debug("Loading signature predictor for %s" % ds)
s3 = self.cc.get_signature(self.cctype, "full", ds)
self.sign_predict_paths[ds] = s3
else:
self.sign_predict_paths = sign_predict_paths
def _dataseter(self, datasets):
if not datasets: datasets = self.datasets
if type(datasets) == str: datasets = [datasets]
if sorted(datasets) != list(datasets):
raise Exception("Datasets not sorted")
return datasets
def _check_prestack_friendly(self, datasets):
if self.prestacked_dataset is None:
prestacked_friendly = False
prestacked_mask = None
return prestacked_friendly, prestacked_mask
datasets = self._dataseter(datasets)
s3 = self.cc.signature(self.prestacked_dataset, "sign3")
with h5py.File(s3.data_path, "r") as hf:
prestacked_datasets = list(hf["datasets"][:])
# Check datasets of pre-stacked signature
if len(set(datasets).difference(prestacked_datasets)) == 0:
prestacked_friendly = True
if list(datasets) == list(prestacked_datasets):
prestacked_mask = None
else:
datasets_set = set(datasets)
prestacked_mask = []
for ds in prestacked_datasets:
if ds in datasets_set:
prestacked_mask += [True]*self.sign_dim
else:
prestacked_mask += [False]*self.sign_dim
prestacked_mask = np.array(prestacked_mask)
else:
prestacked_friendly = False
prestacked_mask = None
return prestacked_friendly, prestacked_mask
# Calculate signatures
def _predict_from_molecule(self, dataset, smiles, destination_dir, moleculetype):
s3 = SignaturizerExternal(dataset.split(".")[0])
s3.predict(smiles.tolist(), destination_dir, keytype=moleculetype)
def _signaturize_signaturizer(self, smiles, datasets=None, is_tmp=None, wait=True, moleculetype = 'SMILES',
**kwargs):
if self.use_cc:
self.__log.info("use_cc was set to True, i.e. signatures are already calculated!")
return []
self.__log.info("Calculating sign for every molecule.")
datasets = self.get_datasets(datasets)
jobs = []
for dataset in datasets:
destination_dir = self.get_destination_dir(dataset, is_tmp)
if os.path.exists(destination_dir):
self.__log.debug("Signature %s file already exists: %s" % (dataset, destination_dir))
continue
else:
self.__log.debug("Calculating sign for %s" % dataset)
if not self.hpc:
self._predict_from_molecule(dataset, smiles, destination_dir, moleculetype)
else:
job = self.func_hpc("_predict_from_molecule", dataset, smiles,
destination_dir, moleculetype,
cpu=np.max([self.n_jobs_hpc, 8]),
memory=16,
wait=False,
job_base_path = self.tmp_path,
delete_job_path=True)
jobs += [job]
if len(jobs) > MAXQUEUE:
self.waiter(jobs)
jobs = []
if wait:
self.waiter(jobs)
return jobs
def signaturize(self, **kwargs):
return self._signaturize_signaturizer(**kwargs)
# Read signatures
[docs] def read_signatures_ensemble(self, datasets, smiles, inchikeys, idxs, is_tmp, sign_folder):
"""Return signatures as an ensemble"""
datasets = self._dataseter(datasets)
for ds in datasets:
yield BaseSignaturizer.read_signatures(self, dataset=ds, smiles=smiles, inchikeys=inchikeys, idxs=idxs, is_tmp=is_tmp, sign_folder=sign_folder)
[docs] def read_signatures_stacked(self, datasets, smiles, inchikeys, idxs, is_tmp, sign_folder):
"""Return signatures in a stacked form"""
datasets = self._dataseter(datasets)
V = []
idxs__ = None
for ds in datasets:
v, idxs_ = BaseSignaturizer.read_signatures(self, dataset=ds, smiles=smiles, inchikeys=inchikeys, idxs=idxs, is_tmp=is_tmp, sign_folder=sign_folder)
V += [v]
if idxs__ is None:
idxs__ = idxs_
if np.any(idxs__ != idxs_):
raise Exception("When stacking signatures exactly the same keys need to be available for all molecules")
return np.hstack(V), idxs_
[docs] def read_signatures_prestacked(self, mask, datasets, smiles, inchikeys, idxs, is_tmp, sign_folder):
"""Return signatures in a stacked form from an already prestacked file"""
datasets = self._dataseter(datasets)
V, idxs = BaseSignaturizer.read_signatures(self, dataset=self.prestacked_dataset, smiles=smiles, inchikeys=inchikeys, idxs=idxs, is_tmp=is_tmp, sign_folder=sign_folder)
if mask is None:
return V, idxs
else:
return V[:,mask], idxs
def read_signatures(self, is_ensemble, datasets, idxs, smiles, inchikeys, is_tmp, sign_folder=None): # Changed sign folder to None
if is_ensemble:
return self.read_signatures_ensemble(datasets=datasets, idxs=idxs, smiles=smiles, inchikeys=inchikeys, is_tmp=is_tmp, sign_folder=sign_folder)
else:
prestack_friendly, prestack_mask = self._check_prestack_friendly(datasets)
if prestack_friendly:
return self.read_signatures_prestacked(mask=prestack_mask, datasets=self.prestacked_dataset, idxs=idxs, smiles=smiles, inchikeys=inchikeys, is_tmp=is_tmp, sign_folder=sign_folder)
else:
return self.read_signatures_stacked(datasets=datasets, idxs=idxs, smiles=smiles, inchikeys=inchikeys, is_tmp=is_tmp, sign_folder=sign_folder)
[docs]class SignaturizerSetup(Signaturizer, Fingerprinter):
"""Set up a signaturizer"""
def __init__(self, **kwargs):
if self.is_classic and not self.use_cc:
Fingerprinter.__init__(self, **kwargs)
else:
Signaturizer.__init__(self, **kwargs)
def signaturize(self, smiles, **kwargs):
if self.is_classic and not self.use_cc:
return Fingerprinter.signaturize(self, smiles=smiles, **kwargs)
else:
if self.use_stacked_signature:
return Signaturizer.stacker(self, smiles=smiles, **kwargs)
else:
return Signaturizer.signaturize(self, smiles=smiles, **kwargs)
def _read_signatures_(self, datasets, idxs, smiles, inchikeys, is_tmp, sign_folder):
if self.is_classic and not self.use_cc:
return Fingerprinter.read_signatures(self, idxs=idxs, smiles=smiles, inchikeys=inchikeys, is_tmp=is_tmp, sign_folder=sign_folder)
else:
return Signaturizer.read_signatures(self, is_ensemble=self.is_ensemble, datasets=datasets, idxs=idxs, smiles=smiles, inchikeys=inchikeys, is_tmp=is_tmp, sign_folder=sign_folder)
[docs] def read_signatures(self, datasets=None, idxs=None, smiles=None, inchikeys=None, is_tmp=None, sign_folder=None):
return self._read_signatures_(datasets=datasets, idxs=idxs, smiles=smiles, inchikeys=inchikeys, is_tmp=is_tmp, sign_folder=sign_folder)