"""Molecule representation.
A simple class to inspect small molecules, easily convert between different
identifier, and ultimately find if signatures available.
"""
import os
import pickle
import tempfile
import numpy as np
import collections
import pandas as pd
import networkx as nx
from tqdm import tqdm
from functools import partial
from collections import defaultdict
from chemicalchecker.util.hpc import HPC
from .signature_data import DataSignature
from chemicalchecker.util.psql import psql
from chemicalchecker.util import logged, Config
from chemicalchecker.util.keytype import KeyTypeDetector
from chemicalchecker.util.parser.converter import Converter
[docs]@logged
class Molset(object):
"""Molset class.
Given a CC instance, provides access to features of a input set of
molecules for one or more dataset of interest.
The data is organized in a DataFrame which will be annotated with
observed and/or predicted features by simple kNN inference.
"""
def __init__(self, cc, molecules, mol_type=None, add_image=True,
generic_scaffold=False, molecules_ids=None):
"""Initialize a Mol instance.
Args:
cc (Chemicalchecker): Chemical Checker instance.
molecules (list): A list of molecules in homogenous format
'mol_type'. This can also be a DataFrame but we expect it to be
generated by this same class, so complete and coherent in terms
of identifiers.
mol_type (str): Type of identifier options are 'inchi', 'inchikey',
'smiles' or 'name'. if 'name' is used we query externally
(cactus.nci and pubchem) and some molecules might be missing.
If 'inchikey' is used, we first query our local db then external
resource (bonus what we get is added to our local db).
We will complete the DataFrame adding other identifiers if not
present yet.
mol_col (str): The name of the columns in the DataFrame.
add_image (bool): If True a molecule image is added
"""
self.cc = cc
self.conv = Converter()
if isinstance(molecules, pd.DataFrame):
self.df = molecules.copy()
all_inks = self.df[self.df['InChIKey'] != '']['InChIKey'].tolist()
self.inchikeys = sorted(list(set(all_inks)))
else:
if len(molecules) != len(set(molecules)):
self.__log.warning('molecules provided are not unique'
', IDs might be off')
if mol_type is None:
mol_type = KeyTypeDetector.type(molecules[0])
if mol_type is None:
self.__log.warning(
"Molecule '%s' not recognized as either"
" 'inchikey', 'inchi' or 'smiles'. "
"Considering type as 'name'." %
molecules[0])
mol_type = 'name'
if molecules_ids is None:
molecules_ids = list(range(len(molecules)))
molecules = [x.strip() for x in molecules]
self.__log.info('%s unique molecules provided' % len(molecules))
self.df = pd.DataFrame()
if mol_type.lower() == 'inchi':
mol_col = 'InChI'
elif mol_type.lower() == 'smiles':
mol_col = 'SMILES'
elif mol_type.lower() == 'name':
mol_col = 'Name'
elif mol_type.lower() == 'inchikey':
mol_col = 'InChIKey'
else:
raise Exception("Molecule type '%' not supported" % mol_type)
self.df[mol_col] = molecules
self.df['ID'] = molecules_ids
if mol_type.lower() == 'name':
name_inchi = self.get_name_inchi_map(molecules)
self.df['InChI'] = self.df[mol_col].map(name_inchi)
self._add_inchikeys()
self._add_smiles()
self.df = self.df[['Name', 'InChIKey', 'InChI', 'SMILES', 'ID']]
elif mol_type.lower() == 'inchikey':
ink_inchi = self.get_inchikey_inchi_map(molecules)
self.df['InChI'] = self.df[mol_col].map(ink_inchi)
self._add_smiles()
self.df = self.df[['InChIKey', 'InChI', 'SMILES', 'ID']]
elif mol_type.lower() == 'inchi':
self._add_inchikeys()
self._add_smiles()
self.df = self.df[['InChIKey', 'InChI', 'SMILES', 'ID']]
elif mol_type.lower() == 'smiles':
self._add_inchi(mol_col)
self._add_inchikeys()
self.df = self.df[['InChIKey', 'InChI', 'SMILES', 'ID']]
self._add_scaffold(generic=generic_scaffold)
self.inchikeys = self.df[self.df['InChIKey']
!= '']['InChIKey'].tolist()
self.df = self.df.sort_values('InChIKey')
self.df.dropna(inplace=True)
if len(self.df) == 0:
raise Exception('No valid molcules passed.')
self.df.reset_index(inplace=True, drop=True)
if add_image:
Molset.add_image(self.df)
[docs] def save(self, destination, overwrite=False):
"""Save a Molset instance.
Args:
destination (str): The destination path
overwrite (bool): Whether to overwrite in case the file exists.
"""
self.__log.debug("saving to {}".format(destination))
persist = (self.cc.cc_root, self.df)
if os.path.isfile(destination):
if overwrite:
pickle.dump(persist, open(destination, 'wb'))
else:
self.__log.warning('file %s already present!' % destination)
else:
pickle.dump(persist, open(destination, 'wb'))
[docs] def __getitem__(self, keys):
"""Forward accessing Dataframe columns.
This allows accessing DataFrame column and keeping the molucule
structure visualization.
Args:
keys(list): a list of column of the Dataframe.
"""
from rdkit.Chem import PandasTools
df = self.df[keys]
PandasTools.ChangeMoleculeRendering(df)
return df
[docs] @classmethod
def load(cls, filename, cc=None, cc_root=None, add_image=False):
"""Load a Molset instance.
Args:
filename (str): The path of the file to load.
cc (ChemicalChecker): An already initialized CC instance.
cc_root (str): Path to the root of a Chemical Checker. If None
we try using the saved path. If that is not reachable we use
the default (default is cc_config dependent) CC.
add_image (bool): If True a molecule image is added.
"""
from .chemcheck import ChemicalChecker
cls.__log.debug("loading from {}".format(filename))
cc_root_pkl, df = pickle.load(open(filename, 'rb'))
if cc != None:
if cc.cc_root != cc_root_pkl:
cls.__log.warning(
('The CC instance used for generating the Molset'
' (%s) is different from the currently used (%s)'
% (cc_root_pkl, cc.cc_root)))
return cls(cc, df, add_image=add_image)
if cc_root == None:
try:
cc_root = cc_root_pkl
cc = ChemicalChecker(cc_root)
except Exception:
cls.__log.warning(
('The CC instance used for generating the Molset cannot be'
' reached. The default CC will be used'))
cc = ChemicalChecker()
else:
try:
cc = ChemicalChecker(cc_root)
except Exception:
cls.__log.warning(
('The `cc_root` provided does not point to a valid CC '
'instance. The default CC will be used'))
cc = ChemicalChecker()
if cc.cc_root != cc_root:
cls.__log.warning(
('The CC instance used for generating the Molset'
' (%s) is different from the currently used (%s)'
% (cc_root, cc.cc_root)))
return cls(cc, df, add_image=add_image)
@staticmethod
def add_image(df):
from rdkit.Chem import PandasTools
df['SMILES'].fillna('', inplace=True)
PandasTools.AddMoleculeColumnToFrame(
df, smilesCol='SMILES', molCol='Structure')
@classmethod
def combine(cls, cc, molset1, molset2, add_image=False,
generic_scaffold=False):
# we merge the dataframes on inchi
mol_cols = ['SMILES', 'Scaffold', 'Structure']
cols1 = [i for i in molset1.df.columns if i not in mol_cols]
cols2 = [i for i in molset2.df.columns if i not in mol_cols]
df = pd.merge(molset1.df[cols1], molset2.df[cols2],
on=['InChIKey', 'InChI'], how='outer')
# add the inchikeys and smiles
mix = cls(cc, df, add_image=False)
mix._add_smiles()
mix._add_inchikeys()
mix._add_scaffold(generic=generic_scaffold)
if add_image:
Molset.add_image(mix.df)
df = mix.df
# take care of other annotation columns
all_mol_cols = mol_cols + ['InChI']
anno_cols1 = [i for i in molset1.df.columns if i not in all_mol_cols]
anno_cols2 = [i for i in molset2.df.columns if i not in all_mol_cols]
# handle disjoint annotation columns
disjoint = set(anno_cols1) ^ set(anno_cols2)
for col in disjoint:
if col == 'Structure':
continue
#print('disjoint:', col)
# if columns does not contain list we make a single element list
if not isinstance(df[col].dropna().iloc[0], list):
df[col] = df[col].apply(lambda x: x if pd.isnull(x) else [x])
# we replace NaNs with empty lists
df[col] = df[col].apply(lambda x: x if isinstance(x, list) else [])
# combine shared annotation columns as a list
shared = (set(anno_cols1) & set(anno_cols2)) - {'InChIKey'}
for col in shared:
if col == 'Structure':
continue
#print('shared:', col)
colx = col + '_x'
coly = col + '_y'
if not isinstance(df[colx].dropna().iloc[0], list):
df[colx] = df[colx].apply(lambda x: x if pd.isnull(x) else [x])
df[colx] = df[colx].apply(
lambda x: x if isinstance(x, list) else [])
if not isinstance(df[coly].dropna().iloc[0], list):
df[coly] = df[coly].apply(lambda x: x if pd.isnull(x) else [x])
df[coly] = df[coly].apply(
lambda x: x if isinstance(x, list) else [])
df[col] = df[colx] + df[coly]
df[col] = df[col].apply(set)
df[col] = df[col].apply(list)
#df[col] = df[col].apply(lambda x: [e for e in x if not np.isnan(e)])
if all(df[col].apply(len) == 1):
df[col] = df[col].apply(lambda x: x[0])
mols_col = ['InChIKey', 'InChI', 'SMILES', 'Scaffold']
if add_image:
mols_col.append('Structure')
mix.df = df[mols_col + sorted(list(shared)) + sorted(list(disjoint))]
mix.df = mix.df.sort_values('InChIKey')
mix.df.reset_index(inplace=True, drop=True)
return mix
def get_name_inchi_map(self, names):
cpd_inchi = dict()
for cpd in tqdm(names, desc='getting Name-InChI map'):
try:
inchi = self.conv.chemical_name_to_inchi(cpd)
cpd_inchi[cpd] = inchi
except Exception as ex:
self.__log.warning(str(ex))
return cpd_inchi
def get_inchikey_inchi_map(self, inks):
ink_inchi = dict()
for ink in tqdm(inks, desc='getting InChIKey-InChI map'):
try:
inchi = self.conv.inchikey_to_inchi(ink)
except Exception:
self.__log.warning('%s has no InChI' % ink)
continue
ink_inchi[ink] = inchi
return ink_inchi
def _safe_func(self, fn, arg, default=np.nan):
try:
return fn(arg)
except:
fn_name = 'None'
if hasattr(fn, '__name__'):
fn_name = fn.__name__
self.__log.warning(f'Problems calling `{fn_name}` for mol {arg}')
return default
def _add_inchikeys(self):
if 'InChIKey' not in self.df.columns:
fn = partial(self._safe_func, self.conv.inchi_to_inchikey)
self.df['InChIKey'] = self.df['InChI'].dropna().apply(fn)
def _add_smiles(self):
if 'SMILES' not in self.df.columns:
fn = partial(self._safe_func, self.conv.inchi_to_smiles)
self.df['SMILES'] = self.df['InChI'].dropna().apply(fn)
def _add_inchi(self, mol_col):
if 'InChI' not in self.df.columns:
fn = partial(self._safe_func,
lambda x: self.conv.smiles_to_inchi(x)[1])
self.df['InChI'] = self.df[mol_col].dropna().apply(fn)
def _add_scaffold(self, generic=False):
"""Bemis-Murcko scaffold"""
if 'Scaffold' not in self.df.columns:
conv_fn = partial(self.conv.smiles_to_scaffold, generic=generic)
fn = partial(self._safe_func, conv_fn)
self.df['Scaffold'] = self.df['SMILES'].dropna().apply(fn)
[docs] def annotate(self, dataset_code, shorten_dscode=True,
include_features=False, feature_map=None,
include_values=False, features_from_raw=False,
filter_values=True, include_sign0=False):
"""Annotate the DataFrame with features fetched CC spaces.
The minimal annotation if whether the molecule is present or not in the
dataset of interest. The features, values and predictions are
optionally available. Features and values are fetched from the raw
preprocess file of a given space (before dropping any data).
The prediction is based on NN at the sign4 level and depends on
several parameters: 1) the applicability threshold for considering the
sign4 as reliable, 2) the p-value threshold (on sign4 distance) to
define the neighbors 3) the number of NN to consider.
Args:
dataset_code (str): the CC dataset code: e.g. B4.001.
include_features (bool): Include features of the molecules from
their raw preprocess sign0.
feature_map (dict): A dictionary that will be used to map features
to a human interpretable format.
include_values (bool): if True an additional column is added with
a list of (feature, value) pairs.
include_prediction (bool): include NN derived predictions.
mapping_fn (dict): A dictionary performing the mapping between
features ids and their meaning.
shorten_dscode (bool): If True get rid of the .001 part of the
dataset code
features_from_raw (bool): If True the features are fetched from the
raw sign0 which includes all features for all molecules. If
False sign0 features are used, so after the Sanitizer step
which possibly removes features or molecules.
filter_values (bool): If True values == 0 are filtered. False is
recomended when the dataset is continuous data.
include_sign0 (bool): If True the full sign0 for each molecule is
included in the dataframe.
"""
# get raw preprocess data
s0 = self.cc.signature(dataset_code, 'sign0')
s0_raw = DataSignature(os.path.join(
s0.signature_path, 'raw', 'preprocess.h5'))
incc = np.isin(self.inchikeys, s0_raw.keys)
mol_ds = sum(incc)
tot_mol = len(self.inchikeys)
self.__log.info(
f'{mol_ds}/{tot_mol} molecules found in {dataset_code}')
dscode = dataset_code
if shorten_dscode:
dscode = dataset_code[:2]
# get presence in space
self.df[dscode] = 0
available_inks = np.array(self.inchikeys)[incc]
idx_df = self.df[self.df['InChIKey'].isin(available_inks)].index
self.df.at[idx_df, dscode] = 1
sorted_inks, values = None, None
# get the features if requested
if include_features:
if features_from_raw:
sorted_inks, values = s0_raw.get_vectors(available_inks,
dataset_name='X')
feat_ref = s0_raw
else:
sorted_inks, values = s0.get_vectors(available_inks)
feat_ref = s0
if values is None or len(values) == 0:
self.__log.warning(f'No features found for {dataset_code}!')
else:
ink_feat = defaultdict(list)
for ink, val in zip(sorted_inks, values):
feat_idxs = np.argwhere(val).flatten()
ink_feat[ink] = feat_ref.features[feat_idxs].tolist()
feat_name = dscode + '_features'
self.df[feat_name] = self.df['InChIKey'].map(ink_feat)
if feature_map is not None:
self.df[feat_name] = self.df[feat_name].apply(
lambda x: [feature_map[i] for i in x if isinstance(
x, list)]).tolist()
self.df[feat_name] = self.df[feat_name].apply(
lambda x: [i for i in x if i is not None]).tolist()
# add the corresponding feature value if required
if include_values:
if sorted_inks is None:
if features_from_raw:
sorted_inks, values = s0_raw.get_vectors(available_inks,
dataset_name='X')
feat_ref = s0_raw
else:
sorted_inks, values = s0.get_vectors(available_inks)
feat_ref = s0
ink_val = defaultdict(list)
for ink, val in zip(sorted_inks, values):
feat_idxs = np.arange(len(val))
if filter_values:
feat_idxs = np.argwhere(val != 0).flatten()
ink_val[ink] = list(zip(
feat_ref.features[feat_idxs].tolist(),
val[feat_idxs]))
val_name = dscode + '_values'
self.df[val_name] = self.df['InChIKey'].map(ink_val)
if feature_map is not None:
self.df[val_name] = self.df[val_name].apply(
lambda x: [(feature_map[k], v) for k, v in x if
isinstance(x, list)]).tolist()
# add sign0
if include_sign0:
sorted_inks, values = s0.get_vectors(available_inks)
feat_ref = s0
s0_name = dscode + '_sign0'
s0_dict = dict(zip(sorted_inks, values))
self.df[s0_name] = self.df['InChIKey'].map(s0_dict)
[docs] def predict(self, dataset_code, shorten_dscode=True,
applicability_thr_query=0, applicability_thr_nn=0,
max_nr_nn=1000, pvalue_thr_nn=1e-4, limit_top_nn=1000,
return_stats=False, return_sign0=False, blacklist=[],
aggregation_thr=0.0, return_probas=False):
"""Annotate the DataFrame with predicted features based on neighbors.
In this case we can potentially get annotation for every molecules.
The molecules are first signaturized (sign4) filtered by applicability
and then searched against molecules within the CC sign4.
Nearest neighbors are selected for each molecule based on user
specified parameters. Only NN that are found in the sign0 of the space
are preserved. The annotations of these NN are aggregated (multiple
strategies are possible) and finally assigned to the molecule.
Args:
dataset_code (str): the CC dataset code: e.g. B4.001.
shorten_dscode (bool): If True get rid of the .001 part of the
dataset code
applicability_thr_query (float): Only query with a sign4
applicability above this threshold will be searched for
neighbors.
applicability_thr_nn (float): Only neighbors with a sign4
applicability above this threshold will be used for features
inference.
max_nr_nn (int): Maximum number of neighbors to possibly consider.
pvalue_thr (float): Filter neighbors based on distance.
limit_top_nn (int): Only keep topN neighbors for feature
predictions.
return_stats (bool): if True return a dataframe with statistics
on the NN search filtering steps.
return_sign0 (bool): if True return sign0 format prediction (only
for binary spaces).
blacklist (list): List of inchikeys of molecules to disregard
during neighbors search. Used for comparison to other
prediction approach where these molecules are the test set.
return_probas (bool): if True the probability for each predicted
class are also returned.
"""
dscode = dataset_code
if shorten_dscode:
dscode = dataset_code[:2]
# signaturize get sign4
s4 = self.cc.signature(dataset_code, 'sign4')
s4_app_keys = s4.keys
s4_app = s4.get_h5_dataset('applicability').ravel()
tmp_pred_file = tempfile.NamedTemporaryFile().name
query_inks = self.df['InChIKey'].tolist()
query_inchies = self.df['InChI'].tolist()
pred = s4.predict_from_string(query_inchies, tmp_pred_file,
keytype='InChI', keys=query_inks)
pred_sign = pred[:]
pred_keys = pred.keys
pred_appl = pred.get_h5_dataset('applicability').ravel()
os.remove(tmp_pred_file)
# filter queries by applicability
appl_mask = pred_appl > applicability_thr_query
self.__log.info(
'Filtering %i queries for applicability thr' % sum(~appl_mask))
query_inks = pred_keys[appl_mask]
pred_sign = pred_sign[appl_mask]
# faiss sign4 NN search
neig4 = self.cc.get_signature('neig4', 'reference', dataset_code)
nn = neig4.get_kth_nearest(pred_sign, k=max_nr_nn)
self.__log.info("nn distances shape %s" % (str(nn['distances'].shape)))
# identify distance threshold based on requested pvalue
metric = neig4.get_h5_dataset('metric')[0]
# this always returns the distances based on the non-redundant set
bg_dist = s4.background_distances(
metric, limit_inks=s0.keys, name='s0')
dst_thr_idx = np.argwhere(bg_dist['pvalue'] == pvalue_thr_nn).ravel()
dst_thr = bg_dist['distance'][dst_thr_idx][0]
self.__log.info("bg_distance '%s' thr %.4f" % (metric, dst_thr))
# precomputing dictionary with the list of significant neighbors for
# each query speeds up things
query_nn_dict = defaultdict(list)
for query_idx, hit_idx in np.argwhere(nn['distances'] < dst_thr):
query_ink = query_inks[query_idx]
query_nn_dict[query_ink].append(nn['keys'][query_idx, hit_idx])
# analyze each query individually
s4_ref = self.cc.get_signature('sign4', 'reference', dataset_code)
s0 = self.cc.signature(dataset_code, 'sign0')
query_nn_dict_map = defaultdict(list)
stats = list()
# for rid in tqdm(range(len(query_inks)), desc='get NN'):
for query_ink, nn_inks in tqdm(query_nn_dict.items(), desc='get NN'):
# filter by distance, now precomputed
"""
query_ink = query_inks[rid]
nn_inks = nn['keys'][rid]
nn_dists = nn['distances'][rid]
len_orig = len(nn_inks)
nn_inks = nn_inks[nn_dists < dst_thr]
"""
len_dist = len(nn_inks)
# expand neighbors list to include redundant molecules
# this is important because things that are redundant in sign4
# might not be redundant in sign0
# FIXME: how to vectorize this kind of operation?
nn_inks_mapped_idx = np.isin(s4_ref.mappings[:, 1], nn_inks)
nn_inks = s4_ref.mappings[:, 0][nn_inks_mapped_idx]
len_mapp = len(nn_inks)
# remove the query itself and filter blacklisted neighbors
nn_inks = nn_inks[~np.isin(nn_inks, np.hstack([blacklist,
query_ink]))]
len_blacklist = len(nn_inks)
# limit to neighbors with good applicability
nn_apps = s4_app[np.isin(s4_app_keys, nn_inks)]
nn_inks = nn_inks[nn_apps > applicability_thr_nn]
len_app = len(nn_inks)
# limit to neighbors with sign0
s0_mask = np.isin(nn_inks, s0.keys)
nn_inks = nn_inks[s0_mask]
len_s0 = len(nn_inks)
# limit to top N neighbors
nn_inks = nn_inks[:limit_top_nn]
query_nn_dict_map[query_ink] = nn_inks.tolist()
len_topn = len(nn_inks)
stats.append({'query_ink': query_ink, 'distance': len_dist,
'blacklist': len_blacklist, 'applicability': len_app,
'in_sign0': len_s0, 'limit_topN': len_topn})
stats_df = pd.DataFrame(stats)
self.df['%s_NN' % dscode] = self.df['InChIKey'].map(query_nn_dict_map)
# generate table with sign0 annotation for each neighbor found
# we can recursively use a molset for this purpose!
all_nn_inks = set.union(*[set(n) for n in query_nn_dict_map.values()])
all_nn_inks = sorted(list(all_nn_inks))
conv = Converter()
inchies = list()
for ink in tqdm(all_nn_inks, desc='get NN InChI'):
try:
inchi = conv.inchikey_to_inchi(ink)
except Exception:
self.__log.warning('%s has no InChI' % ink)
continue
inchies.append(inchi)
nn_molset = Molset(self.cc, inchies, mol_type='inchi')
nn_molset.annotate(dataset_code, include_features=True,
shorten_dscode=shorten_dscode,
features_from_raw=False, include_sign0=True)
# aggregate NN features
probas = defaultdict(list)
preds = defaultdict(list)
for ink, nn_inks in tqdm(query_nn_dict_map.items(), desc='preds'):
if len(nn_inks) == 0:
continue
neighs = nn_molset.df[nn_molset.df['InChIKey'].isin(nn_inks)]
if len(neighs) == 0:
continue
nn_s0 = np.vstack(neighs['%s_sign0' % dscode].values)
# binarize (e.g. B4 is multiclass)
nn_s0 = (nn_s0 > 0).astype(int)
if np.sum(nn_s0) > 0:
s0_probas = np.mean(nn_s0, axis=0)
probas[ink] = s0_probas
preds[ink] = (s0_probas > aggregation_thr).astype(int)
self.df['%s_probas_sign0' % dscode] = self.df['InChIKey'].map(probas)
self.df['%s_predicted_sign0' % dscode] = self.df['InChIKey'].map(preds)
# get readable list of features
ink_feat = defaultdict(list)
for idx, row in self.df.iterrows():
ink = row['InChIKey']
val = row['%s_predicted_sign0' % dscode]
feat_idxs = np.argwhere(val).flatten()
ink_feat[ink] = s0.features[feat_idxs].tolist()
feat_name = dscode + '_predicted_features'
self.df[feat_name] = self.df['InChIKey'].map(ink_feat)
if not return_sign0:
del self.df['%s_predicted_sign0' % dscode]
if not return_probas:
del self.df['%s_probas_sign0' % dscode]
if return_stats:
return stats_df, nn_molset
else:
return nn_molset
[docs] @staticmethod
def get_uniprot_annotation(entries):
"""Get information on Uniprot entries."""
from bioservices import UniProt
uniprot = UniProt()
df = list()
for up in tqdm(entries):
try:
df.append(uniprot.get_df(f'accession:{up}'))
except Exception:
print(up, 'NOT FOUND!')
df = pd.concat(df)
df = df.reset_index(drop=True)
return df
[docs] @staticmethod
def get_chembl_protein_classes(chembldb='chembl_27', prefix_level=False):
"""Get protein class id to name dictionary."""
R = psql.qstring(
"SELECT protein_class_id, parent_id, pref_name "
"FROM protein_classification", chembldb)
class_names = defaultdict(lambda: None)
if prefix_level:
edgelist = [(n[0], n[1]) for n in R[1:]]
nodes = [n[0] for n in R]
G = nx.Graph(edgelist)
level = {n: len(nx.shortest_path(G, n, 0))-1 for n in nodes}
class_names.update(
{'Class:%i' % r[0]: 'L%s:%s' % (level[r[0]], r[2]) for r in R})
else:
class_names.update({'Class:%i' % r[0]: r[2] for r in R})
return class_names
[docs] @staticmethod
def get_chembl_hierarchy(chembldb='chembl_27'):
"""Get protein class and proteins in tree format."""
pc_query = ("SELECT protein_class_id, parent_id, pref_name"
"FROM protein_classification")
R = psql.qstring(pc_query, chembldb)
class_names = defaultdict(lambda: None)
edges = [(0, 1)] + [(p, c) for c, p, _ in R[1:]]
nodes = [(n[0], {'id': 'Class:%s' % n[0], 'name':n[2]}) for n in R]
G = nx.DiGraph()
G.update(nodes=nodes, edges=edges)
trg_query = (
"SELECT c.accession, pc.protein_class_id "
"FROM component_sequences c "
"JOIN component_class cc ON c.component_id = cc.component_id "
"JOIN protein_classification pc "
"ON cc.protein_class_id = pc.protein_class_id;")
R = psql.qstring(trg_query, chembldb)
trg_label = set(ac for ac, pc in R)
id_offset = max([n[0] for n in nodes])+1
tid_label = {i+id_offset: {'id': t}
for i, t in enumerate(sorted([x for x in trg_label if x]))}
label_tid = {t: i+id_offset for i,
t in enumerate(sorted([x for x in trg_label if x]))}
trg_parent = [(pc, label_tid[ac]) for ac, pc in R if ac in label_tid]
G.update(nodes=tuple(tid_label.items()), edges=trg_parent)
return G
[docs] @staticmethod
def get_chebi(chebi_obo):
"""Get CHEBI id to name dictionary."""
chebi_dict = defaultdict(str)
f = open(chebi_obo, "r")
terms = f.read().split("[Term]\n")
for term in terms[1:]:
term = term.split("\n")
chebi_id = term[0].split("id: ")[1]
chebi_name = term[1].split("name: ")[1]
chebi_dict[chebi_id] = chebi_name
f.close()
return chebi_dict
[docs] def signaturize(self, datasets='exemplary'):
"""Annotate the DataFrame with predicted sign4 signatures.
Args:
datasets (list): the CC datasets code: e.g. ['B4.001'].
Use 'exemplary' to get signatures for all exemplary CC spaces.
"""
query_inks = self.df['InChIKey'].tolist()
query_inchies = self.df['InChI'].tolist()
tmp_pred_file = './tmp.h5'
if datasets == 'exemplary':
datasets = list(self.cc.datasets_exemplary())
self.__log.debug("signaturizing datasets: {}".format(str(datasets)))
for ds in tqdm(datasets, disable=len(datasets) == 1):
s4 = self.cc.signature(ds, 'sign4')
pred = s4.predict_from_string(query_inchies, tmp_pred_file,
keytype='InChI', keys=query_inks)
pred_sign = pred[:]
pred_appl = pred.get_h5_dataset('applicability').ravel()
self.df['%s_sign' % ds] = [r for r in pred_sign]
self.df['%s_appl' % ds] = pred_appl
os.remove(tmp_pred_file)
[docs] def project(self, projector, projector_name=None, datasets='exemplary'):
"""Annotate the DataFrame with projection of the signatures.
Args:
projector (object): Any preinitialized projector exposing the
'fit_transform' function.
projector_name (str): The name of the projector, used to define the
new DataFrame columns.
datasets (list): the CC datasets code: e.g. ['B4.001'].
Use 'exemplary' to get signatures for all exemplary CC spaces.
"""
if datasets == 'exemplary':
datasets = list(self.cc.datasets_exemplary())
if projector_name is None:
projector_name = type(projector).__name__
self.__log.debug("projecting datasets {} with {}".format(
str(datasets), projector_name))
for ds in tqdm(datasets, disable=len(datasets) == 1):
if '%s_sign' % ds not in self.df.columns:
self.signaturize(datasets=[ds])
sign = np.vstack(self.df['%s_sign' % ds].values)
proj = projector.fit_transform(sign)
x_name = '%s_%s1' % (ds, projector_name)
self.df = pd.concat(
[self.df, pd.Series(proj[:, 0], name=x_name)], axis=1)
y_name = '%s_%s2' % (ds, projector_name)
self.df = pd.concat(
[self.df, pd.Series(proj[:, 1], name=y_name)], axis=1)
[docs] def func_hpc(self, func_name, func_args, func_kwargs, hpc_kwargs,
data_path=None):
"""Execute the *any* object method on the configured HPC.
Args:
func_name(str): the name of the function.
func_args(tuple): the arguments for of the function to be called.
func_kwargs(tuple): the keyworded arguments for of the function
to be called.
hpc_kwargs(dict): arguments for the HPC class.
data_path(str): the path to the Molset object, if None a copy of
the data in the job folder will be made.
"""
# qualified name
qual_name = '_'.join([self.__class__.__name__, func_name])
# get cc config
cc_config = hpc_kwargs.get("cc_config", os.environ['CC_CONFIG'])
self.__log.debug("cc_config used: {}".format(cc_config))
cfg = Config(cc_config)
# create job directory
job_base_path = cfg.PATH.CC_TMP
job_name = "_".join(["CC", qual_name])
tmp_dir = tempfile.mktemp(prefix=job_name, dir=job_base_path)
job_path = hpc_kwargs.get("job_path", tmp_dir)
if not os.path.isdir(job_path):
os.mkdir(job_path)
# pickle function args and kwargs
pkl_args_fn = '%s_args.pkl' % qual_name
pkl_args_path = os.path.join(job_path, pkl_args_fn)
pickle.dump((func_args, func_kwargs), open(pkl_args_path, 'wb'))
# pickle the data
if data_path is None:
pkl_data_fn = '%s_data.pkl' % qual_name
data_path = os.path.join(job_path, pkl_data_fn)
self.save(data_path)
self.__log.info('data will be saved to: %s' % data_path)
# hpc parameters
hpc = dict()
hpc["cpu"] = hpc_kwargs.get("cpu", 1)
hpc["wait"] = hpc_kwargs.get("wait", False)
hpc["jobdir"] = hpc_kwargs.get("job_path", job_path)
hpc["num_jobs"] = hpc_kwargs.get("num_jobs", 1)
hpc["job_name"] = hpc_kwargs.get("job_name", job_name)
hpc["verbosity"] = hpc_kwargs.get("verbosity", "DEBUG")
# create script file
script_lines = [
"import os, sys, pickle",
"from chemicalchecker import ChemicalChecker",
"from chemicalchecker.core import Molset",
"ChemicalChecker.set_verbosity('%s')" % hpc["verbosity"],
"cc = ChemicalChecker('%s')" % self.cc.cc_root,
"molset = Molset.load(cc, sys.argv[1])",
"args, kwargs = pickle.load(open(sys.argv[2], 'rb'))",
"molset.%s(*args, **kwargs)" % func_name,
"molset.save('%s', overwrite=True)" % data_path,
"print('JOB DONE')"
]
if hpc_kwargs.get("delete_job_path", False):
script_lines.append("print('DELETING JOB PATH: %s')" % job_path)
script_lines.append("os.system('rm -rf %s')" % job_path)
# save the HPC script
script_name = '%s_%s_hpc.py' % (self.__class__.__name__, func_name)
script_name = hpc_kwargs.get("script_name", script_name)
script_path = os.path.join(job_path, script_name)
with open(script_path, 'w') as fh:
for line in script_lines:
fh.write(line + '\n')
# job command
singularity_image = cfg.PATH.SINGULARITY_IMAGE
command = ("SINGULARITYENV_PYTHONPATH={} SINGULARITYENV_CC_CONFIG={}"
" singularity exec {} python {} {} {}")
command = command.format(
os.path.join(cfg.PATH.CC_REPO, 'package'), cc_config,
singularity_image, script_name, data_path, pkl_args_path)
# set environment variable that avoid cpu over-subscription
env_vars = [
'OMP_NUM_THREADS',
'OPENBLAS_NUM_THREADS',
'MKL_NUM_THREADS',
'VECLIB_MAXIMUM_THREADS',
'NUMEXPR_NUM_THREADS'
]
command = ' '.join(["%s=%s" % (v, str(hpc["cpu"]))
for v in env_vars] + [command])
# submit jobs
hpc_cfg = hpc_kwargs.get("hpc_cfg", cfg)
cluster = HPC.from_config(hpc_cfg)
cluster.submitMultiJob(command, **hpc)
return cluster
[docs]@logged
class Mol(object):
"""Mol class.
Given a CC instance, provides access to signatures of a input molecule.
"""
def __init__(self, cc, mol_str, str_type=None):
"""Initialize a Mol instance.
Args:
cc: Chemical Checker instance.
mol_str: Compound identifier (e.g. SMILES string)
str_type: Type of identifier ('inchikey', 'inchi' and 'smiles' are
accepted) if 'None' we do our best to guess.
"""
if str_type is None:
str_type = KeyTypeDetector.type(mol_str)
if str_type is None:
raise Exception(
"Molecule '%s' not recognized as valid format"
" (options are: 'inchikey', 'inchi' and 'smiles')" %
mol_str)
conv = Converter()
if str_type == "inchikey":
self.inchikey = mol_str
self.inchi = conv.inchikey_to_inchi(self.inchikey)
if str_type == "inchi":
self.inchi = mol_str
self.inchikey = conv.inchi_to_inchikey(self.inchi)
if str_type == "smiles":
self.inchikey, self.inchi = conv.smiles_to_inchi(mol_str)
self.smiles = conv.inchi_to_smiles(self.inchi)
self.mol = conv.inchi_to_mol(self.inchi)
self.cc = cc
[docs] def isin(self, cctype, dataset_code, molset="full"):
"""Check if the molecule is in the dataset of interest"""
sign = self.cc.get_signature(cctype, molset, dataset_code)
try:
keys = set(sign.keys)
except Exception:
keys = set(sign.row_keys)
return self.inchikey in keys
[docs] def signature(self, cctype, dataset_code, molset="full"):
"""Check if the molecule is in the dataset of interest"""
sign = self.cc.get_signature(cctype, molset, dataset_code)
return sign[self.inchikey]
[docs] def report_available(self, dataset_code="*", cctype="*", molset="full"):
"""Check in what datasets the molecule is present"""
available = self.cc.report_available(molset, dataset_code, cctype)
d0 = {}
for molset, datasets in available.items():
d1 = collections.defaultdict(list)
for dataset, cctypes in datasets.items():
for cctype in cctypes:
if self.isin(cctype, dataset, molset):
d1[dataset] += [cctype]
if d1:
d0[molset] = dict((k, v) for k, v in d1.items())
return d0