Source code for chemicalchecker.tool.node2vec.node2vec

"""Wrapper for SNAP C++ implementation of node2vec.

An algorithmic framework for representational learning on graphs. [Aug 27 2018]
usage: node2vec
   -i:Input graph path (default:'graph/karate.edgelist')
   -o:Output graph path (default:'emb/karate.emb')
   -d:Number of dimensions. Default is 128 (default:128)
   -l:Length of walk per source. Default is 80 (default:80)
   -r:Number of walks per source. Default is 10 (default:10)
   -k:Context size for optimization. Default is 10 (default:10)
   -e:Number of epochs in SGD. Default is 1 (default:1)
   -p:Return hyperparameter. Default is 1 (default:1)
   -q:Inout hyperparameter. Default is 1 (default:1)
   -v Verbose output.
   -dr Graph is directed.
   -w Graph is weighted.
   -ow Output random walks instead of embeddings.


"""
import os
import h5py
import subprocess
import operator
import pickle
import numpy as np
import gc
from tqdm import tqdm
import collections
import distutils.spawn
from datetime import datetime
from bisect import bisect_left
from chemicalchecker.util import logged

from chemicalchecker.core.signature_data import DataSignature


[docs]@logged class Node2Vec(): """Wrapper to run SNAP's node2vec.""" def __init__(self, executable="node2vec", **kwargs): """Check if executable is found.""" exec_file = distutils.spawn.find_executable(executable) if not exec_file: raise Exception("node2vec executable not found.") self.executable = exec_file
[docs] @staticmethod def heuristic_max_degree(nr_nodes, max_mbytes=6000.): """Return the maximum degree. The heuristic is based on the limiting factor that is node2vec's memory footprint. To precompute transition probabilities, node2vec needs 12 bytes for each triplet node-neighbor-neighbor. Args: nr_nodes(int): Number of nodes the network will contain. max_mbytes(float): Maximum RAM consumption to allow (in MBs). """ max_degree = 10 for max_deg in reversed(range(10, 1000, 5)): mbytes = (nr_nodes * max_deg * max_deg * 12) / 1000000. if mbytes < max_mbytes: max_degree = max_deg break return max_degree
def _pvalue_to_weigth(self, pvalues): """Scale p-values linearly to weights. shift minimum to zero w = p - min_pvalue scale range to one w *= 1/range_pvalue scale range to weight range w *= range_weight shift minimum to weight minimum w += min_weight weight = (pval_log - min_pvalue) * range_weight / range_pvalue + min_w """ min_w = self.min_weight max_w = self.max_weight min_p = -np.log10(self.min_pvalue) max_p = -np.log10(self.max_pvalue) range_p = max_p - min_p range_w = max_w - min_w # compute weights return (-np.log10(pvalues) - min_p) * range_w / range_p + min_w
[docs] def emb_to_h5(self, keys, in_file, out_file): """Convert from native node2vec format to familiar h5. We need to map back from sign1 ids to inchikeys and sort. """ self.__log.info("Converting %s to %s" % (in_file, out_file)) inchikeys = keys with open(in_file, 'r') as fh: words = list() vectors = list() header = fh.readline() # skip first row nr_words = int(header.split()[0]) nr_components = int(header.split()[1]) self.__log.info("words: %s features: %s", nr_words, nr_components) try: for idx, line in enumerate(fh): fields = line.split() # first colum is id word = inchikeys[int(fields[0])] # rest is embedding vector = np.fromiter((float(x) for x in fields[1:]), dtype=np.float32) words.append(word) vectors.append(vector) except Exception as ex: self.__log.info("Error at line %s: %s", idx, str(ex)) # consistency check assert(len(words) == len(inchikeys)) assert(len(words) == len(vectors)) # to numpy arrays words = np.array(words) matrix = np.array(vectors) # get them sorted sorted_idx = np.argsort(words) with h5py.File(out_file, "w") as hf: hf.create_dataset('keys', data=np.array(words[sorted_idx], DataSignature.string_dtype())) hf.create_dataset('V', data=matrix[sorted_idx], dtype=np.float32) hf.create_dataset("shape", data=matrix.shape)
[docs] def to_edgelist(self, sign1, neig1, out_file, **params): """Convert Nearest-neighbor to an edgelist. Args: sign1(str): Signature 1, h5 file path. neig1(str): Nearest neighbors 1, h5 file path. out_file(str): Destination file. params(dict): Parameters defining similarity network. """ self.min_weight = params.get("min_weight", 1e-2) self.max_weight = params.get("max_weight", 1.0) self.min_pvalue = params.get("min_pvalue", 1e-2) self.max_pvalue = params.get("max_pvalue", 1e-6) self.min_degree = params.get("min_degree", 3) self.range_pvalue = self.max_pvalue - self.min_pvalue self.range_weight = self.max_weight - self.min_weight # get sign1 background distances thresholds thresholds = sign1.background_distances('cosine') thr_pvals = thresholds['pvalue'] thr_dists = thresholds['distance'] # derive max_degree mem_max_degree = Node2Vec.heuristic_max_degree(sign1.shape[0]) self.max_degree = params.get("max_degree", mem_max_degree) if self.max_degree > mem_max_degree: self.__log.warn('user max_degree too large: %s', self.max_degree) self.__log.warn('using memory safe degree: %s', mem_max_degree) self.max_degree = mem_max_degree # write edgelist with open(out_file, 'w') as fh: for src_idx, neighbors in enumerate(neig1): # self.__log.debug('molecule: %s', src_idx) neig_idxs, neig_dists = neighbors # exclude self self_idx = np.argwhere(neig_idxs == src_idx) neig_idxs = np.delete(neig_idxs, self_idx) neig_dists = np.delete(neig_dists, self_idx) edges = self._get_edges( neig_idxs, neig_dists, thr_pvals, thr_dists) for dst_idx, weight in edges: fh.write("%i %i %.4f\n" % (src_idx, dst_idx, weight))
[docs] def merge_edgelists(self, signs, edgefiles, out_file, **params): """Convert Nearest-neighbor to an edgelist. Args: signs(list): List of signature objects. edgefiles(list): List of edge files. out_file(str): Destination file. params(dict): Parameters defining similarity network. """ edges_weights = collections.defaultdict(dict) all_keys = set() for i, edgefile in enumerate(edgefiles): keys = signs[i].keys all_keys.update(keys) with open(edgefile, 'r') as fh: for line in fh: elems = line.rstrip().split(' ') key1 = keys[int(elems[0])] key2 = keys[int(elems[1])] if key2 not in edges_weights[key1]: edges_weights[key1][key2] = 0.0 edges_weights[key1][key2] = max( edges_weights[key1][key2], float(elems[2])) mem_max_degree = Node2Vec.heuristic_max_degree(len(edges_weights)) self.max_degree = params.get("max_degree", mem_max_degree) if self.max_degree > mem_max_degree: self.__log.warn('user max_degree too large: %s', self.max_degree) self.__log.warn('using memory safe degree: %s', mem_max_degree) self.max_degree = mem_max_degree all_keys_list = list(all_keys) all_keys_list.sort() dictOfkeys = {all_keys_list[i]: i for i in range(0, len(all_keys_list))} # write edgelist with open(out_file, 'w') as fh: for node, map_nodes in edges_weights.items(): # self.__log.debug('molecule: %s', node) mols = list(map_nodes.items()) mols.sort(key=lambda tup: tup[1]) mols.reverse() for dst_idx, weight in mols[:self.max_degree]: fh.write("%s %s %.4f\n" % (dictOfkeys[node], dictOfkeys[dst_idx], min(weight, 1.0)))
[docs] def split_edgelist(self, full_graph, train, test, train_fraction=0.8): """Split a graph in train and test. Give a Network object split it in two sets, train and test, so that train has train_fraction of edges for each node. """ train_out = open(train, 'w') test_out = open(test, 'w') # we want each node to keep edges so the split is on each node for node in tqdm(full_graph.nodes()): neighbors = list(full_graph.neighbors(node, data=True)) np.random.shuffle(neighbors) split_id = int(len(neighbors) * train_fraction) for neig, weight in neighbors[:split_id]: train_out.write("%i %i %.4f\n" % (node, neig, weight)) for neig, weight in neighbors[split_id:]: test_out.write("%i %i %.4f\n" % (node, neig, weight)) train_out.close() test_out.close()
def _get_edges(self, neig_idxs, neig_dists, thr_pvals, thr_dists): """Get a molecule neighbors and edge weight. We have a list of all possible pvalues and the pvalue id for each similar molecule ('thresholds'). We want as many as possible significant pvalues (< MIN_PVALUE, >= MAX_PVALUE). We want at least MIN_DEGREE similar molecules (even if less significant than MIN_PVALUE) and less than MAX_DEGREE (even if kicking out significant ones). Args: neig_idxs(array): The indexes of the neighbour molecule. neig_dists(array): Corresponding distances. thr_pvals(array): Array with threshold of pvalues. thr_dists(array): Corresponding threshold distances. Returns: edges(list(tuple)): list with with destination and weight. """ curr_pval_idx = bisect_left(thr_pvals, self.min_pvalue) + 1 curr_pval = thr_pvals[curr_pval_idx] neig_mask = neig_dists <= thr_dists[curr_pval_idx] degree = np.count_nonzero(neig_mask) # self.__log.debug("curr_pval[%i] %s - degree %s", # curr_pval_idx, curr_pval, degree) # we want more than MIN_DEGREE and less than MAX_DEGREE if self.min_degree <= degree <= self.max_degree: considered_mol_ids = neig_idxs[neig_mask] # if we have too few elif degree < self.min_degree: # iterate increasing threshold pvalue for incr_pval_idx in range(curr_pval_idx + 1, len(thr_dists)): # check rank degree incr_pval = thr_pvals[incr_pval_idx] neig_mask = neig_dists <= thr_dists[incr_pval_idx] degree = np.count_nonzero(neig_mask) # self.__log.debug("incr_pval[%i] %s - degree %s", # incr_pval_idx, incr_pval, degree) # increasing threshold we have too many if degree >= self.min_degree: # sample neighbors from previous threshold prev_neig_mask = np.logical_and( neig_dists <= thr_dists[incr_pval_idx], neig_dists > thr_dists[incr_pval_idx - 1]) already_in = neig_dists <= thr_dists[ incr_pval_idx - 1] to_add = self.min_degree - np.count_nonzero(already_in) # self.__log.debug( # "sampling %s from pval %s", to_add, incr_pval) sampled_idxs = np.random.choice( np.argwhere(prev_neig_mask).flatten(), to_add, replace=False) sampled_neig_mask = np.full(neig_mask.shape, False) sampled_neig_mask[sampled_idxs] = True # add neighbors within current threshold neig_mask = np.logical_or(already_in, sampled_neig_mask) break # corner case where the highest threshold has to be sampled elif incr_pval_idx == len(thr_dists): # sample from highest threshold already_in = neig_dists <= thr_dists[ incr_pval_idx - 1] to_add = self.min_degree - np.count_nonzero(already_in) # self.__log.debug("corner sampling %s", to_add) curr_neig_mask = neig_dists == thr_dists[ incr_pval_idx] sampled_idxs = np.random.choice( np.argwhere(curr_neig_mask).flatten(), to_add, replace=False) sampled_neig_mask = np.full(neig_mask.shape, False) sampled_neig_mask[sampled_idxs] = True # add neighbors within previous threshold prev_neig_mask = neig_dists <= thr_dists[ incr_pval_idx - 1] neig_mask = np.logical_or( prev_neig_mask, sampled_neig_mask) break # at this point the mask contain the right amount of neighbors considered_mol_ids = neig_idxs[neig_mask] assert(len(considered_mol_ids) == self.min_degree) # else too many else: # iterate decreasing threshold pvalue for decr_pval_idx in range(curr_pval_idx - 1, -1, -1): # check degree decr_pval = thr_pvals[decr_pval_idx] neig_mask = neig_dists <= thr_dists[decr_pval_idx] degree = np.count_nonzero(neig_mask) # self.__log.debug("decr_pval[%i] %s - degree %s", # decr_pval_idx, decr_pval, degree) # reducing threshold we have too few if degree <= self.max_degree: # sample neighbors from previous threshold to_add = self.max_degree - degree # self.__log.debug( # "sampling %s from pval %s", to_add, decr_pval) prev_neig_mask = np.logical_and( neig_dists <= thr_dists[decr_pval_idx + 1], neig_dists > thr_dists[decr_pval_idx]) sampled_idxs = np.random.choice( np.argwhere(prev_neig_mask).flatten(), to_add, replace=False) sampled_neig_mask = np.full(neig_mask.shape, False) sampled_neig_mask[sampled_idxs] = True # add neighbors within current threshold neig_mask = np.logical_or(neig_mask, sampled_neig_mask) break # corner case where the lowest threshold has to be sampled elif decr_pval_idx == 0: to_add = self.max_degree # self.__log.debug("corner sampling %s", to_add) sampled_idxs = np.random.choice( np.argwhere(neig_mask).flatten(), to_add, replace=False) neig_mask = np.full(neig_mask.shape, False) neig_mask[sampled_idxs] = True break # at this point the mask contain the right amount of neighbors considered_mol_ids = neig_idxs[neig_mask] assert(len(considered_mol_ids) == self.max_degree) # convert distances to p-values index pvalues_idx = np.zeros(neig_dists.shape, dtype=int) for dist in thr_dists[:-1]: # skip last distance for corner cases pvalues_idx += neig_dists > dist # convert p-value indexes to p-values pvalues = thr_pvals[pvalues_idx] # replace p-value 0.0 with 1e-6 to avoid -inf pvalues[np.argwhere(pvalues == 0.0).flatten()] = self.max_pvalue # scale p-values linearly to weights weights = self._pvalue_to_weigth(pvalues[neig_mask]) # we force a minimal degree so we might have weights below MIN_WEIGHT # we cap to a default values one order of magnitude below MIN_WEIGHT weights[weights < self.min_weight] = self.min_weight / 10. # self.__log.debug("similar molecules considered: %s", # len(considered_mol_ids)) return zip(considered_mol_ids, weights)
[docs] def run(self, i, o, **kwargs): """Call external exe with given parameters. Args: i:Input graph path (default:'graph/karate.edgelist') o:Output graph path (default:'emb/karate.emb') d:Number of dimensions. Default is 128 (default:128) l:Length of walk per source. Default is 80 (default:80) r:Number of walks per source. Default is 10 (default:10) k:Context size for optimization. Default is 10 (default:10) e:Number of epochs in SGD. Default is 1 (default:1) p:Return hyperparameter. Default is 1 (default:1) q:Inout hyperparameter. Default is 1 (default:1) v Verbose output. dr Graph is directed. w Graph is weighted. ow Output random walks instead of embeddings. """ # check input if not os.path.isfile(i): raise Exception("Input file not found.") # get arguments or default values d = kwargs.get("d", 128) l = kwargs.get("l", 80) r = kwargs.get("r", 10) k = kwargs.get("k", 10) e = kwargs.get("e", 1) p = kwargs.get("p", 1) q = kwargs.get("q", 1) v = kwargs.get("v", True) ow = kwargs.get("ow", False) dr = kwargs.get("dr", False) w = kwargs.get("w", True) cpu = kwargs.get("cpu", 1) # prepare arguments args = [ "-i:%s" % i, "-o:%s" % o, "-d:%s" % d, "-l:%s" % l, "-r:%s" % r, "-k:%s" % k, "-e:%s" % e, "-p:%s" % p, "-q:%s" % q, ] if v: args.append("-v") if dr: args.append("-dr") if w: args.append("-w") if ow: args.append("-ow") # this enables using as many CPU as required os.environ['OMP_NUM_THREADS'] = str(cpu) # log command self.__log.info(' '.join(args)) self.__log.info("cpu: %s" % cpu) # run process process = subprocess.Popen([self.executable] + args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) # stream output as it gets generated while True: line = process.stdout.readline().decode("utf-8").strip() self.__log.debug(line) if line == '' and process.poll() is not None: break