Source code for pythresh.thresholds.decomp

import numpy as np
from sklearn.decomposition import NMF, PCA
from sklearn.random_projection import (
    GaussianRandomProjection,
    SparseRandomProjection
)

from .base import BaseThresholder
from .thresh_utility import check_scores, cut, gen_cdf, normalize


[docs] class DECOMP(BaseThresholder): """DECOMP class for Decomposition based thresholders. Use decomposition to evaluate a non-parametric means to threshold scores generated by the decision_scores where outliers are set to any value beyond the maximum of the decomposed matrix that results from decomposing the cumulative distribution function of the decision scores. See :cite:`boente2002decomp` for details Parameters ---------- method : {'NMF', 'PCA', 'GRP', 'SRP'}, optional (default='PCA') Method to use for decomposition - 'NMF': Non-Negative Matrix Factorization - 'PCA': Principal Component Analysis - 'GRP': Gaussian Random Projection - 'SRP': Sparse Random Projection random_state : int, optional (default=1234) Random seed for the decomposition algorithm. Can also be set to None. Attributes ---------- thresh_ : threshold value that separates inliers from outliers dscores_ : 1D array of decomposed decision scores Examples -------- The effects of randomness can affect the thresholder's output performance significantly. Therefore, to alleviate the effects of randomness on the thresholder a combined model can be used with different random_state values. E.g. .. code:: python # train the KNN detector from pyod.models.knn import KNN from pythresh.thresholds.comb import COMB from pythresh.thresholds.decomp import DECOMP clf = KNN() clf.fit(X_train) # get outlier scores decision_scores = clf.decision_scores_ # raw outlier scores # get outlier labels with combined model thres = COMB(thresholders = [DECOMP(random_state=1234), DECOMP(random_state=42), DECOMP(random_state=9685), DECOMP(random_state=111222)]) labels = thres.eval(decision_scores) """ def __init__(self, method='PCA', random_state=1234): self.method = method self.method_funcs = {'NMF': NMF(n_components=1, random_state=random_state), 'PCA': PCA(n_components=1, random_state=random_state), 'GRP': GaussianRandomProjection(n_components=2, random_state=random_state), 'SRP': SparseRandomProjection(n_components=3, random_state=random_state)} self.random_state = random_state
[docs] def eval(self, decision): """Outlier/inlier evaluation process for decision scores. Parameters ---------- decision : np.array or list of shape (n_samples) or np.array of shape (n_samples, n_detectors) which are the decision scores from a outlier detection. Returns ------- outlier_labels : numpy array of shape (n_samples,) For each observation, tells whether or not it should be considered as an outlier according to the fitted model. 0 stands for inliers and 1 for outliers. """ decision = check_scores(decision, random_state=self.random_state) decision = normalize(decision) self.dscores_ = decision # Generate a CDF of the decision scores val, dat_range = gen_cdf(decision, 0, 1, len(decision)*3) val = normalize(val) # Apply decomposition dec = self.method_funcs[str(self.method)].fit_transform( val.reshape(-1, 1)) # Set limit to max value from decomposition matrix limit = np.max(dec) limit = 1-limit if limit > 0.5 else limit self.thresh_ = limit return cut(decision, limit)