Source code for pythresh.thresholds.gesd

import numpy as np
import scipy.stats as stats

from .base import BaseThresholder
from .thresh_utility import cut

# https://github.com/bhattbhavesh91/outlier-detection-grubbs-test-and-generalized-esd-test-python/blob/master/generalized-esd-test-for-outliers.ipynb


[docs] class GESD(BaseThresholder): r"""GESD class for Generalized Extreme Studentized Deviate thresholder. Use the generalized extreme studentized deviate to evaluate a non-parametric means to threshold scores generated by the decision_scores where outliers are set to any less than the smallest detected outlier. See :cite:`alrawashdeh2021gesd` for details. Parameters ---------- max_outliers : int, optional (default='auto') maximum number of outliers that the dataset may have. Default sets max_outliers to be half the size of the dataset alpha : float, optional (default=0.05) significance level fallback : str ('ignore', 'warn', 'raise'), optional (default='warn') The action to take for thresholders when their criterion are not met. In these cases when set to 'ignore' on eval and fit all train data is set to inliers and the threshold is set to max of the train scores + eps. Passing 'warn' will do the same as 'ignore' but also produce a warning. If 'raise', the thresholder raises a ValueError. random_state : int, optional (default=1234) Random seed for the random number generators of the thresholders. Can also be set to None. Attributes ---------- thresh_ : threshold value that separates inliers from outliers dscores_ : 1D array of decomposed decision scores Notes ----- The generalized extreme studentized deviate is defined for the hypothesis: H0: There are no outliers in the decision scores Ha: There are up to r amount of outliers in the decision scores The test statistic is given as, .. math:: R_i = \frac{\mathrm{MAX} \left(x_i - \bar{x}\right)}{\sigma} \mathrm{,} where :math:`\bar{x}` and :math:`\sigma` are the mean and standard deviation of the decision scores respectively. The GESD maximized :math:`R_i` value is removed from the decision scores and the r test statistic is recomputed and is tested against the r critical values: .. math:: \lambda_i = \frac{(n-i)t_{p,n-i-1}}{\sqrt{\left(n-i-1+t_{p,n-i-1}^2\right)(n-i+1)}} \,\, i = 1,2,...,r \mathrm{,} where :math:`t_{p,v}` is the :math:`p_{100}` for a t-distribution with :math:`v` degrees of freedom such that for a selected significance level :math:`\alpha`: .. math:: p = 1-\frac{\alpha}{2(n-i+1)} \mathrm{.} The threshold for the decision scores is set to the smallest score that fulfills the condition :math:`R_i>\lambda_i`. """ def __init__(self, max_outliers="auto", alpha=0.05, fallback="warn", random_state=1234): super().__init__(fallback=fallback) self.max_outliers = max_outliers self.alpha = alpha self.random_state = random_state np.random.seed(random_state)
[docs] def eval(self, decision): """Outlier/inlier evaluation process for decision scores. Parameters ---------- decision : np.array or list of shape (n_samples) or np.array of shape (n_samples, n_detectors) which are the decision scores from a outlier detection. Returns ------- outlier_labels : numpy array of shape (n_samples,) For each observation, tells whether or not it should be considered as an outlier according to the fitted model. 0 stands for inliers and 1 for outliers. """ decision = self._data_setup(decision) arr = decision.copy() eps = np.finfo(decision.dtype).eps limit = 1.0 + eps if self.max_outliers == "auto": self.max_outliers = len(decision) // 2 for _ in range(self.max_outliers): Gc = self._calc_crit(len(arr), self.alpha) Gs, max_index = self._grubbs_stat(arr) if (Gs > Gc) & (arr[max_index] < limit): limit = arr[max_index] arr = np.delete(arr, max_index) self._check_threshold(limit) self.thresh_ = limit return cut(decision, limit)
def _grubbs_stat(self, y): dev = np.abs(y - y.mean()) max_ind = np.argmax(dev) Gcal = np.max(dev) / y.std() return Gcal, max_ind def _calc_crit(self, size, alpha): t_dist = stats.t.ppf(1 - alpha / (2 * size), size - 2) numerator = (size - 1) * np.sqrt(np.square(t_dist)) denominator = np.sqrt(size) * np.sqrt(size - 2 + np.square(t_dist)) return numerator / denominator