"""Data preprocessing.
Given the diversity of formats and datasources, the signaturization process
starts in tailored pre-process scripts (available in the package ``scripts``
folder).
The `fit` method invoke the pre-process script with a `fit` argument where
we essentially `learn` the feature to consider.
The `predict` method allow deriving signatures without altering the feature
set. This can also be used when mapping
to a bioactive space different entities (i.e. not only compounds)
E.g.
categorical: "C0015230,C0016436..." which translates in n array of 0s or 1s.
discrete: "GO:0006897(8),GO:0006796(3),..." which translates in an array of
integers
continous: "0.515,1.690,0.996" which is an array of floats
"""
import os
import h5py
import argparse
import importlib
import numpy as np
from .signature_data import DataSignature
from chemicalchecker.util import logged
from chemicalchecker.database import Dataset
features_file = "features.h5"
[docs]@logged
class Preprocess():
"""Preprocess class."""
def __init__(self, signature_path, dataset, *args, **kwargs):
"""Initialize a Preprocess instance.
This class handles calling the external run.py for each dataset and
provide shared methods.
Args:
signature_path(str): the path to the signature directory.
"""
# TODO check is kwargs are needed (D1)
self.__log.info('Preprocess signature: %s', signature_path)
self.raw_path = os.path.join(signature_path, "raw")
self.raw_model_path = os.path.join(signature_path, "raw", "models")
if not os.path.isdir(self.raw_path):
Preprocess.__log.debug(
"Initializing raw path: %s" % self.raw_path)
original_umask = os.umask(0)
os.makedirs(self.raw_path, 0o775)
os.umask(original_umask)
if not os.path.isdir(self.raw_model_path):
original_umask = os.umask(0)
os.makedirs(self.raw_model_path, 0o775)
os.umask(original_umask)
# where preprocess data will be saved if fit is called
self.data_path = os.path.join(self.raw_path, "preprocess.h5")
dir_path = os.path.dirname(os.path.realpath(__file__))
self.preprocess_script = os.path.join(
dir_path,
'../..',
"scripts/preprocess",
dataset,
"run.py")
if not os.path.isfile(self.preprocess_script):
self.__log.warning(
"Preprocess script not found! %s", self.preprocess_script)
def is_fit(self):
if os.path.exists(self.data_path):
return True
else:
return False
[docs] def call_preprocess(self, output, method, infile=None, entry=None): #params = {}
"""Call the external pre-process script."""
# create argument list
arglist = ["-o", output, "-mp", self.raw_model_path, "-m", method]
if infile:
arglist.extend(['-i', infile])
if entry:
arglist.extend(['-ep', entry])
# import and run the run.py
loader = importlib.machinery.SourceFileLoader('preprocess_script',
self.preprocess_script)
preprocess = loader.load_module()
# self.__log.debug('ARGS: %s' % str(arglist))
preprocess.main(arglist)
[docs] def fit(self):
"""Call the external preprocess script to generate H5 data.
The preprocess script is invoked with the `fit` argument, which means
features are extracted from datasoruce and saved.
"""
# check that preprocess script is available and call it
self.__log.info('Calling preprocess script: %s',
self.preprocess_script)
if not os.path.isfile(self.preprocess_script):
raise Exception("Preprocess script not found! %s",
self.preprocess_script)
self.call_preprocess(self.data_path, "fit", None, None) #self.params
[docs] def predict(self, input_data_file, destination, entry_point):
"""Call the external preprocess script to generate H5 data."""
"""
Args:
input_data_file(str): Path to the file with the raw to generate
the signature 0.
destination(str): Path to a H5 file where the predicted signature
will be saved.
entry_point(str): Entry point of the input data for the
signaturization process. It depends on the type of data passed
at the input_data_file.
"""
# check that preprocess script is available and call it
self.__log.info('Calling preprocess script: %s',
self.preprocess_script)
if not os.path.isfile(self.preprocess_script):
raise Exception("Pre-process script not found! %s",
self.preprocess_script)
self.call_preprocess(destination, "predict", infile=input_data_file,
entry=entry_point)
[docs] def to_features(self, signatures):
"""Convert signature to explicit feature names.
Args:
signatures(array): a signature 0 for 1 or more molecules
Returns:
list of dict: 1 dictionary per signature where keys are
feature_name and value as values.
"""
# handle single signature
if len(signatures.shape) == 1:
signatures = [signatures]
# if no features file is available then the signature is just an array
feature_file = os.path.join(self.model_path, "features.h5")
if not os.path.isfile(feature_file):
features = np.arange(len(signatures[0]))
else:
# read features names from file
with h5py.File(feature_file, 'r') as hf:
features = hf["features"][:]
# return list of dicts with feature_name as key and value as value
result = list()
for sign in signatures:
keys = features[sign != 0]
values = sign[sign != 0]
result.append(dict(zip(keys, values)))
return result
[docs] @classmethod
def preprocess(cls, sign, **params):
"""Return the file with the raw data preprocessed.
Args:
sign: signature object (e.g. obtained from cc.get_signature)
params: specific parameters for a given preprocess script
Returns:
datafile(str): The name of the file where the data is saved.
ex:
os.path.join(self.raw_path, "preprocess.h5")
"""
prepro = cls(sign.signature_path, sign.dataset, **params)
if not prepro.is_fit():
cls.__log.info(
"No raw data file found, calling the preprocessing script")
prepro.fit()
else:
cls.__log.info("Found {}".format(prepro.data_path))
return prepro.data_path
[docs] @classmethod
def preprocess_predict(cls, sign, input_file, destination, entry_point):
"""Runs the preprocessing script 'predict'.
Run on an input file of raw data formatted correctly for the space of
interest
Args:
sign: signature object ( e.g. obtained from cc.get_signature)
input_file(str): path to the H5 file containing the data on which
to apply 'predict'
destination(str): Path to a H5 file where the predicted signature
will be saved.
entry_point(str): Entry point of the input data for the
signaturization process. It depends on the type of data passed
at the input_data_file.
Returns:
datafile(str): The H5 file containing the predicted data after
preprocess
"""
input_file = os.path.abspath(input_file)
destination = os.path.abspath(destination)
# Checking the provided paths
if not os.path.exists(input_file):
raise Exception("Error, {} does not exist!".format(input_file))
prepro = cls(sign.signature_path, sign.dataset)
prepro.predict(input_file, destination, entry_point)
return destination
@staticmethod
def get_parser():
description = 'Run preprocess script.'
parser = argparse.ArgumentParser(description=description)
parser.add_argument('-i', '--input_file', type=str,
required=False, default='.',
help='Input file only for predict method')
parser.add_argument('-o', '--output_file', type=str,
required=False, default='.', help='Output file')
parser.add_argument('-m', '--method', type=str,
required=False, default='fit',
help='Method: fit or predict')
parser.add_argument('-mp', '--models_path', type=str,
required=False, default='',
help='The models path')
parser.add_argument('-ep', '--entry_point', type=str,
required=False, default=None,
help='The predict entry point')
return parser
@staticmethod
def get_datasources(dataset_code):
dataset = Dataset.get(dataset_code)
map_files = {}
for ds in dataset.datasources:
map_files[ds.name] = ds.data_path
return map_files
[docs] @staticmethod
def save_output(output_file, inchikey_raw, method, models_path, discrete,
features, features_int=False, chunk=2000):
"""Save raw data produced by the preprocess script as matrix.
The result of preprocess scripts are usually in compact format (e.g.
binary data only list features with value of 1) since data might be
sparse and memory intensive to handle. This method convert it to a
signature like (explicit, extended) format. The produced H5 will
contain 3 dataset:
* 'keys': identifier (usually inchikey),
* 'features': features names,
* 'X': the data matrix
Args:
output_file(str): Path to output H5 file.
inchikey_raw(dict): inchikey -> list of values (dense format).
method(str): Same as used in the preprocess script.
models_path(str): Path to signature models directory.
discrete(bool): True if data is binary/discrete, False for
continuous data.
features(list): List of feature names from original sign0,
None when method is 'fit'.
features_int(str): Features have no name, we can use integers as
feature names.
chunk(int): Chunk size for loading data.
"""
keys = []
if discrete:
# check if categorical
categ = False
for k, v in inchikey_raw.items():
if len(v) > 0:
if isinstance(v[0], tuple):
categ = True
break
# words are all possible features
words = set()
for k in sorted(inchikey_raw.keys()):
keys.append(str(k))
if categ:
for word in inchikey_raw[k]:
words.add(word[0])
else:
words.update(inchikey_raw[k])
# if we have features available ('predict' method) check overlap
if features is not None:
orderwords = features
Preprocess.__log.debug(
"Predict entries have a total of %s features,"
" %s overlap with trainset and will be considered.",
len(words), len(set(features) & words))
# otherwise deduce features from data provided and sort them
else:
orderwords = list(words)
del words
if features_int:
orderwords.sort(key=int)
else:
orderwords.sort()
# prepare output file
Preprocess.__log.info("Output file will be of shape: "
"%s" % [len(keys), len(orderwords)])
with h5py.File(output_file, "w") as hf:
hf.create_dataset("keys", data=np.array(
keys, DataSignature.string_dtype()))
hf.create_dataset(
"X", (len(keys), len(orderwords)), dtype=np.int8)
hf.create_dataset("features", data=np.array(
orderwords, DataSignature.string_dtype()))
# write data in H5
raws = np.zeros((chunk, len(orderwords)), dtype=np.int8)
wordspos = {k: v for v, k in enumerate(orderwords)}
index = 0
for i, k in enumerate(keys):
# prepare chunk
shared_features = set(inchikey_raw[k]) & set(orderwords)
if len(shared_features) == 0:
Preprocess.__log.warn(
"%s has no shared features with trainset.", k)
for word in inchikey_raw[k]:
if categ:
raws[index][wordspos[word[0]]] = word[1]
else:
raws[index][wordspos[word]] = 1
index += 1
# when chunk is complete or molecules are over, write to file
if index == chunk or i == len(keys) - 1:
end = i + 1
if index != chunk:
chunk = index
with h5py.File(output_file, "r+") as hf:
dataset = hf["X"]
dataset[end - chunk:end] = raws[:chunk]
raws = np.zeros((chunk, len(orderwords)), dtype=np.int8)
index = 0
saving_features = orderwords
# continuous
else:
# get molecules inchikeys
for k in inchikey_raw.keys():
keys.append(str(k))
keys = np.array(keys)
inds = keys.argsort()
data = []
# sorted data
for i in inds:
data.append(inchikey_raw[keys[i]])
# define features if not available
if features is None:
features = [str(i) for i in range(1, len(data[0]) + 1)]
# save data
with h5py.File(output_file, "w") as hf:
hf.create_dataset("keys", data=np.array(
keys[inds], DataSignature.string_dtype()))
hf.create_dataset("X", data=np.array(data))
hf.create_dataset("features", data=np.array(
features, DataSignature.string_dtype()))
saving_features = features
# if fitting, we also save the features
if method == "fit":
fn = os.path.join(models_path, features_file)
with h5py.File(fn, "w") as hf:
hf.create_dataset("features", data=np.array(saving_features, DataSignature.string_dtype()))
[docs] def to_feature_string(self, signatures, string_func):
"""Covert signature to a string with feature names.
Args:
signatures(array): Signature array(s).
string_func(func): A function taking a dictionary as input and
returning a single string.
"""
result_dicts = self.to_features(signatures)
result_strings = list()
for res_dict in result_dicts:
result_strings.append(string_func(res_dict))
return result_strings
@staticmethod
def _feat_key_only(res_dict):
"""Suited for discrete spaces."""
strings = list()
for k in sorted(res_dict.keys()):
strings.append("%s" % k)
return ','.join(strings)
@staticmethod
def _feat_value_only(res_dict):
"""Suited for continuous spaces."""
strings = list()
for k in sorted(res_dict.keys()):
strings.append("%.3f" % res_dict[k])
return ','.join(strings)
@staticmethod
def _feat_key_values(res_dict):
"""Suited for discrete spaces with values."""
strings = list()
for k in sorted(res_dict.keys()):
strings.append("%s(%s)" % (k, res_dict[k]))
return ','.join(strings)
# def _compare_to_old(self, old_dbname, to_sample=1000):
# """Compare current signature 0 to previous format.
# Args:
# old_dbname(str): the name of the old db (e.g. 'mosaic').
# to_sample(int): Number of signatures to compare in the set of
# shared moleules.
# """
# try:
# from chemicalchecker.util import psql
# except ImportError as err:
# raise err
# old_table_names = {
# 'A1': 'fp2d',
# 'A2': 'fp3d',
# 'A3': 'scaffolds',
# 'A4': 'subskeys',
# 'A5': 'physchem',
# 'B1': 'moa',
# 'B2': 'metabgenes',
# 'B3': 'crystals',
# 'B4': 'binding',
# 'B5': 'htsbioass',
# 'C1': 'molroles',
# 'C2': 'molpathways',
# 'C3': 'pathways',
# 'C4': 'bps',
# 'C5': 'networks',
# 'D1': 'transcript',
# 'D2': 'cellpanel',
# 'D3': 'chemgenet',
# 'D4': 'morphology',
# 'D5': 'cellbioass',
# 'E1': 'therapareas',
# 'E2': 'indications',
# 'E3': 'sideeffects',
# 'E4': 'phenotypes',
# 'E5': 'ddis'
# }
# table_name = old_table_names[self.dataset[:2]]
# string_funcs = {
# 'A1': sign0._feat_key_only,
# 'A2': sign0._feat_key_only,
# 'A3': sign0._feat_key_only,
# 'A4': sign0._feat_key_only,
# 'A5': sign0._feat_value_only,
# 'B1': sign0._feat_key_only,
# 'B2': sign0._feat_key_only,
# 'B3': sign0._feat_key_only,
# 'B4': sign0._feat_key_values,
# 'B5': sign0._feat_key_only,
# 'C1': sign0._feat_key_only,
# 'C2': sign0._feat_key_values,
# 'C3': sign0._feat_key_values,
# 'C4': sign0._feat_key_values,
# 'C5': sign0._feat_key_values,
# 'D1': sign0._feat_key_values,
# 'D2': sign0._feat_value_only,
# 'D3': sign0._feat_key_values,
# 'D4': sign0._feat_value_only,
# 'D5': sign0._feat_key_only,
# 'E1': sign0._feat_key_only,
# 'E2': sign0._feat_key_only,
# 'E3': sign0._feat_key_only,
# 'E4': sign0._feat_key_only,
# 'E5': sign0._feat_key_only
# }
# continuous = ["A5", "D2", "D4"]
# string_func = string_funcs[self.dataset[:2]]
# if not self.dataset.startswith("A"):
# # get old keys
# res = psql.qstring('SELECT inchikey FROM %s;' %
# table_name, old_dbname)
# old_keys = set(r[0] for r in res)
# # compare to new
# old_only_keys = old_keys - self.unique_keys
# new_only_keys = self.unique_keys - old_keys
# shared_keys = self.unique_keys & old_keys
# frac_present = len(shared_keys) / float(len(old_keys))
# self.__log.info(
# "Among %s OLD molecules %.2f%% are still present:",
# len(old_keys),
# 100 * frac_present)
# self.__log.info("Old keys: %s", len(old_keys))
# self.__log.info("New keys: %s", len(self.unique_keys))
# self.__log.info("Shared keys: %s", len(shared_keys))
# self.__log.info("Old only keys: %s", len(old_only_keys))
# self.__log.info("New only keys: %s", len(new_only_keys))
# else:
# shared_keys = self.keys
# frac_present = 1.0
# # randomly check sample entries
# total = 0.0
# shared = 0.0
# changed = 0
# not_changed = 0
# most_diff = {
# 'shared': 99999,
# 'key': None,
# 'old_sign': None,
# 'new_sign': None
# }
# to_sample = min(len(shared_keys), to_sample)
# sample = np.random.choice(list(shared_keys), to_sample,
# replace=False)
# res = psql.qstring(
# "SELECT inchikey,raw FROM %s WHERE inchikey = ANY('{%s}');" %
# (table_name, ','.join(sample)), old_dbname)
# res = dict(res)
# for ink in tqdm(sample):
# feat_old = set(res[ink].split(','))
# if self.dataset[:2] in continuous:
# feat_old = set(["%.3f" % float(x)
# for x in res[ink].split(',')])
# feat_new = set(self.to_feature_string(
# self[ink.encode()], string_func)[0].split(','))
# if feat_new == feat_old:
# not_changed += 1
# else:
# changed += 1
# curr_shared = len(feat_new & feat_old)
# shared += curr_shared
# if curr_shared < most_diff['shared']:
# most_diff['shared'] = curr_shared
# most_diff['key'] = ink
# most_diff['old_sign'] = feat_old
# most_diff['new_sign'] = feat_new
# total += len(feat_old)
# frac_equal = not_changed / float(to_sample)
# self.__log.info(
# "Among %s shared sampled signatures %.2f%% are equal:",
# to_sample, 100 * frac_equal)
# self.__log.info("Equal: %s Changed: %s", not_changed, changed)
# if changed == 0:
# return frac_present, frac_equal, 1.0
# if total == 0.:
# frac_equal_feat = 0.0
# else:
# frac_equal_feat = shared / float(total)
# self.__log.info("Among changed %.2f%% of features are equal to old",
# 100 * frac_equal_feat)
# self.__log.info("Most different signature %s" % most_diff['key'])
# self.__log.info("OLD: %s" % sorted(list(most_diff['old_sign'])))
# self.__log.info("NEW: %s" % sorted(list(most_diff['new_sign'])))
# return frac_present, frac_equal, frac_equal_feat