import os
import h5py
import json
import random
import datetime
import numpy as np
from numpy import linalg as LA
from sklearn.manifold import MDS
from chemicalchecker.core.signature_base import BaseSignature
from chemicalchecker.core.signature_data import DataSignature
from chemicalchecker.util import logged
from chemicalchecker.util.plot import Plot
[docs]@logged
class Default(BaseSignature, DataSignature):
"""A Signature bla bla."""
def __init__(self, signature_path, dataset, **params):
"""Initialize the projection class.
Args:
signature_path(str): the path to the signature directory.
dataset(object): The dataset object with all info related
metric(str): The metric used in the KNN algorithm: euclidean or cosine (default: cosine)
type(int): The type of plot technology used between tsne and mds (default:tsne)
cpu(int): The number of cores to use (default:1)
perplexity(int): Perplexity for the NN-grapn (default:30)
angle(float): Angle in BH-TSNE, from 0 to 0.5 (default:0.5)
"""
# Calling init on the base class to trigger file existance checks
BaseSignature.__init__(
self, signature_path, dataset, **params)
self.__log.debug('signature path is: %s', signature_path)
proj_name = self.__class__.__name__
self.data_path = os.path.join(signature_path, "proj_%s.h5" % proj_name)
self.model_path = os.path.join(self.model_path, proj_name)
if not os.path.isdir(self.model_path):
original_umask = os.umask(0)
os.makedirs(self.model_path, 0o775)
os.umask(original_umask)
self.stats_path = os.path.join(self.stats_path, proj_name)
if not os.path.isdir(self.stats_path):
original_umask = os.umask(0)
os.makedirs(self.stats_path, 0o775)
os.umask(original_umask)
DataSignature.__init__(self, self.data_path)
self.__log.debug('data_path: %s', self.data_path)
self.index_file = "faiss_proj.index"
self.projections_file = "centroids.h5"
self.start_k = 1000
self.type = "tsne"
self.cpu = 1
self.k_neig = None
self.min_members = 5
self.num_subdim = 8
self.min_k = 1
self.max_k = None
self.n_points = 100
self.balance = None
self.significance = 0.05
self.metric = "cosine"
self.perplexity = 30
self.angle = 0.5
for param, value in params.items():
self.__log.debug('parameter %s : %s', param, value)
if "type" in params:
self.type = params["type"]
if "cpu" in params:
self.cpu = params["cpu"]
if "metric" in params:
self.metric = params["metric"]
if "perplexity" in params:
self.perplexity = params["perplexity"]
if "angle" in params:
self.angle = params["angle"]
[docs] def fit(self, signature, validations=True):
"""Take an input and learns to produce an output."""
try:
import faiss
except ImportError:
raise ImportError("requires faiss " +
"https://github.com/facebookresearch/faiss")
try:
import hdbscan
except ImportError:
raise ImportError("requires hdbscan " +
"https://hdbscan.readthedocs.io/en/latest/")
try:
from MulticoreTSNE import MulticoreTSNE as TSNE
except ImportError:
raise ImportError("requires MulticoreTSNE " +
"http://github.com/DmitryUlyanov/Multicore-TSNE")
BaseSignature.fit(self)
plot = Plot(self.dataset, self.stats_path)
mappings = None
faiss.omp_set_num_threads(self.cpu)
if os.path.isfile(signature.data_path):
self.data = np.float32(signature.data)
self.data_type = signature.data_type
self.keys = signature.keys
mappings = signature.mappings
else:
raise Exception(
"The file " + signature.data_path + " does not exist")
self.__log.info("Applying kmeans")
size = self.data.shape[0]
self.k = int(max(self.start_k, min(15000, size / 2)))
if self.k > size:
self.k = size
niter = 20
d = self.data.shape[1]
kmeans = faiss.Kmeans(d, self.k, niter=niter)
if self.k != self.data.shape[0]:
kmeans.train(self.data)
else:
kmeans.k = self.k
if kmeans.k != self.data.shape[0]:
self.__log.info(
"Applying first projection and another clustering to get final points")
if self.type == "tsne":
tsne = TSNE(n_jobs=self.cpu, perplexity=self.perplexity, angle=self.angle,
n_iter=1000, metric=self.metric)
Proj = tsne.fit_transform(kmeans.centroids.astype(np.float64))
if self.type == "mds":
mds = MDS(n_jobs=self.cpu)
Proj = mds.fit_transform(kmeans.centroids.astype(np.float64))
X = Proj[:, 0]
Y = Proj[:, 1]
min_size = 0
if len(X) <= self.start_k:
min_size = 5
if len(X) > self.start_k:
min_size = int(
np.interp(len(X), [self.start_k, 10000], [5, 15]))
if len(X) >= 10000:
min_size = 15
clusterer = hdbscan.HDBSCAN(min_cluster_size=min_size)
combined = np.vstack((X, Y)).T
clusterer.fit(combined)
mask_clust = clusterer.labels_ != -1
final_indexes = np.where(mask_clust)
final_Proj = Proj[mask_clust]
if self.metric == "euclidean":
index = faiss.IndexFlatL2(
kmeans.centroids[final_indexes].shape[1])
else:
index = faiss.IndexFlatIP(
kmeans.centroids[final_indexes].shape[1])
if self.metric == "cosine":
norms = LA.norm(kmeans.centroids[final_indexes], axis=1)
index.add(kmeans.centroids[final_indexes] / norms[:, None])
else:
index.add(kmeans.centroids[final_indexes])
points_proj = {}
points_proj["weights"], points_proj["ids"] = self._get_weights(
index, self.data, 3)
xlim, ylim = plot.projection_plot(final_Proj, bw=0.1, levels=10)
projections = self._project_points(final_Proj, points_proj)
else:
if self.perplexity is None:
neigh = np.max(
[30, np.min([150, int(np.sqrt(self.data.shape[0]) / 2)])])
self.perplexity = int(neigh / 3)
if self.type == "tsne":
tsne = TSNE(n_jobs=self.cpu, perplexity=self.perplexity, angle=self.angle,
n_iter=1000, metric=self.metric)
final_Proj = tsne.fit_transform(self.data.astype(np.float64))
if self.type == "mds":
mds = MDS(n_jobs=self.cpu)
final_Proj = mds.fit_transform(self.data.astype(np.float64))
if self.metric == "euclidean":
index = faiss.IndexFlatL2(self.data.shape[1])
index.add(self.data)
else:
index = faiss.IndexFlatIP(self.data.shape[1])
norms = LA.norm(self.data, axis=1)
index.add(self.data / norms[:, None])
xlim, ylim = plot.projection_plot(final_Proj, bw=0.1, levels=10)
projections = self._project_points(final_Proj)
with h5py.File(self.data_path, "w") as hf:
inchikey_proj = {}
for i in range(len(self.keys)):
k = self.keys[i]
inchikey_proj[k] = projections[i]
hf.create_dataset("V", data=projections)
hf.create_dataset("keys", data=np.array(self.keys, DataSignature.string_dtype()))
name = str(self.dataset) + "_proj"
hf.create_dataset(
"name", data=[name.encode(encoding='UTF-8', errors='strict')])
hf.create_dataset(
"date", data=[datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode(encoding='UTF-8', errors='strict')])
hf.create_dataset("metric", data=[self.metric.encode(
encoding='UTF-8', errors='strict')])
if mappings is not None:
hf.create_dataset("mappings", data=np.array(mappings, DataSignature.string_dtype()))
faiss.write_index(index, os.path.join(
self.model_path, self.index_file))
with h5py.File(os.path.join(
self.model_path, self.projections_file), "w") as hf:
hf.create_dataset("Proj", data=final_Proj)
if validations:
self.__log.info("Doing MoA & ATC validations")
if mappings is not None:
inchikey_mappings = dict(mappings)
else:
inchikey_mappings = None
ks_moa, auc_moa, frac_moa = plot.vector_validation(
self, "proj", prefix="moa", mappings=inchikey_mappings)
ks_atc, auc_atc, frac_atc = plot.vector_validation(
self, "proj", prefix="atc", mappings=inchikey_mappings)
# Saving results
INFO = {
"molecules": final_Proj.shape[0],
"moa_ks_d": ks_moa[0],
"moa_ks_p": ks_moa[1],
"moa_auc": auc_moa,
"atc_ks_d": ks_atc[0],
"atc_ks_p": ks_atc[1],
"atc_auc": auc_atc,
"xlim": xlim,
"ylim": ylim
}
with open(os.path.join(self.stats_path, 'proj_stats.json'), 'w') as fp:
json.dump(INFO, fp)
self.mark_ready()
[docs] def predict(self, signature, destination):
"""Use the fitted models to go from input to output."""
try:
import faiss
except ImportError:
raise ImportError("requires faiss " +
"https://github.com/facebookresearch/faiss")
BaseSignature.predict(self)
mappings = None
faiss.omp_set_num_threads(self.cpu)
input_data_file = ''
if isinstance(signature, str):
input_data_file = signature
else:
input_data_file = signature.data_path
if os.path.isfile(input_data_file):
dh5 = h5py.File(input_data_file, 'r')
if "keys" not in dh5.keys() or "V" not in dh5.keys():
raise Exception(
"H5 file %s does not contain datasets 'keys' and 'V'"
% signature.data_path)
self.data = np.array(dh5["V"][:], dtype=np.float32)
self.data_type = dh5["V"].dtype
self.keys = dh5["keys"][:]
if "mappings" in dh5.keys():
mappings = dh5["mappings"][:]
dh5.close()
else:
raise Exception("The file " + input_data_file + " does not exist")
if destination is None:
raise Exception(
"Predict method requires a destination file to output results")
if not os.path.isfile(os.path.join(self.model_path, self.index_file)) or not os.path.isfile(os.path.join(self.model_path, self.projections_file)):
raise Exception(
"This projection does not have prediction information. Please, call fit method first to use the predict method.")
index = faiss.read_index(os.path.join(
self.model_path, self.index_file))
dh5 = h5py.File(os.path.join(
self.model_path, self.projections_file), 'r')
Proj = dh5["Proj"][:]
dh5.close()
# base_points = faiss.vector_float_to_array(index.xb).reshape(-1, index.d)
points_proj = {}
points_proj["weights"], points_proj["ids"] = self._get_weights(
index, self.data, 3)
projections = self._project_points(Proj, points_proj)
with h5py.File(destination, "w") as hf:
inchikey_proj = {}
for i in range(len(self.keys)):
k = self.keys[i]
inchikey_proj[k] = projections[i]
hf.create_dataset("V", data=projections)
hf.create_dataset("keys", data=np.array(self.keys, DataSignature.string_dtype()))
name = str(self.dataset) + "_proj"
hf.create_dataset(
"name", data=[name.encode(encoding='UTF-8', errors='strict')])
hf.create_dataset(
"date", data=[datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode(encoding='UTF-8', errors='strict')])
hf.create_dataset("metric", data=[self.metric.encode(
encoding='UTF-8', errors='strict')])
if mappings is not None:
hf.create_dataset("mappings", data=np.array(mappings, DataSignature.string_dtype()))
def _get_weights(self, index, data, neigh):
"""Get weights for the first 'neigh' neighbours ."""
if self.metric == "cosine":
norms = LA.norm(data, axis=1)
D, I = index.search(data / norms[:, None], neigh)
# Convert to [0,1]
D = np.maximum(0.0, (1.0 + D) / 2.0)
else:
D, I = index.search(data[np.array(random.sample(
range(0, data.shape[0]), 10))], index.ntotal)
max_val = np.max(D)
D, I = index.search(data, neigh)
D[:, :] = np.maximum((max_val - D[:, :]) / max_val, 0)
return D, I
def _project_points(self, Proj, extra_data=None):
X = Proj[:, 0]
Y = Proj[:, 1]
if extra_data is not None:
weights = extra_data["weights"]
ids = extra_data["ids"]
size = weights.shape[0]
proj_data = np.zeros((size, 2))
for i in range(0, size):
if any(np.isclose(weights[i], 1.0)):
pos = ids[i][0]
if i in ids[i] or len(weights) == len(X):
pos = i
proj_data[i][0] = X[pos]
proj_data[i][1] = Y[pos]
else:
proj_data[i][0] = np.average(
X[np.array(ids[i])], weights=weights[i])
proj_data[i][1] = np.average(
Y[np.array(ids[i])], weights=weights[i])
else:
size = X.shape[0]
proj_data = np.zeros((size, 2))
for i in range(0, size):
proj_data[i][0] = X[i]
proj_data[i][1] = Y[i]
return proj_data