Source code for pythresh.thresholds.wind

import numpy as np
from scipy import integrate, stats

from .base import BaseThresholder
from .thresh_utility import check_scores, cut, gen_kde, normalize


[docs] class WIND(BaseThresholder): r"""WIND class for topological Winding number thresholder. Use the topological winding number (with respect to the origin) to evaluate a non-parametric means to threshold scores generated by the decision_scores where outliers are set to any value beyond the mean intersection point calculated from the winding number. See :cite:`jacobson2013wind` for details. Parameters ---------- random_state : int, optional (default=1234) Random seed for the normal distribution. Can also be set to None. Attributes ---------- thresh_ : threshold value that separates inliers from outliers dscores_ : 1D array of decomposed decision scores Notes ----- The topological winding number or the degree of a continuous mapping. It is an integer sum of the number of completed/closed counterclockwise rotations in a plane around a point. And is given by, .. math:: \mathrm{d}\theta = \frac{1}{r^2} \left(x\mathrm{d}y - y\mathrm{d}x \right) \mathrm{,} where :math:`r^2 = x^2 + y^2` .. math:: wind(\gamma,0) = \frac{1}{2\pi} \oint_\gamma \mathrm{d}\theta The winding number intuitively captures self-intersections/contours, with a change in the distribution of the dataset or shift from inliers to outliers relating to these intersections. With this, it is assumed that if an intersection exists, then adjacent/incident regions must have different region labels. Since multiple intersection regions may exist. The threshold between inliers and outliers is taken as the mean intersection point. Examples -------- The effects of randomness can affect the thresholder's output performance significantly. Therefore, to alleviate the effects of randomness on the thresholder a combined model can be used with different random_state values. E.g. .. code:: python # train the KNN detector from pyod.models.knn import KNN from pythresh.thresholds.comb import COMB from pythresh.thresholds.wind import WIND clf = KNN() clf.fit(X_train) # get outlier scores decision_scores = clf.decision_scores_ # raw outlier scores # get outlier labels with combined model thres = COMB(thresholders = [WIND(random_state=1234), WIND(random_state=42), WIND(random_state=9685), WIND(random_state=111222)]) labels = thres.eval(decision_scores) """ def __init__(self, random_state=1234): super().__init__() self.random_state = random_state
[docs] def eval(self, decision): """Outlier/inlier evaluation process for decision scores. Parameters ---------- decision : np.array or list of shape (n_samples) or np.array of shape (n_samples, n_detectors) which are the decision scores from a outlier detection. Returns ------- outlier_labels : numpy array of shape (n_samples,) For each observation, tells whether or not it should be considered as an outlier according to the fitted model. 0 stands for inliers and 1 for outliers. """ decision = check_scores(decision, random_state=self.random_state) decision = normalize(decision) self.dscores_ = decision # Create a normal distribution and normalize size = min(len(decision), 1500) norm = stats.norm.rvs(size=size, loc=0.0, scale=1.0, random_state=self.random_state) norm = normalize(norm) # Create a KDE of the labels and the normal distribution # Generate KDE val_data, dat_range = gen_kde(decision, 0, 1, len(decision)*3) val_norm, _ = gen_kde(norm, 0, 1, len(decision)*3) # Get the rsquared value r2 = val_data**2 + val_norm**2 val_data = val_data/np.max(val_data) val_norm = val_norm/np.max(val_norm) # Find the first derivatives of the decision and norm kdes # with respect to the decision scores deriv_data = np.gradient(val_data, dat_range[1]-dat_range[0]) deriv_norm = np.gradient(val_norm, dat_range[1]-dat_range[0]) # Compute integrand integrand = self._dtheta( val_data, val_norm, deriv_data, deriv_norm, r2) # Integrate to find winding numbers mean intersection point limit = integrate.simpson(integrand)/np.sum((val_data+val_norm)/2) self.thresh_ = limit return cut(decision, limit)
def _dtheta(self, x, y, dx, dy, r2): """Calculate dtheta for the integrand.""" return (x*dy - y*dx)/r2