Source code for pythresh.thresholds.meta

from importlib.resources import files

import joblib
import numpy as np
import pandas as pd
import scipy.stats as stats
import sklearn
from numba import njit, prange
from sklearn.linear_model import RidgeClassifierCV
from sklearn.preprocessing import MinMaxScaler

from .base import BaseThresholder

_NEEDS_CLASSES = tuple(map(int, sklearn.__version__.split(".")[:2])) >= (1, 8)


[docs] class META(BaseThresholder): r"""META class for Meta-modelling thresholder. Use a trained meta-model to evaluate a non-parametric means to threshold scores generated by the decision_scores where outliers are set based on the trained meta-model classifier. See :cite:`zhao2022meta` for details. Parameters ---------- method : {'LIN', 'GNB', 'GNBC', 'GNBM'}, optional (default='GNBM') select - 'LIN': RidgeCV trained linear classifier meta-model on true labels - 'GNB': Gaussian Naive Bayes trained classifier meta-model on true labels - 'GNBC': Gaussian Naive Bayes trained classifier meta-model on best contamination - 'GNBM': Gaussian Naive Bayes multivariate trained classifier meta-model random_state : int, optional (default=1234) Random seed for the random number generators of the thresholders. Can also be set to None. Attributes ---------- thresh_ : threshold value that separates inliers from outliers dscores_ : 1D array of decomposed decision scores Notes ----- Meta-modelling is the creation of a model of models. If a dataset that contains only the explanatory variables (X), yet no response variable (y), it can still be predicted by using a meta-model. This is done by modelling datasets with known response variables that are similar to the dataset that is missing the response variable. The META thresholder was trained using the ``PyOD`` outlier detection methods ``LODA, QMCD, CD, MCD, GMM, KNN, KDE, PCA, Sampling`` and ``IForest`` on the AD benchmark datasets: ``ALOI, annthyroid, breastw, campaign, cardio, Cardiotocography, fault, glass, Hepatitis, Ionosphere, landsat, letter, Lymphography, magic.gamma, mammography, mnist, musk, optdigits, PageBlocks, pendigits, Pima, satellite, satimage-2, shuttle, smtp, SpamBase, speech, Stamps, thyroid, vertebral, vowels, Waveform, WBC, WDBC, Wilt, wine, WPBC, yeast`` available at `ADBench dataset <https://github.com/Minqi824/ADBench/tree/main/adbench/datasets/Classical>`_. META uses a majority vote of all the trained models to determine the inlier/outlier labels. Update: the latest GNBC model was further trained on the ``backdoor, celeba, census, cover, donors, fraud, http, InternetAds,`` and ``skin`` datasets and additionally using the ``AutoEncoder, LUNAR, OCSVM, HBOS, KPCA,`` and ``DIF`` outlier detection methods. """ def __init__(self, method="GNBM", random_state=1234): super().__init__() self.method = method self.random_state = random_state np.random.seed(random_state) self._attrs = ["_kde", "_scaler", "_knorm", "_pnorm", "_qnorm", "_is_flipped"]
[docs] def eval(self, decision): """Outlier/inlier evaluation process for decision scores. Parameters ---------- decision : np.array or list of shape (n_samples) or np.array of shape (n_samples, n_detectors) which are the decision scores from a outlier detection. Returns ------- outlier_labels : numpy array of shape (n_samples,) For each observation, tells whether or not it should be considered as an outlier according to the fitted model. 0 stands for inliers and 1 for outliers. """ if self._is_fitted is None: self._set_attributes(self._attrs, None) decision = self._data_setup(decision) if self.method == "LIN": clf = "meta_model_LIN.pkl" elif self.method == "GNB": clf = "meta_model_GNB.pkl" elif self.method == "GNBC": clf = "meta_model_GNBC.pkl" else: clf = "meta_model_GNBM.pkl" contam = [] counts = len(decision) model_path = files("pythresh.models").joinpath(clf) model = joblib.load(model_path) # Sklearn 1.8.0 API patch for e in getattr(model, "estimators_", {}).values(): self._patch_ridge(e) self._patch_ridge(getattr(model, "estimator", None)) if self.method == "GNBM": if self._scaler is None: scaler = MinMaxScaler() scaler.fit(decision.reshape(-1, 1)) self._scaler = scaler self._norm = scaler.transform(decision.reshape(-1, 1)) norm = self._scaler.transform(decision.reshape(-1, 1)) qmcd = self._wrap_around_discrepancy(self._norm, norm) qmcd = self._set_norm(qmcd, "_qnorm") # Get criterion for inverting scores if self._is_flipped is None: skew = stats.skew(qmcd) kurt = stats.kurtosis(qmcd) # Invert score order based on criterion if (skew < 0) or ((skew >= 0) & (kurt < 0)): self._is_flipped = True if self._is_flipped: qmcd = qmcd.max() + qmcd.min() - qmcd if self._kde is None: kde = stats.gaussian_kde(decision) self._kde = kde pdf = self._kde.pdf(decision) pdf = self._set_norm(pdf, "_knorm") pdf[pdf < 0] = 0 for i in range(len(model.groups_)): df = pd.DataFrame() df["scores"] = decision df["groups"] = i if self.method == "GNBM": df["qmcd"] = qmcd df["kdes"] = pdf ** (1 / 10) labels = model.predict(df) outlier_ratio = np.sum(labels) / counts if (outlier_ratio < 0.5) & (outlier_ratio > 0): contam.append(labels) contam = np.array(contam) lbls = stats.mode(contam, axis=0) lbls = np.squeeze(lbls[0]) self.thresh_ = None return lbls
@staticmethod def _patch_ridge(est): """RidgeClassifierCV classes_ attribute patch.""" if _NEEDS_CLASSES and isinstance(est, RidgeClassifierCV) and not hasattr(est, "classes_"): est.classes_ = np.array([0, 1]) @staticmethod @njit(fastmath=True, parallel=True) def _wrap_around_discrepancy(data, check): # pragma: no cover """Wrap-around Quasi-Monte Carlo discrepancy method.""" n = data.shape[0] d = data.shape[1] p = check.shape[0] disc = np.zeros(p) for i in prange(p): dc = 0.0 for j in prange(n): prod = 1.0 for k in prange(d): x_kikj = abs(check[i, k] - data[j, k]) prod *= 3.0 / 2.0 - x_kikj + x_kikj**2 dc += prod disc[i] = dc return -((4.0 / 3.0) ** d) + 1.0 / (n**2) * disc