import numpy as np
from sklearn.decomposition import NMF, PCA
from sklearn.random_projection import GaussianRandomProjection, SparseRandomProjection
from .base import BaseThresholder
from .thresh_utility import cut, gen_cdf, normalize
[docs]
class DECOMP(BaseThresholder):
"""DECOMP class for Decomposition based thresholders.
Use decomposition to evaluate a non-parametric means
to threshold scores generated by the decision_scores where outliers
are set to any value beyond the maximum of the decomposed
matrix that results from decomposing the cumulative distribution
function of the decision scores.
See :cite:`boente2002decomp` for details
Parameters
----------
method : {'NMF', 'PCA', 'GRP', 'SRP'}, optional (default='PCA')
Method to use for decomposition
- 'NMF': Non-Negative Matrix Factorization
- 'PCA': Principal Component Analysis
- 'GRP': Gaussian Random Projection
- 'SRP': Sparse Random Projection
random_state : int, optional (default=1234)
Random seed for the decomposition algorithm. Can also be set to None.
Attributes
----------
thresh_ : threshold value that separates inliers from outliers
dscores_ : 1D array of decomposed decision scores
Examples
--------
The effects of randomness can affect the thresholder's output performance
significantly. Therefore, to alleviate the effects of randomness on the
thresholder a combined model can be used with different random_state values.
E.g.
.. code:: python
# train the KNN detector
from pyod.models.knn import KNN
from pythresh.thresholds.comb import COMB
from pythresh.thresholds.decomp import DECOMP
clf = KNN()
clf.fit(X_train)
# get outlier scores
decision_scores = clf.decision_scores_ # raw outlier scores
# get outlier labels with combined model
thres = COMB(thresholders = [DECOMP(random_state=1234),
DECOMP(random_state=42), DECOMP(random_state=9685),
DECOMP(random_state=111222)])
labels = thres.eval(decision_scores)
"""
def __init__(self, method="PCA", random_state=1234):
super().__init__()
self.method = method
self.method_funcs = {
"NMF": NMF(n_components=1, random_state=random_state),
"PCA": PCA(n_components=1, random_state=random_state),
"GRP": GaussianRandomProjection(n_components=2, random_state=random_state),
"SRP": SparseRandomProjection(n_components=3, random_state=random_state),
}
self.random_state = random_state
np.random.seed(random_state)
[docs]
def eval(self, decision):
"""Outlier/inlier evaluation process for decision scores.
Parameters
----------
decision : np.array or list of shape (n_samples)
or np.array of shape (n_samples, n_detectors)
which are the decision scores from a
outlier detection.
Returns
-------
outlier_labels : numpy array of shape (n_samples,)
For each observation, tells whether or not
it should be considered as an outlier according to the
fitted model. 0 stands for inliers and 1 for outliers.
"""
decision = self._data_setup(decision)
# Generate a CDF of the decision scores
val, _ = gen_cdf(decision, 0, 1, len(decision) * 3)
val = normalize(val)
# Apply decomposition
dec = self.method_funcs[str(self.method)].fit_transform(val.reshape(-1, 1))
# Set limit to max value from decomposition matrix
limit = np.max(dec)
limit = 1 - limit if limit > 0.5 else limit
self.thresh_ = limit
return cut(decision, limit)