from itertools import combinations
import numpy as np
import scipy.spatial.distance as distance
import scipy.special as special
import scipy.stats as stats
from scipy import interpolate
from scipy.integrate import simpson
from sklearn.covariance import MinCovDet
from .base import BaseThresholder
from .thresh_utility import check_scores, cut, gen_cdf, gen_kde, normalize
[docs]
class DSN(BaseThresholder):
"""DSN class for Distance Shift from Normal thresholder.
Use the distance shift from normal to evaluate a non-parametric means
to threshold scores generated by the decision_scores where outliers
are set to any value beyond the distance calculated by the selected
metric. See :cite:`amagata2021dsn` for details.
Parameters
----------
metric : {'JS', 'WS', 'ENG', 'BHT', 'HLL', 'HI', 'LK', 'LP', 'MAH', 'TMT', 'RES', 'KS', 'INT', 'MMD'}, optional (default='MAH')
Metric to use for distance computation
- 'JS': Jensen-Shannon distance
- 'WS': Wasserstein or Earth Movers distance
- 'ENG': Energy distance
- 'BHT': Bhattacharyya distance
- 'HLL': Hellinger distance
- 'HI': Histogram intersection distance
- 'LK': Lukaszyk-Karmowski metric for normal distributions
- 'LP': Levy-Prokhorov metric
- 'MAH': Mahalanobis distance
- 'TMT': Tanimoto distance
- 'RES': Studentized residual distance
- 'KS': Kolmogorov-Smirnov distance
- 'INT': Weighted spline interpolated distance
- 'MMD': Maximum Mean Discrepancy distance
random_state : int, optional (default=1234)
Random seed for the normal distribution. Can also be set to None.
Attributes
----------
thresh_ : threshold value that separates inliers from outliers
dscores_ : 1D array of decomposed decision scores
Examples
--------
The effects of randomness can affect the thresholder's output performance
significantly. Therefore, to alleviate the effects of randomness on the
thresholder a combined model can be used with different random_state values.
E.g.
.. code:: python
# train the KNN detector
from pyod.models.knn import KNN
from pythresh.thresholds.comb import COMB
from pythresh.thresholds.dsn import DSN
clf = KNN()
clf.fit(X_train)
# get outlier scores
decision_scores = clf.decision_scores_ # raw outlier scores
# get outlier labels with combined model
thres = COMB(thresholders = [DSN(random_state=1234),
DSN(random_state=42), DSN(random_state=9685),
DSN(random_state=111222)])
labels = thres.eval(decision_scores)
"""
def __init__(self, metric='MAH', random_state=1234):
super().__init__()
self.metric = metric
self.metric_funcs = {'JS': self._JS_metric, 'WS': self._WS_metric,
'ENG': self._ENG_metric, 'BHT': self._BHT_metric,
'HLL': self._HLL_metric, 'HI': self._HI_metric,
'LK': self._LK_metric, 'LP': self._LP_metric,
'MAH': self._MAH_metric, 'TMT': self._TMT_metric,
'RES': self._RES_metric, 'KS': self._KS_metric,
'INT': self._INTER_metric, 'MMD': self._MMD_metric}
self.random_state = random_state
[docs]
def eval(self, decision):
"""Outlier/inlier evaluation process for decision scores.
Parameters
----------
decision : np.array or list of shape (n_samples)
or np.array of shape (n_samples, n_detectors)
which are the decision scores from a
outlier detection.
Returns
-------
outlier_labels : numpy array of shape (n_samples,)
For each observation, tells whether or not
it should be considered as an outlier according to the
fitted model. 0 stands for inliers and 1 for outliers.
"""
decision = check_scores(decision, random_state=self.random_state)
decision = normalize(decision)
self.dscores_ = decision
# Create a normal distribution and normalize
size = min(len(decision), 1500)
norm = stats.norm.rvs(size=size, loc=0.0, scale=1.0,
random_state=self.random_state)
self.norm = normalize(norm)
n = 3 if self.metric != 'LP' else 1
if self.metric in ['JS', 'BHT', 'INT', 'MMD']:
# Create a KDE of the decision scores and the normal distribution
# Generate KDE
self.val_data, self.data_range = gen_kde(
decision, 0, 1, len(decision)*n)
self.val_norm, self.norm_range = gen_kde(
self.norm, 0, 1, len(decision)*n)
else:
# Create a KDE of the decision scores and the normal distribution
# Generate CDF
self.val_data, self.data_range = gen_cdf(
decision, 0, 1, len(decision)*n)
self.val_norm, self.norm_range = gen_cdf(
self.norm, 0, 1, len(decision)*n)
limit = self.metric_funcs[str(self.metric)]()
self.thresh_ = limit
return cut(decision, limit)
def _JS_metric(self):
"""Calculate the Jensen-Shannon distance."""
return 1-distance.jensenshannon(self.val_data, self.val_norm)
def _WS_metric(self):
"""Calculate the Wasserstein or Earth Movers distance."""
return stats.wasserstein_distance(self.val_data, self.val_norm)
def _ENG_metric(self):
"""Calculate the Energy distance."""
return stats.energy_distance(self.val_data, self.val_norm)
def _BHT_metric(self):
"""Calculate the Bhattacharyya distance."""
bht = simpson(np.sqrt(self.val_data*self.val_norm),
dx=1/len(self.val_data))
return np.log1p(bht)
def _HLL_metric(self):
"""Calculate the Hellinger distance."""
val_data = self.val_data/np.sum(self.val_data)
val_norm = self.val_norm/np.sum(self.val_norm)
return (distance.euclidean(np.sqrt(val_data), np.sqrt(val_norm))
/ np.sqrt(2))
def _HI_metric(self):
"""Calculate the Histogram intersection distance."""
val_data = self.val_data/np.sum(self.val_data)
val_norm = self.val_norm/np.sum(self.val_norm)
return 1-np.sum(np.minimum(val_data, val_norm))
def _LK_metric(self):
"""Calculate the Lukaszyk-Karmowski metric for normal distributions."""
# Get expected values for both distributions
rng = np.linspace(0, 1, len(self.val_data))
exp_data = (rng*self.val_data).sum()/self.val_data.sum()
exp_norm = (rng*self.val_norm).sum()/self.val_norm.sum()
nu_xy = np.abs(exp_data-exp_norm)
# STD is same for both distributions
std = np.std(rng)
# Get the LK distance
return (nu_xy + 2*std/np.sqrt(np.pi)*np.exp(-nu_xy**2/(4*std**2))
- nu_xy*special.erfc(nu_xy/(2*std)))
def _LP_metric(self):
"""Calculate the Levy-Prokhorov metric."""
# Get the edges for the complete graphs of the datasets
f1 = np.array(list(combinations(self.val_data.tolist(), 2)))
f2 = np.array(list(combinations(self.val_norm.tolist(), 2)))
return (distance.directed_hausdorff(f1, f2)[0] /
distance.correlation(self.val_data, self.val_norm))
def _MAH_metric(self):
"""Calculate the Mahalanobis distance."""
# fit a Minimum Covariance Determinant (MCD) robust estimator to data
robust_cov = MinCovDet(random_state=self.random_state).fit(
np.array([self.val_norm]).T)
# Get the Mahalanobis distance
dist = robust_cov.mahalanobis(np.array([self.val_data]).T)
return 1-np.mean(dist)/np.max(dist)
def _TMT_metric(self):
"""Calculate the Tanimoto distance."""
val_data = self.val_data/np.sum(self.val_data)
val_norm = self.val_norm/np.sum(self.val_norm)
p = np.sum(val_data)
q = np.sum(val_norm)
m = np.sum(np.minimum(val_data, val_norm))
return (p+q-2*m)/(p+q-m)
def _RES_metric(self):
"""Calculate the studentized residual distance."""
mean_X = np.mean(self.val_data)
mean_Y = np.mean(self.val_norm)
n = len(self.val_data)
diff_mean_sqr = np.dot((self.val_data - mean_X),
(self.val_data - mean_X))
beta1 = (np.dot((self.val_data - mean_X),
(self.val_norm - mean_Y)) /
diff_mean_sqr)
beta0 = mean_Y - beta1 * mean_X
y_hat = beta0 + beta1 * self.val_data
residuals = self.val_norm - y_hat
h_ii = (self.val_data - mean_X) ** 2 / diff_mean_sqr + (1 / n)
Var_e = np.sqrt(np.sum((self.val_norm - y_hat) ** 2)/(n-2))
SE_regression = Var_e*((1-h_ii) ** 0.5)
studentized_residuals = residuals/SE_regression
return np.abs(np.sum(studentized_residuals))
def _KS_metric(self):
"""Calculate the Kolmogorov-Smirnov distance."""
return np.max(np.abs(self.val_data-self.val_norm))
def _INTER_metric(self):
"""Calculate the weighted spline interpolation distance."""
# Get splines
data_spl = self._interp(self.data_range, self.val_data)
norm_spl = self._interp(self.norm_range, self.val_norm)
# Get distance of new weight shifted spline
dist = []
for i in range(99):
weight = (i+1)/99
spline = (data_spl[0]*(1.0-weight) +
norm_spl[0]*weight)
dist.append(np.max(spline))
return np.mean(dist)-np.mean(self.norm)
def _interp(self, x, y):
"""Spline interpolation."""
# Smooth approximating B-spline coefficients
tck, _ = interpolate.splprep([x, y], s=0)
# Resample points
points = np.arange(0.0, 1.0, 1e-2)
# Generate spline
spline = interpolate.splev(points, tck)
return np.array(spline)
def _MMD_metric(self):
"""Calculate the Maximum Mean Discrepancy distance using a linear kernel."""
delta = self.val_data - self.val_norm
delta = normalize(delta)
delta = delta.dot(delta.T)
return delta/np.sum(self.data_range)