Source code for chemicalchecker.tool.siamese.siamese

import os
import pickle
import numpy as np
from time import time
from functools import partial

from tensorflow import keras
from tensorflow.keras import backend as K
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.callbacks import EarlyStopping, Callback
from tensorflow.keras.layers import Input, Dropout, Lambda, Dense

from chemicalchecker.util import logged
#from chemicalchecker.util.splitter import NeighborPairTraintest


[docs]@logged class Siamese(object): """Siamese class. This class implements a simple siamese neural network based on Keras that allows metric learning. """ def __init__(self, model_dir, traintest_file=None, evaluate=False, **kwargs): """Initialize the Siamese class. Args: model_dir(str): Directorty where models will be stored. traintest_file(str): Path to the traintest file. evaluate(bool): Whether to run evaluation. """ from chemicalchecker.core.signature_data import DataSignature # read parameters self.epochs = int(kwargs.get("epochs", 10)) self.batch_size = int(kwargs.get("batch_size", 100)) self.learning_rate = float(kwargs.get("learning_rate", 1e-3)) self.replace_nan = float(kwargs.get("replace_nan", 0.0)) self.dropout = float(kwargs.get("dropout", 0.2)) self.suffix = str(kwargs.get("suffix", 'eval')) self.split = str(kwargs.get("split", 'train')) self.layers = kwargs.get("layers", [128]) self.augment_fn = kwargs.get("augment_fn", None) self.augment_kwargs = kwargs.get("augment_kwargs", None) self.augment_scale = int(kwargs.get("augment_scale", 1)) # internal variables self.name = '%s_%s' % (self.__class__.__name__.lower(), self.suffix) self.time = 0 self.output_dim = None self.model_dir = os.path.abspath(model_dir) self.model_file = os.path.join(self.model_dir, "%s.h5" % self.name) self.model = None self.evaluate = evaluate # check output path if not os.path.exists(model_dir): self.__log.warning("Creating model directory: %s", self.model_dir) os.mkdir(self.model_dir) # check if a scaler is available scaler_file = os.path.join(self.model_dir, 'scaler.pkl') if os.path.isfile(scaler_file): scaler = pickle.load(open(scaler_file, 'rb')) self.set_predict_scaler(scaler) else: self.__log.warn('No scaler available. %s' % scaler_file) # check input path self.traintest_file = traintest_file if self.traintest_file is not None: self.traintest_file = os.path.abspath(traintest_file) if not os.path.exists(traintest_file): raise Exception('Input data file does not exists!') # initialize train generator self.sharedx = DataSignature(traintest_file).get_h5_dataset('x') tr_shape_type_gen = NeighborPairTraintest.generator_fn( self.traintest_file, 'train_train', batch_size=int(self.batch_size / self.augment_scale), replace_nan=self.replace_nan, sharedx=self.sharedx, augment_fn=self.augment_fn, augment_kwargs=self.augment_kwargs, augment_scale=self.augment_scale) self.tr_shapes = tr_shape_type_gen[0] self.tr_gen = tr_shape_type_gen[2]() self.steps_per_epoch = np.ceil( self.tr_shapes[0][0] / self.batch_size) self.output_dim = tr_shape_type_gen[0][1][1] # initialize validation/test generator if evaluate: val_shape_type_gen = NeighborPairTraintest.generator_fn( self.traintest_file, 'test_test', batch_size=self.batch_size, replace_nan=self.replace_nan, sharedx=self.sharedx, shuffle=False) self.val_shapes = val_shape_type_gen[0] self.val_gen = val_shape_type_gen[2]() self.validation_steps = np.ceil( self.val_shapes[0][0] / self.batch_size) else: self.val_shapes = None self.val_gen = None self.validation_steps = None # log parameters self.__log.info("**** %s Parameters: ***" % self.__class__.__name__) self.__log.info("{:<22}: {:>12}".format("model_dir", self.model_dir)) if self.traintest_file is not None: self.__log.info("{:<22}: {:>12}".format( "traintest_file", self.traintest_file)) tmp = NeighborPairTraintest(self.traintest_file, 'train_train') self.__log.info("{:<22}: {:>12}".format( 'train_train', str(tmp.get_py_shapes()))) if evaluate: tmp = NeighborPairTraintest(self.traintest_file, 'train_test') self.__log.info("{:<22}: {:>12}".format( 'train_test', str(tmp.get_py_shapes()))) tmp = NeighborPairTraintest(self.traintest_file, 'test_test') self.__log.info("{:<22}: {:>12}".format( 'test_test', str(tmp.get_py_shapes()))) self.__log.info("{:<22}: {:>12}".format( "learning_rate", self.learning_rate)) self.__log.info("{:<22}: {:>12}".format( "epochs", self.epochs)) self.__log.info("{:<22}: {:>12}".format( "output_dim", self.output_dim)) self.__log.info("{:<22}: {:>12}".format( "batch_size", self.batch_size)) self.__log.info("{:<22}: {:>12}".format( "layers", str(self.layers))) self.__log.info("{:<22}: {:>12}".format( "dropout", str(self.dropout))) self.__log.info("{:<22}: {:>12}".format( "augment_fn", str(self.augment_fn))) self.__log.info("{:<22}: {:>12}".format( "augment_scale", self.augment_scale)) self.__log.info("{:<22}: {:>12}".format( "augment_kwargs", str(self.augment_kwargs))) self.__log.info("**** %s Parameters: ***" % self.__class__.__name__)
[docs] def build_model(self, input_shape, load=False): """Compile Keras model input_shape(tuple): X dimensions (only nr feat is needed) load(bool): Whether to load the pretrained model. """ def euclidean_distance(vects): x, y = vects sum_square = K.sum(K.square(x - y), axis=1, keepdims=True) return K.sqrt(K.maximum(sum_square, K.epsilon())) def dist_output_shape(shapes): shape1, shape2 = shapes return (shape1[0], 1) # we have two inputs input_a = Input(shape=input_shape) input_b = Input(shape=input_shape) # each goes to a network with the same architechture model_layers = list() # first layer model_layers.append( Dense(self.layers[0], activation='tanh', input_shape=input_shape)) if self.dropout is not None: model_layers.append(Dropout(self.dropout)) # other layers for layer in self.layers[1:-1]: model_layers.append(Dense(layer, activation='tanh')) if self.dropout is not None: model_layers.append(Dropout(self.dropout)) # last layer model_layers.append( Dense(self.layers[-1], activation='tanh')) basenet = Sequential(model_layers) basenet.summary() encoded_a = basenet(input_a) encoded_b = basenet(input_b) # layer to merge two encoded inputs with distance between them distance = Lambda(euclidean_distance, output_shape=dist_output_shape) # call this layer on list of two input tensors. prediction = distance([encoded_a, encoded_b]) model = Model([input_a, input_b], prediction) # define monitored metrics def accuracy(y_true, y_pred, threshold=0.5): y_pred = K.cast(y_pred < threshold, y_pred.dtype) return K.mean(K.equal(y_true, y_pred)) metrics = [ accuracy ] def contrastive_loss(y_true, y_pred): '''Contrastive loss from Hadsell-et-al.'06 http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf ''' margin = 1 square_pred = K.square(y_pred) margin_square = K.square(K.maximum(margin - y_pred, 0)) return K.mean(y_true * square_pred + (1 - y_true) * margin_square) # compile and print summary model.compile( optimizer=keras.optimizers.RMSprop(lr=self.learning_rate), loss=contrastive_loss, metrics=metrics) model.summary() # if pre-trained model is specified, load its weights self.model = model if load: self.model.load_weights(self.model_file) # this will be the encoder/transformer self.transformer = self.model.layers[2]
[docs] def fit(self, monitor='val_accuracy'): """Fit the model. monitor(str): variable to monitor for early stopping. """ # builf model input_shape = (self.tr_shapes[0][1],) self.build_model(input_shape) # prepare callbacks callbacks = list() def mask_keep(idxs, x1_data, x2_data, y_data): # we will fill an array of NaN with values we want to keep x1_data_transf = np.zeros_like(x1_data, dtype=np.float32) * np.nan for idx in idxs: # copy column from original data col_slice = slice(idx * 128, (idx + 1) * 128) x1_data_transf[:, col_slice] = x1_data[:, col_slice] x2_data_transf = np.zeros_like(x2_data, dtype=np.float32) * np.nan for idx in idxs: # copy column from original data col_slice = slice(idx * 128, (idx + 1) * 128) x2_data_transf[:, col_slice] = x2_data[:, col_slice] # keep rows containing at least one not-NaN value not_nan1 = np.isfinite(x1_data_transf).any(axis=1) not_nan2 = np.isfinite(x2_data_transf).any(axis=1) not_nan = np.logical_and(not_nan1, not_nan2) x1_data_transf = x1_data_transf[not_nan] x2_data_transf = x2_data_transf[not_nan] y_data_transf = y_data[not_nan] return x1_data_transf, x2_data_transf, y_data_transf def mask_exclude(idxs, x1_data, x2_data, y_data): x1_data_transf = np.copy(x1_data) for idx in idxs: # set current space to nan col_slice = slice(idx * 128, (idx + 1) * 128) x1_data_transf[:, col_slice] = np.nan x2_data_transf = np.copy(x2_data) for idx in idxs: # set current space to nan col_slice = slice(idx * 128, (idx + 1) * 128) x2_data_transf[:, col_slice] = np.nan # drop rows that only contain NaNs not_nan1 = np.isfinite(x1_data_transf).any(axis=1) not_nan2 = np.isfinite(x2_data_transf).any(axis=1) not_nan = np.logical_and(not_nan1, not_nan2) x1_data_transf = x1_data_transf[not_nan] x2_data_transf = x2_data_transf[not_nan] y_data_transf = y_data[not_nan] return x1_data_transf, x2_data_transf, y_data_transf # additional validation sets space_idx = self.augment_kwargs['dataset_idx'] mask_fns = { 'ALL': None, 'NOT-SELF': partial(mask_exclude, space_idx), 'ONLY-SELF': partial(mask_keep, space_idx), } validation_sets = list() if self.evaluate: vsets = ['train_test', 'test_test'] for split in vsets: for set_name, mask_fn in mask_fns.items(): name = '_'.join([split, set_name]) shapes, dtypes, gen = NeighborPairTraintest.generator_fn( self.traintest_file, split, batch_size=self.batch_size, replace_nan=self.replace_nan, mask_fn=mask_fn, sharedx=self.sharedx, shuffle=False) validation_sets.append((gen, shapes, name)) additional_vals = AdditionalValidationSets( validation_sets, self.model, batch_size=self.batch_size) callbacks.append(additional_vals) patience = 10 early_stopping = EarlyStopping( monitor=monitor, verbose=1, patience=patience, mode='max', restore_best_weights=True) if monitor or not self.evaluate: callbacks.append(early_stopping) # call fit and save model t0 = time() self.history = self.model.fit_generator( generator=self.tr_gen, steps_per_epoch=self.steps_per_epoch, epochs=self.epochs, callbacks=callbacks, validation_data=self.val_gen, validation_steps=self.validation_steps) self.time = time() - t0 self.model.save(self.model_file) if self.evaluate: self.history.history.update(additional_vals.history) # check early stopping if early_stopping.stopped_epoch != 0: self.last_epoch = early_stopping.stopped_epoch - patience else: self.last_epoch = self.epochs # save and plot history history_file = os.path.join( self.model_dir, "%s_history.pkl" % self.name) pickle.dump(self.history.history, open(history_file, 'wb')) plot_file = os.path.join(self.model_dir, "%s.png" % self.name) self._plot_history(self.history.history, vsets, plot_file)
def set_predict_scaler(self, scaler): self.scaler = scaler
[docs] def predict(self, input_mat): """Do predictions. prediction_file(str): Path to input file containing Xs. split(str): which split to predict. batch_size(int): batch size for prediction. """ # load model if not alredy there if self.model is None: self.build_model((input_mat.shape[1],), load=True) no_nans = np.nan_to_num(input_mat) if hasattr(self, 'scaler'): scaled = self.scaler.fit_transform(no_nans) else: scaled = no_nans return self.transformer.predict(scaled)
def _plot_history(self, history, vsets, destination): """Plot history. history(dict): history result from Keras fit method. destination(str): path to output file. """ import matplotlib.pyplot as plt metrics = list({k.split('_')[-1] for k in history}) rows = len(metrics) cols = len(vsets) plt.figure(figsize=(cols * 5, rows * 5), dpi=100) c = 1 for metric in sorted(metrics): for vset in vsets: plt.subplot(rows, cols, c) plt.title(metric.capitalize()) plt.plot(history[metric], label="Train", lw=2, ls='--') plt.plot(history['val_' + metric], label="Val", lw=2, ls='--') vset_met = [k for k in history if vset in k and metric in k] for valset in vset_met: plt.plot(history[valset], label=valset, lw=2) plt.ylim(0, 1) plt.legend() c += 1 plt.tight_layout() if destination is not None: plt.savefig(destination) plt.close('all')
[docs]class AdditionalValidationSets(Callback): def __init__(self, validation_sets, model, verbose=1, batch_size=None): """ validation_sets(list): list of 3-tuples (val_data, val_targets, val_set_name) or 4-tuples (val_data, val_targets, sample_weights, val_set_name). verbose(int): verbosity mode, 1 or 0. batch_size(int): batch size to be used when evaluating on the additional datasets. """ super(AdditionalValidationSets, self).__init__() self.validation_sets = validation_sets self.epoch = [] self.history = {} self.verbose = verbose self.batch_size = batch_size self.model = model
[docs] def on_train_begin(self, logs=None): self.epoch = [] self.history = {}
[docs] def on_epoch_end(self, epoch, logs=None): logs = logs or {} self.epoch.append(epoch) # record the same values as History() as well for k, v in logs.items(): self.history.setdefault(k, []).append(v) # evaluate on the additional validation sets for val_gen, val_shapes, val_set_name in self.validation_sets: results = self.model.evaluate_generator( val_gen(), steps=np.ceil(val_shapes[0][0] / self.batch_size), verbose=self.verbose) for i, result in enumerate(results): name = '_'.join([val_set_name, self.model.metrics_names[i]]) self.history.setdefault(name, []).append(result)