Source code for chemicalchecker.core.diagnostics

"""Diagnostics for Signatures.

Performs an array of validations and diagnostic analysis.
"""
import os
import random
import pickle
import shutil
import collections
import numpy as np
import tempfile
from sklearn.cluster import DBSCAN
from scipy.stats import gaussian_kde
from scipy.spatial.distance import pdist
from sklearn.decomposition import PCA
from sklearn.metrics import roc_curve, auc, precision_recall_curve, average_precision_score
from sklearn.preprocessing import normalize
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import NearestNeighbors

from chemicalchecker.util import logged, Config
from chemicalchecker.util.plot import DiagnosisPlot
#from chemicalchecker.util.decorator import safe_return
from chemicalchecker.util.hpc import HPC


[docs]@logged class Diagnosis(object): """Diagnosis class.""" def __init__(self, sign, ref_cc=None, ref_cctype='sign0', save=True, plot=True, overwrite=False, load=True, n=10000, seed=42, cpu=4): """Initialize a Diagnosis instance. Args: ref_cc (ChemicalChecker): A CC instance used as reference. sign (CC signature): The CC signature object to be diagnosed. save (bool): Whether to save results in the `diags` folder of the signature. (default=True) plot (bool): Whether to save plots in the `diags` folder of the signature. (default=True) overwrite (bool): Whether to overwrite the results of the diagnosis. (default=False) n (int): Number of molecules to sample. (default=10000) """ np.random.seed(seed) random.seed(seed) self.seed = seed if ref_cc is None: ref_cc = sign.get_cc() self.ref_cc = ref_cc self.save = save self.plot = plot self.plotter = DiagnosisPlot(self) self.sign = sign self.ref_cctype = ref_cctype self.subsample_n = n self.cpu = cpu self.memory = 5 # check if reference CC has reference all cctype signatures available_sign = ref_cc.report_available( molset="full", signature=ref_cctype) if len(available_sign["full"]) < 25: self.__log.warning( "Reference CC '%s' does not have enough '%s'" % (ref_cc.name, ref_cctype)) if self.ref_cctype == 'sign0': self.ref_cctype = 'sign2' else: self.ref_cctype = 'sign2' self.__log.warning("Switching to `%s`" % self.ref_cctype) # define current diag_path self.name = '%s_%s' % (ref_cc.name, self.ref_cctype) self.path = os.path.join(sign.diags_path, self.name) if os.path.isdir(self.path): if overwrite: self.clear() if not os.path.isdir(self.path): os.mkdir(self.path) if self.plot and not self.save: self.__log.warning( "Saving is necessary to plot. Setting 'save' to True.") self.save = True self._V = None self._keys = None @property def V(self): if self._V is None: self._V, self._keys = self._subsample() return self._V @property def keys(self): if self._keys is None: self._V, self._keys = self._subsample() return self._keys def _subsample(self): fn = os.path.join(self.path, "subsampled_data.pkl") if not os.path.exists(fn): self.__log.debug("Subsampling signature") V, keys = self.sign.subsample(self.subsample_n, seed=self.seed) if self.save: self.__log.debug("Saving signature subsample %s" % fn) with open(fn, "wb") as fh: d = {"V": V, "keys": keys} pickle.dump(d, fh) return V, keys self.__log.debug("Loading signature subsample") with open(fn, "rb") as f: d = pickle.load(f) return d["V"], d["keys"]
[docs] def clear(self): """Remove al diagnostic data.""" self.__log.debug("Deleting %s" % self.path) shutil.rmtree(self.path)
def _todo(self, fn, inner=False): """Check if function is to be run.""" return not os.path.exists(os.path.join(self.path, fn + ".pkl")) def _load_diagnosis_pickle(self, fn): with open(os.path.join(self.path, fn), "rb") as f: results = pickle.load(f) return results def _returner(self, *args, results, fn, save, plot, plotter_function, kw_plotter=dict(), **kwargs): fn_ = os.path.join(self.path, fn + ".pkl") if results is None: results = pickle.load(open(fn_, "rb")) if save: pickle.dump(results, open(fn_, "wb")) if plot: return plotter_function(results=results, **kw_plotter) else: return fn_ else: if plot: return plotter_function(results=results, **kw_plotter) else: return results def _shared_keys(self, sign): return sorted(list(set(self.keys) & set(sign.keys))) def _paired_keys(self, my_keys, vs_keys): keys = sorted(set(my_keys) & set(vs_keys)) my_keys = keys vs_keys = keys return np.array(my_keys), np.array(vs_keys) def _paired_conn_layers(self, my_keys, vs_keys): vs_conn_set = set([ik.split("-")[0] for ik in vs_keys]) my_conn_set = set([ik.split("-")[0] for ik in my_keys]) common_conn = my_conn_set & vs_conn_set vs_keys_conn = dict( (ik.split("-")[0], ik) for ik in vs_keys if ik.split("-")[0] in common_conn) my_keys_conn = dict( (ik.split("-")[0], ik) for ik in my_keys if ik.split("-")[0] in common_conn) common_conn = sorted(common_conn) my_keys = [my_keys_conn[c] for c in common_conn] vs_keys = [vs_keys_conn[c] for c in common_conn] return np.array(my_keys), np.array(vs_keys) def _select_datasets(self, datasets, exemplary): if datasets is None: datasets = self.ref_cc.datasets dss = [] for ds in datasets: if exemplary: # TODO: to change in case of CC expansion from 25 spaces if ds[0] in "ABCDE" and ds[1] in "12345" and ds[-3:] == "001": dss += [ds] else: dss += [ds] return dss def _get_signatures(self, *args, keys=None, max_keys=10000, max_features=None, shuffle=False, **kwargs): if max_keys is None: max_keys = len(self.keys) if keys is None: V = self.V keys = np.array(self.keys) if shuffle: idxs = np.random.choice(len(keys), np.min( [max_keys, len(keys)]), replace=False) V = V[idxs] keys = keys[idxs] else: V = V[:max_keys] keys = keys[:max_keys] else: keys = set(keys).intersection(self.sign.keys) self.__log.debug("%d keys found" % len(keys)) keys = sorted(random.sample(keys, np.min([max_keys, len(keys)]))) self.__log.debug("Fetching signatures") keys, V = self.sign.get_vectors(keys) if shuffle: idxs = np.random.choice(len(keys), np.min( [max_keys, len(keys)]), replace=False) V = V[idxs] keys = keys[idxs] else: V = V[:max_keys] keys = keys[:max_keys] if max_features is not None and max_features < V.shape[1]: idxs = np.random.choice(V.shape[1], max_features, replace=False) V = V[:, idxs] return V, keys def _distance_distribution(self, metric, max_keys=200): """Distance distribution. Sampled with replacement. Args: metric (str): 'cosine' or 'euclidean'. (default='cosine') max_keys (int): maximum number of keys to use for pairwise distance calculation. """ if metric != "cosine" and metric != "euclidean": raise Exception("metric needs to be 'cosine' or 'euclidean'") n = self.V.shape[0] idxs = np.random.choice(n, max_keys) dists = pdist(self.V[idxs], metric=metric) order = np.argsort(dists) results = { "dists": dists[order] } return results
[docs] def canvas_hpc(self, tmpdir, **kwargs): """Run HPC jobs . tmpdir(str): Folder (usually in scratch) where the job directory is generated. cc_root: CC root path cctype: CC type (sign0, sign1, sign2, sign3) on which the method is applied molset: 'full' or 'reference' dss: datasets to run the diagnostics on cc_reference: another version of CC to use as diagnostic reference """ cc_config = kwargs.get("cc_config", os.environ['CC_CONFIG']) cfg = Config(cc_config) job_name = "_".join(["CC", self.sign.cctype.upper(), self.sign.dataset, "DIAGNOSIS"]) job_path = tempfile.mkdtemp("_" + job_name, dir=tmpdir) # create job directory if not available if not os.path.isdir(job_path): os.mkdir(job_path) # in case some plots of the canvases must be skipped skip_plots = kwargs.get("skip_plots", []) # create script file script_lines = f""" import sys, os import pickle from chemicalchecker import ChemicalChecker from chemicalchecker.core.diagnostics import Diagnosis ChemicalChecker.set_verbosity('DEBUG') task_id = sys.argv[1] filename = sys.argv[2] diag = pickle.load(open(filename, 'rb'))[task_id][0] diag.canvas(size='small', savefig=True, skip_plots={skip_plots}) diag.canvas(size='medium', savefig=True, skip_plots={skip_plots}) print('JOB DONE') """ script_name = os.path.join(job_path, 'diagnostics_script.py') with open(script_name, 'w') as fh: fh.write(script_lines) # HPC parameters params = kwargs params["num_jobs"] = 1 params["jobdir"] = job_path params["job_name"] = kwargs.get('job_name', job_name) params["elements"] = [self] params["wait"] = kwargs.get('wait', False) params["check_error"] = kwargs.get('check_error', False) params["cpu"] = kwargs.get('cpu', self.cpu) params["mem_by_core"] = kwargs.get('mem_by_core', 4) # job command singularity_image = cfg.PATH.SINGULARITY_IMAGE command = "SINGULARITYENV_PYTHONPATH={} SINGULARITYENV_CC_CONFIG={}" \ " singularity exec {} python {} <TASK_ID> <FILE>" command = command.format( os.path.join(cfg.PATH.CC_REPO, 'package'), cc_config, singularity_image, script_name) # submit jobs cluster = HPC.from_config(cfg) cluster.submitMultiJob(command, **params) return cluster
[docs] @staticmethod def diagnostics_hpc(tmpdir, cc_root, cctype, molset, dss, cc_reference, **kwargs): """Run HPC jobs . tmpdir(str): Folder (usually in scratch) where the job directory is generated. cc_root: CC root path cctype: CC type (sign0, sign1, sign2, sign3) on which the method is applied molset: 'full' or 'reference' dss: datasets to run the diagnostics on cc_reference: another version of CC to use as diagnostic reference """ cc_config = kwargs.get("cc_config", os.environ['CC_CONFIG']) cfg = Config(cc_config) job_path = tempfile.mkdtemp( prefix='jobs_diagnostics_' + cctype + "_", dir=tmpdir) # create job directory if not available if not os.path.isdir(job_path): os.mkdir(job_path) dataset_codes = list() for ds in dss: dataset_codes.append(ds) sign_args_tmp = kwargs.get('sign_args', {}) sign_kwargs_tmp = kwargs.get('sign_kwargs', {}) dataset_params = list() for ds_code in dataset_codes: sign_args = list() sign_args.extend(sign_args_tmp.get(ds_code, list())) sign_kwargs = dict() sign_kwargs.update(sign_kwargs_tmp.get(ds_code, dict())) sign_args.insert(0, cctype) sign_args.insert(1, molset) sign_args.insert(2, ds_code) dataset_params.append( (sign_args, sign_kwargs)) # create script file script_lines = [ "import sys, os", "import pickle", "from chemicalchecker import ChemicalChecker", "from chemicalchecker.core.diagnostics import Diagnosis", "task_id = sys.argv[1]", "filename = sys.argv[2]", "inputs = pickle.load(open(filename, 'rb'))", "sign_args = inputs[task_id][0][0]", "sign_kwargs = inputs[task_id][0][1]", "cc = ChemicalChecker('{cc_root}')", "sign = cc.get_signature(*sign_args, **sign_kwargs)", "cc_ref = ChemicalChecker('{cc_reference}')", "diag = Diagnosis(sign, cc_ref)", "fig = diag.canvas()", "fig.savefig(os.path.join(sign.diags_path, diag.name + '.png'))", "print('JOB DONE')" ] replacements = {"cc_root"} if cc_reference == "": cc_reference = cc_root script_name = os.path.join(job_path, 'diagnostics_script.py') with open(script_name, 'w') as fh: for line in script_lines: fh.write(line.format(cc_root=cc_root, cc_reference=cc_reference) + '\n') # HPC parameters params = {} params["num_jobs"] = len(dataset_codes) params["jobdir"] = job_path params["job_name"] = "CC_" + cctype.upper() + "_DIAGNOSIS" params["elements"] = dataset_params params["wait"] = False params["check_error"] = False params["memory"] = 5 # trial and error # job command singularity_image = cfg.PATH.SINGULARITY_IMAGE command = "SINGULARITYENV_PYTHONPATH={} SINGULARITYENV_CC_CONFIG={}" \ " singularity exec {} python {} <TASK_ID> <FILE>" command = command.format( os.path.join(cfg.PATH.CC_REPO, 'package'), cc_config, singularity_image, script_name) # submit jobs cluster = HPC.from_config(cfg) cluster.submitMultiJob(command, **params) return cluster
# @safe_return(None)
[docs] def euclidean_distances(self, *arg, n_pairs=10000, **kwargs): """Euclidean distance distribution. Args: n_pairs (int): Number of pairs to sample. (default=10000) """ self.__log.debug("Euclidean distances") fn = "euclidean_distances" if self._todo(fn): results = self._distance_distribution(metric="euclidean") else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.euclidean_distances, **kwargs)
# @safe_return(None)
[docs] def cosine_distances(self, *args, n_pairs=10000, **kwargs): """Cosine distance distribution. Args: n_pairs (int): Number of pairs to sample. (default=10000) """ self.__log.debug("Cosine distances") fn = "cosine_distances" if self._todo(fn): results = self._distance_distribution(metric="cosine") else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.cosine_distances, **kwargs)
# @safe_return(None)
[docs] def cross_coverage(self, dataset, *args, ref_cctype='sign1', molset="full", try_conn_layer=False, redo=False, **kwargs): """Intersection of coverages. Args: sign (signature): A CC signature object to check against. """ if ref_cctype is None: ref_cctype = self.ref_cctype qualified_name = '_'.join([dataset, ref_cctype, molset]) fn = os.path.join(self.path, "cross_coverage_%s" % qualified_name) if self._todo(fn) or redo: metadata = self.ref_cc.sign_metadata( 'keys', molset, dataset, ref_cctype) if metadata is not None: vs_keys = metadata my_keys = self.sign.keys # apply inchikey connectivity layer if possible if try_conn_layer: keys1, keys2 = self._paired_conn_layers(my_keys, vs_keys) else: keys1, keys2 = self._paired_keys(my_keys, vs_keys) results = { "inter": len(keys1), "my_overlap": len(keys1) / len(my_keys), "vs_overlap": len(keys1) / len(vs_keys) } else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.cross_coverage, kw_plotter={"sign_qualified_name": qualified_name}, **kwargs)
# @safe_return(None)
[docs] def cross_roc(self, sign, *args, n_samples=10000, n_neighbors=5, neg_pos_ratio=1, apply_mappings=False, try_conn_layer=False, metric='cosine', redo=False, val_type='roc', **kwargs): """Perform validations. Args: sign (signature): A CC signature object to validate against. n_samples (int): Number of samples. apply_mappings (bool): Whether to use mappings to compute validation. Signature which have been redundancy-reduced (i.e. `reference`) have fewer molecules. The key are molecules from the `full` signature and values are molecules from the `reference` set. try_conn_layer (bool): Try with the inchikey connectivity layer. (default=False) metric (str): 'cosine' or 'euclidean'. (default='cosine') val_type (str): 'roc' or 'pr'. (default='roc') save (bool): Specific save parameter. If not specified, the global is set. (default=None). """ fn = os.path.join(self.path, "cross_roc_%s" % sign.qualified_name) if self._todo(fn) or redo: r = self.cross_coverage(sign.dataset, ref_cctype=sign.cctype, molset=sign.molset, apply_mappings=apply_mappings, try_conn_layer=try_conn_layer, save=False, redo=redo, plot=False) if r["inter"] < n_neighbors: self.__log.warning("Not enough shared molecules") return None my_keys = self.sign.keys vs_keys = sign.keys # apply inchikey connectivity layer if possible if try_conn_layer: keys1, keys2 = self._paired_conn_layers(my_keys, vs_keys) else: keys1, keys2 = self._paired_keys(my_keys, vs_keys) # Get fraction of shared samples if n_samples < len(keys1): idxs = sorted(np.random.choice( len(keys1), n_samples, replace=False)) keys1 = keys1[idxs] keys2 = keys2[idxs] # fit nearest neighbors on other signature vs_vectors = sign.get_vectors(keys2)[1] nn = NearestNeighbors(n_neighbors=n_neighbors, metric=metric, n_jobs=self.cpu) nn.fit(vs_vectors) # get positive pairs neighs = nn.kneighbors(vs_vectors)[1] del vs_vectors flat_neigh = neighs.flatten() indexes = np.repeat(np.arange(0, neighs.shape[0]), n_neighbors) pos_pairs = np.vstack([indexes, flat_neigh]).T # avoid identities pos_pairs = pos_pairs[pos_pairs[:, 0] != pos_pairs[:, 1]] pos_pairs = set([tuple(p) for p in pos_pairs]) # get negative pairs idxs1 = np.repeat(np.arange(0, neighs.shape[0]), n_neighbors * 2) idxs2 = np.repeat(np.arange(0, neighs.shape[0]), n_neighbors * 2) np.random.shuffle(idxs2) neg_pairs = np.vstack([idxs1, idxs2]).T neg_pairs = neg_pairs[neg_pairs[:, 0] != neg_pairs[:, 1]] neg_pairs = set([tuple(p) for p in neg_pairs]) neg_pairs = neg_pairs - pos_pairs neg_pairs = list(neg_pairs)[:len(pos_pairs)] # calculate distances of same pairs but in out signature my_vectors = self.sign.get_vectors(keys1)[1] if metric == "cosine": from scipy.spatial.distance import cosine as metric if metric == "euclidean": from scipy.spatial.distance import euclidean as metric def _compute_dists(pairs): dists = list() for p1, p2 in pairs: dists.append(metric(my_vectors[p1], my_vectors[p2])) dists = np.array(dists) # cosine distance in sparse matrices might be NaN or inf dists = dists[np.isfinite(dists)] return dists pos_dists = _compute_dists(pos_pairs) neg_dists = _compute_dists(neg_pairs) del my_vectors # correct negative/positive ratio if len(neg_dists) > len(pos_dists) * neg_pos_ratio: np.random.shuffle(neg_dists) neg_dists = neg_dists[:int(len(pos_dists) * neg_pos_ratio)] else: np.random.shuffle(pos_dists) pos_dists = pos_dists[:int(len(neg_dists) / neg_pos_ratio)] # final arrays for performance calculation y_t = np.array([1] * len(pos_dists) + [0] * len(neg_dists)) y_p = np.hstack([pos_dists, neg_dists]) # convert to similarity-respected order y_p = -np.abs(np.array(y_p)) # roc calculation fpr, tpr, _ = roc_curve(y_t, y_p) # pr calculation precision, recall, _ = precision_recall_curve(y_t, y_p) # write results dictionary results = { "fpr": fpr, "tpr": tpr, "auc": auc(fpr, tpr), "precision": precision, "recall": recall, "average_precision_score": average_precision_score(y_t, y_p) } else: results = None if val_type == 'pr': plotter_function_arg = self.plotter.cross_pr else: plotter_function_arg = self.plotter.cross_roc return self._returner( results=results, fn=fn, plotter_function=plotter_function_arg, kw_plotter={"sign": sign}, **kwargs)
# @safe_return(None)
[docs] def projection(self, *args, keys=None, focus_keys=None, max_keys=10000, perplexity=None, max_pca=100, redo=False, **kwargs): """TSNE projection of CC signatures. Args: keys (list): Keys to be projected. If None specified, keys are randomly sampled. (default=None) focus_keys (list): Keys to be highlighted in the projection. (default=None). max_keys (int): Maximum number of keys to include in the projection. (default=10000) """ self.__log.debug("Projection") fn = "projection" if self._todo(fn) or redo: from MulticoreTSNE import MulticoreTSNE as TSNE X, keys = self._get_signatures(keys=keys, max_keys=max_keys) self.__log.debug("Fit-transforming t-SNE") if X.shape[1] > max_pca: self.__log.debug("First doing a PCA") # check on max_pca value min_samples_features = min(X.shape[0], X.shape[1]) if min_samples_features <= max_pca: max_pca = min_samples_features X = PCA(n_components=max_pca).fit_transform(X) init = PCA(n_components=2).fit_transform(X) if perplexity is None: perp = int(np.sqrt(X.shape[0])) perp = np.max([5, perp]) perp = np.min([50, perp]) self.__log.debug("Chosen perplexity %d" % perp) tsne = TSNE(perplexity=perp, init=init, n_jobs=self.cpu) P_ = tsne.fit_transform(X) P = P_[:len(keys)] results = { "P": P, "keys": keys } else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.projection, kw_plotter={'focus_keys': focus_keys}, **kwargs)
# @safe_return(None) def image(self, *args, keys=None, max_keys=100, shuffle=False, **kwargs): self.__log.debug("Image") fn = "image" if self._todo(fn): X, keys = self._get_signatures(keys=keys, max_keys=max_keys, shuffle=shuffle) results = { "X": X, "keys": keys } else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.image, **kwargs) def _iqr(self, axis, keys, max_keys): X, keys = self._get_signatures(keys=keys, max_keys=max_keys) p25 = np.percentile(X, 25, axis=axis) p50 = np.percentile(X, 50, axis=axis) p75 = np.percentile(X, 75, axis=axis) results = { "p25": p25, "p50": p50, "p75": p75 } return results # @safe_return(None) def features_iqr(self, *args, keys=None, max_keys=10000, **kwargs): self.__log.debug("Features IQR") fn = "features_iqr" if self._todo(fn): results = self._iqr(axis=0, keys=keys, max_keys=max_keys) else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.features_iqr, **kwargs) # @safe_return(None) def keys_iqr(self, *args, keys=None, max_keys=1000, **kwargs): self.__log.debug("Keys IQR") fn = "keys_iqr" if self._todo(fn): results = self._iqr(axis=1, keys=keys, max_keys=max_keys) else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.keys_iqr, **kwargs) def _bins(self, *args, axis, keys=None, max_keys=10000, n_bins=100, **kwargs): X, keys = self._get_signatures(keys=keys, max_keys=max_keys) bs = np.linspace(np.min(X), np.max(X), n_bins) H = np.apply_along_axis(lambda v: np.histogram(v, bins=bs)[0], axis, X) if axis == 1: H = H.T results = { "H": H, "bins": bs, "p50": np.median(X, axis=axis), } return results # @safe_return(None) def features_bins(self, *args, **kwargs): self.__log.debug("Features bins") fn = "features_bins" if self._todo(fn): results = self._bins(axis=0, **kwargs) else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.features_bins, **kwargs) # @safe_return(None) def keys_bins(self, *args, **kwargs): self.__log.debug("Keys bins") fn = "keys_bins" if self._todo(fn): results = self._bins(axis=1, **kwargs) else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.keys_bins, **kwargs) # @safe_return(None) def values(self, *args, max_values=10000, **kwargs): self.__log.debug("Values") fn = "values" if self._todo(fn): V = self.V.ravel() idxs = np.random.choice(len(V), min(max_values, len(V)), replace=False) V = V[idxs] kernel = gaussian_kde(V) positions = np.linspace(np.min(V), np.max(V), 1000) values = kernel(positions) results = { "x": positions, "y": values } else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.values, **kwargs) def _projection_binned(self, P, scores, n_bins): # check nans mask = ~np.isnan(scores) P = P[mask, ] scores = scores[mask] self.__log.debug("Binning projection") bins_x = np.linspace(np.min(P[:, 0]), np.max(P[:, 0]) + 1e-6, n_bins) bins_y = np.linspace(np.min(P[:, 1]), np.max(P[:, 1]) + 1e-6, n_bins) H = np.zeros((len(bins_y), len(bins_x))) S = np.zeros(H.shape) for j in range(0, len(bins_x) - 1): min_x = bins_x[j] max_x = bins_x[j + 1] maskx = np.logical_and(P[:, 0] >= min_x, P[:, 0] < max_x) P_ = P[maskx] scores_ = scores[maskx] for i in range(0, len(bins_y) - 1): min_y = bins_y[i] max_y = bins_y[i + 1] masky = np.logical_and(P_[:, 1] >= min_y, P_[:, 1] < max_y) ss = scores_[masky] if len(ss) > 0: H[i, j] = len(ss) S[i, j] = np.mean(ss) bins_x = np.array([(bins_x[i] + bins_x[i + 1]) / 2 for i in range(0, len(bins_x) - 1)]) bins_y = np.array([(bins_y[i] + bins_y[i + 1]) / 2 for i in range(0, len(bins_y) - 1)]) results = { "H": H, "S": S, "bins_x": bins_x, "bins_y": bins_y, "scores": np.array([np.max(scores), np.min(scores)]), "lims": np.array([[np.min(P[:, 0]), np.min(P[:, 1])], [np.max(P[:, 0]), np.max(P[:, 1])]]) } return results # @safe_return(None) def confidences(self, *args, **kwargs): self.__log.debug("Confidences") fn = "confidences" if self._todo(fn): keys = self.keys mask = np.isin( list(self.sign.keys), list(keys), assume_unique=True) confidences = self.sign.get_h5_dataset('confidence')[mask] confidences = np.array(confidences) kernel = gaussian_kde(confidences) positions = np.linspace( np.min(confidences), np.max(confidences), 1000) values = kernel(positions) results = { "keys": self.keys, "confidences": confidences, "x": positions, "y": values } else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.confidences, **kwargs) # @safe_return(None) def confidences_projection(self, *args, n_bins=20, **kwargs): self.__log.debug("Confidences projection") if self._todo("confidences", inner=True): raise Exception("confidences must be done first") if self._todo("projection", inner=True): raise Exception("projection must be done first") fn = "confidences_projection" if self._todo(fn): results_proj = self._load_diagnosis_pickle("projection.pkl") results_inte = self._load_diagnosis_pickle("confidences.pkl") if np.any(results_proj["keys"] != results_inte["keys"]): raise Exception("keys do not coincide...") results = self._projection_binned( results_proj["P"], results_inte["confidences"], n_bins) else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.confidences_projection, **kwargs) # @safe_return(None) def intensities(self, *args, **kwargs): self.__log.debug("Intensities") fn = "intensities" if self._todo(fn): V, keys = self._get_signatures(**kwargs) m = np.mean(V, axis=0) s = np.std(V, axis=0) # if std is zero we would get NaN, costant columns are irrelevant m = m[s != 0] V = V[:, s != 0] s = s[s != 0] norm = np.apply_along_axis(lambda v: np.abs((v - m) / s), 1, V) # for i in range(0, V.shape[0]): # v = np.abs((V[i, :] - m) / s) # intensities += [np.sum(v)] intensities = np.sum(norm, axis=1) kernel = gaussian_kde(intensities) positions = np.linspace( np.min(intensities), np.max(intensities), 1000) values = kernel(positions) results = { "keys": keys, "intensities": intensities, "x": positions, "y": values } else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.intensities, **kwargs) # @safe_return(None) def intensities_projection(self, *args, n_bins=20, **kwargs): self.__log.debug("Intensities projection") if self._todo("intensities", inner=True): raise Exception("intensities must be done first") if self._todo("projection", inner=True): raise Exception("projection must be done first") fn = "intensities_projection" if self._todo(fn): results_proj = self._load_diagnosis_pickle("projection.pkl") results_inte = self._load_diagnosis_pickle("intensities.pkl") if np.any(results_proj["keys"] != results_inte["keys"]): raise Exception("keys do not coincide...") results = self._projection_binned( results_proj["P"], results_inte["intensities"], n_bins) else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.intensities_projection, **kwargs) def _latent(self, sign): fn = os.path.join(self.path, "latent-%d.pkl" % self.V.shape[0]) if os.path.exists(fn): results = pickle.load(open(fn, "rb")) else: pca = PCA(n_components=0.9) V = sign.subsample(self.V.shape[0])[0] pca.fit(V) results = {"explained_variance": pca.explained_variance_ratio_} return results # @safe_return(None)
[docs] def dimensions(self, *args, datasets=None, exemplary=True, ref_cctype='sign1', molset="full", **kwargs): """Get dimensions of the signature and compare to other signatures.""" self.__log.debug("Dimensions") fn = "dimensions" if ref_cctype is None: ref_cctype = self.ref_cctype if self._todo(fn): datasets = self._select_datasets(datasets, exemplary) results = {} for ds in datasets: metadata = self.ref_cc.sign_metadata( 'dimensions', molset, ds, ref_cctype) if metadata is not None: nr_keys, nr_feats = metadata results[ds] = { "keys": nr_keys, "features": nr_feats } results["MY"] = { "keys": self.sign.shape[0], "features": self.sign.shape[1] } else: results = None kw_plotter = { "exemplary": exemplary, "cctype": ref_cctype, "molset": molset} kw_plotter.update(kwargs) return self._returner( results=results, fn=fn, plotter_function=self.plotter.dimensions, kw_plotter=kw_plotter, **kwargs)
# @safe_return(None)
[docs] def across_coverage(self, *args, datasets=None, exemplary=True, ref_cctype='sign1', **kwargs): """Check coverage against a collection of other CC signatures. Args: datasets (list): List of datasets. If None, all available are used. (default=None) exemplary (bool): Whether to use only exemplary datasets (recommended). (default=True) cctype (str): CC signature type. (default=None) molset (str): Molecule set to use. Full is recommended. (default=None) kwargs (dict): params of hte cross_coverage method. """ self.__log.debug("Across coverage") fn = "across_coverage" if ref_cctype is None: ref_cctype = self.ref_cctype if self._todo(fn): datasets = self._select_datasets(datasets, exemplary) results = {} for ds in datasets: results[ds] = self.cross_coverage( ds, ref_cctype=ref_cctype, save=False, redo=True, plot=False) else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.across_coverage, kw_plotter={ "exemplary": exemplary, "cctype": ref_cctype}, **kwargs)
def _sample_accuracy_individual(self, sign, n_neighbors, min_shared, metric): # do nearest neighbors (start by the target signature) shared_keys = self._shared_keys(sign) keys, V1 = sign.get_vectors(shared_keys) if keys is None or len(keys) < min_shared: return None n_neighbors = np.min([V1.shape[0], n_neighbors]) nn = NearestNeighbors(n_neighbors=n_neighbors, metric=metric, n_jobs=self.cpu) nn.fit(V1) neighs1_ = nn.kneighbors(V1)[1][:, 1:] # do nearest neighbors for self mask = np.isin(list(self.keys), list(shared_keys), assume_unique=True) V0 = self.V[mask] nn = NearestNeighbors(n_neighbors=n_neighbors, metric=metric, n_jobs=self.cpu) nn.fit(V0) neighs0_ = nn.kneighbors(V0)[1][:, 1:] # reindex keys_dict = dict((k, i) for i, k in enumerate(self.keys)) neighs0 = np.zeros(neighs0_.shape).astype(np.int) neighs1 = np.zeros(neighs1_.shape).astype(np.int) rows = [] for i in range(0, neighs0_.shape[0]): rows += [keys_dict[keys[i]]] for j in range(0, neighs0_.shape[1]): neighs0[i, j] = keys_dict[keys[neighs0_[i, j]]] neighs1[i, j] = keys_dict[keys[neighs1_[i, j]]] return {"neighs0": neighs0, "neighs1": neighs1, "rows": rows} def _rbo(self, neighs0, neighs1, p): from chemicalchecker.tool.rbo.rbo import rbo scores = [] for i in range(0, neighs0.shape[0]): scores += [rbo(neighs0[i, :], neighs1[i, :], p=p).ext] return np.array(scores) # @safe_return(None)
[docs] def ranks_agreement(self, *args, datasets=None, exemplary=True, ref_cctype="sign0", n_neighbors=100, min_shared=100, metric="minkowski", p=0.9, **kwargs): """Sample-specific accuracy. Estimated as general agreement with the rest of the CC. """ self.__log.debug("Sample-specific agreement to the rest of CC") fn = "ranks_agreement" if self.sign.shape[0] < min_shared: min_shared = 0.7 * self.sign.shape[0] # self.__log.debug("Not enough molecules in the dataset to generate ranks agreement: \n\ # dataset molecules {}, min_shared molecules {} ".format(self.sign.shape[0], min_shared)) # return None if ref_cctype is None: ref_cctype = self.sign.cctype def q67(r): return np.percentile(r, 67) def stat(R, func): s = [] for r in R: if len(r) == 0: s += [np.nan] else: s += [func(r)] return np.array(s) if self._todo(fn): datasets = self._select_datasets(datasets, exemplary) results_datasets = {} for ds in datasets: sign = self.ref_cc.signature(ds, ref_cctype) dn = self._sample_accuracy_individual( sign, n_neighbors, min_shared, metric) if dn is None: continue sc = self._rbo(dn["neighs0"], dn["neighs1"], p=p) results_datasets[ds] = [r for r in zip(dn["rows"], sc)] d = {} for k in self.keys: d[k] = [] for k, v in results_datasets.items(): for r in v: d[self.keys[r[0]]] += [(k, r[1])] R = [] for k in self.keys: R += [d[k]] R_ = [[x[1] for x in r] for r in R] results = { "all": R, "max": stat(R_, np.max), "median": stat(R_, np.median), "mean": stat(R_, np.mean), "q67": stat(R_, q67), "size": np.array([len(r) for r in R_]), "keys": self.keys } else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.ranks_agreement, kw_plotter={ "exemplary": exemplary, "cctype": ref_cctype}, **kwargs)
# @safe_return(None) def ranks_agreement_projection(self, *args, n_bins=20, **kwargs): self.__log.debug("Ranks agreement projection") if self._todo("ranks_agreement", inner=True): raise Exception("ranks_agreement must be done first") if self._todo("projection", inner=True): raise Exception("projection must be done first") fn = "ranks_agreement_projection" if self._todo(fn): results_proj = self._load_diagnosis_pickle("projection.pkl") results_rnks = self._load_diagnosis_pickle("ranks_agreement.pkl") if np.any(results_proj["keys"] != results_rnks["keys"]): raise Exception("keys do not coincide...") results = self._projection_binned( results_proj["P"], results_rnks["mean"], n_bins) else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.ranks_agreement_projection, **kwargs) # @safe_return(None)
[docs] def global_ranks_agreement(self, *args, n_neighbors=100, min_shared=100, metric="minkowski", p=0.9, ref_cctype=None, **kwargs): """Sample-specific global accuracy. Estimated as general agreement with the rest of the CC, based on a Z-global ranking. """ self.__log.debug( "Sample-specific agreement to the rest of CC," " based on a Z-global ranking") fn = "global_ranks_agreement" # to take into consideration the case of very small datasets # with less than min_shared molecules if self.sign.shape[0] < min_shared: # TODO: is this plot significant anyways? min_shared = 0.7 * self.sign.shape[0] # self.__log.debug("Not enough molecules in the dataset to generate global ranks agreement: \n\ # dataset molecules {}, min_shared molecules {} ".format(self.sign.shape[0], min_shared)) # return None if ref_cctype is None: ref_cctype = self.ref_cctype exemplary_datasets = self._select_datasets(None, True) datasets = kwargs.get("datasets", exemplary_datasets) def q67(r): return np.percentile(r, 67) def stat(R, func): s = [] for r in R: if len(r) == 0: s += [np.nan] else: s += [func(r)] return np.array(s) if self._todo(fn): results_datasets = {} for ds in datasets: sign = self.ref_cc.signature(ds, ref_cctype) dn = self._sample_accuracy_individual( sign, n_neighbors, min_shared, metric) if dn is None: continue sc = self._rbo(dn["neighs0"], dn["neighs1"], p=p) results_datasets[ds] = [r for r in zip(dn["rows"], sc)] d = {} for k in self.keys: d[k] = [] for k, v in results_datasets.items(): for r in v: d[self.keys[r[0]]] += [(k, r[1])] R = [] for k in self.keys: R += [d[k]] R_ = [[x[1] for x in r] for r in R] results = { "all": R, "max": stat(R_, np.max), "median": stat(R_, np.median), "mean": stat(R_, np.mean), "q67": stat(R_, q67), "size": np.array([len(r) for r in R_]), "keys": self.keys } else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.global_ranks_agreement, **kwargs)
# @safe_return(None) def global_ranks_agreement_projection(self, *args, n_bins=20, **kwargs): self.__log.debug("Global ranks agreement projection") if self._todo("global_ranks_agreement", inner=True): raise Exception("global_ranks_agreement must be done first") if self._todo("projection", inner=True): raise Exception("projection must be done first") fn = "global_ranks_agreement_projection" if self._todo(fn): results_proj = self._load_diagnosis_pickle("projection.pkl") results_rnks = self._load_diagnosis_pickle( "global_ranks_agreement.pkl") if np.any(results_proj["keys"] != results_rnks["keys"]): raise Exception("keys do not coincide...") results = self._projection_binned( results_proj["P"], results_rnks["mean"], n_bins) else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.global_ranks_agreement_projection, **kwargs) # @safe_return(None)
[docs] def across_roc(self, *args, datasets=None, exemplary=True, ref_cctype=None, redo=False, include_datasets=None, **kwargs): """Check coverage against a collection of other CC signatures. Args: datasets (list): List of datasets. If None, all available are used. (default=None). exemplary (bool): Whether to use only exemplary datasets (recommended). (default=True) ref_cctype (str): CC signature type. (default='sign0') redo (bool): redo the plot include_datasets (list): specific datasets to add when exemplary is set to True (default=None) kwargs (dict): Parameters of the cross_roc method. """ self.__log.debug("Across ROC") fn = "across_roc" if ref_cctype is None: ref_cctype = self.ref_cctype if self._todo(fn) or redo: datasets = self._select_datasets(datasets, exemplary) results = {} for ds in datasets: sign = self.ref_cc.signature(ds, ref_cctype) results[ds] = self.cross_roc(sign, save=False, redo=True, plot=False) if include_datasets and exemplary: for incl_ds in include_datasets: sign = self.ref_cc.signature(incl_ds, ref_cctype) results[incl_ds] = self.cross_roc(sign, save=False, redo=True, plot=False) else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.across_roc, kw_plotter={ "exemplary": exemplary, "cctype": ref_cctype}, **kwargs)
# @safe_return(None)
[docs] def neigh_roc(self, ds, *args, ref_cctype=None, n_neighbors=[1, 5, 10, 50, 100], **kwargs): """Check ROC against another signature at different NN levels. Args: ds: Dataset aginst which to run ROC analysis. ref_cctype (str): CC signature type. neighbors (list): list of top NN for which we want to compute ROC. molset (str): Molecule set to use. Full is recommended. (default='full') kwargs (dict): Parameters of hte cross_coverage method. """ self.__log.debug("Multiple Neighbors ROC") fn = "neigh_roc" if ref_cctype is None: ref_cctype = self.ref_cctype if self._todo(fn): results = {} sign = self.ref_cc.signature(ds, ref_cctype) for nn in n_neighbors: results[nn] = self.cross_roc( sign, save=False, redo=True, n_neighbors=nn, plot=False) else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.neigh_roc, kw_plotter={ "ds": ds}, **kwargs)
# @safe_return(None) def atc_roc(self, *args, ref_cctype=None, redo=False, **kwargs): self.__log.debug("ATC ROC") fn = "atc_roc" if ref_cctype is None: ref_cctype = self.ref_cctype ds = "E1.001" if self._todo(fn) or redo: sign = self.ref_cc.signature(ds, ref_cctype) results = self.cross_roc(sign, redo=True, save=False, plot=False) else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.atc_roc, **kwargs) # @safe_return(None) def moa_roc(self, *args, ref_cctype=None, redo=False, **kwargs): self.__log.debug("MoA ROC") fn = "moa_roc" if ref_cctype is None: ref_cctype = self.ref_cctype ds = "B1.001" if self._todo(fn) or redo: sign = self.ref_cc.signature(ds, ref_cctype) results = self.cross_roc(sign, redo=True, save=False, plot=False) else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.moa_roc, **kwargs) # @safe_return(None) def roc(self, ds, *args, ref_cctype=None, redo=False, **kwargs): self.__log.debug("ROC") fn = "roc" if ref_cctype is None: ref_cctype = self.ref_cctype if self._todo(fn) or redo: sign = self.ref_cc.signature(ds, ref_cctype) results = self.cross_roc( sign, redo=True, save=False, plot=False, val_type='roc') else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.roc, kw_plotter={"ds": ds}, **kwargs) # @safe_return(None) def pr(self, ds, *args, ref_cctype=None, redo=False, **kwargs): self.__log.debug("PrecisionRecall") fn = "pr" if ref_cctype is None: ref_cctype = self.ref_cctype if self._todo(fn) or redo: sign = self.ref_cc.signature(ds, ref_cctype) results = self.cross_roc( sign, redo=True, save=False, plot=False, val_type='pr') else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.pr, kw_plotter={"ds": ds}, **kwargs) # @safe_return(None) def redundancy(self, *args, **kwargs): self.__log.debug("Redundancy") fn = "redundancy" if self._todo(fn): from chemicalchecker.util.remove_near_duplicates import RNDuplicates self.__log.debug("Removing near duplicates") rnd = RNDuplicates(cpu=self.cpu) mappings = rnd.remove(self.sign.data_path, save_dest=None, just_mappings=True) mappings = np.array(sorted(mappings.items())) mps = [] for i in range(0, mappings.shape[0]): mps += [(mappings[i, 0], mappings[i, 1])] mps = np.array(mps) n_full = mps.shape[0] n_ref = len(set(mps[:, 1])) red_counts = collections.defaultdict(int) for i in range(0, mps.shape[0]): red_counts[mps[i, 1]] += 1 red_counts = [(k, v) for k, v in sorted( red_counts.items(), key=lambda item: -item[1])] results = { "n_full": n_full, "n_ref": n_ref, "counts": red_counts } else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.redundancy, **kwargs) def _cluster(self, expected_coverage, n_neighbors, min_samples, top_clusters): self.__log.debug("Clustering") P = self._load_diagnosis_pickle("projection.pkl")["P"] min_samples = int( np.max([np.min([P.shape[0] * 0.01, min_samples]), 5])) self.__log.debug("Estimating epsilon") if n_neighbors is None: n_neighbors = [1, 2, 3, 5, 10] else: n_neighbors = [n_neighbors] def do_clustering(n_neigh): nn = NearestNeighbors(n_neighbors=n_neigh + 1, n_jobs=self.cpu) nn.fit(P) dists = nn.kneighbors(P)[0][:, n_neigh] # FIXME: bins='auto' is preferable, but might trigger memory # errors e.g. C3 sign1 h = np.histogram(dists, bins=100) y = np.cumsum(h[0]) / np.sum(h[0]) x = h[1][1:] eps = x[np.where(y > expected_coverage)[0][0]] self.__log.debug("Running DBSCAN") cl = DBSCAN(eps=eps, min_samples=min_samples, n_jobs=self.cpu).fit(P) labels = cl.labels_ n_clusters_ = len(set(labels)) - (1 if -1 in labels else 0) n_noise_ = list(labels).count(-1) lab_counts = collections.defaultdict(int) for lab in labels: lab_counts[lab] += 1 lab_counts = [(k, v) for k, v in sorted( lab_counts.items(), key=lambda item: -item[1])] return (labels, lab_counts, eps, n_clusters_, n_noise_) best_score = 0 best_n_neigh = None for n_neigh in n_neighbors: labels, lab_counts, eps, n_clusters_, n_noise_ = do_clustering( n_neigh) lc = list([x[1] for x in lab_counts[:top_clusters]]) if len(lc) < top_clusters: lc += [0] * (top_clusters - len(lc)) score = np.median(lc) if best_n_neigh is None: best_n_neigh = n_neigh best_score = score if best_score < score: best_n_neigh = n_neigh best_score = score labels, lab_counts, eps, n_clusters_, n_noise_ = do_clustering( best_n_neigh) results = { "P": P, "min_samples": min_samples, "labels": labels, "lab_counts": lab_counts, "epsilon": eps, "n_clusters": n_clusters_, "n_noise": n_noise_ } return results # @safe_return(None) def cluster_sizes(self, *args, expected_coverage=0.95, n_neighbors=None, min_samples=10, top_clusters=20, **kwargs): if self._todo("projection", inner=True): raise Exception("projection needs to be done first") self.__log.debug("Cluster sizes") fn = "clusters" if self._todo(fn): results = self._cluster( expected_coverage, n_neighbors, min_samples, top_clusters) else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.cluster_sizes, **kwargs) # @safe_return(None) def clusters_projection(self, *args, **kwargs): self.__log.debug("Projection clusters") if self._todo("projection", inner=True): raise Exception("projection needs to be done first") fn = "clusters" if self._todo(fn, inner=True): raise Exception("clusters needs to be done first") else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.clusters_projection, **kwargs) # @safe_return(None) def key_coverage(self, *args, datasets=None, exemplary=True, ref_cctype='sign1', molset='full', **kwargs): self.__log.debug("Key coverages") fn = "key_coverage" if ref_cctype is None: ref_cctype = self.ref_cctype if self._todo(fn): datasets = self._select_datasets(datasets, exemplary) results_shared = {} for ds in datasets: metadata = self.ref_cc.sign_metadata( 'keys', molset, ds, ref_cctype) if metadata is not None: sign_keys = metadata results_shared[ds] = sorted( list(set(self.keys) & set(sign_keys))) results_counts = {} for k in self.keys: results_counts[k] = 0 for ds, v in results_shared.items(): for k in v: results_counts[k] += 1 results = { "shared": results_shared, "counts": results_counts, "key_counts": np.array([results_counts[k] for k in self.keys]) } else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.key_coverage, kw_plotter={ "exemplary": exemplary }, **kwargs) # @safe_return(None) def key_coverage_projection(self, *args, n_bins=20, **kwargs): if self._todo("key_coverage", inner=True): raise Exception("key_coverage needs to be run first") if self._todo("projection", inner=True): raise Exception("projection needs to be done first") fn = "key_coverage_projection" if self._todo(fn): results_proj = self._load_diagnosis_pickle("projection.pkl") results_cov = self._load_diagnosis_pickle("key_coverage.pkl") results = self._projection_binned(results_proj["P"], np.array( results_cov["key_counts"]), n_bins=n_bins) else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.key_coverage_projection, **kwargs) # @safe_return(None) def orthogonality(self, *args, max_features=1000, **kwargs): fn = "orthogonality" if self._todo(fn): V, keys = self._get_signatures(max_features=max_features, **kwargs) V = normalize(V, norm="l2", axis=0) dots = [] for i in range(0, V.shape[1] - 1): for j in range(i + 1, V.shape[1]): dots += [np.dot(V[:, i], V[:, j].T)] results = { "dots": np.array(dots) } else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.orthogonality, **kwargs) # @safe_return(None)
[docs] def outliers(self, *args, n_estimators=1000, **kwargs): """Computes anomaly score of the input samples. The lower, the more abnormal. Negative scores represent outliers, positive scores represent inliers. """ fn = "outliers" if self._todo(fn): max_features = int(np.sqrt(self.V.shape[1]) + 1) max_samples = min(1000, int(self.V.shape[0] / 2 + 1)) mod = IsolationForest(n_estimators=n_estimators, contamination=0.1, max_samples=max_samples, max_features=max_features, n_jobs=self.cpu) pred = mod.fit_predict(self.V) scores = mod.decision_function(self.V) results = { "scores": scores, "pred": pred } else: results = None return self._returner( results=results, fn=fn, plotter_function=self.plotter.outliers, **kwargs)
def available(self): return self.plotter.available() def canvas_small(self, title=None, skip_plots=[]): shared_kw = dict(save=True, plot=False) if "cosine_distances" not in skip_plots: self.cosine_distances(**shared_kw) if "euclidean_distances" not in skip_plots: self.euclidean_distances(**shared_kw) if "projection" not in skip_plots: self.projection(**shared_kw) if "image" not in skip_plots: self.image(**shared_kw) if "features_bins" not in skip_plots: self.features_bins(**shared_kw) if "keys_bins" not in skip_plots: self.keys_bins(**shared_kw) if "values" not in skip_plots: self.values(**shared_kw) if "redundancy" not in skip_plots: self.redundancy(**shared_kw) fig = self.plotter.canvas( size="small", title=title, skip_plots=skip_plots) return fig def canvas_medium(self, title=None, skip_plots=[]): shared_kw = dict(save=True, plot=False) if "cosine_distances" not in skip_plots: self.cosine_distances(**shared_kw) if "euclidean_distances" not in skip_plots: self.euclidean_distances(**shared_kw) if "projection" not in skip_plots: self.projection(**shared_kw) if "image" not in skip_plots: self.image(**shared_kw) if "features_bins" not in skip_plots: self.features_bins(**shared_kw) if "keys_bins" not in skip_plots: self.keys_bins(**shared_kw) if "values" not in skip_plots: self.values(**shared_kw) if "redundancy" not in skip_plots: self.redundancy(**shared_kw) if "intensities" and "intensities_projection" not in skip_plots: self.intensities(**shared_kw) if "intensities_projection" not in skip_plots: self.intensities_projection(**shared_kw) if self.sign.cctype == 'sign3': if "confidences" not in skip_plots: self.confidences(**shared_kw) if "confidences_projection" not in skip_plots: self.confidences_projection(**shared_kw) if "cluster_sizes" not in skip_plots: self.cluster_sizes(**shared_kw) if "clusters_projection" not in skip_plots: self.clusters_projection(**shared_kw) if "outliers" not in skip_plots: self.outliers(**shared_kw) # these plots requires CC wide metadata if "dimensions" not in skip_plots: self.dimensions(**shared_kw) if "key_coverage" not in skip_plots: self.key_coverage(**shared_kw) if "key_coverage_projection" not in skip_plots: self.key_coverage_projection(**shared_kw) if "across_coverage" not in skip_plots: self.across_coverage(**shared_kw) # these plots requires CC wide signatures if "atc_roc" not in skip_plots: self.atc_roc(**shared_kw) if "moa_roc" not in skip_plots: self.moa_roc(**shared_kw) if "across_roc" not in skip_plots: self.across_roc(**shared_kw) if self.global_ranks_agreement(**shared_kw) is None: self.__log.debug( "Skipping plots Global Ranks Agreement and Global Ranks Agreement Projection") skip_plots.extend("global_ranks_agreement", "global_ranks_agreement_projection") else: self.global_ranks_agreement_projection(**shared_kw) fig = self.plotter.canvas( size="medium", title=title, skip_plots=skip_plots) return fig def custom_comparative_vertical(self, title=None, skip_plots=[]): shared_kw = dict(save=True, plot=False) self.projection(**shared_kw) self.intensities(**shared_kw) if self.sign.cctype == 'sign3': self.confidences(**shared_kw) self.clusters_projection(**shared_kw) self.atc_roc(**shared_kw) self.moa_roc(**shared_kw) self.across_roc(**shared_kw) fig = self.plotter.canvas( size="compare_v", title=title, skip_plots=skip_plots) return fig def canvas_large(self, title=None, skip_plots=[]): pass def canvas(self, size="medium", title=None, savefig=False, dest_dir=None, savefig_kwargs={'facecolor': 'white'}, skip_plots=[]): self.__log.debug("Computing or retrieving data for canvas %s." % size) self.__log.debug("Skipping the following plots: %s" % skip_plots) if size == "small": fig = self.canvas_small(title=title, skip_plots=skip_plots) elif size == "medium": fig = self.canvas_medium(title=title, skip_plots=skip_plots) elif size == "large": fig = self.canvas_large(title=title, skip_plots=skip_plots) elif size == "compare_v": fig = self.custom_comparative_vertical( title=title, skip_plots=skip_plots) else: return None if savefig: fn = "_".join([self.sign.dataset, self.sign.cctype, self.name, size]) + '.png' if dest_dir is None: fn_dest = os.path.join(self.path, fn) else: fn_dest = os.path.join(dest_dir, fn) self.__log.debug("Saving plot to: %s" % fn_dest) fig.savefig(fn_dest, **savefig_kwargs)