"""Basic train-test splitter."""
import h5py
import numpy as np
from tqdm import tqdm
from chemicalchecker.util import logged
[docs]@logged
class Traintest(object):
"""Traintest class."""
def __init__(self, hdf5_file, split, replace_nan=None):
"""Initialize a Traintest instance.
We assume the file is containing diffrent splits.
e.g. "x_train", "y_train", "x_test", ...
"""
self._file = hdf5_file
self._f = None
self.replace_nan = replace_nan
if split is None:
self.x_name = "x"
self.y_name = "y"
self.sw_name = "sw"
else:
self.x_name = "x_%s" % split
self.y_name = "y_%s" % split
self.sw_name = "sw_%s" % split
'''
available_splits = self.get_split_names()
if split not in available_splits:
raise Exception("Split '%s' not found in %s!" %
(split, str(available_splits)))
'''
[docs] def get_x_shapes(self):
"""Return the shpaes of X."""
self.open()
x_shape = self._f[self.x_name].shape
self.close()
return x_shape
[docs] def get_xy_shapes(self):
"""Return the shpaes of X an Y."""
self.open()
x_shape = self._f[self.x_name].shape
y_shape = self._f[self.y_name].shape
self.close()
return x_shape, y_shape
[docs] def get_split_names(self):
"""Return the name of the splits."""
self.open()
if "split_names" in self._f:
split_names = [a.decode() for a in self._f["split_names"]]
else:
split_names = ['train', 'test', 'validation']
self.__log.info("Using default split names %s" % split_names)
self.close()
return split_names
[docs] def open(self):
"""Open the HDF5."""
self._f = h5py.File(self._file, 'r')
self.__log.info("HDF5 open %s", self._file)
[docs] def close(self):
"""Close the HDF5."""
try:
self._f.close()
self.__log.info("HDF5 close %s", self._file)
except AttributeError:
self.__log.error('HDF5 file is not open yet.')
[docs] def get_sw(self, beg_idx, end_idx):
"""Get a batch of X."""
features = self._f[self.sw_name][beg_idx: end_idx]
# handle NaNs
if self.replace_nan is not None:
features[np.where(np.isnan(features))] = self.replace_nan
return features
[docs] def get_xy(self, beg_idx, end_idx):
"""Get a batch of X and Y."""
features = self._f[self.x_name][beg_idx: end_idx]
# handle NaNs
if self.replace_nan is not None:
features[np.where(np.isnan(features))] = self.replace_nan
labels = self._f[self.y_name][beg_idx: end_idx]
return features, labels
[docs] def get_x(self, beg_idx, end_idx):
"""Get a batch of X."""
features = self._f[self.x_name][beg_idx: end_idx]
# handle NaNs
if self.replace_nan is not None:
features[np.where(np.isnan(features))] = self.replace_nan
return features
[docs] def get_y(self, beg_idx, end_idx):
"""Get a batch of Y."""
features = self._f[self.y_name][beg_idx: end_idx]
# handle NaNs
if self.replace_nan is not None:
features[np.where(np.isnan(features))] = self.replace_nan
return features
[docs] def get_all_x(self):
"""Get full X."""
features = self._f[self.x_name][:]
# handle NaNs
if self.replace_nan is not None:
features[np.where(np.isnan(features))] = self.replace_nan
return features
[docs] def get_all_x_columns(self, columns):
"""Get all the X.
Args:
colums(tuple(int,int)): start, stop indexes.
"""
features = self._f[self.x_name][:, slice(*columns)]
# handle NaNs
if self.replace_nan is not None:
features[np.where(np.isnan(features))] = self.replace_nan
return features
[docs] def get_all_y(self):
"""Get full Y."""
labels = self._f[self.y_name][:]
return labels
[docs] @staticmethod
def create_signature_file(sign_from, sign_to, out_filename):
"""Create the HDF5 file with both X and Y, train and test."""
# get type1
with h5py.File(sign_from, 'r') as fh:
X = fh['V'][:]
check_X = fh['keys'][:]
X = np.asarray(X, dtype=np.float32)
# get type2
with h5py.File(sign_to, 'r') as fh:
Y = fh['V'][:]
check_Y = fh['keys'][:]
assert(np.array_equal(check_X, check_Y))
# train test validation splits
Traintest.create(X, Y, out_filename)
[docs] @staticmethod
def get_split_indeces(rows, fractions, random_state=None):
"""Get random indeces for different splits."""
if not sum(fractions) == 1.0:
raise Exception("Split fractions should sum to 1.0")
# shuffle indeces
idxs = list(range(rows))
np.random.seed(random_state)
np.random.shuffle(idxs)
# from frequs to indices
splits = np.cumsum(fractions)
splits = splits[:-1]
splits *= len(idxs)
splits = splits.round().astype(np.int)
return np.split(idxs, splits)
[docs] @staticmethod
def create(X, Y, out_file, split_names=['train', 'test', 'validation'],
split_fractions=[.8, .1, .1], x_dtype=np.float32,
y_dtype=np.float32, chunk_size=10000):
"""Create the HDF5 file with validation splits for both X and Y.
Args:
X(numpy.ndarray): features to train from.
Y(numpy.ndarray): labels to predict.
out_file(str): path of the h5 file to write.
split_names(list(str)): names for the split of data.
split_fractions(list(float)): fraction of data in each split.
x_dtype(type): numpy data type for X.
y_dtype(type): numpy data type for Y (np.float32 for regression,
np.int32 for classification.
"""
# Force number of dimension to 2 (reshape Y)
if Y.ndim == 1:
Traintest.__log.debug("We need Y as a column vector, reshaping.")
Y = np.reshape(Y, (len(Y), 1))
Traintest.__log.debug(
"{:<20} shape: {:>10}".format("input X", str(X.shape)))
Traintest.__log.debug(
"{:<20} shape: {:>10}".format("input Y", str(Y.shape)))
# train test validation splits
if len(split_names) != len(split_fractions):
raise Exception("Split names and fraction should be same amount.")
split_names = [s.encode() for s in split_names]
split_idxs = Traintest.get_split_indeces(
Y.shape[0], split_fractions)
# create dataset
Traintest.__log.info('Traintest saving to %s', out_file)
with h5py.File(out_file, "w") as fh:
fh.create_dataset('split_names', data=split_names)
fh.create_dataset('split_fractions', data=split_fractions)
for name, idxs in zip(split_names, split_idxs):
ds_name = "x_%s" % name.decode() # NS added decode() otherwise--> x_b'train'
fh.create_dataset(ds_name, (len(idxs), X.shape[1]), dtype=x_dtype)
for i in range(0, len(idxs), chunk_size):
chunk = slice(i, i + chunk_size)
fh[ds_name][chunk] = X[idxs[chunk]]
Traintest.__log.debug("Written: {:<20} shape: {:>10}".format(ds_name, str(fh[ds_name].shape)))
ds_name = "y_%s" % name.decode() # NS added decode() otherwise--> y_b'train'
fh.create_dataset(ds_name, (len(idxs), Y.shape[1]), dtype=y_dtype)
for i in range(0, len(idxs), chunk_size):
chunk = slice(i, i + chunk_size)
fh[ds_name][chunk] = Y[idxs[chunk]]
Traintest.__log.debug("Written: {:<20} shape: {:>10}".format(
ds_name, str(fh[ds_name].shape)))
Traintest.__log.info('Traintest saved to %s', out_file)
[docs] @staticmethod
def split_h5(in_file, out_file,
split_names=['train', 'test', 'validation'],
split_fractions=[.8, .1, .1], chunk_size=1000):
"""Create the HDF5 file with validation splits from an input file.
Args:
in_file(str): path of the h5 file to read from.
out_file(str): path of the h5 file to write.
split_names(list(str)): names for the split of data.
split_fractions(list(float)): fraction of data in each split.
"""
with h5py.File(in_file, 'r') as hf_in:
# log input datasets and shapes
for k in hf_in.keys():
Traintest.__log.debug(
"{:<20} shape: {:>10}".format(k, str(hf_in[k].shape)))
rows = hf_in[k].shape[0]
# train test validation splits
if len(split_names) != len(split_fractions):
raise Exception(
"Split names and fraction should be same amount.")
split_names = [s.encode() for s in split_names]
split_idxs = Traintest.get_split_indeces(rows, split_fractions)
Traintest.__log.info('Traintest saving to %s', out_file)
with h5py.File(out_file, "w") as hf_out:
# create fixed datasets
hf_out.create_dataset(
'split_names', data=np.array(split_names))
hf_out.create_dataset(
'split_fractions', data=np.array(split_fractions))
for name, idxs in zip(split_names, split_idxs):
# for each original dataset
for k in hf_in.keys():
# create all splits
ds_name = "%s_%s" % (k, name.decode())
hf_out.create_dataset(ds_name,
(len(idxs), hf_in[k].shape[1]),
dtype=hf_in[k].dtype)
# fill-in by chunks
for i in range(0, len(idxs), chunk_size):
chunk = slice(i, i + chunk_size)
sorted_idxs = sorted(list(idxs[chunk]))
hf_out[ds_name][chunk] = hf_in[k][sorted_idxs]
Traintest.__log.debug(
"Written: {:<20} shape: {:>10}".format(
ds_name, str(hf_out[ds_name].shape)))
Traintest.__log.info('Traintest saved to %s', out_file)
[docs] @staticmethod
def split_h5_blocks(in_file, out_file,
split_names=['train', 'test', 'validation'],
split_fractions=[.8, .1, .1], block_size=1000,
datasets=None):
"""Create the HDF5 file with validation splits from an input file.
Args:
in_file(str): path of the h5 file to read from.
out_file(str): path of the h5 file to write.
split_names(list(str)): names for the split of data.
split_fractions(list(float)): fraction of data in each split.
block_size(int): size of the block to be used.
dataset(list): only split the given dataset and ignore others.
"""
with h5py.File(in_file, 'r') as hf_in:
# log input datasets and get shapes
for k in hf_in.keys():
Traintest.__log.debug(
"{:<20} shape: {:>10}".format(k, str(hf_in[k].shape)))
rows = hf_in[k].shape[0]
# reduce block size if it is not adequate
while rows / (float(block_size) * 10) <= 1:
block_size = int(block_size / 10)
Traintest.__log.warning(
"Reducing block_size to: %s", block_size)
# train test validation splits
if len(split_names) != len(split_fractions):
raise Exception(
"Split names and fraction should be same amount.")
split_names = [s.encode() for s in split_names]
# get indeces of blocks for each split
split_block_idx = Traintest.get_split_indeces(
int(np.floor(rows / block_size)) + 1,
split_fractions)
if datasets is None:
datasets = hf_in.keys()
for dataset_name in datasets:
if dataset_name not in hf_in.keys():
raise Exception(
"Dataset %s not found in source file." % dataset_name)
# save to output file
Traintest.__log.info('Traintest saving to %s', out_file)
with h5py.File(out_file, "w") as hf_out:
# create fixed datasets
hf_out.create_dataset(
'split_names', data=np.array(split_names))
hf_out.create_dataset(
'split_fractions', data=np.array(split_fractions))
for name, blocks in zip(split_names, split_block_idx):
# for each original dataset
for k in datasets:
# create all splits
ds_name = "%s_%s" % (k, name.decode())
# need total size and mapping of blocks
src_dst = list()
total_size = 0
for dst, src in enumerate(sorted(blocks)):
# source block start-end
src_start = src * block_size
src_end = (src * block_size) + block_size
# check current block size to avoid overflowing
curr_block_size = block_size
if src_end > hf_in[k].shape[0]:
src_end = hf_in[k].shape[0]
curr_block_size = src_end - src_start
# update total size
total_size += curr_block_size
# destination start-end
dst_start = dst * block_size
dst_end = (dst * block_size) + curr_block_size
src_slice = (src_start, src_end)
dst_slice = (dst_start, dst_end)
src_dst.append((src_slice, dst_slice))
# Traintest.__log.debug(
# "src: %s dst: %s" % src_dst[-1])
# Traintest.__log.debug(
# "block_size: %s" % curr_block_size)
# create block matrix
reshape = False
if len(hf_in[k].shape) == 1:
cols = 1
reshape = True
else:
cols = hf_in[k].shape[1]
hf_out.create_dataset(ds_name,
(total_size, cols),
dtype=hf_in[k].dtype)
for src_slice, dst_slice in tqdm(src_dst):
src_chunk = slice(*src_slice)
dst_chunk = slice(*dst_slice)
# Traintest.__log.debug(
# "writing src: %s to dst: %s" %
# (src_slice, dst_slice))
if reshape:
hf_out[ds_name][dst_chunk] = np.expand_dims(
hf_in[k][src_chunk], 1)
else:
hf_out[ds_name][dst_chunk] = hf_in[
k][src_chunk]
# Traintest.__log.debug(
# "Written: {:<20} shape: {:>10}".format(
# ds_name, str(hf_out[ds_name].shape)))
Traintest.__log.info('Traintest saved to %s', out_file)
[docs] @staticmethod
def generator_fn(file_name, split, batch_size=None, only_x=False,
sample_weights=False, shuffle=True,
return_on_epoch=False):
"""Return the generator function that we can query for batches."""
reader = Traintest(file_name, split)
reader.open()
if only_x:
x_shape = reader._f[reader.x_name].shape
shapes = x_shape
x_dtype = reader._f[reader.x_name].dtype
dtypes = x_dtype
else:
# read shapes
x_shape = reader._f[reader.x_name].shape
y_shape = reader._f[reader.y_name].shape
shapes = (x_shape, y_shape)
# read data types
x_dtype = reader._f[reader.x_name].dtype
y_dtype = reader._f[reader.y_name].dtype
dtypes = (x_dtype, y_dtype)
# no batch size -> return everything
if not batch_size:
batch_size = x_shape[0]
batch_beg_end = np.zeros((int(np.ceil(x_shape[0] / batch_size)), 2))
last = 0
for row in batch_beg_end:
row[0] = last
row[1] = last + batch_size
last = row[1]
batch_beg_end = batch_beg_end.astype(int)
def example_generator_fn():
# generator function yielding data
epoch = 0
batch_idx = 0
while True:
if batch_idx == len(batch_beg_end):
batch_idx = 0
epoch += 1
if shuffle:
np.random.shuffle(batch_beg_end)
# Traintest.__log.debug('EPOCH %i (caller: %s)', epoch,
# inspect.stack()[1].function)
if return_on_epoch:
return
beg_idx, end_idx = batch_beg_end[batch_idx]
if only_x:
if sample_weights:
yield reader.get_x(beg_idx, end_idx), \
reader.get_sw(beg_idx, end_idx)
else:
yield reader.get_x(beg_idx, end_idx)
else:
if sample_weights:
yield reader.get_xy(beg_idx, end_idx), \
reader.get_sw(beg_idx, end_idx)
else:
yield reader.get_xy(beg_idx, end_idx)
batch_idx += 1
return shapes, dtypes, example_generator_fn