Source code for chemicalchecker.tool.targetmate.nonconformist.nc

#!/usr/bin/env python

"""
Nonconformity functions.
"""

# Authors: Henrik Linusson

from __future__ import division

import abc
import numpy as np
import sklearn.base
from .base import ClassifierAdapter, RegressorAdapter
from .base import OobClassifierAdapter, OobRegressorAdapter

# -----------------------------------------------------------------------------
# Error functions
# -----------------------------------------------------------------------------

[docs]class ClassificationErrFunc(object): """Base class for classification model error functions. """ __metaclass__ = abc.ABCMeta def __init__(self): super(ClassificationErrFunc, self).__init__()
[docs] @abc.abstractmethod def apply(self, prediction, y): """Apply the nonconformity function. Parameters ---------- prediction : numpy array of shape [n_samples, n_classes] Class probability estimates for each sample. y : numpy array of shape [n_samples] True output labels of each sample. Returns ------- nc : numpy array of shape [n_samples] Nonconformity scores of the samples. """ pass
[docs]class RegressionErrFunc(object): """Base class for regression model error functions. """ __metaclass__ = abc.ABCMeta def __init__(self): super(RegressionErrFunc, self).__init__()
[docs] @abc.abstractmethod def apply(self, prediction, y):#, norm=None, beta=0): """Apply the nonconformity function. Parameters ---------- prediction : numpy array of shape [n_samples, n_classes] Class probability estimates for each sample. y : numpy array of shape [n_samples] True output labels of each sample. Returns ------- nc : numpy array of shape [n_samples] Nonconformity scores of the samples. """ pass
[docs] @abc.abstractmethod def apply_inverse(self, nc, significance):#, norm=None, beta=0): """Apply the inverse of the nonconformity function (i.e., calculate prediction interval). Parameters ---------- nc : numpy array of shape [n_calibration_samples] Nonconformity scores obtained for conformal predictor. significance : float Significance level (0, 1). Returns ------- interval : numpy array of shape [n_samples, 2] Minimum and maximum interval boundaries for each prediction. """ pass
[docs]class InverseProbabilityErrFunc(ClassificationErrFunc): """Calculates the probability of not predicting the correct class. For each correct output in ``y``, nonconformity is defined as .. math:: 1 - \hat{P}(y_i | x) \, . """ def __init__(self): super(InverseProbabilityErrFunc, self).__init__()
[docs] def apply(self, prediction, y): prob = np.zeros(y.size, dtype=np.float32) for i, y_ in enumerate(y): if y_ >= prediction.shape[1]: prob[i] = 0 else: prob[i] = prediction[i, int(y_)] return 1 - prob
[docs]class MarginErrFunc(ClassificationErrFunc): """ Calculates the margin error. For each correct output in ``y``, nonconformity is defined as .. math:: 0.5 - \dfrac{\hat{P}(y_i | x) - max_{y \, != \, y_i} \hat{P}(y | x)}{2} """ def __init__(self): super(MarginErrFunc, self).__init__()
[docs] def apply(self, prediction, y): prob = np.zeros(y.size, dtype=np.float32) for i, y_ in enumerate(y): if y_ >= prediction.shape[1]: prob[i] = 0 else: prob[i] = prediction[i, int(y_)] prediction[i, int(y_)] = -np.inf return 0.5 - ((prob - prediction.max(axis=1)) / 2)
[docs]class AbsErrorErrFunc(RegressionErrFunc): """Calculates absolute error nonconformity for regression problems. For each correct output in ``y``, nonconformity is defined as .. math:: | y_i - \hat{y}_i | """ def __init__(self): super(AbsErrorErrFunc, self).__init__()
[docs] def apply(self, prediction, y): return np.abs(prediction - y)
[docs] def apply_inverse(self, nc, significance): nc = np.sort(nc)[::-1] border = int(np.floor(significance * (nc.size + 1))) - 1 # TODO: should probably warn against too few calibration examples border = min(max(border, 0), nc.size - 1) return np.vstack([nc[border], nc[border]])
[docs]class SignErrorErrFunc(RegressionErrFunc): """Calculates signed error nonconformity for regression problems. For each correct output in ``y``, nonconformity is defined as .. math:: y_i - \hat{y}_i References ---------- .. [1] Linusson, Henrik, Ulf Johansson, and Tuve Lofstrom. Signed-error conformal regression. Pacific-Asia Conference on Knowledge Discovery and Data Mining. Springer International Publishing, 2014. """ def __init__(self): super(SignErrorErrFunc, self).__init__()
[docs] def apply(self, prediction, y): return (prediction - y)
[docs] def apply_inverse(self, nc, significance): nc = np.sort(nc)[::-1] upper = int(np.floor((significance / 2) * (nc.size + 1))) lower = int(np.floor((1 - significance / 2) * (nc.size + 1))) # TODO: should probably warn against too few calibration examples upper = min(max(upper, 0), nc.size - 1) lower = max(min(lower, nc.size - 1), 0) return np.vstack([-nc[lower], nc[upper]])
# ----------------------------------------------------------------------------- # Base nonconformity scorer # -----------------------------------------------------------------------------
[docs]class BaseScorer(sklearn.base.BaseEstimator): __metaclass__ = abc.ABCMeta def __init__(self): super(BaseScorer, self).__init__() @abc.abstractmethod def fit(self, x, y): pass @abc.abstractmethod def score(self, x, y=None): pass
[docs]class RegressorNormalizer(BaseScorer): def __init__(self, base_model, normalizer_model, err_func): super(RegressorNormalizer, self).__init__() self.base_model = base_model self.normalizer_model = normalizer_model self.err_func = err_func def fit(self, x, y): residual_prediction = self.base_model.predict(x) residual_error = np.abs(self.err_func.apply(residual_prediction, y)) residual_error += 0.00001 # Add small term to avoid log(0) log_err = np.log(residual_error) self.normalizer_model.fit(x, log_err) def score(self, x, y=None): norm = np.exp(self.normalizer_model.predict(x)) return norm
[docs]class NcFactory(object): @staticmethod def create_nc(model, err_func=None, normalizer_model=None, oob=False): if normalizer_model is not None: normalizer_adapter = RegressorAdapter(normalizer_model) else: normalizer_adapter = None if isinstance(model, sklearn.base.ClassifierMixin): err_func = MarginErrFunc() if err_func is None else err_func if oob: c = sklearn.base.clone(model) c.fit([[0], [1]], [0, 1]) if hasattr(c, 'oob_decision_function_'): adapter = OobClassifierAdapter(model) else: raise AttributeError('Cannot use out-of-bag ' 'calibration with {}'.format( model.__class__.__name__ )) else: adapter = ClassifierAdapter(model) if normalizer_adapter is not None: normalizer = RegressorNormalizer(adapter, normalizer_adapter, err_func) return ClassifierNc(adapter, err_func, normalizer) else: return ClassifierNc(adapter, err_func) elif isinstance(model, sklearn.base.RegressorMixin): err_func = AbsErrorErrFunc() if err_func is None else err_func if oob: c = sklearn.base.clone(model) c.fit([[0], [1]], [0, 1]) if hasattr(c, 'oob_prediction_'): adapter = OobRegressorAdapter(model) else: raise AttributeError('Cannot use out-of-bag ' 'calibration with {}'.format( model.__class__.__name__ )) else: adapter = RegressorAdapter(model) if normalizer_adapter is not None: normalizer = RegressorNormalizer(adapter, normalizer_adapter, err_func) return RegressorNc(adapter, err_func, normalizer) else: return RegressorNc(adapter, err_func)
[docs]class BaseModelNc(BaseScorer): """Base class for nonconformity scorers based on an underlying model. Parameters ---------- model : ClassifierAdapter or RegressorAdapter Underlying classification model used for calculating nonconformity scores. err_func : ClassificationErrFunc or RegressionErrFunc Error function object. normalizer : BaseScorer Normalization model. beta : float Normalization smoothing parameter. As the beta-value increases, the normalized nonconformity function approaches a non-normalized equivalent. """ def __init__(self, model, err_func, normalizer=None, beta=0): super(BaseModelNc, self).__init__() self.err_func = err_func self.model = model self.normalizer = normalizer self.beta = beta # If we use sklearn.base.clone (e.g., during cross-validation), # object references get jumbled, so we need to make sure that the # normalizer has a reference to the proper model adapter, if applicable. if (self.normalizer is not None and hasattr(self.normalizer, 'base_model')): self.normalizer.base_model = self.model self.last_x, self.last_y = None, None self.last_prediction = None self.clean = False
[docs] def fit(self, x, y): """Fits the underlying model of the nonconformity scorer. Parameters ---------- x : numpy array of shape [n_samples, n_features] Inputs of examples for fitting the underlying model. y : numpy array of shape [n_samples] Outputs of examples for fitting the underlying model. Returns ------- None """ self.model.fit(x, y) if self.normalizer is not None: self.normalizer.fit(x, y) self.clean = False
[docs] def score(self, x, y=None): """Calculates the nonconformity score of a set of samples. Parameters ---------- x : numpy array of shape [n_samples, n_features] Inputs of examples for which to calculate a nonconformity score. y : numpy array of shape [n_samples] Outputs of examples for which to calculate a nonconformity score. Returns ------- nc : numpy array of shape [n_samples] Nonconformity scores of samples. """ prediction = self.model.predict(x) n_test = x.shape[0] if self.normalizer is not None: norm = self.normalizer.score(x) + self.beta else: norm = np.ones(n_test) return self.err_func.apply(prediction, y) / norm
# ----------------------------------------------------------------------------- # Classification nonconformity scorers # -----------------------------------------------------------------------------
[docs]class ClassifierNc(BaseModelNc): """Nonconformity scorer using an underlying class probability estimating model. Parameters ---------- model : ClassifierAdapter Underlying classification model used for calculating nonconformity scores. err_func : ClassificationErrFunc Error function object. normalizer : BaseScorer Normalization model. beta : float Normalization smoothing parameter. As the beta-value increases, the normalized nonconformity function approaches a non-normalized equivalent. Attributes ---------- model : ClassifierAdapter Underlying model object. err_func : ClassificationErrFunc Scorer function used to calculate nonconformity scores. See also -------- RegressorNc, NormalizedRegressorNc """ def __init__(self, model, err_func=MarginErrFunc(), normalizer=None, beta=0): super(ClassifierNc, self).__init__(model, err_func, normalizer, beta)
# ----------------------------------------------------------------------------- # Regression nonconformity scorers # -----------------------------------------------------------------------------
[docs]class RegressorNc(BaseModelNc): """Nonconformity scorer using an underlying regression model. Parameters ---------- model : RegressorAdapter Underlying regression model used for calculating nonconformity scores. err_func : RegressionErrFunc Error function object. normalizer : BaseScorer Normalization model. beta : float Normalization smoothing parameter. As the beta-value increases, the normalized nonconformity function approaches a non-normalized equivalent. Attributes ---------- model : RegressorAdapter Underlying model object. err_func : RegressionErrFunc Scorer function used to calculate nonconformity scores. See also -------- ProbEstClassifierNc, NormalizedRegressorNc """ def __init__(self, model, err_func=AbsErrorErrFunc(), normalizer=None, beta=0): super(RegressorNc, self).__init__(model, err_func, normalizer, beta)
[docs] def predict(self, x, nc, significance=None): """Constructs prediction intervals for a set of test examples. Predicts the output of each test pattern using the underlying model, and applies the (partial) inverse nonconformity function to each prediction, resulting in a prediction interval for each test pattern. Parameters ---------- x : numpy array of shape [n_samples, n_features] Inputs of patters for which to predict output values. significance : float Significance level (maximum allowed error rate) of predictions. Should be a float between 0 and 1. If ``None``, then intervals for all significance levels (0.01, 0.02, ..., 0.99) are output in a 3d-matrix. Returns ------- p : numpy array of shape [n_samples, 2] or [n_samples, 2, 99] If significance is ``None``, then p contains the interval (minimum and maximum boundaries) for each test pattern, and each significance level (0.01, 0.02, ..., 0.99). If significance is a float between 0 and 1, then p contains the prediction intervals (minimum and maximum boundaries) for the set of test patterns at the chosen significance level. """ n_test = x.shape[0] prediction = self.model.predict(x) if self.normalizer is not None: norm = self.normalizer.score(x) + self.beta else: norm = np.ones(n_test) if significance: intervals = np.zeros((x.shape[0], 2)) err_dist = self.err_func.apply_inverse(nc, significance) err_dist = np.hstack([err_dist] * n_test) err_dist *= norm intervals[:, 0] = prediction - err_dist[0, :] intervals[:, 1] = prediction + err_dist[1, :] return intervals else: significance = np.arange(0.01, 1.0, 0.01) intervals = np.zeros((x.shape[0], 2, significance.size)) for i, s in enumerate(significance): err_dist = self.err_func.apply_inverse(nc, s) err_dist = np.hstack([err_dist] * n_test) err_dist *= norm intervals[:, 0, i] = prediction - err_dist[0, :] intervals[:, 1, i] = prediction + err_dist[0, :] return intervals