"""Do TFIDF-LSI."""
import os
import h5py
import random
import tempfile
import numpy as np
from gensim import corpora, models
from scipy.sparse import lil_matrix
from sklearn.utils.sparsefuncs import mean_variance_axis
from .base import BaseTransform
from chemicalchecker.util import Config, logged
from chemicalchecker.core.signature_data import DataSignature
[docs]class Corpus(object):
"""Corpus class."""
def __init__(self, plain_corpus, dictionary):
self.plain_corpus = plain_corpus
self.dictionary = dictionary
def __iter__(self):
for l in open(self.plain_corpus, "r"):
l = l.rstrip("\n").split(" ")[1].split(",")
bow = self.dictionary.doc2bow(l)
if not bow:
continue
yield bow
def __len__(self):
return len([_ for _ in self.keys()])
def keys(self):
for l in open(self.plain_corpus, "r"):
key = l.split(" ")[0]
l = l.rstrip("\n").split(" ")[1].split(",")
bow = self.dictionary.doc2bow(l)
if not bow:
continue
yield key
[docs]@logged
class Lsi(BaseTransform):
"""Lsi class."""
def __init__(self, sign1, *args, tmp=False, variance_explained=0.9,
num_topics=None, B_val=10, N_val=1000, multipass=True,
min_freq=5, max_freq=0.25,
max_keys=100000, tmp_path=None, **kwargs):
"""Initialize a Lsi instance."""
BaseTransform.__init__(self, sign1, "lsi", max_keys, tmp)
self.variance_explained = variance_explained
self.min_freq = min_freq
self.max_freq = max_freq
self.multipass = multipass
self.num_topics = num_topics
self.B_val = B_val
self.N_val = N_val
if tmp_path is None:
tmp_path = Config().PATH.CC_TMP
self.tmp_path = tmp_path
def _lsi_variance_explained(self, tfidf_corpus, lsi, num_topics):
mm = corpora.MmCorpus(tfidf_corpus)
exp_var_ratios = []
for _ in range(self.B_val):
xt = []
sm = lil_matrix((self.N_val, mm.num_terms))
for i in range(self.N_val):
io = random.randint(0, mm.num_docs - 1)
terms = mm[io]
# Transformed matrix
tops = np.zeros(num_topics)
for x in lsi[terms]:
if x[0] >= num_topics:
continue
tops[x[0]] = x[1]
xt += [tops]
# Sparse original matrix
for t in terms:
sm[i, t[0] - 1] = t[1]
xt = np.array(xt)
sm = sm.tocsr()
full_var = mean_variance_axis(sm, 0)[1].sum()
try:
exp_var = np.var(xt, axis=0)
exp_var_ratios += [exp_var / full_var]
except Exception as ex:
self.__log.warning(str(ex))
continue
exp_var_ratios = np.mean(np.array(exp_var_ratios), axis=0)
return exp_var_ratios
def fit(self):
if not self.categorical:
raise Exception("TFIDF-LSI only allowed for categorical matrices")
V, keys, features = self.subsample()
self.features = features
self.plain_corpus = os.path.join(
self.model_path, self.name + ".plain.txt")
self.tfidf_corpus = os.path.join(
self.model_path, self.name + ".tfidf.mm")
# plain corpus
with open(self.plain_corpus, "w") as f:
for chunk in self.chunker(V.shape[0]):
vs = V[chunk]
ks = keys[chunk]
for key, row in zip(ks, vs):
mask = np.where(row > 0)
val = ",".join([",".join([features[x]] * int(row[x]))
for x in mask[0]])
f.write("%s %s\n" % (key, val))
del V
# get dictionary
self.__log.info('Generating dictionary.')
self.__log.info('min_freq: %s', self.min_freq)
self.__log.info('max_freq: %s', self.max_freq)
dictionary = corpora.Dictionary(l.rstrip("\n").split(" ")[1].split(
",") for l in open(self.plain_corpus, "r"))
# filter extremes
dictionary.filter_extremes(
no_below=self.min_freq, no_above=self.max_freq)
# save
dictionary.compactify()
dictionary.save(os.path.join(self.model_path, self.name + ".dict.pkl"))
# corpus
c = Corpus(self.plain_corpus, dictionary)
# tfidf model
tfidf = models.TfidfModel(c)
tfidf.save(os.path.join(self.model_path, self.name + ".tfidf.pkl"))
c_tfidf = tfidf[c]
corpora.MmCorpus.serialize(self.tfidf_corpus, c_tfidf)
# getting ready for lsi
if self.num_topics is None:
self.num_topics = int(0.67 * len(dictionary))
if self.multipass:
onepass = False
else:
onepass = True
# lsi
self.__log.info('Fitting LSI model.')
only_zeros = 1
while only_zeros > 0:
self.__log.info('num_topics: %s', self.num_topics)
lsi = models.LsiModel(c_tfidf, id2word=dictionary,
num_topics=self.num_topics, onepass=onepass,
chunksize=2500)
lsi.save(os.path.join(self.model_path, self.name + ".lsi.pkl"))
# variance explained
exp_var_ratios = self._lsi_variance_explained(
self.tfidf_corpus, lsi, self.num_topics)
for cut_i, cum_var in enumerate(np.cumsum(exp_var_ratios)):
if cum_var > self.variance_explained:
break
self.cut_i = cut_i
c_lsi = lsi[c_tfidf]
# get keys
keys = np.array([k for k in c.keys()])
only_zeros = 0
for line in c_lsi:
v = np.zeros(self.cut_i + 1)
for x in line[:self.cut_i + 1]:
if x[0] > self.cut_i:
continue
v[x[0]] = x[1]
if np.sum(v) == 0:
only_zeros += 1
# in some corner cases we might get full zero rows after LSI
if only_zeros > 0:
self.__log.warning(
'Getting only zero rows: %s', str(only_zeros))
self.num_topics += 50
self.variance_explained = min(
self.variance_explained + 0.05, 1)
self.__log.warning(
'Repeating LSI with: variance_explained: %.2f num_topics: %s',
self.variance_explained, str(self.num_topics))
self.predict(self.sign_ref)
self.predict(self.sign)
self.save()
def predict(self, sign1):
self.predict_check(sign1)
# corpus for the predict
tmp_dir = tempfile.mkdtemp(prefix="lsi_", dir=self.tmp_path)
plain_corpus = os.path.join(tmp_dir, self.name + ".plain.txt")
tfidf_corpus = os.path.join(tmp_dir, self.name + ".tfidf.mm")
# write corpus (dense feature)
with open(plain_corpus, "w") as f:
# Read the provided sign1 by chunks of n signautres
for chunk in sign1.chunker():
# take a chunk of n signatures, together with their keys
vs = sign1[chunk].astype(np.int)
ks = sign1.keys[chunk]
for i in range(0, len(ks)):
# save dense represantation (feat with 1 values only)
row = vs[i]
mask = np.argwhere(row > 0).ravel()
val = ",".join(self.features[mask])
f.write("%s %s\n" % (ks[i], val))
sign1.close_hdf5()
# load dictionary
dictionary = corpora.Dictionary.load(
os.path.join(self.model_path, self.name + ".dict.pkl"))
# init corpus object
c = Corpus(plain_corpus, dictionary)
tfidf = models.TfidfModel.load(os.path.join(
self.model_path, self.name + ".tfidf.pkl"))
c_tfidf = tfidf[c]
corpora.MmCorpus.serialize(tfidf_corpus, c_tfidf)
lsi = models.LsiModel.load(os.path.join(
self.model_path, self.name + ".lsi.pkl"))
c_lsi = lsi[c_tfidf]
# get keys
keys = np.array([k for k in c.keys()])
if len(keys) < len(sign1.keys):
drop = len(sign1.keys) - len(keys)
self.__log.warning('Dropped %s molecules (only zeros).' % drop)
# instead of creating V we need to write iteratively to the H5
# we run here the operations of the overwrite function
with h5py.File(sign1.data_path, "r+") as hf:
if self.tmp:
del hf["V_tmp"]
del hf["V"]
else:
del hf["V"]
if "V_tmp" in hf.keys():
self.__log.debug("Overwriting tmp with the actual dataset")
del hf["V_tmp"]
del hf["keys"]
hf.create_dataset("keys", data=np.array(
keys, DataSignature.string_dtype()))
hf.create_dataset("V", (len(keys), self.cut_i + 1),
dtype=np.float32)
hf.create_dataset("V_tmp", (len(keys), self.cut_i + 1),
dtype=np.float32)
only_zeros = 0
for idx, line in enumerate(c_lsi):
v = np.zeros(self.cut_i + 1)
for x in line[:self.cut_i + 1]:
if x[0] > self.cut_i:
continue
v[x[0]] = x[1]
if np.sum(v) == 0:
only_zeros += 1
hf["V"][idx] = v
hf["V_tmp"][idx] = v
# in some corner cases we might get full zero rows after LSI
if only_zeros > 0:
self.__log.warning(
'Getting only zero rows: %s', str(only_zeros))
sign1.refresh()
self.reindex_triplets(sign1, keys)
self.remap(sign1)
# self.overwrite(sign1=sign1, V=V, keys=keys)