"""Data aggregation classes."""
import collections
import numpy as np
from chemicalchecker.util import logged
[docs]@logged
class AggregateAsMatrix(object):
"""AggregateAsMatrix class.
Given a matrix with keys with potential duplicates, aggregate them.
"""
def __init__(self, method):
"""Initialize a AggregateAsMatrix instance.
Args:
method(str): The aggregation method to be used. Must be one of:
-first: The first occurrence of the signature is kept.
-last: The last occurrence of the signature is kept.
-average: The average of the signature is kept.
"""
self.method = method
[docs] def get_agg_func(self):
"""Get aggregation function."""
self.__log.debug("Aggregation method = %s" % self.method)
if self.method not in ['first', 'last', 'average']:
raise Exception("Aggregate 'method' must be one of: "
"'first', 'last', 'average'.")
def first(V, idxs):
return V[idxs[0], :]
def last(V, idxs):
return V[idxs[-1], :]
def average(V, idxs):
if len(idxs) == 1:
return V[idxs[0], :]
else:
return np.mean(V[idxs, :], axis=0)
return eval(self.method)
[docs]@logged
class AggregateAsPairs(object):
"""AggregateAsPairs class.
Given a matrix with potential duplicates, aggregate them.
"""
def __init__(self, method):
"""Initialize a AggregateAsPairs instance.
Args:
method(str): The aggregation method to be used. Must be one of:
-first: The first occurrence of the signature is kept.
-last: The last occurrence of the signature is kept.
-average: The average of the signature is kept.
"""
self.method = method
[docs] def get_agg_func(self):
"""Get aggregation function."""
self.__log.debug("Aggregation method = %s" % self.method)
if self.method not in ['first', 'last', 'average']:
raise Exception("Aggregate 'method' must be one of: "
"'first', 'last', 'average'.")
def first(V, idxs):
if len(idxs) == 1:
return V[idxs[0], :]
else:
v0 = V[idxs[0], :]
for idx in idxs[1:]:
v1 = V[idx, :]
zero_idxs = np.where(
np.logical_and(v0 == 0, v1 != 0))[0]
for zidx in zero_idxs:
v0[zidx] = v1[zidx]
return v0
def last(V, idxs):
if len(idxs) == 1:
return V[idxs[0], :]
else:
idxs = np.array(idxs)[::-1]
v0 = V[idxs[0], :]
for idx in idxs[1:]:
v1 = V[idx, :]
zero_idxs = np.where(
np.logical_and(v0 == 0, v1 != 0))[0]
for zidx in zero_idxs:
v0[zidx] = v1[zidx]
return v0
def average(V, idxs):
if len(idxs) == 1:
return V[idxs[0], :]
else:
idxs = np.array(idxs)[::-1]
num = np.sum(V[idxs, :], axis=0)
den = np.sum(V[idxs, :] != 0, axis=0)
mask = den > 0
v = np.zeros(len(num))
v[mask] = num[mask] / den[mask]
return v
return eval(self.method)
[docs]@logged
class Aggregate(object):
"""Aggregate class.
Aggregate samples.
"""
def __init__(self, method, input_type):
"""Initialize a Aggregate instance.
Args:
method(str): The aggregation method to be used. Must be one of:
-first: The first occurrence of the signature is kept.
-last: The last occurrence of the signature is kept.
-average: The average of the signature is kept.
input_type(str): One of 'pairs' or 'matrix'.
"""
if input_type not in ['pairs', 'matrix']:
raise Exception("Input type must be 'pairs' or 'matrix'")
if input_type == 'pairs':
self.agg = AggregateAsPairs(method=method)
else:
self.agg = AggregateAsMatrix(method=method)