import numpy as np
import scipy.stats as stats
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.ensemble import BaggingClassifier, StackingClassifier
from sklearn.linear_model import RidgeClassifier
from sklearn.utils import check_array
from .base import BaseThresholder
from .thresh_utility import cut
[docs]
class COMB(BaseThresholder):
"""COMB class for Combined thresholder.
Use multiple thresholders as a non-parametric means
to threshold scores generated by the decision_scores where outliers
are set to any value beyond the (mean, median, or mode) of the
contamination from the selected combination of thresholders.
Parameters
----------
thresholders : list, optional (default='default')
List of instantiated thresholders, e.g. [DSN(), FILTER()].
Default is [DSN(random_state=self.random_state), FILTER(),
OCSVM(random_state=self.random_state)]
max_contam : float, optional (default=0.5)
Maximum contamination allowed for each threshold output. Thresholded scores
above the maximum contamination will not be included in the final combined
threshold
method : {'mean', 'median', 'mode', 'bagged', 'stacked}, optional (default='stacked')
evaluation method to apply to contamination levels
- 'mean': calculate the mean combined threshold
- 'median': calculate the median combined threshold
- 'mode': calculate the majority vote or mode of the thresholded labels
- 'bagged': use a bagged LaplaceGaussianNB to solve the combined threshold
- 'stacked': use a stacked Ridge, and LaplaceGaussianNB classifier combined method
fallback : str ('ignore', 'warn', 'raise'), optional (default='warn')
The action to take for thresholders when their criterion are
not met. In these cases when set to 'ignore' on eval and fit
all train data is set to inliers and the threshold is set to
max of the train scores + eps. Passing 'warn' will do the same as
'ignore' but also produce a warning. If 'raise', the thresholder
raises a ValueError.
random_state : int, optional (default=1234)
Random seed for the random number generators of the thresholders. Can also
be set to None.
Attributes
----------
thresh_ : threshold value that separates inliers from outliers
confidence_interval_ : lower and upper confidence interval of the contamination level
dscores_ : 1D array of decomposed decision scores
"""
def __init__(self, thresholders="default", max_contam=0.5, method="stacked", fallback="warn", random_state=1234):
super().__init__(fallback=fallback)
self.thresholders = thresholders
self.max_contam = max_contam
func = {"mean": np.mean, "median": np.median, "mode": stats.mode, "bagged": BaggingClassifier, "stacked": StackingClassifier}
self.method = method
self.method_func = func[method]
self.random_state = random_state
np.random.seed(random_state)
self._attrs = ["_clf", "_active_thresholders"]
[docs]
def eval(self, decision):
"""Outlier/inlier evaluation process for decision scores.
Parameters
----------
decision : np.array or list of shape (n_samples)
or np.array of shape (n_samples, n_detectors)
which are the decision scores from a
outlier detection.
Returns
-------
outlier_labels : numpy array of shape (n_samples,)
For each observation, tells whether or not
it should be considered as an outlier according to the
fitted model. 0 stands for inliers and 1 for outliers.
"""
if self._is_fitted is None:
self._set_attributes(self._attrs, None)
self._active_thresholders = []
scores = check_array(decision, ensure_2d=False)
decision = self._data_setup(decision)
# Initialize thresholders
if self.thresholders == "default":
from .dsn import DSN
from .filter import FILTER
from .ocsvm import OCSVM
self.thresholders = [DSN(random_state=self.random_state), FILTER(), OCSVM(random_state=self.random_state)]
# Apply each thresholder
contam = []
ratio = []
counts = len(decision)
thresh_to_use = self._active_thresholders if self._active_thresholders else self.thresholders
for thresholder in thresh_to_use:
if self._is_fitted is not True:
thresholder.fit(scores)
labels = thresholder.predict(scores)
outlier_ratio = np.sum(labels) / counts
if not self._is_fitted and outlier_ratio < self.max_contam:
self._active_thresholders.append(thresholder)
if self._is_fitted or outlier_ratio < self.max_contam:
contam.append(labels)
ratio.append(outlier_ratio)
if not contam:
self._check_threshold(0.0, self.max_contam)
q = 100 * (1 - self.max_contam)
limit = np.percentile(decision, q)
self.thresh_ = limit
self.confidence_interval_ = [limit, limit]
return cut(decision, limit)
contam = np.array(contam)
ratio = np.array(ratio)
# Get lower and upper confidence interval
if self._is_fitted is not True:
low, high = stats.bootstrap(ratio.reshape(1, -1), np.mean, paired=True, random_state=self.random_state).confidence_interval
self.confidence_interval_ = [low, high]
# Get [mean, median, mode, bagged, or stacked] of inliers
if (self.method == "bagged") or (self.method == "stacked"):
X = np.tile(decision, len(contam))
y = np.hstack(contam)
if self.method == "bagged":
model = self.method_func(LaplaceGaussianNB(), n_estimators=12, random_state=self.random_state)
else:
model = self.method_func([("Ridge", RidgeClassifier()), ("GNB", LaplaceGaussianNB())])
if self._clf is None:
model.fit(X.reshape(-1, 1), y)
self._clf = model
lbls = self._clf.predict(decision.reshape(-1, 1))
self.thresh_ = None
return lbls
elif self.method == "mode":
self._clf = True
self.thresh_ = None
lbls = self.method_func(contam, axis=0)
return np.squeeze(lbls[0])
else:
if self.thresh_ is None:
contam = np.sum(contam, axis=1) / contam.shape[1]
inlier_ratio = 1 - self.method_func(contam)
idx = int(counts * inlier_ratio)
ordered = np.sort(decision)
limit = ordered[idx] if idx < counts else 1.0
self.thresh_ = limit
self._clf = True
return cut(decision, self.thresh_)
class LaplaceGaussianNB(BaseEstimator, ClassifierMixin):
def __init__(self):
pass
def fit(self, X, y):
X = X.squeeze()
self.models = []
self.priors = []
self.classes_ = np.array([0, 1])
dist = [stats.laplace, stats.norm]
for c in self.classes_:
subset_x = X[y == c]
self.models.append(dist[c](subset_x.mean(), subset_x.std()))
self.priors.append(len(subset_x) / len(X))
return self
def predict(self, X):
likelihoods = self.predict_proba(X)
return likelihoods.argmax(axis=1)
def predict_proba(self, X):
X = X.squeeze()
likelihoods = []
for c in self.classes_:
probs = self.priors[c] * self.models[c].pdf(X)
likelihoods.append(probs)
return np.vstack(likelihoods).T