"""Signature type 2.
Network embedding of the similarity matrix derived from signatures type 1.
They have fixed length, which is convenient for machine learning, and capture
both explicit and implicit similarity relationships in the data.
Signatures type 2 are the result of a two-step process:
1. Transform nearest-neighbor of signature type 1 to graph.
2. Perform network embedding with Node2Vec.
It is not possible to produce network embeddings for out-of-sample
(out-of-vocabulary) nodes, so a multi-output regression needs to be performed
a posteriori (from signatures type 1 to signatures type 2) in order to endow
predict() capabilities.
"""
import os
import h5py
import shutil
import numpy as np
from time import time
from .signature_base import BaseSignature
from .signature_data import DataSignature
from chemicalchecker.util.plot import Plot
from chemicalchecker.util import logged, Config
from chemicalchecker.util.splitter import Traintest
[docs]@logged
class sign2(BaseSignature, DataSignature):
"""Signature type 2 class."""
def __init__(self, signature_path, dataset, **params):
"""Initialize a Signature.
Args:
signature_path(str): The signature root directory.
dataset(`Dataset`): `chemicalchecker.database.Dataset` object.
params(): Parameters, expected keys are 'graph', 'node2vec', and
'adanet'.
"""
# Calling init on the base class to trigger file existance checks
BaseSignature.__init__(self, signature_path, dataset, **params)
self.data_path = os.path.join(self.signature_path, 'sign2.h5')
DataSignature.__init__(self, self.data_path)
# assign dataset
self.dataset = dataset
[docs] def fit(self, sign1=None, neig1=None, reuse=True, compare_nn=False,
oos_predictor=True, graph_kwargs={}, node2vec_kwargs={},
adanet_kwargs={'cpu': 1}, **kwargs):
"""Fit signature 2 given signature 1 and its nearest neighbors.
Node2vec embeddings are computed using the graph derived from sign1.
The predictive model is learned with AdaNet.
Args:
sign1(sign1): Signature type 1.
neig1(neig1): Nearest neighbor of type 1.
reuse(bool): Reuse already generated intermediate files. Set to
False to re-train from scratch.
"""
try:
from chemicalchecker.util.network import SNAPNetwork
from chemicalchecker.util.performance import LinkPrediction
from chemicalchecker.tool.adanet import AdaNet
from chemicalchecker.tool.node2vec import Node2Vec
except ImportError as err:
raise err
BaseSignature.fit(self, **kwargs)
# signature specific checks
if self.molset != "reference":
self.__log.debug("Fit will be done with the reference sign2")
self = self.get_molset("reference")
if sign1 is None:
sign1 = self.get_sign('sign1').get_molset("reference")
if neig1 is None:
neig1 = sign1.get_neig().get_molset("reference")
if sign1.cctype != "sign1":
raise Exception("A signature type 1 is expected!")
if sign1.molset != "reference":
self.__log.debug("Fit will be done with the reference sign1")
sign1 = self.get_sign('sign1').get_molset("reference")
if neig1.cctype != "neig1":
raise Exception("A neighbors of signature type 1 is expected!")
if neig1.molset != "reference":
self.__log.debug("Fit will be done with the reference neig1")
neig1 = sign1.get_neig().get_molset("reference")
#########
# step 1: Node2Vec (learn graph embedding) input is neig1
#########
self.update_status("Node2Vec")
self.__log.debug('Node2Vec on %s' % sign1)
n2v = Node2Vec(executable=Config().TOOLS.node2vec_exec)
# use neig1 to generate the Node2Vec input graph (as edgelist)
node2vec_path = os.path.join(self.model_path, 'node2vec')
if not os.path.isdir(node2vec_path):
os.makedirs(node2vec_path)
graph_file = os.path.join(node2vec_path, 'graph.edgelist')
if not reuse or not os.path.isfile(graph_file):
n2v.to_edgelist(sign1, neig1, graph_file, **graph_kwargs)
# check that all molecules are considered in the graph
with open(graph_file, 'r') as fh:
lines = fh.readlines()
graph_mol = set(l.split()[0] for l in lines)
# we can just compare the total nr
if not len(graph_mol) == len(sign1.unique_keys):
raise Exception("Graph %s is missing nodes." % graph_file)
# save graph stats
graph_stat_file = os.path.join(self.stats_path, 'graph_stats.json')
graph = None
if not reuse or not os.path.isfile(graph_stat_file):
graph = SNAPNetwork.from_file(graph_file)
graph.stats_toJSON(graph_stat_file)
# run Node2Vec to generate embeddings
emb_file = os.path.join(node2vec_path, 'n2v.emb')
if not reuse or not os.path.isfile(emb_file):
n2v.run(graph_file, emb_file, **node2vec_kwargs)
# convert to signature h5 format
if not reuse or not os.path.isfile(self.data_path):
n2v.emb_to_h5(sign1.keys, emb_file, self.data_path)
# save link prediction stats
linkpred_file = os.path.join(self.stats_path, 'linkpred.json')
if not reuse or not os.path.isfile(linkpred_file):
if not graph:
graph = SNAPNetwork.from_file(graph_file)
try:
linkpred = LinkPrediction(self, graph)
linkpred.performance.toJSON(linkpred_file)
except Exception as ex:
self.__log.error('Problem with LinkPrediction: %s' % str(ex))
# copy reduced-full mappingsfrom sign1
if "mappings" not in self.info_h5 and "mappings" in sign1.info_h5:
self.copy_from(sign1, "mappings")
else:
self.__log.warn("Cannot copy 'mappings' from sign1.")
sign2_plot = Plot(self.dataset, self.stats_path)
sign2_plot.sign_feature_distribution_plot(self)
#########
# step 2: AdaNet (learn to predict sign2 from sign1 without Node2Vec)
#########
if oos_predictor:
self.update_status("Training out-of-sample predictor")
self.__log.debug('AdaNet fit %s with Node2Vec output' % sign1)
# get params and set folder
adanet_path = os.path.join(self.model_path, 'adanet')
adanet_path = adanet_kwargs.get('model_dir', adanet_path)
if not reuse or not os.path.isdir(adanet_path):
os.makedirs(adanet_path, exist_ok=True)
# prepare train-test file
traintest_file = os.path.join(adanet_path, 'traintest.h5')
traintest_file = adanet_kwargs.get(
'traintest_file', traintest_file)
if not reuse or not os.path.isfile(traintest_file):
Traintest.create_signature_file(
sign1.data_path, self.data_path, traintest_file)
ada = AdaNet(model_dir=adanet_path,
traintest_file=traintest_file, **adanet_kwargs)
# learn NN with AdaNet
self.__log.debug('AdaNet training on %s' % traintest_file)
ada.train_and_evaluate()
# save AdaNet performances and plots
sign2_plot = Plot(self.dataset, adanet_path)
extra_preditors = dict()
if compare_nn:
nearest_neighbor_pred = sign2.predict_nearest_neighbor(
self.model_path, traintest_file)
extra_preditors['NearestNeighbor'] = nearest_neighbor_pred
ada.save_performances(adanet_path, sign2_plot,
extra_predictors=extra_preditors)
self.__log.debug('model saved to %s' % adanet_path)
self.update_status("Generating `full` molset")
cc_tmp = self.get_cc()
sign1_full = cc_tmp.get_signature('sign1', 'full', self.dataset)
sign2_full = cc_tmp.get_signature('sign2', 'full', self.dataset)
# we want agreement between reference and full
# so we overwrite the original embeddings using the predictor
sign1_ref = cc_tmp.get_signature('sign1', 'reference', self.dataset)
sign2_ref = cc_tmp.get_signature('sign2', 'reference', self.dataset)
if oos_predictor:
self.predict(sign1_full, sign2_full.data_path)
self.predict(sign1_ref, sign2_ref.data_path)
else:
self.save_full(sign2_full.data_path)
# finalize signature
BaseSignature.fit_end(self, **kwargs)
[docs] def predict(self, sign1, destination=None):
"""Use the learned model to predict the signature.
Args:
sign1(signature): A valid Signature type 1
destination(None|path|signature): If None the prediction results are
returned as dictionary, if str then is used as path for H5 data,
if empty Signature type 2 its data_path is used as destination.
"""
try:
from chemicalchecker.tool.adanet import AdaNet
except ImportError as err:
raise err
if isinstance(destination, BaseSignature):
destination = destination.data_path
# load AdaNet model
adanet_path = os.path.join(self.model_path, 'adanet', 'savedmodel')
self.__log.debug('loading model from %s' % adanet_path)
predict_fn = AdaNet.predict_fn(adanet_path)
tot_inks = len(sign1.keys)
if destination is None:
results = {
"V": AdaNet.predict(sign1[:], predict_fn),
"keys": sign1.keys
}
return results
else:
if isinstance(destination, BaseSignature):
destination = destination.data_path
with h5py.File(destination, "w") as results:
# initialize V and keys datasets
results.create_dataset('V', (tot_inks, 128), dtype=np.float32)
results.create_dataset(
'keys', data=np.array(sign1.keys,
DataSignature.string_dtype()))
results.create_dataset("shape", data=(tot_inks, 128))
# predict signature 2
for chunk in sign1.chunker():
results['V'][chunk] = AdaNet.predict(
sign1[chunk], predict_fn)
[docs] @staticmethod
def predict_nearest_neighbor(destination_path, traintest_file):
"""Prediction with nearest neighbor.
Find nearest neighbor in sign 1 and mapping it to known sign 2.
"""
from .data import DataFactory
from .neig import neig
sign2.__log.info('Performing Nearest Neighbor prediction.')
# create directory to save neig and sign (delete if exists)
nn_path = os.path.join(destination_path, "nearest_neighbor")
if os.path.isdir(nn_path):
shutil.rmtree(nn_path)
# evaluate all data splits
datasets = ['train', 'test', 'validation']
nn_pred = dict()
nn_pred_start = time()
for ds in datasets:
# get dataset split
traintest = Traintest(traintest_file, ds)
traintest.open()
x_data = traintest.get_all_x()
y_data = traintest.get_all_y()
traintest.close()
sign2.__log.info('Nearest Neighbor %s X:%s Y:%s.',
ds, x_data.shape, y_data.shape)
# check that there are samples left
if x_data.shape[0] == 0:
sign2.__log.warning("No samples available, skipping.")
return None
# fit on train set
if ds == "train":
# signaturize dataset
sign1_dest = os.path.join(nn_path, "sign1")
os.makedirs(sign1_dest)
nn_sign1 = DataFactory.signaturize(
"sign1", sign1_dest, x_data)
# sign2 is needed just to get the default keys
# as neig1.get_kth_nearest is returning keys of sign1
sign2_dest = os.path.join(nn_path, "sign2")
os.makedirs(sign2_dest)
nn_sign2 = DataFactory.signaturize(
"sign2", sign2_dest, y_data)
# create temporary neig1 and call fit
neig1_dest = os.path.join(nn_path, "neig1")
os.makedirs(neig1_dest)
nn_neig1 = neig(neig1_dest, "NN.001")
nn_neig1.fit(nn_sign1)
# save nearest neighbor signatures as predictions
nn_pred[ds] = dict()
# get nearest neighbor indices and keys
nn_neig1_idxs, _ = nn_neig1.get_kth_nearest(x_data, 1)
nn_idxs = nn_neig1_idxs[:, 0]
nn_pred[ds]['true'] = y_data
nn_pred[ds]['pred'] = list()
for idx in nn_idxs:
nn_pred[ds]['pred'].append(nn_sign2[idx])
nn_pred[ds]['pred'] = np.vstack(nn_pred[ds]['pred'])
nn_pred_end = time()
nn_pred['time'] = nn_pred_end - nn_pred_start
nn_pred['name'] = "NearestNeighbor"
return nn_pred
[docs] @staticmethod
def predict_adanet(destination_path, traintest_file, params):
"""Prediction with adanet."""
from chemicalchecker.tool.adanet import AdaNet
sign2.__log.info('Performing AdaNet prediction.')
# create directory to save neig and sign (delete if exists)
ada_path = os.path.join(destination_path, "adanet")
if os.path.isdir(ada_path):
shutil.rmtree(ada_path)
# evaluate all data splits
datasets = ['train', 'test', 'validation']
ada_pred = dict()
ada_pred_start = time()
ada = AdaNet(model_dir=ada_path, traintest_file=traintest_file,
**params)
ada.train_and_evaluate()
ada_pred_end = time()
for ds in datasets:
# get dataset split
traintest = Traintest(traintest_file, ds)
traintest.open()
x_data = traintest.get_all_x()
y_data = traintest.get_all_y()
traintest.close()
sign2.__log.info('AdaNet %s X:%s Y:%s.',
ds, x_data.shape, y_data.shape)
# check that there are samples left
if x_data.shape[0] == 0:
sign2.__log.warning("No samples available, skipping.")
return None
ada_pred[ds] = dict()
# get nearest neighbor indices and keys
ada_pred[ds]['true'] = y_data
ada_pred[ds]['pred'] = AdaNet.predict(ada.save_dir, x_data)
ada_pred['time'] = ada_pred_end - ada_pred_start
ada_pred['name'] = "AdaNet"
return ada_pred
[docs] @staticmethod
def predict_linear_regression(destination_path, traintest_file):
"""Prediction with adanet."""
from sklearn.linear_model import LinearRegression
sign2.__log.info('Performing LinearRegression prediction.')
# create directory to save neig and sign (delete if exists)
lr_path = os.path.join(destination_path, "linear")
if os.path.isdir(lr_path):
shutil.rmtree(lr_path)
# evaluate all data splits
datasets = ['train', 'test', 'validation']
lr_pred = dict()
for ds in datasets:
# get dataset split
traintest = Traintest(traintest_file, ds)
traintest.open()
x_data = traintest.get_all_x()
y_data = traintest.get_all_y()
traintest.close()
sign2.__log.info('LinearRegression %s X:%s Y:%s.',
ds, x_data.shape, y_data.shape)
# check that there are samples left
if x_data.shape[0] == 0:
sign2.__log.warning("No samples available, skipping.")
return None
lr_pred[ds] = dict()
if ds == 'train':
lr_pred_start = time()
linreg = LinearRegression().fit(x_data, y_data)
lr_pred_end = time()
# get nearest neighbor indices and keys
lr_pred[ds]['true'] = y_data
lr_pred[ds]['pred'] = linreg.predict(x_data)
lr_pred['time'] = lr_pred_end - lr_pred_start
lr_pred['name'] = "LinearRegression"
return lr_pred
[docs] def eval_node2vec(self, sign1, neig1, reuse=True, graph_kwargs={},
node2vec_kwargs={}):
"""Evaluate node2vec performances.
Node2vec embeddings are computed using the graph derived from sign1.
We split edges in a train and test set so we can compute the ROC
of link prediction.
Args:
sign1(sign1): Signature type 1.
neig1(neig1): Nearest neighbor of type 1.
reuse(bool): Reuse already generated intermediate files. Set to
False to re-train from scratch.
"""
#########
# step 1: Node2Vec (learn graph embedding) input is neig1
#########
try:
from chemicalchecker.util.network import SNAPNetwork
from chemicalchecker.util.performance import LinkPrediction
from chemicalchecker.tool.node2vec import Node2Vec
except ImportError as err:
raise err
self.__log.debug('Node2Vec on %s' % sign1)
n2v = Node2Vec(executable=Config().TOOLS.node2vec_exec)
# define the n2v model path
node2vec_path = os.path.join(self.model_path, 'node2vec_eval')
node2vec_path = node2vec_kwargs.get('model_dir', node2vec_path)
if not reuse or not os.path.isdir(node2vec_path):
os.makedirs(node2vec_path)
# use neig1 to generate the Node2Vec input graph (as edgelist)
graph_file = os.path.join(
self.model_path, 'node2vec', 'graph.edgelist')
if not reuse or not os.path.isfile(graph_file):
n2v.to_edgelist(sign1, neig1, graph_file, **graph_kwargs)
# split graph in train and test
graph_train = graph_file + ".train"
graph_test = graph_file + ".test"
if not reuse or not os.path.isfile(graph_train) \
or not os.path.isfile(graph_test):
graph = SNAPNetwork.from_file(graph_file)
n2v.split_edgelist(graph, graph_train, graph_test)
# run Node2Vec to generate embeddings based on train
emb_file = os.path.join(node2vec_path, 'n2v.emb')
if not reuse or not os.path.isfile(emb_file):
n2v.run(graph_train, emb_file, **node2vec_kwargs)
# create evaluation sign2
eval_s2 = sign2(node2vec_path, self.dataset)
# convert to signature h5 format
if not reuse or not os.path.isfile(eval_s2.data_path):
n2v.emb_to_h5(sign1.keys, emb_file, eval_s2.data_path)
# save link prediction stats
linkpred = LinkPrediction(eval_s2, SNAPNetwork.from_file(graph_train))
perf_train_filen = os.path.join(node2vec_path, 'linkpred.train.json')
linkpred.performance.toJSON(perf_train_filen)
linkpred = LinkPrediction(eval_s2, SNAPNetwork.from_file(graph_test))
perf_test_file = os.path.join(node2vec_path, 'linkpred.test.json')
linkpred.performance.toJSON(perf_test_file)
[docs] def grid_search_adanet(self, sign1, cc_root, job_path, parameters,
dir_suffix="", traintest_file=None):
"""Perform a grid search.
parameters = {
'boosting_iterations': [10, 25, 50],
'adanet_lambda': [1e-3, 5 * 1e-3, 1e-2],
'layer_size': [8, 128, 512, 1024]
}
"""
import chemicalchecker
from chemicalchecker.util.hpc import HPC
from sklearn.model_selection import ParameterGrid
gridsearch_path = os.path.join(
self.model_path, 'grid_search_%s' % dir_suffix)
if not os.path.isdir(gridsearch_path):
os.makedirs(gridsearch_path)
# prepare train-test file
if traintest_file is None:
traintest_file = os.path.join(gridsearch_path, 'traintest.h5')
if not os.path.isfile(traintest_file):
Traintest.create_signature_file(
sign1.data_path, self.data_path, traintest_file)
elements = list()
for params in ParameterGrid(parameters):
model_dir = '-'.join("%s_%s" % kv for kv in params.items())
params.update(
{'model_dir': os.path.join(gridsearch_path, model_dir)})
params.update({'traintest_file': traintest_file})
elements.append({'adanet': params})
# create job directory if not available
if not os.path.isdir(job_path):
os.mkdir(job_path)
# create script file
cc_config = os.environ['CC_CONFIG']
cc_package = os.path.join(chemicalchecker.__path__[0], '../')
script_lines = [
"import sys, os",
"import pickle",
"os.environ['CC_CONFIG'] = '%s'" % cc_config, # cc_config location
"sys.path.append('%s')" % cc_package, # allow package import
"from chemicalchecker.core import ChemicalChecker",
"cc = ChemicalChecker('%s')" % cc_root,
"task_id = sys.argv[1]", # <TASK_ID>
"filename = sys.argv[2]", # <FILE>
"inputs = pickle.load(open(filename, 'rb'))", # load pickled data
"data = inputs[task_id]", # elements for current job
"for params in data:", # elements are indexes
" ds = '%s'" % self.dataset,
" s1 = cc.get_signature('sign1', 'reference', ds)",
" n1 = cc.get_signature('neig1', 'reference', ds)",
" s2 = cc.get_signature('sign2', 'reference', ds, **params)",
" s2.fit(s1, n1)",
"print('JOB DONE')"
]
script_name = os.path.join(job_path, 'sign2_grid_search_adanet.py')
with open(script_name, 'w') as fh:
for line in script_lines:
fh.write(line + '\n')
# hpc parameters
params = {}
params["num_jobs"] = len(elements)
params["jobdir"] = job_path
params["job_name"] = "CC_SIGN2_GRID_SEARCH_ADANET"
params["elements"] = elements
params["wait"] = False
params["memory"] = 32
# job command
singularity_image = Config().PATH.SINGULARITY_IMAGE
command = "singularity exec {} python {} <TASK_ID> <FILE>".format(
singularity_image, script_name)
# submit jobs
cluster = HPC.from_config(Config())
cluster.submitMultiJob(command, **params)
return cluster
[docs] def grid_search_node2vec(self, cc_root, job_path, parameters, dir_suffix=""):
"""Perform a grid search.
parameters = {
'd': [2**i for i in range(1,11)]
}
"""
import chemicalchecker
from chemicalchecker.util.hpc import HPC
from sklearn.model_selection import ParameterGrid
from chemicalchecker.util.network import SNAPNetwork
from chemicalchecker.tool.node2vec import Node2Vec
n2v = Node2Vec(executable=Config().TOOLS.node2vec_exec)
gridsearch_path = os.path.join(
self.model_path, 'grid_search_%s' % dir_suffix)
if not os.path.isdir(gridsearch_path):
os.makedirs(gridsearch_path)
elements = list()
graph_file = os.path.join(
self.model_path, 'node2vec', 'graph.edgelist')
graph_train = graph_file + ".train"
graph_test = graph_file + ".test"
if not os.path.isfile(graph_train) \
or not os.path.isfile(graph_test):
graph = SNAPNetwork.from_file(graph_file)
n2v.split_edgelist(graph, graph_train, graph_test)
for params in ParameterGrid(parameters):
model_dir = '-'.join("%s_%s" % kv for kv in params.items())
params.update(
{'model_dir': os.path.join(gridsearch_path, model_dir)})
elements.append({'node2vec': params})
# create job directory if not available
if not os.path.isdir(job_path):
os.mkdir(job_path)
# create script file
cc_config = os.environ['CC_CONFIG']
cc_package = os.path.join(chemicalchecker.__path__[0], '../')
script_lines = [
"import sys, os",
"import pickle",
"os.environ['CC_CONFIG'] = '%s'" % cc_config, # cc_config location
"sys.path.append('%s')" % cc_package, # allow package import
"from chemicalchecker.core import ChemicalChecker",
"cc = ChemicalChecker('%s')" % cc_root,
"task_id = sys.argv[1]", # <TASK_ID>
"filename = sys.argv[2]", # <FILE>
"inputs = pickle.load(open(filename, 'rb'))", # load pickled data
"data = inputs[task_id]", # elements for current job
"for params in data:", # elements are indexes
" ds = '%s'" % self.dataset,
" s1 = cc.get_signature('sign1', 'reference', ds)",
" n1 = cc.get_signature('neig1', 'reference', ds)",
" s2 = cc.get_signature('sign2', 'reference', ds, **params)",
" s2.eval_node2vec(s1, n1)",
"print('JOB DONE')"
]
script_name = os.path.join(job_path, 'sign2_grid_search_adanet.py')
with open(script_name, 'w') as fh:
for line in script_lines:
fh.write(line + '\n')
# hpc parameters
params = {}
params["num_jobs"] = len(elements)
params["jobdir"] = job_path
params["job_name"] = "CC_SIGN2_GRID_SEARCH_NODE2VEC"
params["elements"] = elements
params["wait"] = False
params["memory"] = 6
# job command
singularity_image = Config().PATH.SINGULARITY_IMAGE
command = "singularity exec {} python {} <TASK_ID> <FILE>".format(
singularity_image, script_name)
# submit jobs
cluster = HPC.from_config(Config())
cluster.submitMultiJob(command, **params)
return cluster