Source code for pythresh.thresholds.comb

import numpy as np
import scipy.stats as stats
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.ensemble import BaggingClassifier, StackingClassifier
from sklearn.linear_model import RidgeClassifier
from sklearn.utils import check_array

from .base import BaseThresholder
from .thresh_utility import cut


[docs] class COMB(BaseThresholder): """COMB class for Combined thresholder. Use multiple thresholders as a non-parametric means to threshold scores generated by the decision_scores where outliers are set to any value beyond the (mean, median, or mode) of the contamination from the selected combination of thresholders. Parameters ---------- thresholders : list, optional (default='default') List of instantiated thresholders, e.g. [DSN(), FILTER()]. Default is [DSN(random_state=self.random_state), FILTER(), OCSVM(random_state=self.random_state)] max_contam : float, optional (default=0.5) Maximum contamination allowed for each threshold output. Thresholded scores above the maximum contamination will not be included in the final combined threshold method : {'mean', 'median', 'mode', 'bagged', 'stacked}, optional (default='stacked') evaluation method to apply to contamination levels - 'mean': calculate the mean combined threshold - 'median': calculate the median combined threshold - 'mode': calculate the majority vote or mode of the thresholded labels - 'bagged': use a bagged LaplaceGaussianNB to solve the combined threshold - 'stacked': use a stacked Ridge, and LaplaceGaussianNB classifier combined method fallback : str ('ignore', 'warn', 'raise'), optional (default='warn') The action to take for thresholders when their criterion are not met. In these cases when set to 'ignore' on eval and fit all train data is set to inliers and the threshold is set to max of the train scores + eps. Passing 'warn' will do the same as 'ignore' but also produce a warning. If 'raise', the thresholder raises a ValueError. random_state : int, optional (default=1234) Random seed for the random number generators of the thresholders. Can also be set to None. Attributes ---------- thresh_ : threshold value that separates inliers from outliers confidence_interval_ : lower and upper confidence interval of the contamination level dscores_ : 1D array of decomposed decision scores """ def __init__(self, thresholders="default", max_contam=0.5, method="stacked", fallback="warn", random_state=1234): super().__init__(fallback=fallback) self.thresholders = thresholders self.max_contam = max_contam func = {"mean": np.mean, "median": np.median, "mode": stats.mode, "bagged": BaggingClassifier, "stacked": StackingClassifier} self.method = method self.method_func = func[method] self.random_state = random_state np.random.seed(random_state) self._attrs = ["_clf", "_active_thresholders"]
[docs] def eval(self, decision): """Outlier/inlier evaluation process for decision scores. Parameters ---------- decision : np.array or list of shape (n_samples) or np.array of shape (n_samples, n_detectors) which are the decision scores from a outlier detection. Returns ------- outlier_labels : numpy array of shape (n_samples,) For each observation, tells whether or not it should be considered as an outlier according to the fitted model. 0 stands for inliers and 1 for outliers. """ if self._is_fitted is None: self._set_attributes(self._attrs, None) self._active_thresholders = [] scores = check_array(decision, ensure_2d=False) decision = self._data_setup(decision) # Initialize thresholders if self.thresholders == "default": from .dsn import DSN from .filter import FILTER from .ocsvm import OCSVM self.thresholders = [DSN(random_state=self.random_state), FILTER(), OCSVM(random_state=self.random_state)] # Apply each thresholder contam = [] ratio = [] counts = len(decision) thresh_to_use = self._active_thresholders if self._active_thresholders else self.thresholders for thresholder in thresh_to_use: if self._is_fitted is not True: thresholder.fit(scores) labels = thresholder.predict(scores) outlier_ratio = np.sum(labels) / counts if not self._is_fitted and outlier_ratio < self.max_contam: self._active_thresholders.append(thresholder) if self._is_fitted or outlier_ratio < self.max_contam: contam.append(labels) ratio.append(outlier_ratio) if not contam: self._check_threshold(0.0, self.max_contam) q = 100 * (1 - self.max_contam) limit = np.percentile(decision, q) self.thresh_ = limit self.confidence_interval_ = [limit, limit] return cut(decision, limit) contam = np.array(contam) ratio = np.array(ratio) # Get lower and upper confidence interval if self._is_fitted is not True: low, high = stats.bootstrap(ratio.reshape(1, -1), np.mean, paired=True, random_state=self.random_state).confidence_interval self.confidence_interval_ = [low, high] # Get [mean, median, mode, bagged, or stacked] of inliers if (self.method == "bagged") or (self.method == "stacked"): X = np.tile(decision, len(contam)) y = np.hstack(contam) if self.method == "bagged": model = self.method_func(LaplaceGaussianNB(), n_estimators=12, random_state=self.random_state) else: model = self.method_func([("Ridge", RidgeClassifier()), ("GNB", LaplaceGaussianNB())]) if self._clf is None: model.fit(X.reshape(-1, 1), y) self._clf = model lbls = self._clf.predict(decision.reshape(-1, 1)) self.thresh_ = None return lbls elif self.method == "mode": self._clf = True self.thresh_ = None lbls = self.method_func(contam, axis=0) return np.squeeze(lbls[0]) else: if self.thresh_ is None: contam = np.sum(contam, axis=1) / contam.shape[1] inlier_ratio = 1 - self.method_func(contam) idx = int(counts * inlier_ratio) ordered = np.sort(decision) limit = ordered[idx] if idx < counts else 1.0 self.thresh_ = limit self._clf = True return cut(decision, self.thresh_)
class LaplaceGaussianNB(BaseEstimator, ClassifierMixin): def __init__(self): pass def fit(self, X, y): X = X.squeeze() self.models = [] self.priors = [] self.classes_ = np.array([0, 1]) dist = [stats.laplace, stats.norm] for c in self.classes_: subset_x = X[y == c] self.models.append(dist[c](subset_x.mean(), subset_x.std())) self.priors.append(len(subset_x) / len(X)) return self def predict(self, X): likelihoods = self.predict_proba(X) return likelihoods.argmax(axis=1) def predict_proba(self, X): X = X.squeeze() likelihoods = [] for c in self.classes_: probs = self.priors[c] * self.models[c].pdf(X) likelihoods.append(probs) return np.vstack(likelihoods).T