Source code for chemicalchecker.core.molkit

"""Molecule representation.

A simple class to inspect small molecules, easily convert between different
identifier, and ultimately find if signatures available.
"""
import os
import pickle
import tempfile
import numpy as np
import collections
import pandas as pd
import networkx as nx
from tqdm import tqdm
from functools import partial
from collections import defaultdict

from chemicalchecker.util.hpc import HPC
from .signature_data import DataSignature
from chemicalchecker.util.psql import psql
from chemicalchecker.util import logged, Config
from chemicalchecker.util.keytype import KeyTypeDetector
from chemicalchecker.util.parser.converter import Converter


[docs]@logged class Molset(object): """Molset class. Given a CC instance, provides access to features of a input set of molecules for one or more dataset of interest. The data is organized in a DataFrame which will be annotated with observed and/or predicted features by simple kNN inference. """ def __init__(self, cc, molecules, mol_type=None, add_image=True, generic_scaffold=False, molecules_ids=None): """Initialize a Mol instance. Args: cc (Chemicalchecker): Chemical Checker instance. molecules (list): A list of molecules in homogenous format 'mol_type'. This can also be a DataFrame but we expect it to be generated by this same class, so complete and coherent in terms of identifiers. mol_type (str): Type of identifier options are 'inchi', 'inchikey', 'smiles' or 'name'. if 'name' is used we query externally (cactus.nci and pubchem) and some molecules might be missing. If 'inchikey' is used, we first query our local db then external resource (bonus what we get is added to our local db). We will complete the DataFrame adding other identifiers if not present yet. mol_col (str): The name of the columns in the DataFrame. add_image (bool): If True a molecule image is added """ self.cc = cc self.conv = Converter() if isinstance(molecules, pd.DataFrame): self.df = molecules.copy() all_inks = self.df[self.df['InChIKey'] != '']['InChIKey'].tolist() self.inchikeys = sorted(list(set(all_inks))) else: if len(molecules) != len(set(molecules)): self.__log.warning('molecules provided are not unique' ', IDs might be off') if mol_type is None: mol_type = KeyTypeDetector.type(molecules[0]) if mol_type is None: self.__log.warning( "Molecule '%s' not recognized as either" " 'inchikey', 'inchi' or 'smiles'. " "Considering type as 'name'." % molecules[0]) mol_type = 'name' if molecules_ids is None: molecules_ids = list(range(len(molecules))) molecules = [x.strip() for x in molecules] self.__log.info('%s unique molecules provided' % len(molecules)) self.df = pd.DataFrame() if mol_type.lower() == 'inchi': mol_col = 'InChI' elif mol_type.lower() == 'smiles': mol_col = 'SMILES' elif mol_type.lower() == 'name': mol_col = 'Name' elif mol_type.lower() == 'inchikey': mol_col = 'InChIKey' else: raise Exception("Molecule type '%' not supported" % mol_type) self.df[mol_col] = molecules self.df['ID'] = molecules_ids if mol_type.lower() == 'name': name_inchi = self.get_name_inchi_map(molecules) self.df['InChI'] = self.df[mol_col].map(name_inchi) self._add_inchikeys() self._add_smiles() self.df = self.df[['Name', 'InChIKey', 'InChI', 'SMILES', 'ID']] elif mol_type.lower() == 'inchikey': ink_inchi = self.get_inchikey_inchi_map(molecules) self.df['InChI'] = self.df[mol_col].map(ink_inchi) self._add_smiles() self.df = self.df[['InChIKey', 'InChI', 'SMILES', 'ID']] elif mol_type.lower() == 'inchi': self._add_inchikeys() self._add_smiles() self.df = self.df[['InChIKey', 'InChI', 'SMILES', 'ID']] elif mol_type.lower() == 'smiles': self._add_inchi(mol_col) self._add_inchikeys() self.df = self.df[['InChIKey', 'InChI', 'SMILES', 'ID']] self._add_scaffold(generic=generic_scaffold) self.inchikeys = self.df[self.df['InChIKey'] != '']['InChIKey'].tolist() self.df = self.df.sort_values('InChIKey') self.df.dropna(inplace=True) if len(self.df) == 0: raise Exception('No valid molcules passed.') self.df.reset_index(inplace=True, drop=True) if add_image: Molset.add_image(self.df)
[docs] def save(self, destination, overwrite=False): """Save a Molset instance. Args: destination (str): The destination path overwrite (bool): Whether to overwrite in case the file exists. """ self.__log.debug("saving to {}".format(destination)) persist = (self.cc.cc_root, self.df) if os.path.isfile(destination): if overwrite: pickle.dump(persist, open(destination, 'wb')) else: self.__log.warning('file %s already present!' % destination) else: pickle.dump(persist, open(destination, 'wb'))
[docs] def __getitem__(self, keys): """Forward accessing Dataframe columns. This allows accessing DataFrame column and keeping the molucule structure visualization. Args: keys(list): a list of column of the Dataframe. """ from rdkit.Chem import PandasTools df = self.df[keys] PandasTools.ChangeMoleculeRendering(df) return df
[docs] @classmethod def load(cls, filename, cc=None, cc_root=None, add_image=False): """Load a Molset instance. Args: filename (str): The path of the file to load. cc (ChemicalChecker): An already initialized CC instance. cc_root (str): Path to the root of a Chemical Checker. If None we try using the saved path. If that is not reachable we use the default (default is cc_config dependent) CC. add_image (bool): If True a molecule image is added. """ from .chemcheck import ChemicalChecker cls.__log.debug("loading from {}".format(filename)) cc_root_pkl, df = pickle.load(open(filename, 'rb')) if cc != None: if cc.cc_root != cc_root_pkl: cls.__log.warning( ('The CC instance used for generating the Molset' ' (%s) is different from the currently used (%s)' % (cc_root_pkl, cc.cc_root))) return cls(cc, df, add_image=add_image) if cc_root == None: try: cc_root = cc_root_pkl cc = ChemicalChecker(cc_root) except Exception: cls.__log.warning( ('The CC instance used for generating the Molset cannot be' ' reached. The default CC will be used')) cc = ChemicalChecker() else: try: cc = ChemicalChecker(cc_root) except Exception: cls.__log.warning( ('The `cc_root` provided does not point to a valid CC ' 'instance. The default CC will be used')) cc = ChemicalChecker() if cc.cc_root != cc_root: cls.__log.warning( ('The CC instance used for generating the Molset' ' (%s) is different from the currently used (%s)' % (cc_root, cc.cc_root))) return cls(cc, df, add_image=add_image)
@staticmethod def add_image(df): from rdkit.Chem import PandasTools df['SMILES'].fillna('', inplace=True) PandasTools.AddMoleculeColumnToFrame( df, smilesCol='SMILES', molCol='Structure') @classmethod def combine(cls, cc, molset1, molset2, add_image=False, generic_scaffold=False): # we merge the dataframes on inchi mol_cols = ['SMILES', 'Scaffold', 'Structure'] cols1 = [i for i in molset1.df.columns if i not in mol_cols] cols2 = [i for i in molset2.df.columns if i not in mol_cols] df = pd.merge(molset1.df[cols1], molset2.df[cols2], on=['InChIKey', 'InChI'], how='outer') # add the inchikeys and smiles mix = cls(cc, df, add_image=False) mix._add_smiles() mix._add_inchikeys() mix._add_scaffold(generic=generic_scaffold) if add_image: Molset.add_image(mix.df) df = mix.df # take care of other annotation columns all_mol_cols = mol_cols + ['InChI'] anno_cols1 = [i for i in molset1.df.columns if i not in all_mol_cols] anno_cols2 = [i for i in molset2.df.columns if i not in all_mol_cols] # handle disjoint annotation columns disjoint = set(anno_cols1) ^ set(anno_cols2) for col in disjoint: if col == 'Structure': continue #print('disjoint:', col) # if columns does not contain list we make a single element list if not isinstance(df[col].dropna().iloc[0], list): df[col] = df[col].apply(lambda x: x if pd.isnull(x) else [x]) # we replace NaNs with empty lists df[col] = df[col].apply(lambda x: x if isinstance(x, list) else []) # combine shared annotation columns as a list shared = (set(anno_cols1) & set(anno_cols2)) - {'InChIKey'} for col in shared: if col == 'Structure': continue #print('shared:', col) colx = col + '_x' coly = col + '_y' if not isinstance(df[colx].dropna().iloc[0], list): df[colx] = df[colx].apply(lambda x: x if pd.isnull(x) else [x]) df[colx] = df[colx].apply( lambda x: x if isinstance(x, list) else []) if not isinstance(df[coly].dropna().iloc[0], list): df[coly] = df[coly].apply(lambda x: x if pd.isnull(x) else [x]) df[coly] = df[coly].apply( lambda x: x if isinstance(x, list) else []) df[col] = df[colx] + df[coly] df[col] = df[col].apply(set) df[col] = df[col].apply(list) #df[col] = df[col].apply(lambda x: [e for e in x if not np.isnan(e)]) if all(df[col].apply(len) == 1): df[col] = df[col].apply(lambda x: x[0]) mols_col = ['InChIKey', 'InChI', 'SMILES', 'Scaffold'] if add_image: mols_col.append('Structure') mix.df = df[mols_col + sorted(list(shared)) + sorted(list(disjoint))] mix.df = mix.df.sort_values('InChIKey') mix.df.reset_index(inplace=True, drop=True) return mix def get_name_inchi_map(self, names): cpd_inchi = dict() for cpd in tqdm(names, desc='getting Name-InChI map'): try: inchi = self.conv.chemical_name_to_inchi(cpd) cpd_inchi[cpd] = inchi except Exception as ex: self.__log.warning(str(ex)) return cpd_inchi def get_inchikey_inchi_map(self, inks): ink_inchi = dict() for ink in tqdm(inks, desc='getting InChIKey-InChI map'): try: inchi = self.conv.inchikey_to_inchi(ink) except Exception: self.__log.warning('%s has no InChI' % ink) continue ink_inchi[ink] = inchi return ink_inchi def _safe_func(self, fn, arg, default=np.nan): try: return fn(arg) except: fn_name = 'None' if hasattr(fn, '__name__'): fn_name = fn.__name__ self.__log.warning(f'Problems calling `{fn_name}` for mol {arg}') return default def _add_inchikeys(self): if 'InChIKey' not in self.df.columns: fn = partial(self._safe_func, self.conv.inchi_to_inchikey) self.df['InChIKey'] = self.df['InChI'].dropna().apply(fn) def _add_smiles(self): if 'SMILES' not in self.df.columns: fn = partial(self._safe_func, self.conv.inchi_to_smiles) self.df['SMILES'] = self.df['InChI'].dropna().apply(fn) def _add_inchi(self, mol_col): if 'InChI' not in self.df.columns: fn = partial(self._safe_func, lambda x: self.conv.smiles_to_inchi(x)[1]) self.df['InChI'] = self.df[mol_col].dropna().apply(fn) def _add_scaffold(self, generic=False): """Bemis-Murcko scaffold""" if 'Scaffold' not in self.df.columns: conv_fn = partial(self.conv.smiles_to_scaffold, generic=generic) fn = partial(self._safe_func, conv_fn) self.df['Scaffold'] = self.df['SMILES'].dropna().apply(fn)
[docs] def annotate(self, dataset_code, shorten_dscode=True, include_features=False, feature_map=None, include_values=False, features_from_raw=False, filter_values=True, include_sign0=False): """Annotate the DataFrame with features fetched CC spaces. The minimal annotation if whether the molecule is present or not in the dataset of interest. The features, values and predictions are optionally available. Features and values are fetched from the raw preprocess file of a given space (before dropping any data). The prediction is based on NN at the sign4 level and depends on several parameters: 1) the applicability threshold for considering the sign4 as reliable, 2) the p-value threshold (on sign4 distance) to define the neighbors 3) the number of NN to consider. Args: dataset_code (str): the CC dataset code: e.g. B4.001. include_features (bool): Include features of the molecules from their raw preprocess sign0. feature_map (dict): A dictionary that will be used to map features to a human interpretable format. include_values (bool): if True an additional column is added with a list of (feature, value) pairs. include_prediction (bool): include NN derived predictions. mapping_fn (dict): A dictionary performing the mapping between features ids and their meaning. shorten_dscode (bool): If True get rid of the .001 part of the dataset code features_from_raw (bool): If True the features are fetched from the raw sign0 which includes all features for all molecules. If False sign0 features are used, so after the Sanitizer step which possibly removes features or molecules. filter_values (bool): If True values == 0 are filtered. False is recomended when the dataset is continuous data. include_sign0 (bool): If True the full sign0 for each molecule is included in the dataframe. """ # get raw preprocess data s0 = self.cc.signature(dataset_code, 'sign0') s0_raw = DataSignature(os.path.join( s0.signature_path, 'raw', 'preprocess.h5')) incc = np.isin(self.inchikeys, s0_raw.keys) mol_ds = sum(incc) tot_mol = len(self.inchikeys) self.__log.info( f'{mol_ds}/{tot_mol} molecules found in {dataset_code}') dscode = dataset_code if shorten_dscode: dscode = dataset_code[:2] # get presence in space self.df[dscode] = 0 available_inks = np.array(self.inchikeys)[incc] idx_df = self.df[self.df['InChIKey'].isin(available_inks)].index self.df.at[idx_df, dscode] = 1 sorted_inks, values = None, None # get the features if requested if include_features: if features_from_raw: sorted_inks, values = s0_raw.get_vectors(available_inks, dataset_name='X') feat_ref = s0_raw else: sorted_inks, values = s0.get_vectors(available_inks) feat_ref = s0 if values is None or len(values) == 0: self.__log.warning(f'No features found for {dataset_code}!') else: ink_feat = defaultdict(list) for ink, val in zip(sorted_inks, values): feat_idxs = np.argwhere(val).flatten() ink_feat[ink] = feat_ref.features[feat_idxs].tolist() feat_name = dscode + '_features' self.df[feat_name] = self.df['InChIKey'].map(ink_feat) if feature_map is not None: self.df[feat_name] = self.df[feat_name].apply( lambda x: [feature_map[i] for i in x if isinstance( x, list)]).tolist() self.df[feat_name] = self.df[feat_name].apply( lambda x: [i for i in x if i is not None]).tolist() # add the corresponding feature value if required if include_values: if sorted_inks is None: if features_from_raw: sorted_inks, values = s0_raw.get_vectors(available_inks, dataset_name='X') feat_ref = s0_raw else: sorted_inks, values = s0.get_vectors(available_inks) feat_ref = s0 ink_val = defaultdict(list) for ink, val in zip(sorted_inks, values): feat_idxs = np.arange(len(val)) if filter_values: feat_idxs = np.argwhere(val != 0).flatten() ink_val[ink] = list(zip( feat_ref.features[feat_idxs].tolist(), val[feat_idxs])) val_name = dscode + '_values' self.df[val_name] = self.df['InChIKey'].map(ink_val) if feature_map is not None: self.df[val_name] = self.df[val_name].apply( lambda x: [(feature_map[k], v) for k, v in x if isinstance(x, list)]).tolist() # add sign0 if include_sign0: sorted_inks, values = s0.get_vectors(available_inks) feat_ref = s0 s0_name = dscode + '_sign0' s0_dict = dict(zip(sorted_inks, values)) self.df[s0_name] = self.df['InChIKey'].map(s0_dict)
[docs] def predict(self, dataset_code, shorten_dscode=True, applicability_thr_query=0, applicability_thr_nn=0, max_nr_nn=1000, pvalue_thr_nn=1e-4, limit_top_nn=1000, return_stats=False, return_sign0=False, blacklist=[], aggregation_thr=0.0, return_probas=False): """Annotate the DataFrame with predicted features based on neighbors. In this case we can potentially get annotation for every molecules. The molecules are first signaturized (sign4) filtered by applicability and then searched against molecules within the CC sign4. Nearest neighbors are selected for each molecule based on user specified parameters. Only NN that are found in the sign0 of the space are preserved. The annotations of these NN are aggregated (multiple strategies are possible) and finally assigned to the molecule. Args: dataset_code (str): the CC dataset code: e.g. B4.001. shorten_dscode (bool): If True get rid of the .001 part of the dataset code applicability_thr_query (float): Only query with a sign4 applicability above this threshold will be searched for neighbors. applicability_thr_nn (float): Only neighbors with a sign4 applicability above this threshold will be used for features inference. max_nr_nn (int): Maximum number of neighbors to possibly consider. pvalue_thr (float): Filter neighbors based on distance. limit_top_nn (int): Only keep topN neighbors for feature predictions. return_stats (bool): if True return a dataframe with statistics on the NN search filtering steps. return_sign0 (bool): if True return sign0 format prediction (only for binary spaces). blacklist (list): List of inchikeys of molecules to disregard during neighbors search. Used for comparison to other prediction approach where these molecules are the test set. return_probas (bool): if True the probability for each predicted class are also returned. """ dscode = dataset_code if shorten_dscode: dscode = dataset_code[:2] # signaturize get sign4 s4 = self.cc.signature(dataset_code, 'sign4') s4_app_keys = s4.keys s4_app = s4.get_h5_dataset('applicability').ravel() tmp_pred_file = tempfile.NamedTemporaryFile().name query_inks = self.df['InChIKey'].tolist() query_inchies = self.df['InChI'].tolist() pred = s4.predict_from_string(query_inchies, tmp_pred_file, keytype='InChI', keys=query_inks) pred_sign = pred[:] pred_keys = pred.keys pred_appl = pred.get_h5_dataset('applicability').ravel() os.remove(tmp_pred_file) # filter queries by applicability appl_mask = pred_appl > applicability_thr_query self.__log.info( 'Filtering %i queries for applicability thr' % sum(~appl_mask)) query_inks = pred_keys[appl_mask] pred_sign = pred_sign[appl_mask] # faiss sign4 NN search neig4 = self.cc.get_signature('neig4', 'reference', dataset_code) nn = neig4.get_kth_nearest(pred_sign, k=max_nr_nn) self.__log.info("nn distances shape %s" % (str(nn['distances'].shape))) # identify distance threshold based on requested pvalue metric = neig4.get_h5_dataset('metric')[0] # this always returns the distances based on the non-redundant set bg_dist = s4.background_distances( metric, limit_inks=s0.keys, name='s0') dst_thr_idx = np.argwhere(bg_dist['pvalue'] == pvalue_thr_nn).ravel() dst_thr = bg_dist['distance'][dst_thr_idx][0] self.__log.info("bg_distance '%s' thr %.4f" % (metric, dst_thr)) # precomputing dictionary with the list of significant neighbors for # each query speeds up things query_nn_dict = defaultdict(list) for query_idx, hit_idx in np.argwhere(nn['distances'] < dst_thr): query_ink = query_inks[query_idx] query_nn_dict[query_ink].append(nn['keys'][query_idx, hit_idx]) # analyze each query individually s4_ref = self.cc.get_signature('sign4', 'reference', dataset_code) s0 = self.cc.signature(dataset_code, 'sign0') query_nn_dict_map = defaultdict(list) stats = list() # for rid in tqdm(range(len(query_inks)), desc='get NN'): for query_ink, nn_inks in tqdm(query_nn_dict.items(), desc='get NN'): # filter by distance, now precomputed """ query_ink = query_inks[rid] nn_inks = nn['keys'][rid] nn_dists = nn['distances'][rid] len_orig = len(nn_inks) nn_inks = nn_inks[nn_dists < dst_thr] """ len_dist = len(nn_inks) # expand neighbors list to include redundant molecules # this is important because things that are redundant in sign4 # might not be redundant in sign0 # FIXME: how to vectorize this kind of operation? nn_inks_mapped_idx = np.isin(s4_ref.mappings[:, 1], nn_inks) nn_inks = s4_ref.mappings[:, 0][nn_inks_mapped_idx] len_mapp = len(nn_inks) # remove the query itself and filter blacklisted neighbors nn_inks = nn_inks[~np.isin(nn_inks, np.hstack([blacklist, query_ink]))] len_blacklist = len(nn_inks) # limit to neighbors with good applicability nn_apps = s4_app[np.isin(s4_app_keys, nn_inks)] nn_inks = nn_inks[nn_apps > applicability_thr_nn] len_app = len(nn_inks) # limit to neighbors with sign0 s0_mask = np.isin(nn_inks, s0.keys) nn_inks = nn_inks[s0_mask] len_s0 = len(nn_inks) # limit to top N neighbors nn_inks = nn_inks[:limit_top_nn] query_nn_dict_map[query_ink] = nn_inks.tolist() len_topn = len(nn_inks) stats.append({'query_ink': query_ink, 'distance': len_dist, 'blacklist': len_blacklist, 'applicability': len_app, 'in_sign0': len_s0, 'limit_topN': len_topn}) stats_df = pd.DataFrame(stats) self.df['%s_NN' % dscode] = self.df['InChIKey'].map(query_nn_dict_map) # generate table with sign0 annotation for each neighbor found # we can recursively use a molset for this purpose! all_nn_inks = set.union(*[set(n) for n in query_nn_dict_map.values()]) all_nn_inks = sorted(list(all_nn_inks)) conv = Converter() inchies = list() for ink in tqdm(all_nn_inks, desc='get NN InChI'): try: inchi = conv.inchikey_to_inchi(ink) except Exception: self.__log.warning('%s has no InChI' % ink) continue inchies.append(inchi) nn_molset = Molset(self.cc, inchies, mol_type='inchi') nn_molset.annotate(dataset_code, include_features=True, shorten_dscode=shorten_dscode, features_from_raw=False, include_sign0=True) # aggregate NN features probas = defaultdict(list) preds = defaultdict(list) for ink, nn_inks in tqdm(query_nn_dict_map.items(), desc='preds'): if len(nn_inks) == 0: continue neighs = nn_molset.df[nn_molset.df['InChIKey'].isin(nn_inks)] if len(neighs) == 0: continue nn_s0 = np.vstack(neighs['%s_sign0' % dscode].values) # binarize (e.g. B4 is multiclass) nn_s0 = (nn_s0 > 0).astype(int) if np.sum(nn_s0) > 0: s0_probas = np.mean(nn_s0, axis=0) probas[ink] = s0_probas preds[ink] = (s0_probas > aggregation_thr).astype(int) self.df['%s_probas_sign0' % dscode] = self.df['InChIKey'].map(probas) self.df['%s_predicted_sign0' % dscode] = self.df['InChIKey'].map(preds) # get readable list of features ink_feat = defaultdict(list) for idx, row in self.df.iterrows(): ink = row['InChIKey'] val = row['%s_predicted_sign0' % dscode] feat_idxs = np.argwhere(val).flatten() ink_feat[ink] = s0.features[feat_idxs].tolist() feat_name = dscode + '_predicted_features' self.df[feat_name] = self.df['InChIKey'].map(ink_feat) if not return_sign0: del self.df['%s_predicted_sign0' % dscode] if not return_probas: del self.df['%s_probas_sign0' % dscode] if return_stats: return stats_df, nn_molset else: return nn_molset
[docs] @staticmethod def get_uniprot_annotation(entries): """Get information on Uniprot entries.""" from bioservices import UniProt uniprot = UniProt() df = list() for up in tqdm(entries): try: df.append(uniprot.get_df(f'accession:{up}')) except Exception: print(up, 'NOT FOUND!') df = pd.concat(df) df = df.reset_index(drop=True) return df
[docs] @staticmethod def get_chembl_protein_classes(chembldb='chembl_27', prefix_level=False): """Get protein class id to name dictionary.""" R = psql.qstring( "SELECT protein_class_id, parent_id, pref_name " "FROM protein_classification", chembldb) class_names = defaultdict(lambda: None) if prefix_level: edgelist = [(n[0], n[1]) for n in R[1:]] nodes = [n[0] for n in R] G = nx.Graph(edgelist) level = {n: len(nx.shortest_path(G, n, 0))-1 for n in nodes} class_names.update( {'Class:%i' % r[0]: 'L%s:%s' % (level[r[0]], r[2]) for r in R}) else: class_names.update({'Class:%i' % r[0]: r[2] for r in R}) return class_names
[docs] @staticmethod def get_chembl_hierarchy(chembldb='chembl_27'): """Get protein class and proteins in tree format.""" pc_query = ("SELECT protein_class_id, parent_id, pref_name" "FROM protein_classification") R = psql.qstring(pc_query, chembldb) class_names = defaultdict(lambda: None) edges = [(0, 1)] + [(p, c) for c, p, _ in R[1:]] nodes = [(n[0], {'id': 'Class:%s' % n[0], 'name':n[2]}) for n in R] G = nx.DiGraph() G.update(nodes=nodes, edges=edges) trg_query = ( "SELECT c.accession, pc.protein_class_id " "FROM component_sequences c " "JOIN component_class cc ON c.component_id = cc.component_id " "JOIN protein_classification pc " "ON cc.protein_class_id = pc.protein_class_id;") R = psql.qstring(trg_query, chembldb) trg_label = set(ac for ac, pc in R) id_offset = max([n[0] for n in nodes])+1 tid_label = {i+id_offset: {'id': t} for i, t in enumerate(sorted([x for x in trg_label if x]))} label_tid = {t: i+id_offset for i, t in enumerate(sorted([x for x in trg_label if x]))} trg_parent = [(pc, label_tid[ac]) for ac, pc in R if ac in label_tid] G.update(nodes=tuple(tid_label.items()), edges=trg_parent) return G
[docs] @staticmethod def get_chebi(chebi_obo): """Get CHEBI id to name dictionary.""" chebi_dict = defaultdict(str) f = open(chebi_obo, "r") terms = f.read().split("[Term]\n") for term in terms[1:]: term = term.split("\n") chebi_id = term[0].split("id: ")[1] chebi_name = term[1].split("name: ")[1] chebi_dict[chebi_id] = chebi_name f.close() return chebi_dict
[docs] def signaturize(self, datasets='exemplary'): """Annotate the DataFrame with predicted sign4 signatures. Args: datasets (list): the CC datasets code: e.g. ['B4.001']. Use 'exemplary' to get signatures for all exemplary CC spaces. """ query_inks = self.df['InChIKey'].tolist() query_inchies = self.df['InChI'].tolist() tmp_pred_file = './tmp.h5' if datasets == 'exemplary': datasets = list(self.cc.datasets_exemplary()) self.__log.debug("signaturizing datasets: {}".format(str(datasets))) for ds in tqdm(datasets, disable=len(datasets) == 1): s4 = self.cc.signature(ds, 'sign4') pred = s4.predict_from_string(query_inchies, tmp_pred_file, keytype='InChI', keys=query_inks) pred_sign = pred[:] pred_appl = pred.get_h5_dataset('applicability').ravel() self.df['%s_sign' % ds] = [r for r in pred_sign] self.df['%s_appl' % ds] = pred_appl os.remove(tmp_pred_file)
[docs] def project(self, projector, projector_name=None, datasets='exemplary'): """Annotate the DataFrame with projection of the signatures. Args: projector (object): Any preinitialized projector exposing the 'fit_transform' function. projector_name (str): The name of the projector, used to define the new DataFrame columns. datasets (list): the CC datasets code: e.g. ['B4.001']. Use 'exemplary' to get signatures for all exemplary CC spaces. """ if datasets == 'exemplary': datasets = list(self.cc.datasets_exemplary()) if projector_name is None: projector_name = type(projector).__name__ self.__log.debug("projecting datasets {} with {}".format( str(datasets), projector_name)) for ds in tqdm(datasets, disable=len(datasets) == 1): if '%s_sign' % ds not in self.df.columns: self.signaturize(datasets=[ds]) sign = np.vstack(self.df['%s_sign' % ds].values) proj = projector.fit_transform(sign) x_name = '%s_%s1' % (ds, projector_name) self.df = pd.concat( [self.df, pd.Series(proj[:, 0], name=x_name)], axis=1) y_name = '%s_%s2' % (ds, projector_name) self.df = pd.concat( [self.df, pd.Series(proj[:, 1], name=y_name)], axis=1)
[docs] def func_hpc(self, func_name, func_args, func_kwargs, hpc_kwargs, data_path=None): """Execute the *any* object method on the configured HPC. Args: func_name(str): the name of the function. func_args(tuple): the arguments for of the function to be called. func_kwargs(tuple): the keyworded arguments for of the function to be called. hpc_kwargs(dict): arguments for the HPC class. data_path(str): the path to the Molset object, if None a copy of the data in the job folder will be made. """ # qualified name qual_name = '_'.join([self.__class__.__name__, func_name]) # get cc config cc_config = hpc_kwargs.get("cc_config", os.environ['CC_CONFIG']) self.__log.debug("cc_config used: {}".format(cc_config)) cfg = Config(cc_config) # create job directory job_base_path = cfg.PATH.CC_TMP job_name = "_".join(["CC", qual_name]) tmp_dir = tempfile.mktemp(prefix=job_name, dir=job_base_path) job_path = hpc_kwargs.get("job_path", tmp_dir) if not os.path.isdir(job_path): os.mkdir(job_path) # pickle function args and kwargs pkl_args_fn = '%s_args.pkl' % qual_name pkl_args_path = os.path.join(job_path, pkl_args_fn) pickle.dump((func_args, func_kwargs), open(pkl_args_path, 'wb')) # pickle the data if data_path is None: pkl_data_fn = '%s_data.pkl' % qual_name data_path = os.path.join(job_path, pkl_data_fn) self.save(data_path) self.__log.info('data will be saved to: %s' % data_path) # hpc parameters hpc = dict() hpc["cpu"] = hpc_kwargs.get("cpu", 1) hpc["wait"] = hpc_kwargs.get("wait", False) hpc["jobdir"] = hpc_kwargs.get("job_path", job_path) hpc["num_jobs"] = hpc_kwargs.get("num_jobs", 1) hpc["job_name"] = hpc_kwargs.get("job_name", job_name) hpc["verbosity"] = hpc_kwargs.get("verbosity", "DEBUG") # create script file script_lines = [ "import os, sys, pickle", "from chemicalchecker import ChemicalChecker", "from chemicalchecker.core import Molset", "ChemicalChecker.set_verbosity('%s')" % hpc["verbosity"], "cc = ChemicalChecker('%s')" % self.cc.cc_root, "molset = Molset.load(cc, sys.argv[1])", "args, kwargs = pickle.load(open(sys.argv[2], 'rb'))", "molset.%s(*args, **kwargs)" % func_name, "molset.save('%s', overwrite=True)" % data_path, "print('JOB DONE')" ] if hpc_kwargs.get("delete_job_path", False): script_lines.append("print('DELETING JOB PATH: %s')" % job_path) script_lines.append("os.system('rm -rf %s')" % job_path) # save the HPC script script_name = '%s_%s_hpc.py' % (self.__class__.__name__, func_name) script_name = hpc_kwargs.get("script_name", script_name) script_path = os.path.join(job_path, script_name) with open(script_path, 'w') as fh: for line in script_lines: fh.write(line + '\n') # job command singularity_image = cfg.PATH.SINGULARITY_IMAGE command = ("SINGULARITYENV_PYTHONPATH={} SINGULARITYENV_CC_CONFIG={}" " singularity exec {} python {} {} {}") command = command.format( os.path.join(cfg.PATH.CC_REPO, 'package'), cc_config, singularity_image, script_name, data_path, pkl_args_path) # set environment variable that avoid cpu over-subscription env_vars = [ 'OMP_NUM_THREADS', 'OPENBLAS_NUM_THREADS', 'MKL_NUM_THREADS', 'VECLIB_MAXIMUM_THREADS', 'NUMEXPR_NUM_THREADS' ] command = ' '.join(["%s=%s" % (v, str(hpc["cpu"])) for v in env_vars] + [command]) # submit jobs hpc_cfg = hpc_kwargs.get("hpc_cfg", cfg) cluster = HPC.from_config(hpc_cfg) cluster.submitMultiJob(command, **hpc) return cluster
[docs]@logged class Mol(object): """Mol class. Given a CC instance, provides access to signatures of a input molecule. """ def __init__(self, cc, mol_str, str_type=None): """Initialize a Mol instance. Args: cc: Chemical Checker instance. mol_str: Compound identifier (e.g. SMILES string) str_type: Type of identifier ('inchikey', 'inchi' and 'smiles' are accepted) if 'None' we do our best to guess. """ if str_type is None: str_type = KeyTypeDetector.type(mol_str) if str_type is None: raise Exception( "Molecule '%s' not recognized as valid format" " (options are: 'inchikey', 'inchi' and 'smiles')" % mol_str) conv = Converter() if str_type == "inchikey": self.inchikey = mol_str self.inchi = conv.inchikey_to_inchi(self.inchikey) if str_type == "inchi": self.inchi = mol_str self.inchikey = conv.inchi_to_inchikey(self.inchi) if str_type == "smiles": self.inchikey, self.inchi = conv.smiles_to_inchi(mol_str) self.smiles = conv.inchi_to_smiles(self.inchi) self.mol = conv.inchi_to_mol(self.inchi) self.cc = cc
[docs] def isin(self, cctype, dataset_code, molset="full"): """Check if the molecule is in the dataset of interest""" sign = self.cc.get_signature(cctype, molset, dataset_code) try: keys = set(sign.keys) except Exception: keys = set(sign.row_keys) return self.inchikey in keys
[docs] def signature(self, cctype, dataset_code, molset="full"): """Check if the molecule is in the dataset of interest""" sign = self.cc.get_signature(cctype, molset, dataset_code) return sign[self.inchikey]
[docs] def report_available(self, dataset_code="*", cctype="*", molset="full"): """Check in what datasets the molecule is present""" available = self.cc.report_available(molset, dataset_code, cctype) d0 = {} for molset, datasets in available.items(): d1 = collections.defaultdict(list) for dataset, cctypes in datasets.items(): for cctype in cctypes: if self.isin(cctype, dataset, molset): d1[dataset] += [cctype] if d1: d0[molset] = dict((k, v) for k, v in d1.items()) return d0