Source code for pythresh.thresholds.mcst

import numpy as np
import scipy.stats as stats

from .base import BaseThresholder
from .thresh_utility import check_scores, cut, normalize


[docs] class MCST(BaseThresholder): r"""MCST class for Monte Carlo Shapiro Tests thresholder. Use uniform random sampling and statistical testing to evaluate a non-parametric means to threshold scores generated by the decision_scores where outliers are set to any value beyond the minimum value left after iterative Shapiro-Wilk tests have occurred. Note** accuracy decreases with array size. For good results the should be array<1000. However still this threshold method may fail at any array size. See :cite:`coin2008mcst` for details. Parameters ---------- random_state : int, optional (default=1234) Random seed for the uniform distribution. Can also be set to None. Attributes ---------- thresh_ : threshold value that separates inliers from outliers dscores_ : 1D array of decomposed decision scores Notes ----- The Shapiro-Wilk test is a frequentist statistical test for normality. It is used to test the null-hypothesis that the decision scores came from a normal distribution. This test statistic is defined as: .. math:: W = \frac{\left(\sum_{i=1}^n a_i x_{(i)} \right)^2}{\sum_{i=1}^n \left(x_i - \bar{x} \right)^2} \mathrm{,} where :math:`\bar{x}` is the mean of the scores and :math:`x_{(i)}` is the ith-smallest number in the sample (kth order statistic). The coefficients :math:`a_i` is given by: .. math:: (a_1,...,a_n) = \frac{m^{\top}V^{-1}}{\sqrt{m^{\top}V^{-1}V^{-1}m}} \mathrm{,} where the vector :math:`m=\lvert(m_1,...,m_n \rvert)^{\top}` and :math:`V` is the covariance matrix of the order statistics. The threshold is set by first calculating an initial Shapiro-Wilk test p-value on the decision scores. Using Monte Carlo simulations, random values between 0-1 are inserted into the normalized decision scores and p-values are calculated. if the p-value is higher than the initial p-value, the initial p-value is set to this value and the random value is stored. The minimum stored random value is set as the threshold as it is the minimum found outlier. Examples -------- The effects of randomness can affect the thresholder's output performance significantly. Therefore, to alleviate the effects of randomness on the thresholder a combined model can be used with different random_state values. E.g. .. code:: python # train the KNN detector from pyod.models.knn import KNN from pythresh.thresholds.comb import COMB from pythresh.thresholds.mcst import MCST clf = KNN() clf.fit(X_train) # get outlier scores decision_scores = clf.decision_scores_ # raw outlier scores # get outlier labels with combined model thres = COMB(thresholders = [MCST(random_state=1234), MCST(random_state=42), MCST(random_state=9685), MCST(random_state=111222)]) labels = thres.eval(decision_scores) """ def __init__(self, random_state=1234): self.random_state = random_state
[docs] def eval(self, decision): """Outlier/inlier evaluation process for decision scores. Parameters ---------- decision : np.array or list of shape (n_samples) or np.array of shape (n_samples, n_detectors) which are the decision scores from a outlier detection. Returns ------- outlier_labels : numpy array of shape (n_samples,) For each observation, tells whether or not it should be considered as an outlier according to the fitted model. 0 stands for inliers and 1 for outliers. """ decision = check_scores(decision, random_state=self.random_state) decision = normalize(decision) self.dscores_ = decision # Get Baseline Shapiro-Wilk test p-value p_std = stats.shapiro(decision).pvalue # Create random dataset to insert and test p-values rnd = stats.uniform.rvs(loc=0, scale=1, size=len( decision), random_state=self.random_state) rnd = normalize(rnd) povr = [] # Iterate and add a new random variable # Perform a Shapiro-Wilk test and see if the new # distribution has a lower or higher p-value # If higher record these potential outlier values for i in range(len(rnd)): arr = np.append(decision, rnd[i]) p_check = stats.shapiro(arr).pvalue if p_check > p_std: p_std = p_check povr.append(rnd[i]) limit = np.min(povr) if povr else 1.1 self.thresh_ = limit return cut(decision, limit)