Source code for chemicalchecker.tool.targetmate.nonconformist.icp

#!/usr/bin/env python

"""
Inductive conformal predictors.
"""

# Authors: Henrik Linusson

from __future__ import division

from collections import defaultdict
from functools import partial

import numpy as np
from sklearn.base import BaseEstimator

from .base import RegressorMixin, ClassifierMixin
from .util import calc_p


# -----------------------------------------------------------------------------
# Base inductive conformal predictor
# -----------------------------------------------------------------------------
[docs]class BaseIcp(BaseEstimator): """Base class for inductive conformal predictors. """ def __init__(self, nc_function, condition=None): self.cal_x, self.cal_y = None, None self.nc_function = nc_function # Check if condition-parameter is the default function (i.e., # lambda x: 0). This is so we can safely clone the object without # the clone accidentally having self.conditional = True. default_condition = lambda x: 0 is_default = (callable(condition) and (condition.__code__.co_code == default_condition.__code__.co_code)) if is_default: self.condition = condition self.conditional = False elif callable(condition): self.condition = condition self.conditional = True else: self.condition = lambda x: 0 self.conditional = False
[docs] def fit(self, x, y): """Fit underlying nonconformity scorer. Parameters ---------- x : numpy array of shape [n_samples, n_features] Inputs of examples for fitting the nonconformity scorer. y : numpy array of shape [n_samples] Outputs of examples for fitting the nonconformity scorer. Returns ------- None """ # TODO: incremental? self.nc_function.fit(x, y)
[docs] def calibrate(self, x, y, increment=False): """Calibrate conformal predictor based on underlying nonconformity scorer. Parameters ---------- x : numpy array of shape [n_samples, n_features] Inputs of examples for calibrating the conformal predictor. y : numpy array of shape [n_samples, n_features] Outputs of examples for calibrating the conformal predictor. increment : boolean If ``True``, performs an incremental recalibration of the conformal predictor. The supplied ``x`` and ``y`` are added to the set of previously existing calibration examples, and the conformal predictor is then calibrated on both the old and new calibration examples. Returns ------- None """ self._calibrate_hook(x, y, increment) self._update_calibration_set(x, y, increment) if self.conditional: category_map = np.array([self.condition((x[i, :], y[i])) for i in range(y.size)]) self.categories = np.unique(category_map) self.cal_scores = defaultdict(partial(np.ndarray, 0)) for cond in self.categories: idx = category_map == cond cal_scores = self.nc_function.score(self.cal_x[idx, :], self.cal_y[idx]) self.cal_scores[cond] = np.sort(cal_scores)[::-1] else: self.categories = np.array([0]) cal_scores = self.nc_function.score(self.cal_x, self.cal_y) self.cal_scores = {0: np.sort(cal_scores)[::-1]}
def _calibrate_hook(self, x, y, increment): pass def _update_calibration_set(self, x, y, increment): if increment and self.cal_x is not None and self.cal_y is not None: self.cal_x = np.vstack([self.cal_x, x]) self.cal_y = np.hstack([self.cal_y, y]) else: self.cal_x, self.cal_y = x, y
# ----------------------------------------------------------------------------- # Inductive conformal classifier # -----------------------------------------------------------------------------
[docs]class IcpClassifier(BaseIcp, ClassifierMixin): """Inductive conformal classifier. Parameters ---------- nc_function : BaseScorer Nonconformity scorer object used to calculate nonconformity of calibration examples and test patterns. Should implement ``fit(x, y)`` and ``calc_nc(x, y)``. smoothing : boolean Decides whether to use stochastic smoothing of p-values. Attributes ---------- cal_x : numpy array of shape [n_cal_examples, n_features] Inputs of calibration set. cal_y : numpy array of shape [n_cal_examples] Outputs of calibration set. nc_function : BaseScorer Nonconformity scorer object used to calculate nonconformity scores. classes : numpy array of shape [n_classes] List of class labels, with indices corresponding to output columns of IcpClassifier.predict() See also -------- IcpRegressor References ---------- .. [1] Papadopoulos, H., & Haralambous, H. (2011). Reliable prediction intervals with regression neural networks. Neural Networks, 24(8), 842-851. Examples -------- >>> import numpy as np >>> from sklearn.datasets import load_iris >>> from sklearn.tree import DecisionTreeClassifier >>> from nonconformist.base import ClassifierAdapter >>> from nonconformist.icp import IcpClassifier >>> from nonconformist.nc import ClassifierNc, MarginErrFunc >>> iris = load_iris() >>> idx = np.random.permutation(iris.target.size) >>> train = idx[:int(idx.size / 3)] >>> cal = idx[int(idx.size / 3):int(2 * idx.size / 3)] >>> test = idx[int(2 * idx.size / 3):] >>> model = ClassifierAdapter(DecisionTreeClassifier()) >>> nc = ClassifierNc(model, MarginErrFunc()) >>> icp = IcpClassifier(nc) >>> icp.fit(iris.data[train, :], iris.target[train]) >>> icp.calibrate(iris.data[cal, :], iris.target[cal]) >>> icp.predict(iris.data[test, :], significance=0.10) ... # doctest: +SKIP array([[ True, False, False], [False, True, False], ..., [False, True, False], [False, True, False]], dtype=bool) """ def __init__(self, nc_function, condition=None, smoothing=True): super(IcpClassifier, self).__init__(nc_function, condition) self.classes = None self.smoothing = smoothing def _calibrate_hook(self, x, y, increment=False): self._update_classes(y, increment) def _update_classes(self, y, increment): if self.classes is None or not increment: self.classes = np.unique(y) else: self.classes = np.unique(np.hstack([self.classes, y]))
[docs] def predict(self, x, significance=None): """Predict the output values for a set of input patterns. Parameters ---------- x : numpy array of shape [n_samples, n_features] Inputs of patters for which to predict output values. significance : float or None Significance level (maximum allowed error rate) of predictions. Should be a float between 0 and 1. If ``None``, then the p-values are output rather than the predictions. Returns ------- p : numpy array of shape [n_samples, n_classes] If significance is ``None``, then p contains the p-values for each sample-class pair; if significance is a float between 0 and 1, then p is a boolean array denoting which labels are included in the prediction sets. """ # TODO: if x == self.last_x ... n_test_objects = x.shape[0] p = np.zeros((n_test_objects, self.classes.size)) ncal_ngt_neq = self._get_stats(x) for i in range(len(self.classes)): for j in range(n_test_objects): p[j, i] = calc_p(ncal_ngt_neq[j, i, 0], ncal_ngt_neq[j, i, 1], ncal_ngt_neq[j, i, 2], self.smoothing) if significance is not None: return p > significance else: return p
def _get_stats(self, x): n_test_objects = x.shape[0] ncal_ngt_neq = np.zeros((n_test_objects, self.classes.size, 3)) for i, c in enumerate(self.classes): test_class = np.zeros(x.shape[0], dtype=self.classes.dtype) test_class.fill(c) # TODO: maybe calculate p-values using cython or similar # TODO: interpolated p-values # TODO: nc_function.calc_nc should take X * {y1, y2, ... ,yn} test_nc_scores = self.nc_function.score(x, test_class) for j, nc in enumerate(test_nc_scores): cal_scores = self.cal_scores[self.condition((x[j, :], c))][::-1] n_cal = cal_scores.size idx_left = np.searchsorted(cal_scores, nc, 'left') idx_right = np.searchsorted(cal_scores, nc, 'right') ncal_ngt_neq[j, i, 0] = n_cal ncal_ngt_neq[j, i, 1] = n_cal - idx_right ncal_ngt_neq[j, i, 2] = idx_right - idx_left return ncal_ngt_neq
[docs] def predict_conf(self, x): """Predict the output values for a set of input patterns, using the confidence-and-credibility output scheme. Parameters ---------- x : numpy array of shape [n_samples, n_features] Inputs of patters for which to predict output values. Returns ------- p : numpy array of shape [n_samples, 3] p contains three columns: the first column contains the most likely class for each test pattern; the second column contains the confidence in the predicted class label, and the third column contains the credibility of the prediction. """ p = self.predict(x, significance=None) label = p.argmax(axis=1) credibility = p.max(axis=1) for i, idx in enumerate(label): p[i, idx] = -np.inf confidence = 1 - p.max(axis=1) return np.array([label, confidence, credibility]).T
# ----------------------------------------------------------------------------- # Inductive conformal regressor # -----------------------------------------------------------------------------
[docs]class IcpRegressor(BaseIcp, RegressorMixin): """Inductive conformal regressor. Parameters ---------- nc_function : BaseScorer Nonconformity scorer object used to calculate nonconformity of calibration examples and test patterns. Should implement ``fit(x, y)``, ``calc_nc(x, y)`` and ``predict(x, nc_scores, significance)``. Attributes ---------- cal_x : numpy array of shape [n_cal_examples, n_features] Inputs of calibration set. cal_y : numpy array of shape [n_cal_examples] Outputs of calibration set. nc_function : BaseScorer Nonconformity scorer object used to calculate nonconformity scores. See also -------- IcpClassifier References ---------- .. [1] Papadopoulos, H., Proedrou, K., Vovk, V., & Gammerman, A. (2002). Inductive confidence machines for regression. In Machine Learning: ECML 2002 (pp. 345-356). Springer Berlin Heidelberg. .. [2] Papadopoulos, H., & Haralambous, H. (2011). Reliable prediction intervals with regression neural networks. Neural Networks, 24(8), 842-851. Examples -------- >>> import numpy as np >>> from sklearn.datasets import load_boston >>> from sklearn.tree import DecisionTreeRegressor >>> from nonconformist.base import RegressorAdapter >>> from nonconformist.icp import IcpRegressor >>> from nonconformist.nc import RegressorNc, AbsErrorErrFunc >>> boston = load_boston() >>> idx = np.random.permutation(boston.target.size) >>> train = idx[:int(idx.size / 3)] >>> cal = idx[int(idx.size / 3):int(2 * idx.size / 3)] >>> test = idx[int(2 * idx.size / 3):] >>> model = RegressorAdapter(DecisionTreeRegressor()) >>> nc = RegressorNc(model, AbsErrorErrFunc()) >>> icp = IcpRegressor(nc) >>> icp.fit(boston.data[train, :], boston.target[train]) >>> icp.calibrate(boston.data[cal, :], boston.target[cal]) >>> icp.predict(boston.data[test, :], significance=0.10) ... # doctest: +SKIP array([[ 5. , 20.6], [ 15.5, 31.1], ..., [ 14.2, 29.8], [ 11.6, 27.2]]) """ def __init__(self, nc_function, condition=None): super(IcpRegressor, self).__init__(nc_function, condition)
[docs] def predict(self, x, significance=None): """Predict the output values for a set of input patterns. Parameters ---------- x : numpy array of shape [n_samples, n_features] Inputs of patters for which to predict output values. significance : float Significance level (maximum allowed error rate) of predictions. Should be a float between 0 and 1. If ``None``, then intervals for all significance levels (0.01, 0.02, ..., 0.99) are output in a 3d-matrix. Returns ------- p : numpy array of shape [n_samples, 2] or [n_samples, 2, 99} If significance is ``None``, then p contains the interval (minimum and maximum boundaries) for each test pattern, and each significance level (0.01, 0.02, ..., 0.99). If significance is a float between 0 and 1, then p contains the prediction intervals (minimum and maximum boundaries) for the set of test patterns at the chosen significance level. """ # TODO: interpolated p-values n_significance = (99 if significance is None else np.array(significance).size) if n_significance > 1: prediction = np.zeros((x.shape[0], 2, n_significance)) else: prediction = np.zeros((x.shape[0], 2)) condition_map = np.array([self.condition((x[i, :], None)) for i in range(x.shape[0])]) for condition in self.categories: idx = condition_map == condition if np.sum(idx) > 0: p = self.nc_function.predict(x[idx, :], self.cal_scores[condition], significance) if n_significance > 1: prediction[idx, :, :] = p else: prediction[idx, :] = p return prediction
[docs]class OobCpClassifier(IcpClassifier): def __init__(self, nc_function, condition=None, smoothing=True): super(OobCpClassifier, self).__init__(nc_function, condition, smoothing)
[docs] def fit(self, x, y): super(OobCpClassifier, self).fit(x, y) super(OobCpClassifier, self).calibrate(x, y, False)
[docs] def calibrate(self, x, y, increment=False): # Should throw exception (or really not be implemented for oob) pass
[docs]class OobCpRegressor(IcpRegressor): def __init__(self, nc_function, condition=None): super(OobCpRegressor, self).__init__(nc_function, condition)
[docs] def fit(self, x, y): super(OobCpRegressor, self).fit(x, y) super(OobCpRegressor, self).calibrate(x, y, False)
[docs] def calibrate(self, x, y, increment=False): # Should throw exception (or really not be implemented for oob) pass