Source code for chemicalchecker.util.network.network

"""Network representations.

Each class provides mean of accessing a network/graph.
"""
import os
import json
import numpy as np
import networkx as nx

from chemicalchecker.util import logged


[docs]@logged class NetworkxNetwork(): """NetworkxNetwork class. Network representation with NetworkX. Simple but heavy in memory. Single type of node, single type of edge. """ def __init__(self, network): """Initialize a NetworkxNetwork instance.""" self._network = network self.__log.info("Nodes: %s Edges: %s" % ( len(self._network.nodes()), len(self._network.edges()))) @property def nodes(self, data=False): return self._network.nodes(data=data) def get_node(self, node): return self._network[node] def edges(self, data=False): return self._network.edges(data=data) def neighbors(self, node): neighbors = list() for node, edge_data in self._network[node].items(): node_data = self._network.nodes[node] neighbors.append((edge_data, node, node_data)) return neighbors def has_edge(self, node, previous): return self._network.has_edge(node, previous)
[docs]@logged class MultiEdgeNetwork(): """MultiEdgeNetwork class. Multimodal network representation with SNAP. Multiple type of node, Multiple type of edge. """ def __init__(self, network): """Initialize a MultiEdgeNetwork instance.""" try: import snap self.snap = snap except ImportError: raise ImportError("requires snap " + "http://snap.stanford.edu/") self._network = network self.__log.info("Node types: %s" % network.GetModeNets()) self.node_types = list() modeneti = network.BegModeNetI() while modeneti < network.EndModeNetI(): self.__log.info("Nodes in '{}': {:>12}".format( network.GetModeName(modeneti.GetModeId()), modeneti.GetModeNet().GetNodes())) modenet_name = network.GetModeName(modeneti.GetModeId()) self.node_types.append(modenet_name) self._current_modenet = ( modenet_name, network.GetModeNetByName(modenet_name)) modeneti.Next() self.__log.info("Link types: %s" % network.GetCrossNets()) self.edge_types = list() crossneti = network.BegCrossNetI() while crossneti < network.EndCrossNetI(): self.__log.info("Edges in '{}': {:>12}".format( network.GetCrossName(crossneti.GetCrossId()), crossneti.GetCrossNet().GetEdges())) crossnet_name = network.GetCrossName(crossneti.GetCrossId()) self.edge_types.append(crossnet_name) self._current_crossnet = ( crossnet_name, network.GetCrossNetByName(crossnet_name)) crossneti.Next() self._neighbors = dict() def nodes(self, node_type=None, data=False): if not node_type: node_type = self.node_types[0] modenet = self._network.GetModeNetByName(node_type) node = modenet.BegMMNI() while node < modenet.EndMMNI(): if data: attr = self.snap.TStrV() node.GetStrAttrVal(attr) yield (node.GetId(), attr[0]) else: yield node.GetId() node.Next() def edges(self, edge_type=None, data=False): if not edge_type: edge_type = self.edge_types[0] crossnet = self._network.GetCrossNetByName(edge_type) edge = crossnet.BegEI() while edge < crossnet.EndEI(): src = edge.GetSrcNId() dst = edge.GetDstNId() if data: weight = self.snap.TFltV() crossnet.FltAttrValueEI(edge.GetId(), weight) yield (src, dst, weight[0]) else: yield (src, dst) edge.Next() def neighbors(self, nodeid, edge_type, node_type, data=True): if self._current_modenet[0] == node_type: modenet = self._current_modenet[1] else: modenet = self._network.GetModeNetByName(node_type) if self._current_crossnet[0] == edge_type: crossnet = self._current_crossnet[1] else: crossnet = self._network.GetCrossNetByName(edge_type) edgeids = self.snap.TIntV() modenet.GetNeighborsByCrossNet(nodeid, edge_type, edgeids, True) for edgeid in edgeids: ei = crossnet.GetEdgeI(edgeid) edge = self.snap.TCrossNetEdgeI(ei) ei.disown() if data: weight = self.snap.TFltV() crossnet.FltAttrValueEI(edgeid, weight) yield (edge.GetDstNId(), weight[0]) else: yield edge.GetDstNId() def has_edge(self, src, dst, edge_type, node_type): neighbors = self.neighbors(src, edge_type, node_type, data=False) for nodeid in neighbors: if dst == nodeid: return True return False def out_degree(self, nodeid, edge_type, node_type): neighbors = self.neighbors(nodeid, edge_type, node_type) return len(neighbors) def print_nodes(self, node_type): for node in self.nodes(node_type, data=True): print(node) def print_edges(self, edge_type): for edge in self.edges(edge_type, data=True): print(edge)
[docs]@logged class SNAPNetwork(): """SNAPNetwork class. Network representation with SNAP. Single type of node, single type of edge. """ def __init__(self, network): """Initialize a SNAPNetwork instance.""" try: import snap self.snap = snap except ImportError: raise ImportError("requires snap " + "http://snap.stanford.edu/") self._network = network self.__log.info("Nodes : {:>12}".format(network.GetNodes())) self.__log.info("Edges : {:>12}".format(network.GetEdges())) @classmethod def from_file(cls, filename, delimiter=' ', read_weights=True): try: import snap except ImportError: raise ImportError("requires snap " + "http://snap.stanford.edu/") filename = os.path.abspath(filename) network = snap.LoadEdgeList(snap.PNEANet, filename, 0, 1, delimiter) # add weigths if read_weights: with open(filename, 'r') as fh: eid = 0 for line in fh: network.AddFltAttrDatE( eid, float(line.split()[2]), 'weight') eid += 1 snapnet = cls(network) return snapnet def nodes(self, data=False): node = self._network.BegNI() while node < self._network.EndNI(): if data: attr = self.snap.TStrV() node.GetStrAttrVal(attr) yield (node.GetId(), attr[0]) else: yield node.GetId() node.Next() def edges(self, data=False): edge = self._network.BegEI() while edge < self._network.EndEI(): src = edge.GetSrcNId() dst = edge.GetDstNId() if data: weight = self.snap.TFltV() self._network.FltAttrValueEI(edge.GetId(), weight) yield (src, dst, weight[0]) else: yield (src, dst) edge.Next() def neighbors(self, nodeid, data=False): node = self._network.GetNI(nodeid) for nid in range(node.GetOutDeg()): neig = node.GetNbrNId(nid) if data: edgeid = node.GetNbrEId(nid) weight = self.snap.TFltV() self._network.FltAttrValueEI(edgeid, weight) yield (neig, weight[0]) else: yield neig def has_edge(self, src, dst): neighbors = self.neighbors(src) for nodeid in neighbors: if dst == nodeid: return True return False def out_degree(self, nodeid): node = self._network.GetNI(nodeid) return node.GetOutDeg() def print_nodes(self): for node in self.nodes(): print(node) def print_edges(self): for edge in self.edges(): print(edge) def save(self, filename): FOut = self.snap.TFOut(filename) self._network.Save(FOut) FOut.Flush() def stats_toJSON(self, filename): stats = dict() stats["nodes"] = self._network.GetNodes() stats["edges"] = self._network.GetEdges() zeroNodes = 0 zeroInNodes = 0 zeroOutNodes = 0 nonZIODegNodes = 0 node = self._network.BegNI() degrees = list() while node < self._network.EndNI(): degrees.append(node.GetDeg()) if (node.GetDeg() == 0): zeroNodes += 1 if (node.GetInDeg() == 0): zeroInNodes += 1 if (node.GetOutDeg() == 0): zeroOutNodes += 1 if (node.GetInDeg() != 0 & node.GetOutDeg() != 0): nonZIODegNodes += 1 node.Next() edge = self._network.BegEI() weights = list() while edge < self._network.EndEI(): weight = self.snap.TFltV() self._network.FltAttrValueEI(edge.GetId(), weight) weights.append(weight[0]) edge.Next() # nodes without edges? stats["zeroNodes"] = zeroNodes stats["zeroInNodes"] = zeroInNodes stats["zeroOutNodes"] = zeroOutNodes stats["nonZIODegNodes"] = nonZIODegNodes # degree distribution stats["Degree_min"] = min(degrees) stats["Degree_max"] = max(degrees) stats["Degree_25"] = np.percentile(degrees, 25) stats["Degree_50"] = np.percentile(degrees, 50) stats["Degree_75"] = np.percentile(degrees, 75) # weights distribution stats["Weight_min"] = min(weights) stats["Weight_max"] = max(weights) stats["Weight_25"] = np.percentile(weights, 25) stats["Weight_50"] = np.percentile(weights, 50) stats["Weight_75"] = np.percentile(weights, 75) # fraction of nodes in largest weakly connected component stats["WccSz"] = self.snap.GetMxWccSz(self._network) # fraction of nodes in largest strongly connected component stats["SccSz"] = self.snap.GetMxSccSz(self._network) with open(filename, 'w') as fh: json.dump(stats, fh)
[docs]@logged class HotnetNetwork(): """HotnetNetwork class. Network tools for hotnet. Read network and create files. """ def __init__(self, network): """Initialize a HotnetNetwork instance.""" self._network = network self.__log.info("Nodes: %s Edges: %s" % ( len(self._network.nodes()), len(self._network.edges()))) @staticmethod def prepare(interactions, out_path, hotnet, all_nodes=False): HotnetNetwork.__log.info("Reading network") G = nx.Graph() with open(interactions, "r") as f: for l in f: l = l.rstrip("\n").split("\t") G.add_edge(l[0], l[1]) if not all_nodes: G = max(list(G.subgraph(c) for c in nx.connected_components(G)), key=len) # Writing files # Index-to-gene file f = open("%s/idx2node.tsv" % out_path, "w") i = 1 node_idx = {} for n in G.nodes(): f.write("%d\t%s\n" % (i, n)) node_idx[n] = i i += 1 f.close() # Edge-list file f = open("%s/edgelist.tsv" % out_path, "w") for e in G.edges(): f.write("%d\t%d\n" % (node_idx[e[0]], node_idx[e[1]])) f.close() # Calculate beta HotnetNetwork.__log.info("Computing beta") hotnet.choose_beta(os.path.join(out_path, "edgelist.tsv"), os.path.join(out_path, "beta.txt")) # Calculate similarity matrix HotnetNetwork.__log.info("Calculate Similarity matrix") b = float(open(os.path.join(out_path, "beta.txt"), "r").read()) hotnet.create_similarity_matrix(os.path.join(out_path, "edgelist.tsv"), os.path.join(out_path, "similarity_matrix.h5"), b=b)