import numpy as np
import scipy.stats as stats
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.ensemble import BaggingClassifier, StackingClassifier
from sklearn.linear_model import RidgeClassifier
from sklearn.utils import check_array
from .base import BaseThresholder
from .thresh_utility import check_scores, cut, normalize
[docs]
class COMB(BaseThresholder):
"""COMB class for Combined thresholder.
Use multiple thresholders as a non-parametric means
to threshold scores generated by the decision_scores where outliers
are set to any value beyond the (mean, median, or mode) of the
contamination from the selected combination of thresholders.
Parameters
----------
thresholders : list, optional (default='default')
List of instantiated thresholders, e.g. [DSN(), FILTER()].
Default is [DSN(random_state=self.random_state), FILTER(),
OCSVM(random_state=self.random_state)]
max_contam : float, optional (default=0.5)
Maximum contamination allowed for each threshold output. Thresholded scores
above the maximum contamination will not be included in the final combined
threshold
method : {'mean', 'median', 'mode', 'bagged', 'stacked}, optional (default='stacked')
evaluation method to apply to contamination levels
- 'mean': calculate the mean combined threshold
- 'median': calculate the median combined threshold
- 'mode': calculate the majority vote or mode of the thresholded labels
- 'bagged': use a bagged LaplaceGaussianNB to solve the combined threshold
- 'stacked': use a stacked Ridge, and LaplaceGaussianNB classifier combined method
random_state : int, optional (default=1234)
Random seed for the random number generators of the thresholders. Can also
be set to None.
Attributes
----------
thresh_ : threshold value that separates inliers from outliers
confidence_interval_ : lower and upper confidence interval of the contamination level
dscores_ : 1D array of decomposed decision scores
"""
def __init__(self, thresholders='default', max_contam=0.5, method='stacked', random_state=1234):
self.thresholders = thresholders
self.max_contam = max_contam
func = {'mean': np.mean, 'median': np.median,
'mode': stats.mode, 'bagged': BaggingClassifier,
'stacked': StackingClassifier}
self.method = method
self.method_func = func[method]
self.random_state = random_state
[docs]
def eval(self, decision):
"""Outlier/inlier evaluation process for decision scores.
Parameters
----------
decision : np.array or list of shape (n_samples)
or np.array of shape (n_samples, n_detectors)
which are the decision scores from a
outlier detection.
Returns
-------
outlier_labels : numpy array of shape (n_samples,)
For each observation, tells whether or not
it should be considered as an outlier according to the
fitted model. 0 stands for inliers and 1 for outliers.
"""
scores = check_array(decision, ensure_2d=False)
decision = check_scores(decision, random_state=self.random_state)
decision = normalize(decision)
self.dscores_ = decision
# Initialize thresholders
if self.thresholders == 'default':
from .dsn import DSN
from .filter import FILTER
from .ocsvm import OCSVM
self.thresholders = [DSN(random_state=self.random_state), FILTER(),
OCSVM(random_state=self.random_state)]
# Apply each thresholder
contam = []
ratio = []
counts = len(decision)
for thresholder in self.thresholders:
labels = thresholder.eval(scores)
outlier_ratio = np.sum(labels)/counts
if outlier_ratio < self.max_contam:
contam.append(labels)
ratio.append(outlier_ratio)
contam = np.array(contam)
ratio = np.array(ratio)
# Get lower and upper confidence interval
low, high = stats.bootstrap(ratio.reshape(1, -1),
np.mean, paired=True,
random_state=self.random_state).confidence_interval
self.confidence_interval_ = [low, high]
# Get [mean, median, mode, bagged, or stacked] of inliers
if (self.method == 'bagged') or (self.method == 'stacked'):
X = np.tile(decision, len(contam))
y = np.hstack(contam)
if (self.method == 'bagged'):
model = self.method_func(LaplaceGaussianNB(),
n_estimators=12,
random_state=self.random_state)
else:
model = self.method_func([('Ridge', RidgeClassifier()),
('GNB', LaplaceGaussianNB())])
model.fit(X.reshape(-1, 1), y)
lbls = model.predict(decision.reshape(-1, 1))
self.thresh_ = None
return lbls
elif self.method == 'mode':
self.thresh_ = None
lbls = self.method_func(contam, axis=0)
return np.squeeze(lbls[0])
else:
contam = np.sum(contam, axis=1)/contam.shape[1]
inlier_ratio = 1-self.method_func(contam)
idx = int(counts*inlier_ratio)
ordered = np.sort(decision)
limit = ordered[idx] if idx < counts else 1.0
self.thresh_ = limit
return cut(decision, limit)
class LaplaceGaussianNB(BaseEstimator, ClassifierMixin):
def __init__(self):
pass
def fit(self, X, y):
X = X.squeeze()
self.models = []
self.priors = []
self.classes_ = [0, 1]
dist = [stats.laplace, stats.norm]
for c in self.classes_:
subset_x = X[y == c]
self.models.append(dist[c](subset_x.mean(),
subset_x.std()))
self.priors.append(len(subset_x)/len(X))
return self
def predict(self, X):
likelihoods = self.predict_proba(X)
return likelihoods.argmax(axis=1)
def predict_proba(self, X):
X = X.squeeze()
likelihoods = []
for c in self.classes_:
probs = self.priors[c] * self.models[c].pdf(X)
likelihoods.append(probs)
return np.vstack(likelihoods).T