Source code for chemicalchecker.core.sign1

"""Signature type 1.

Signatures type 1 are basically processed signatures.

This imply mild compression (usually latent) of the signatures,
with a dimensionality that typically retains 90% of the original variance.
They keep most of the complexity of the original data and they can be used for
similarity calculations.

The typical preprocessing is a PCA (continuous data) or TF-IDF LSI.
"""
import os
import h5py
import uuid
import shutil
import pickle
import numpy as np
from tqdm import tqdm
from numpy import linalg as LA
from scipy.signal import savgol_filter
from scipy.spatial.distance import cosine
from sklearn.metrics import accuracy_score

from .chemcheck import ChemicalChecker
from chemicalchecker.util import Config
from .signature_data import DataSignature
from .signature_base import BaseSignature

from chemicalchecker.util import logged
from chemicalchecker.util.transform.pca import Pca
from chemicalchecker.util.transform.scale import Scale


DEFAULT_T = 0.01


[docs]@logged class sign1(BaseSignature, DataSignature): """Signature type 1 class.""" def __init__(self, signature_path, dataset, **kwargs): """Initialize a Signature. Args: signature_path(str): the path to the signature directory. model_path(str): Where the persistent model is. """ # Calling init on the base class to trigger file existance checks BaseSignature.__init__(self, signature_path, dataset, **kwargs) self.data_path = os.path.join(self.signature_path, "sign1.h5") DataSignature.__init__(self, self.data_path)
[docs] def copy_sign0_to_sign1(self, s0, s1, just_data=False): """Copy from sign0 to sign1""" is_basesig = False if isinstance(s0, BaseSignature): if isinstance(s1, BaseSignature): is_basesig = True if is_basesig: if s0.molset != s1.molset: raise Exception( "Copying from signature 0 to 1 is only allowed for " "same molsets (reference or full)") self.__log.debug("Copying HDF5 dataset") shutil.copyfile(s0.data_path, s1.data_path) with h5py.File(s1.data_path, "a") as hf: if 'name' in hf.keys(): del hf['name'] hf.create_dataset("name", data=np.array( [str(self.dataset) + "sig"], DataSignature.string_dtype())) if not just_data: fn0 = os.path.join(s0.model_path, "triplets.h5") if os.path.exists(fn0): self.__log.debug("Copying triplets") fn1 = os.path.join(s1.model_path, "triplets.h5") shutil.copyfile(fn0, fn1)
def duplicate(self, s1): self.__log.debug("Duplicating V matrix to V_tmp") with h5py.File(s1.data_path, "a") as hf: if "V_tmp" in hf.keys(): self.__log.debug("Deleting V_tmp") del hf["V_tmp"] hf.create_dataset("V_tmp", hf["V"].shape, dtype=hf["V"].dtype) for i in range(0, hf["V"].shape[0]): hf["V_tmp"][i] = hf["V"][i][:]
[docs] def was_sparse(self, max_keys=1000, zero_prop=0.5): """Guess if the matrix was sparse""" vals = self.subsample(max_keys)[0].ravel() if np.sum(vals == 0) / len(vals) > zero_prop: self.__log.debug("Matrix was probably sparse") return True else: self.__log.debug("Matrix was probably not sparse") return False
def pipeline_file(self): fn = os.path.join(self.get_molset( "reference").model_path, "pipeline.pkl") return fn def load_model(self, name): fn = os.path.join(self.get_molset( "reference").model_path, "%s.pkl" % name) with open(fn, "rb") as f: mod = pickle.load(f) self.__log.debug("\n----> Loading model:" + fn) return mod def delete_tmp(self, s1): self.__log.debug("Deleting V_tmp") with h5py.File(s1.data_path, "r+") as hf: if "V_tmp" in hf.keys(): del hf["V_tmp"]
[docs] def fit(self, sign0=None, latent=True, scale=True, metric_learning=False, semisupervised=False, scale_kwargs={}, pca_kwargs={}, lsi_kwargs={}, **kwargs): """Fit signature 1 given signature 0 Args: sign0: A signature 0. """ try: from chemicalchecker.util.transform.metric_learn import \ UnsupervisedMetricLearn, SemiSupervisedMetricLearn except ImportError: raise ImportError("requires tensorflow https://tensorflow.org") try: from chemicalchecker.util.transform.lsi import Lsi except ImportError as ex: raise ex BaseSignature.fit(self, **kwargs) self.clear() # signature specific checks if sign0 is None: sign0 = self.get_sign('sign0').get_molset("full") if sign0.cctype != "sign0": raise Exception("A signature type 0 is expected..!") if sign0.molset != "full": raise Exception( "Fit should be done with the full signature 0 " "(even if inside reference is used)") # preparing signatures self.update_status("Getting data") s0_ref = sign0.get_molset("reference") s1_ref = self.get_molset("reference") s1_ref.clear() self.__log.debug("Placing sign0 to sign1 (done for reference)") self.copy_sign0_to_sign1(s0_ref, s1_ref) self.__log.debug("Placing sign0 to sign1 (done for full)") self.copy_sign0_to_sign1(sign0, self) self.__log.debug("Duplicating signature (tmp) (done for reference)") self.duplicate(s1_ref) self.__log.debug("Duplicating signature (tmp) (done for full)") self.duplicate(self) self.__log.debug("Checking if matrix was sparse or not") sparse = s1_ref.was_sparse() tmp = False if metric_learning: tmp = True if sparse: self.__log.debug("Sparse matrix pipeline") if latent: self.update_status("LSI") self.__log.debug("Looking for latent variables with" "TFIDF-LSI (done for tmp)") mod = Lsi(self, tmp=tmp, **lsi_kwargs) mod.fit() else: self.__log.debug("Not looking for latent variables") else: self.__log.debug("Dense matrix pipeline") if scale: self.update_status("Scaling") mod = Scale(self, tmp=False, **scale_kwargs) mod.fit() else: self.__log.debug("Not scaling") if latent: self.update_status("PCA") mod = Pca(self, **pca_kwargs) mod.fit() else: self.__log.debug("Not looking for latent variables") if metric_learning: self.update_status("Metric Learning") if semisupervised: self.__log.debug("Semi-supervised metric learning") mod = SemiSupervisedMetricLearn(self, tmp=False) mod.fit() else: self.__log.debug("Unsupervised metric learning") self.__log.debug("First doing neighbors") mod = UnsupervisedMetricLearn(self, tmp=False) mod.fit() # save pipeline pipeline = { "sparse": sparse, "latent": latent, "scale": scale, "metric_learning": metric_learning, "semisupervised": semisupervised } self.delete_tmp(self) self.delete_tmp(s1_ref) fn = self.pipeline_file() with open(fn, "wb") as f: pickle.dump(pipeline, f) with h5py.File(s1_ref.data_path, "a") as hf: hf.create_dataset("metric", data=np.array( ["cosine"], DataSignature.string_dtype())) # finalize signature BaseSignature.fit_end(self, **kwargs)
[docs] def predict(self, sign0, destination): """Use the learned model to predict the signature. Args: sign1(signature): A valid Signature type 1 destination(None|path|signature): If None the prediction results are returned as dictionary, if str then is used as path for H5 data, if empty Signature type 2 its data_path is used as destination. """ if not isinstance(destination, BaseSignature): """ tag = str(uuid.uuid4()) tmp_path = os.path.join(self.model_path, tag) cc = ChemicalChecker(tmp_path) s1 = cc.get_signature(self.cctype, self.molset, self.cctype) destination = s1 """ raise NotImplementedError( "'destination' must be a valid signature object.") if not os.path.isfile(sign0.data_path): raise Exception("The file " + sign0.data_path + " does not exist") self.copy_sign0_to_sign1(sign0, destination, just_data=True) self.__log.debug("Reading pipeline") fn = self.pipeline_file() with open(fn, "rb") as f: pipeline = pickle.load(f) self.__log.debug("Starting pipeline") if not pipeline["sparse"] and pipeline["scale"]: self.__log.debug("Scaling") mod = self.load_model("scale") mod.model_path = self.model_path # using the config path for the CC_TMP and not the generated model one mod.tmp_path = Config().PATH.CC_TMP mod.predict(destination) self.__log.debug("Transformation") if pipeline["metric_learning"]: if pipeline["semisupervised"]: mod = self.load_model("semiml") else: mod = self.load_model("unsupml") else: if pipeline["latent"]: if pipeline["sparse"]: mod = self.load_model("lsi") else: mod = self.load_model("pca") else: mod = None if mod is not None: # avoid taking the info from pickle in case it is copied mod.model_path = self.model_path # using the config path for the CC_TMP and not the generated model one mod.tmp_path = Config().PATH.CC_TMP mod.predict(destination) destination.refresh() self.__log.debug("Prediction done!")
[docs] def neighbors(self, tmp, metric="cosine", k_neig=1000, cpu=4): """Neighbors""" try: import faiss except ImportError: raise ImportError( "requires faiss https://github.com/facebookresearch/faiss") s1 = self.get_molset("reference") if metric not in ["cosine", "euclidean"]: raise Exception("Metric must be 'cosine' or 'euclidean'") faiss.omp_set_num_threads(cpu) if tmp: V_name = "V_tmp" else: V_name = "V" data_path = os.path.join(s1.model_path, "neig.h5") self.__log.debug( "Calculating nearest neighbors. Saving in: %s" % data_path) with h5py.File(s1.data_path, 'r') as dh5, \ h5py.File(data_path, 'w') as dh5out: datasize = dh5[V_name].shape # data_type = dh5[V_name].dtype self.__log.debug("...data size is (%d, %d)" % (datasize[0], datasize[1])) k = min(datasize[0], k_neig) dh5out.create_dataset("row_keys", data=dh5["keys"][:]) dh5out["col_keys"] = h5py.SoftLink('/row_keys') dh5out.create_dataset( "indices", (datasize[0], k), dtype=np.int32) dh5out.create_dataset( "distances", (datasize[0], k), dtype=np.float32) dh5out.create_dataset("shape", data=(datasize[0], k)) dh5out.create_dataset( "metric", data=[metric.encode(encoding='UTF-8', errors='strict')]) if metric == "euclidean": index = faiss.IndexFlatL2(datasize[1]) else: index = faiss.IndexFlatIP(datasize[1]) for chunk in s1.chunker(): data_temp = np.array(dh5[V_name][chunk], dtype=np.float32) if metric == "cosine": normst = LA.norm(data_temp, axis=1) index.add(data_temp / normst[:, None]) else: index.add(data_temp) for chunk in s1.chunker(): data_temp = np.array(dh5[V_name][chunk], dtype=np.float32) if metric == "cosine": normst = LA.norm(data_temp, axis=1) Dt, It = index.search(data_temp / normst[:, None], k) else: Dt, It = index.search(data_temp, k) dh5out["indices"][chunk] = It if metric == "cosine": dh5out["distances"][chunk] = np.maximum(0.0, 1.0 - Dt) else: dh5out["distances"][chunk] = Dt
[docs] def get_triplets(self, reference): """Read triplets of signature across the CC""" if reference: fn = os.path.join(self.get_molset( "reference").model_path, "triplets.h5") else: fn = os.path.join(self.model_path, "triplets.h5") if not os.path.exists(fn): return None self.__log.debug("Getting triplets from %s" % fn) with h5py.File(fn, "r") as hf: triplets = hf["triplets"][:] return triplets
[docs] def get_self_triplets(self, local_neig_path, num_triplets=10000000): """Get triplets of signatures only looking at itself""" s1 = self.get_molset("reference") if local_neig_path: neig_path = os.path.join(s1.model_path, "neig.h5") opt_t = self.optimal_t(local_neig_path=local_neig_path) # Heuristic to correct opt_t, dependent on the size of the data LB = 10000 UB = 100000 TMAX = 50 TMIN = 10 def get_t_max(n): n = np.clip(n, LB, UB) a = (TMAX - TMIN) / (LB - UB) b = TMIN - a * UB return a * n + b with h5py.File(neig_path, "r") as hf: N, kn = hf["indices"].shape opt_t = np.min([opt_t, 0.01]) k = np.clip(opt_t * N, 5, 100) k = np.min([k, kn * 0.5 + 1]) k = np.max([k, 5]) k = np.min([k, get_t_max(N)]) k = int(k) self.__log.debug("... selected T is %d" % k) nn_indices = hf["indices"][:] nn_pos = nn_indices[:, 1:(k + 1)] nn_neg = nn_indices[:, (k + 1):] self.__log.debug("Starting sampling (pos:%d, neg:%d)" % (nn_pos.shape[1], nn_neg.shape[1])) n_sample = np.clip(int(num_triplets / N), 1, 100) triplets = [] med_neg = nn_neg.shape[1] nn_pos_prob = [(len(nn_pos) - i) for i in range(0, nn_pos.shape[1])] nn_neg_prob = [(len(nn_neg) - i) for i in range(0, nn_neg.shape[1])] nn_pos_prob = np.array(nn_pos_prob) / np.sum(nn_pos_prob) nn_neg_prob = np.array(nn_neg_prob) / np.sum(nn_neg_prob) for i in range(0, N): # sample positives with replacement pos = np.random.choice( nn_pos[i], n_sample, p=nn_pos_prob, replace=True) if n_sample > med_neg: # sample "medium" negatives neg = np.random.choice( nn_neg[i], med_neg, p=nn_neg_prob, replace=False) # for the rest, sample "easy" negatives forb = set(list(nn_pos[i]) + list(nn_neg[i])) cand = [i for i in range(0, N) if i not in forb] if len(cand) > 0: neg_ = np.random.choice( cand, min(len(cand), n_sample - med_neg), replace=True) neg = np.array(list(neg) + list(neg_)) else: neg = np.random.choice(nn_neg[i], n_sample, replace=False) if len(pos) > len(neg): neg = np.random.choice(neg, len(pos), replace=True) elif len(pos) < len(neg): neg = np.random.choice(neg, len(pos), replace=False) else: pass for p, n in zip(pos, neg): triplets += [(i, p, n)] triplets = np.array(triplets).astype(np.int) fn = os.path.join(s1.model_path, "triplets_self.h5") self.__log.debug("Triplets path: %s" % fn) with h5py.File(fn, "w") as hf: hf.create_dataset("triplets", data=triplets) return triplets
[docs] def score(self, reference, max_triplets=10000): """Score based on triplets. Args: max_triplets(int): Maximum number of triplets to consider. """ self.__log.debug("Score the transformation based on triplets accuracy") if reference: sign = self.get_molset("reference") else: sign = self triplets = self.get_triplets(reference) idxs = np.random.choice(triplets.shape[0], max_triplets, replace=False) triplets = triplets[idxs] acc = 0 for t in tqdm(triplets): a = sign[int(t[0])] p = sign[int(t[1])] n = sign[int(t[2])] if cosine(a, p) < cosine(a, n): acc += 1 acc /= len(triplets) return acc
[docs] def optimal_t(self, max_triplets=10000, min_triplets=1000, local_neig_path=False, save=True): """Find optimal (recommended) number of neighbors. Based on the accuracy of triplets across the CC. Neighbors class needs to be precomputed. Only done for the reference set (it doesn't really make sense to do it for the full). Args: max_triplets(int): Maximum number of triplets to consider (default=10000). save(bool): Store an opt_t.h5 file (default=True). """ # lazily loading if already computed fname = os.path.join( self.get_molset("reference").model_path, "opt_t.h5") if os.path.isfile(fname): with h5py.File(fname, "r") as hf: opt_t = hf['opt_t'][0] return opt_t self.__log.debug("Reading triplets") triplets = self.get_triplets(reference=True) if triplets is None: self.__log.debug("No triplets were found. Returning ") return DEFAULT_T if len(triplets) < min_triplets: self.__log.warning("Not enough triplets... t is %f" % DEFAULT_T) return DEFAULT_T self.__log.debug("Selecting available anchors") if len(triplets) > max_triplets: idxs = np.random.choice(len(triplets), max_triplets, replace=False) triplets = triplets[idxs] anchors = sorted(set(triplets[:, 0])) anchors_idxs = dict((k, i) for i, k in enumerate(anchors)) self.__log.debug("Reading from nearest neighbors") if not local_neig_path: self.__log.debug( "Getting neighbors data from a proper neig instance") neig = self.get_molset("reference").get_neig() neig_path = neig.data_path else: neig_path = os.path.join(self.get_molset( "reference").model_path, "neig.h5") self.__log.debug( "Getting neighbors data from file: %s" % neig_path) with h5py.File(neig_path, "r") as hf: nn = hf["indices"][anchors][:, 1:101] self.__log.debug("Negatives and positives") positives = [(anchors_idxs[t[0]], t[1]) for t in triplets] negatives = [(anchors_idxs[t[0]], t[2]) for t in triplets] pairs = positives + negatives truth = [1] * len(positives) + [0] * len(negatives) self.__log.debug("Setting the range of search") # N = nn.shape[0] kranges = [] for i in range(5, 10): kranges += [i] for i in range(10, 50, 2): kranges += [i] for i in range(50, 100, 5): kranges += [i] self.__log.debug("Screening") accus = [] for k in kranges: n_pairs = [] for i in range(0, nn.shape[0]): n_pairs += [(i, n) for n in nn[i, :k]] pred = [] n_pairs = set(n_pairs) for p in pairs: if p in n_pairs: pred += [1] else: pred += [0] accus += [(k, accuracy_score(truth, pred))] idx = np.argmax(savgol_filter([x[1] for x in accus], 11, 3)) opt_k = accus[idx][0] opt_t = opt_k / nn.shape[0] if save: self.__log.debug("Saving") with h5py.File(fname, "w") as hf: hf.create_dataset( "accuracies", data=np.array(accus).astype(np.int)) hf.create_dataset("opt_t", data=np.array([opt_t])) hf.create_dataset("opt_k", data=np.array([opt_k])) return opt_t