Source code for pythresh.thresholds.comb

import numpy as np
import scipy.stats as stats
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.ensemble import BaggingClassifier, StackingClassifier
from sklearn.linear_model import RidgeClassifier
from sklearn.utils import check_array

from .base import BaseThresholder
from .thresh_utility import check_scores, cut, normalize


[docs] class COMB(BaseThresholder): """COMB class for Combined thresholder. Use multiple thresholders as a non-parametric means to threshold scores generated by the decision_scores where outliers are set to any value beyond the (mean, median, or mode) of the contamination from the selected combination of thresholders. Parameters ---------- thresholders : list, optional (default='default') List of instantiated thresholders, e.g. [DSN(), FILTER()]. Default is [DSN(random_state=self.random_state), FILTER(), OCSVM(random_state=self.random_state)] max_contam : float, optional (default=0.5) Maximum contamination allowed for each threshold output. Thresholded scores above the maximum contamination will not be included in the final combined threshold method : {'mean', 'median', 'mode', 'bagged', 'stacked}, optional (default='stacked') evaluation method to apply to contamination levels - 'mean': calculate the mean combined threshold - 'median': calculate the median combined threshold - 'mode': calculate the majority vote or mode of the thresholded labels - 'bagged': use a bagged LaplaceGaussianNB to solve the combined threshold - 'stacked': use a stacked Ridge, and LaplaceGaussianNB classifier combined method random_state : int, optional (default=1234) Random seed for the random number generators of the thresholders. Can also be set to None. Attributes ---------- thresh_ : threshold value that separates inliers from outliers confidence_interval_ : lower and upper confidence interval of the contamination level dscores_ : 1D array of decomposed decision scores """ def __init__(self, thresholders='default', max_contam=0.5, method='stacked', random_state=1234): self.thresholders = thresholders self.max_contam = max_contam func = {'mean': np.mean, 'median': np.median, 'mode': stats.mode, 'bagged': BaggingClassifier, 'stacked': StackingClassifier} self.method = method self.method_func = func[method] self.random_state = random_state
[docs] def eval(self, decision): """Outlier/inlier evaluation process for decision scores. Parameters ---------- decision : np.array or list of shape (n_samples) or np.array of shape (n_samples, n_detectors) which are the decision scores from a outlier detection. Returns ------- outlier_labels : numpy array of shape (n_samples,) For each observation, tells whether or not it should be considered as an outlier according to the fitted model. 0 stands for inliers and 1 for outliers. """ scores = check_array(decision, ensure_2d=False) decision = check_scores(decision, random_state=self.random_state) decision = normalize(decision) self.dscores_ = decision # Initialize thresholders if self.thresholders == 'default': from .dsn import DSN from .filter import FILTER from .ocsvm import OCSVM self.thresholders = [DSN(random_state=self.random_state), FILTER(), OCSVM(random_state=self.random_state)] # Apply each thresholder contam = [] ratio = [] counts = len(decision) for thresholder in self.thresholders: labels = thresholder.eval(scores) outlier_ratio = np.sum(labels)/counts if outlier_ratio < self.max_contam: contam.append(labels) ratio.append(outlier_ratio) contam = np.array(contam) ratio = np.array(ratio) # Get lower and upper confidence interval low, high = stats.bootstrap(ratio.reshape(1, -1), np.mean, paired=True, random_state=self.random_state).confidence_interval self.confidence_interval_ = [low, high] # Get [mean, median, mode, bagged, or stacked] of inliers if (self.method == 'bagged') or (self.method == 'stacked'): X = np.tile(decision, len(contam)) y = np.hstack(contam) if (self.method == 'bagged'): model = self.method_func(LaplaceGaussianNB(), n_estimators=12, random_state=self.random_state) else: model = self.method_func([('Ridge', RidgeClassifier()), ('GNB', LaplaceGaussianNB())]) model.fit(X.reshape(-1, 1), y) lbls = model.predict(decision.reshape(-1, 1)) self.thresh_ = None return lbls elif self.method == 'mode': self.thresh_ = None lbls = self.method_func(contam, axis=0) return np.squeeze(lbls[0]) else: contam = np.sum(contam, axis=1)/contam.shape[1] inlier_ratio = 1-self.method_func(contam) idx = int(counts*inlier_ratio) ordered = np.sort(decision) limit = ordered[idx] if idx < counts else 1.0 self.thresh_ = limit return cut(decision, limit)
class LaplaceGaussianNB(BaseEstimator, ClassifierMixin): def __init__(self): pass def fit(self, X, y): X = X.squeeze() self.models = [] self.priors = [] self.classes_ = [0, 1] dist = [stats.laplace, stats.norm] for c in self.classes_: subset_x = X[y == c] self.models.append(dist[c](subset_x.mean(), subset_x.std())) self.priors.append(len(subset_x)/len(X)) return self def predict(self, X): likelihoods = self.predict_proba(X) return likelihoods.argmax(axis=1) def predict_proba(self, X): X = X.squeeze() likelihoods = [] for c in self.classes_: probs = self.priors[c] * self.models[c].pdf(X) likelihoods.append(probs) return np.vstack(likelihoods).T