Source code for pythresh.thresholds.dsn

from itertools import combinations

import numpy as np
import scipy.spatial.distance as distance
import scipy.special as special
import scipy.stats as stats
from scipy import interpolate
from scipy.integrate import simpson
from sklearn.covariance import MinCovDet

from .base import BaseThresholder
from .thresh_utility import check_scores, cut, gen_cdf, gen_kde, normalize


[docs] class DSN(BaseThresholder): """DSN class for Distance Shift from Normal thresholder. Use the distance shift from normal to evaluate a non-parametric means to threshold scores generated by the decision_scores where outliers are set to any value beyond the distance calculated by the selected metric. See :cite:`amagata2021dsn` for details. Parameters ---------- metric : {'JS', 'WS', 'ENG', 'BHT', 'HLL', 'HI', 'LK', 'LP', 'MAH', 'TMT', 'RES', 'KS', 'INT', 'MMD'}, optional (default='MAH') Metric to use for distance computation - 'JS': Jensen-Shannon distance - 'WS': Wasserstein or Earth Movers distance - 'ENG': Energy distance - 'BHT': Bhattacharyya distance - 'HLL': Hellinger distance - 'HI': Histogram intersection distance - 'LK': Lukaszyk-Karmowski metric for normal distributions - 'LP': Levy-Prokhorov metric - 'MAH': Mahalanobis distance - 'TMT': Tanimoto distance - 'RES': Studentized residual distance - 'KS': Kolmogorov-Smirnov distance - 'INT': Weighted spline interpolated distance - 'MMD': Maximum Mean Discrepancy distance random_state : int, optional (default=1234) Random seed for the normal distribution. Can also be set to None. Attributes ---------- thresh_ : threshold value that separates inliers from outliers dscores_ : 1D array of decomposed decision scores Examples -------- The effects of randomness can affect the thresholder's output performance significantly. Therefore, to alleviate the effects of randomness on the thresholder a combined model can be used with different random_state values. E.g. .. code:: python # train the KNN detector from pyod.models.knn import KNN from pythresh.thresholds.comb import COMB from pythresh.thresholds.dsn import DSN clf = KNN() clf.fit(X_train) # get outlier scores decision_scores = clf.decision_scores_ # raw outlier scores # get outlier labels with combined model thres = COMB(thresholders = [DSN(random_state=1234), DSN(random_state=42), DSN(random_state=9685), DSN(random_state=111222)]) labels = thres.eval(decision_scores) """ def __init__(self, metric='MAH', random_state=1234): super().__init__() self.metric = metric self.metric_funcs = {'JS': self._JS_metric, 'WS': self._WS_metric, 'ENG': self._ENG_metric, 'BHT': self._BHT_metric, 'HLL': self._HLL_metric, 'HI': self._HI_metric, 'LK': self._LK_metric, 'LP': self._LP_metric, 'MAH': self._MAH_metric, 'TMT': self._TMT_metric, 'RES': self._RES_metric, 'KS': self._KS_metric, 'INT': self._INTER_metric, 'MMD': self._MMD_metric} self.random_state = random_state
[docs] def eval(self, decision): """Outlier/inlier evaluation process for decision scores. Parameters ---------- decision : np.array or list of shape (n_samples) or np.array of shape (n_samples, n_detectors) which are the decision scores from a outlier detection. Returns ------- outlier_labels : numpy array of shape (n_samples,) For each observation, tells whether or not it should be considered as an outlier according to the fitted model. 0 stands for inliers and 1 for outliers. """ decision = check_scores(decision, random_state=self.random_state) decision = normalize(decision) self.dscores_ = decision # Create a normal distribution and normalize size = min(len(decision), 1500) norm = stats.norm.rvs(size=size, loc=0.0, scale=1.0, random_state=self.random_state) self.norm = normalize(norm) n = 3 if self.metric != 'LP' else 1 if self.metric in ['JS', 'BHT', 'INT', 'MMD']: # Create a KDE of the decision scores and the normal distribution # Generate KDE self.val_data, self.data_range = gen_kde( decision, 0, 1, len(decision)*n) self.val_norm, self.norm_range = gen_kde( self.norm, 0, 1, len(decision)*n) else: # Create a KDE of the decision scores and the normal distribution # Generate CDF self.val_data, self.data_range = gen_cdf( decision, 0, 1, len(decision)*n) self.val_norm, self.norm_range = gen_cdf( self.norm, 0, 1, len(decision)*n) limit = self.metric_funcs[str(self.metric)]() self.thresh_ = limit return cut(decision, limit)
def _JS_metric(self): """Calculate the Jensen-Shannon distance.""" return 1-distance.jensenshannon(self.val_data, self.val_norm) def _WS_metric(self): """Calculate the Wasserstein or Earth Movers distance.""" return stats.wasserstein_distance(self.val_data, self.val_norm) def _ENG_metric(self): """Calculate the Energy distance.""" return stats.energy_distance(self.val_data, self.val_norm) def _BHT_metric(self): """Calculate the Bhattacharyya distance.""" bht = simpson(np.sqrt(self.val_data*self.val_norm), dx=1/len(self.val_data)) return np.log1p(bht) def _HLL_metric(self): """Calculate the Hellinger distance.""" val_data = self.val_data/np.sum(self.val_data) val_norm = self.val_norm/np.sum(self.val_norm) return (distance.euclidean(np.sqrt(val_data), np.sqrt(val_norm)) / np.sqrt(2)) def _HI_metric(self): """Calculate the Histogram intersection distance.""" val_data = self.val_data/np.sum(self.val_data) val_norm = self.val_norm/np.sum(self.val_norm) return 1-np.sum(np.minimum(val_data, val_norm)) def _LK_metric(self): """Calculate the Lukaszyk-Karmowski metric for normal distributions.""" # Get expected values for both distributions rng = np.linspace(0, 1, len(self.val_data)) exp_data = (rng*self.val_data).sum()/self.val_data.sum() exp_norm = (rng*self.val_norm).sum()/self.val_norm.sum() nu_xy = np.abs(exp_data-exp_norm) # STD is same for both distributions std = np.std(rng) # Get the LK distance return (nu_xy + 2*std/np.sqrt(np.pi)*np.exp(-nu_xy**2/(4*std**2)) - nu_xy*special.erfc(nu_xy/(2*std))) def _LP_metric(self): """Calculate the Levy-Prokhorov metric.""" # Get the edges for the complete graphs of the datasets f1 = np.array(list(combinations(self.val_data.tolist(), 2))) f2 = np.array(list(combinations(self.val_norm.tolist(), 2))) return (distance.directed_hausdorff(f1, f2)[0] / distance.correlation(self.val_data, self.val_norm)) def _MAH_metric(self): """Calculate the Mahalanobis distance.""" # fit a Minimum Covariance Determinant (MCD) robust estimator to data robust_cov = MinCovDet(random_state=self.random_state).fit( np.array([self.val_norm]).T) # Get the Mahalanobis distance dist = robust_cov.mahalanobis(np.array([self.val_data]).T) return 1-np.mean(dist)/np.max(dist) def _TMT_metric(self): """Calculate the Tanimoto distance.""" val_data = self.val_data/np.sum(self.val_data) val_norm = self.val_norm/np.sum(self.val_norm) p = np.sum(val_data) q = np.sum(val_norm) m = np.sum(np.minimum(val_data, val_norm)) return (p+q-2*m)/(p+q-m) def _RES_metric(self): """Calculate the studentized residual distance.""" mean_X = np.mean(self.val_data) mean_Y = np.mean(self.val_norm) n = len(self.val_data) diff_mean_sqr = np.dot((self.val_data - mean_X), (self.val_data - mean_X)) beta1 = (np.dot((self.val_data - mean_X), (self.val_norm - mean_Y)) / diff_mean_sqr) beta0 = mean_Y - beta1 * mean_X y_hat = beta0 + beta1 * self.val_data residuals = self.val_norm - y_hat h_ii = (self.val_data - mean_X) ** 2 / diff_mean_sqr + (1 / n) Var_e = np.sqrt(np.sum((self.val_norm - y_hat) ** 2)/(n-2)) SE_regression = Var_e*((1-h_ii) ** 0.5) studentized_residuals = residuals/SE_regression return np.abs(np.sum(studentized_residuals)) def _KS_metric(self): """Calculate the Kolmogorov-Smirnov distance.""" return np.max(np.abs(self.val_data-self.val_norm)) def _INTER_metric(self): """Calculate the weighted spline interpolation distance.""" # Get splines data_spl = self._interp(self.data_range, self.val_data) norm_spl = self._interp(self.norm_range, self.val_norm) # Get distance of new weight shifted spline dist = [] for i in range(99): weight = (i+1)/99 spline = (data_spl[0]*(1.0-weight) + norm_spl[0]*weight) dist.append(np.max(spline)) return np.mean(dist)-np.mean(self.norm) def _interp(self, x, y): """Spline interpolation.""" # Smooth approximating B-spline coefficients tck, _ = interpolate.splprep([x, y], s=0) # Resample points points = np.arange(0.0, 1.0, 1e-2) # Generate spline spline = interpolate.splev(points, tck) return np.array(spline) def _MMD_metric(self): """Calculate the Maximum Mean Discrepancy distance using a linear kernel.""" delta = self.val_data - self.val_norm delta = normalize(delta) delta = delta.dot(delta.T) return delta/np.sum(self.data_range)