Source code for pythresh.thresholds.meta

from importlib.resources import as_file, files

import joblib
import numpy as np
import pandas as pd
import scipy.stats as stats
import sklearn
from numba import njit, prange
from sklearn.linear_model import RidgeClassifierCV
from sklearn.preprocessing import MinMaxScaler

from .base import BaseThresholder

_NEEDS_CLASSES = tuple(map(int, sklearn.__version__.split(".")[:2])) >= (1, 8)


[docs] class META(BaseThresholder): r"""META class for Meta-modelling thresholder. Use a trained meta-model to evaluate a non-parametric means to threshold scores generated by the decision_scores where outliers are set based on the trained meta-model classifier. See :cite:`zhao2022meta` for details. Parameters ---------- method : {'LIN', 'GNB', 'GNBC', 'GNBM'}, optional (default='GNBM') select - 'LIN': RidgeCV trained linear classifier meta-model on true labels - 'GNB': Gaussian Naive Bayes trained classifier meta-model on true labels - 'GNBC': Gaussian Naive Bayes trained classifier meta-model on best contamination - 'GNBM': Gaussian Naive Bayes multivariate trained classifier meta-model random_state : int, optional (default=1234) Random seed for the random number generators of the thresholders. Can also be set to None. Attributes ---------- thresh_ : threshold value that separates inliers from outliers dscores_ : 1D array of decomposed decision scores Notes ----- Meta-modelling is the creation of a model of models. If a dataset that contains only the explanatory variables (X), yet no response variable (y), it can still be predicted by using a meta-model. This is done by modelling datasets with known response variables that are similar to the dataset that is missing the response variable. The META thresholder was trained using the ``PyOD`` outlier detection methods ``LODA, QMCD, CD, MCD, GMM, KNN, KDE, PCA, Sampling`` and ``IForest`` on the AD benchmark datasets: ``ALOI, annthyroid, breastw, campaign, cardio, Cardiotocography, fault, glass, Hepatitis, Ionosphere, landsat, letter, Lymphography, magic.gamma, mammography, mnist, musk, optdigits, PageBlocks, pendigits, Pima, satellite, satimage-2, shuttle, smtp, SpamBase, speech, Stamps, thyroid, vertebral, vowels, Waveform, WBC, WDBC, Wilt, wine, WPBC, yeast`` available at `ADBench dataset <https://github.com/Minqi824/ADBench/tree/main/adbench/datasets/Classical>`_. META uses a majority vote of all the trained models to determine the inlier/outlier labels. Update: the latest GNBC model was further trained on the ``backdoor, celeba, census, cover, donors, fraud, http, InternetAds,`` and ``skin`` datasets and additionally using the ``AutoEncoder, LUNAR, OCSVM, HBOS, KPCA,`` and ``DIF`` outlier detection methods. """ def __init__(self, method="GNBM", random_state=1234): super().__init__() self.method = method self.random_state = random_state np.random.seed(random_state) self._attrs = ["_kde", "_scaler", "_knorm", "_pnorm", "_qnorm", "_is_flipped"]
[docs] def eval(self, decision): """Outlier/inlier evaluation process for decision scores. Parameters ---------- decision : np.array or list of shape (n_samples) or np.array of shape (n_samples, n_detectors) which are the decision scores from a outlier detection. Returns ------- outlier_labels : numpy array of shape (n_samples,) For each observation, tells whether or not it should be considered as an outlier according to the fitted model. 0 stands for inliers and 1 for outliers. """ if self._is_fitted is None: self._set_attributes(self._attrs, None) decision = self._data_setup(decision) if self.method == "LIN": clf = "meta_model_LIN.pkl" elif self.method == "GNB": clf = "meta_model_GNB.pkl" elif self.method == "GNBC": clf = "meta_model_GNBC.pkl" else: clf = "meta_model_GNBM.pkl" contam = [] counts = len(decision) with as_file(files("pythresh.models").joinpath(clf)) as model_path: model = joblib.load(model_path) # Sklearn 1.8.0 API patch for e in getattr(model, "estimators_", {}).values(): self._patch_ridge(e) self._patch_ridge(getattr(model, "estimator", None)) if self.method == "GNBM": if self._scaler is None: scaler = MinMaxScaler() scaler.fit(decision.reshape(-1, 1)) self._scaler = scaler self._norm = scaler.transform(decision.reshape(-1, 1)) norm = self._scaler.transform(decision.reshape(-1, 1)) qmcd = self._wrap_around_discrepancy(self._norm, norm) qmcd = self._set_norm(qmcd, "_qnorm") # Get criterion for inverting scores if self._is_flipped is None: skew = stats.skew(qmcd) kurt = stats.kurtosis(qmcd) # Invert score order based on criterion if (skew < 0) or ((skew >= 0) & (kurt < 0)): self._is_flipped = True if self._is_flipped: qmcd = qmcd.max() + qmcd.min() - qmcd if self._kde is None: kde = stats.gaussian_kde(decision) self._kde = kde pdf = self._kde.pdf(decision) pdf = self._set_norm(pdf, "_knorm") pdf[pdf < 0] = 0 for i in range(len(model.groups_)): df = pd.DataFrame() df["scores"] = decision df["groups"] = i if self.method == "GNBM": df["qmcd"] = qmcd df["kdes"] = pdf ** (1 / 10) labels = model.predict(df) outlier_ratio = np.sum(labels) / counts if (outlier_ratio < 0.5) & (outlier_ratio > 0): contam.append(labels) contam = np.array(contam) lbls = stats.mode(contam, axis=0) lbls = np.squeeze(lbls[0]) self.thresh_ = None return lbls
@staticmethod def _patch_ridge(est): """RidgeClassifierCV classes_ attribute patch.""" if _NEEDS_CLASSES and isinstance(est, RidgeClassifierCV) and not hasattr(est, "classes_"): est.classes_ = np.array([0, 1]) @staticmethod @njit(fastmath=True, parallel=True) def _wrap_around_discrepancy(data, check): # pragma: no cover """Wrap-around Quasi-Monte Carlo discrepancy method.""" n = data.shape[0] d = data.shape[1] p = check.shape[0] disc = np.zeros(p) for i in prange(p): dc = 0.0 for j in prange(n): prod = 1.0 for k in prange(d): x_kikj = abs(check[i, k] - data[j, k]) prod *= 3.0 / 2.0 - x_kikj + x_kikj**2 dc += prod disc[i] = dc return -((4.0 / 3.0) ** d) + 1.0 / (n**2) * disc