Source code for chemicalchecker.util.sanitize.sanitizer
"""Simple sanitization of input matrices.
Garbage in, garbage out, before data can be used it needs to be cleaned.
This includes:
* Removing seldomly occurring features (columns)
* Removing molecules with few data (rows)
* Handling missing data (NaNs or infs)
* Trimming less informative features if too many are provided
"""
import os
import h5py
import uuid
import numpy as np
from scipy.stats import entropy
from chemicalchecker.core.signature_data import DataSignature
from chemicalchecker.util import Config, logged
[docs]@logged
class Sanitizer(object):
"""Sanitizer class."""
def __init__(self, *args, impute_missing=True, trim=True,
max_features=10000,
check_features=True, min_feature_abs=5, max_feature_freq=0.8,
check_keys=True, min_keys_abs=1, max_keys_freq=0.8,
sample_size=1000, max_categories=20, zero_as_missing=True,
chunk_size=10000, tmp_path=None, **kwargs):
"""Initialize a Sanitizer instance.
Args:
impute_missing (bool): True if NaNs (and -inf/+inf) will be imputed.
NaN will be median, -inf/+inf will be min/max of the column.
trim (bool): Trim dataset to have a maximum number of features.
max_features (int): Maximum number of features to keep
(default=10000).
check_features (bool): True if we want to drop features based on
frequency arguments. For categorical data, 0 is considered as
`missing`. For continuous, any non numerical value.
min_feature_abs (int): Minimum number (counts) of occurrences
of feature, column-wise. (default=5).
max_feature_freq (float): Maximum proportion of occurrences of
the feature, column-wise. (default=0.8).
check_keys (bool): True if we want to drop keys based on
frequency arguments. For categorical data, 0 is considered as
`missing`. For continuous, any non numerical value.
min_key_abs (int): Minimum number (counts) of occurrences
of feature, row-wise. (default=1).
max_keys_freq (float): Maximum proportion of occurrences of
the feature, row-wise. (default=0.8).
sample_size (int): rows used for determining data type.
max_categories (int): Maximum number of categories we can expect.
zero_as_missing (bool): Only applyied to categorical data (usually)
binary where the 0 denotes a missing information. Used when
filtering row or columns by frequency.
"""
self.__dict__.update(locals())
if self.tmp_path is None:
self.tmp_path = Config().PATH.CC_TMP
for k, v in self.__dict__.items():
if k == 'self':
continue
self.__log.debug("{:<22}: {:>12}".format(str(k), str(v)))
[docs] def transform(self, data=None, V=None, keys=None, keys_raw=None,
features=None, sign=None):
"""Sanitize data
Args:
data (str): Path to a H5 or a DataSignature (default=None).
V (matrix): Input matrix (default=None).
keys (array): Keys (default=None).
keys_raw (array): Keys raw (default=None).
features (array): Features (default=None).
sign (DataSignature): Auxiliary data used to impute (default=None).
"""
if data is not None and V is not None:
raise Exception("Too many inputs! Either provide `data` or `V`.")
if data is None:
# create temporary h5 data structure
was_data = False
if V is None or keys is None or features is None:
raise Exception("`data` not provided so "
"`V`, `keys`, `features` are expected.")
if keys_raw is None:
keys_raw = keys
tag = str(uuid.uuid4())
data_path = os.path.join(self.tmp_path, "%s.h5" % tag)
data = DataSignature(data_path)
datasets = {'V': V, 'keys': keys,
'keys_raw': keys, 'features': features}
data.add_datasets(datasets)
self.__log.debug("Saving temporary data to %s" % data_path)
V = None
keys = None
keys_raw = None
features = None
else:
was_data = True
if isinstance(data, str):
data = DataSignature(data)
req_ds = ['V', 'keys', 'keys_raw', 'features']
if any([k not in data.info_h5 for k in req_ds]):
raise Exception("`data` must contain %s." % str(req_ds))
self.data = data
# check data type
self.__log.debug("Data type: %s" % str(self.data[0].dtype))
self.__log.debug("Data shape: %s" % str(self.data.shape))
self.__log.debug("Data size: %s" % str(self.data.size))
if self.data.size > 1e9:
self.__log.debug("Data size exceeds 1e9, reducing `chunk_size`.")
self.chunk_size = 1000
cs = self.chunk_size
vals = data[:self.sample_size].ravel()
unique_vals, unique_counts = np.unique(vals[np.isfinite(vals)],
return_counts=True)
if len(unique_vals) <= self.max_categories:
self.is_categorical = True
self.categories = unique_vals
unique_freqs = {k: v/vals.size for k,
v in zip(unique_vals, unique_counts)}
self.__log.debug("Data is categorical: %s" % str(unique_vals))
freq_str = ', '.join(['%s : %.3f' % (k, v)
for k, v in unique_freqs.items()])
self.__log.debug("Category frequency: [%s]" % freq_str)
else:
self.is_categorical = False
self.__log.debug("Data is continuous.")
# if a signature is specified make sure the new input has equal columns
if sign is not None:
try:
ref_features = sign.get_h5_dataset('features')
except Exception:
raise Exception("`sign` should have the `features` dataset.")
if len(set(features) & set(ref_features)) != len(set(ref_features)):
raise Exception("`data` must contains at least all features "
"present in `sign`.")
add_features = sorted(list(set(features) - set(ref_features)))
if add_features:
self.__log.info("Some input features are skipped as are not "
"present in reference: %s" % str(add_features))
# we assume that features are in the same order
mask = np.isin(list(features), list(ref_features))
data.filter_h5_dataset('V', mask, axis=1)
data.filter_h5_dataset('features', mask, axis=1)
# check features frequencies
if self.check_features:
self.__log.debug('Checking features:')
features = data.get_h5_dataset('features')
drop_abs = np.full((data.shape[1],), False, dtype=bool)
drop_freq = np.full((data.shape[1],), False, dtype=bool)
for chunk, cols in data.chunk_iter('V', cs, axis=1, chunk=True):
missing = np.sum(~np.isfinite(cols), axis=0)
if self.is_categorical and self.zero_as_missing:
missing += np.sum(cols == 0, axis=0)
present = data.shape[0] - missing
present_freq = present / data.shape[0]
drop_abs[chunk] = present < self.min_feature_abs
if self.is_categorical:
drop_freq[chunk] = present_freq > self.max_feature_freq
self.__log.info('Filter %s features (min_feature_abs): %s'
% (np.sum(drop_abs), str(features[drop_abs])))
self.__log.info('Filter %s features (max_feature_freq): %s'
% (np.sum(drop_freq), str(features[drop_freq])))
keep = ~np.logical_or(drop_abs, drop_freq)
if np.any(~keep):
data.filter_h5_dataset('V', keep, axis=1)
data.filter_h5_dataset('features', keep, axis=1)
# check keys frequencies
if self.check_keys:
self.__log.debug('Checking keys:')
keys = data.get_h5_dataset('keys')
drop_abs = np.full((data.shape[0],), False, dtype=bool)
drop_freq = np.full((data.shape[0],), False, dtype=bool)
for chunk, rows in data.chunk_iter('V', cs, axis=0, chunk=True):
missing = np.sum(~np.isfinite(rows), axis=1)
if self.is_categorical and self.zero_as_missing:
missing += np.sum(rows == 0, axis=1)
present = data.shape[1] - missing
present_freq = present / data.shape[1]
drop_abs[chunk] = present < self.min_keys_abs
if self.is_categorical:
drop_freq[chunk] = present_freq > self.max_keys_freq
self.__log.info('Filter %s keys (min_keys_abs): %s'
% (np.sum(drop_abs), str(keys[drop_abs])))
self.__log.info('Filter %s keys (max_keys_freq): %s'
% (np.sum(drop_freq), str(keys[drop_freq])))
keep = ~np.logical_or(drop_abs, drop_freq)
if np.any(~keep):
data.filter_h5_dataset('V', keep, axis=0)
data.filter_h5_dataset('keys', keep, axis=0)
data.filter_h5_dataset('keys_raw', keep, axis=0)
# count NaN & infs
self.__log.debug('Missing values:')
nan_counts = dict()
nan_counts['NaN'] = 0
nan_counts['+inf'] = 0
nan_counts['-inf'] = 0
nan_counts['all'] = 0
for chunk in data.chunk_iter('V', cs):
nan_counts['NaN'] += np.sum(np.isnan(chunk))
nan_counts['+inf'] += np.sum(np.isposinf(chunk))
nan_counts['-inf'] += np.sum(np.isneginf(chunk))
nan_counts['all'] += nan_counts['NaN']
nan_counts['all'] += nan_counts['+inf']
nan_counts['all'] += nan_counts['-inf']
for k, v in nan_counts.items():
if k == 'all':
continue
self.__log.debug("{:<5}: {:>12}".format(str(k), str(v)))
# impute NaN & infs
if self.impute_missing and nan_counts['all'] != 0:
self.__log.info('Imputing missing values.')
hf = h5py.File(data.data_path, 'a')
for chunk, cols in data.chunk_iter('V', cs, axis=1, chunk=True):
# get values for replacements
ref_cols = cols
if sign is not None:
ref_cols = sign[:, chunk]
nan_vals = np.nanmedian(ref_cols, axis=0)
if self.is_categorical:
# if categorical replace NaNs with most frequent instead
count = np.zeros((cols.shape[1], len(self.categories)))
for idx, cat in enumerate(self.categories):
count[:, idx] = np.sum(ref_cols == cat, axis=0)
nan_vals = np.argmax(count, axis=1)
cols_masked = np.ma.masked_array(
ref_cols, mask=~np.isfinite(ref_cols))
posinf_vals = np.max(cols_masked, axis=0).data
neginf_vals = np.min(cols_masked, axis=0).data
# replace
idxs = np.where(np.isnan(cols))
cols[idxs] = np.take(nan_vals, idxs[1])
idxs = np.where(np.isposinf(cols))
cols[idxs] = np.take(posinf_vals, idxs[1])
idxs = np.where(np.isneginf(cols))
cols[idxs] = np.take(neginf_vals, idxs[1])
hf['V'][:, chunk] = cols
hf.close()
# count NaN & infs
if nan_counts['all'] != 0:
self.__log.debug('Missing values:')
nan_counts = dict()
nan_counts['NaN'] = 0
nan_counts['+inf'] = 0
nan_counts['-inf'] = 0
cs = self.chunk_size
for chunk in data.chunk_iter('V', cs):
nan_counts['NaN'] += np.sum(np.isnan(chunk))
nan_counts['+inf'] += np.sum(np.isposinf(chunk))
nan_counts['-inf'] += np.sum(np.isneginf(chunk))
for k, v in nan_counts.items():
if k == 'all':
continue
self.__log.debug("{:<5}: {:>12}".format(str(k), str(v)))
# trim if there are too many features
if self.trim and data.shape[1] > self.max_features:
self.__log.debug("More than %d features, trimming the "
"least informative ones." % self.max_features)
if self.is_categorical:
entropy_vals = np.zeros((data.shape[1],))
for chunk, cols in data.chunk_iter('V', cs, axis=1, chunk=True):
entropy_vals[chunk] = entropy(cols, axis=0)
features_rank = np.argsort(entropy_vals)
else:
std_vals = np.zeros((data.shape[1],))
for chunk, cols in data.chunk_iter('V', cs, axis=1, chunk=True):
std_vals[chunk] = np.std(cols, axis=0)
features_rank = np.argsort(std_vals)[::-1]
keep = np.full((data.shape[1], ), False, dtype=bool)
keep[features_rank[: self.max_features]] = True
filtered = data.get_h5_dataset('features', mask=~keep)
data.filter_h5_dataset('V', keep, axis=1)
data.filter_h5_dataset('features', keep, axis=1)
self.__log.info("Removed %i features (max_features): %s"
% (len(filtered), str(filtered)))
self.__log.info("Sanitized data shape: %s" % str(self.data.shape))
# return if input was raw data
if not was_data:
with h5py.File(data.data_path, "r") as hf:
V = hf["V"][:]
keys = hf["keys"][:].astype(str)
keys_raw = hf["keys_raw"][:].astype(str)
features = hf["features"][:].astype(str)
os.remove(data.data_path)
return V, keys, keys_raw, features