Source code for pythresh.thresholds.gamgmm

import warnings

import numpy as np
from scipy.optimize import least_squares
from scipy.stats import beta, dirichlet, multivariate_normal, wishart
from sklearn.decomposition import TruncatedSVD
from sklearn.mixture import BayesianGaussianMixture
from sklearn.utils import check_array

from .base import BaseThresholder
from .thresh_utility import normalize


[docs] class GAMGMM(BaseThresholder): r"""GAMGMM class for gammaGMM thresholder. Use a Bayesian method for estimating the posterior distribution of the contamination factor (i.e., the proportion of anomalies) for a given unlabeled dataset. The threshold is set such that the proportion of predicted anomalies equals the contamination factor. See :cite:`perini2023gamgmm` for details. Parameters ---------- n_contaminations : int, optional (default=1000) number of samples to draw from the contamination posterior distribution n_draws : int, optional (default=50) number of samples simultaneously drawn from each DPGMM component p0 : float, optional (default=0.01) probability that no anomalies are in the data phigh : float, optional (default=0.01) probability that there are more than high_gamma anomalies high_gamma : float, optional (default=0.15) sensibly high number of anomalies that has low probability to occur gamma_lim : float, optional (default=0.5) Upper gamma/proportion of anomalies limit K : int, optional (default=100) number of components for DPGMM used to approximate the Dirichlet Process skip : bool, optional (default=False) skip optimal hyperparameter test (this may return a sub-optimal solution) steps : int, optional (default=100) number of iterations to test for optimal hyperparameters random_state : int, optional (default=1234) Random seed for the random number generators of the thresholders. Can also be set to None. verbose : bool, optional (default=False) 20 iterations step printout of the DPGMM process Attributes ---------- thresh_ : threshold value that separates inliers from outliers dscores_ : 1D array of decomposed decision scores Notes ----- This implementation deviates from that in :cite:`perini2023gamgmm` only in the post-processing page. These deviations include: if a single outlier detector likelihood score set is passed a dummy score set of zeros will be added such that GAMGMM method can function correctly, if multiple outlier detector likelihood score sets are passed a TruncatedSVD 1D decomposed will be thresholded but not used to determine the gamma contamination. However, if you wish to follow the original implementation please go to `GammaGMM <https://github.com/Lorenzo-Perini/GammaGMM>`_ """ def __init__(self, n_contaminations=1000, n_draws=50, p0=0.01, phigh=0.01, high_gamma=0.15, gamma_lim=0.5, K=100, skip=False, steps=100, random_state=1234, verbose=False): self.n_contaminations = n_contaminations self.n_draws = n_draws self.p0 = p0 self.phigh = phigh self.high_gamma = high_gamma self.gamma_lim = gamma_lim self.K = K self.skip = skip self.steps = steps self.random_state = random_state self.verbose = verbose
[docs] def eval(self, decision): """Outlier/inlier evaluation process for decision scores. Parameters ---------- decision : np.array or list of shape (n_samples) or np.array of shape (n_samples, n_detectors) which are the decision scores from a outlier detection. Returns ------- outlier_labels : numpy array of shape (n_samples,n_contaminations) For each observation, tells whether or not it should be considered as an outlier according to the fitted model. 0 stands for inliers and 1 for outliers. """ if (np.asarray(decision).ndim == 2) & (np.atleast_2d(decision).shape[1] > 1): decision = check_array(decision, ensure_2d=True) score_space = self._augment_space(decision) # Decompose decision scores to 1D for thresholding decomp = TruncatedSVD(n_components=1, random_state=self.random_state) decision = decomp.fit_transform(normalize(decision)) else: decision = check_array(decision, ensure_2d=False).squeeze() decision = np.atleast_2d(decision).T score_space = self._augment_space(decision).squeeze() score_space = np.vstack( [score_space, np.zeros_like(score_space)]).T warnings.warn('''Using one set of outlier detection likelihood scores may lead to a suboptimal or no solution. Please consider increasing the the number of outlier detection score sets''') decision = normalize(decision.squeeze()) self.dscores_ = decision.copy() # Compute the gamma posterior and threshold gamma_posterior_sample = self._compute_gamma_posterior(score_space) gamma_mean = np.mean(gamma_posterior_sample) self.thresh_ = np.percentile(decision, 100 * (1 - gamma_mean)) labels = (decision > self.thresh_).astype('int').ravel() return labels
def _compute_gamma_posterior(self, decision): # Handle overflow of components versus samples. self.K = decision.shape[0] - \ 1 if np.shape(decision)[0] < self.K else self.K itv = 0 random_start = 1234 if not self.random_state else self.random_state # Repeat the loop until a valid gamma sample is found else return unassigned gammas while True: whileseed = random_start + 100*itv np.random.seed(whileseed) # Fit the DPGMM bgm = BayesianGaussianMixture(weight_concentration_prior_type='dirichlet_process', n_components=self.K, weight_concentration_prior=0.01, max_iter=1500, random_state=whileseed, verbose=self.verbose, verbose_interval=20, reg_covar=1e-4).fit(decision) # Drop components with less than 2 instances assigned filter_idx = np.where(bgm.weight_concentration_[0] >= 2)[0] tot_concentration = np.sum(bgm.weight_concentration_[0]) partial_concentration = np.sum( bgm.weight_concentration_[0][filter_idx]) means = bgm.means_[filter_idx] covariances = bgm.covariances_[filter_idx, :, :] alphas = bgm.weight_concentration_[0][filter_idx] mean_precs = bgm.mean_precision_[filter_idx] dgf = bgm.degrees_of_freedom_[filter_idx] idx_sortcomponents, meanstd = self._order_components( means, mean_precs, covariances, dgf) # Redistribute the lost mass (after cutting off some components) alphas = alphas[idx_sortcomponents] + \ (tot_concentration-partial_concentration)/len(filter_idx) # Solve the optimization problem to find the hyperparameters delta and tau res = least_squares(self._find_delta_tau, x0=(-2, 1), args=(meanstd, alphas), bounds=((-50, -1), (0, 50))) delta, tau = res.x # Check that delta and tau are properly found, allowing for a 10% error p0Est, phighEst = self._check_delta_tau( delta, tau, meanstd, alphas) if (((p0Est < self.p0*1.1) & (p0Est > self.p0*0.9) & (phighEst < self.phigh*1.1) & (phighEst > self.phigh*0.9)) or (self.skip)): # If hyperparameters are OK, break the loop. Otherwise repeat it with different seeds if self.verbose: print('Optimal hyperparameters were found') break elif self.verbose: print( 'Optimal hyperparameters were not found. Rerunning the model on a new seed.') itv += 1 if itv > self.steps: print('No solution found! Returning unassigned gammas') return np.zeros(self.n_contaminations, np.float32) # Sort the components values and extract the parameters posteriors means = means[idx_sortcomponents] covariances = covariances[idx_sortcomponents, :, :] mean_precs = bgm.mean_precision_[idx_sortcomponents] dgf = bgm.degrees_of_freedom_[idx_sortcomponents] # Compute the cumulative sum of the mixing proportion (GMM weights) gmm_weights = np.cumsum(dirichlet(alphas).rvs( self.n_contaminations), axis=1) w = {} for k in range(len(filter_idx)): w[k+1] = gmm_weights[:, k] # Sample from gamma's posterior by computing the probabilities gamma = self._sample_withexactprobs( means, mean_precs, covariances, dgf, delta, tau, w) # Clip gammas to upper limit gamma = gamma[gamma < self.gamma_lim] if np.all(gamma == 0): return np.zeros(self.n_contaminations, np.float32) if len(gamma) < self.n_contaminations: gamma = np.concatenate((gamma, np.random.choice( gamma[gamma > 0.0], self.n_contaminations - len(gamma), replace=True))) # return the sample from gamma's posterior return gamma def _order_components(self, means, mean_precs, covariances, dgf): K, M = np.shape(means) meanstd = np.zeros(K, np.float32) mean_std = np.sqrt(1/mean_precs) for k in range(K): sample_mean_component = multivariate_normal.rvs(mean=means[k, :], cov=mean_std[k]**2, size=1000, random_state=self.random_state) sample_covariance = wishart.rvs( df=dgf[k], scale=covariances[k]/dgf[k], size=1000, random_state=self.random_state) var = np.array([np.diag(sample_covariance[i]) for i in range(1000)]) meanstd[k] = np.mean([np.mean(sample_mean_component[:, m].reshape(-1) / (1 + np.sqrt(var[:, m].reshape(-1)))) for m in range(M)]) idx_components = np.argsort(-meanstd) meanstd = meanstd[idx_components] return idx_components, np.array(meanstd) def _find_delta_tau(self, params, *args): delta, tau = params meanstd, alphas = args first_eq = delta - (np.log(self.p0/(1 - self.p0)) - tau)/meanstd[0] prob_ck = self._sigmoid(delta, tau, meanstd) prob_c1ck = self._derive_jointprobs(prob_ck) a = np.cumsum(alphas) b = sum(alphas) - np.cumsum(alphas) probBetaGreaterT = np.nan_to_num( beta.sf(self.high_gamma, a, b), nan=1.0) second_eq = np.sum(probBetaGreaterT * prob_c1ck) - self.phigh return (first_eq, second_eq) def _check_delta_tau(self, delta, tau, meanstd, alphas): """Check that delta and tau are properly set. Return the p_0 and p_high estimated using the given delta and tau """ prob_ck = self._sigmoid(delta, tau, meanstd) p0Est = 1 - prob_ck[0] prob_c1ck = self._derive_jointprobs(prob_ck) a = np.cumsum(alphas) b = sum(alphas) - np.cumsum(alphas) probBetaGreaterT = np.nan_to_num( beta.sf(self.high_gamma, a, b), nan=1.0) phighEst = np.sum(probBetaGreaterT * prob_c1ck) return p0Est, phighEst def _sigmoid(self, delta, tau, x): """Transforms scores into probabilities using a sigmoid function.""" return 1/(1+np.exp(tau+delta*x)) def _derive_jointprobs(self, prob_ck): """Obtain the joint probabilities given the conditional probabilities.""" cumprobs = np.cumprod(prob_ck) negprobs = np.roll(1-prob_ck, -1) negprobs[-1] = 1 prob_c1ck = cumprobs*negprobs return prob_c1ck def _sample_withexactprobs(self, means, mean_precs, covariances, dgf, delta, tau, w): """This function computes the joint probabilities and use them to get a sample from gamma's posterior.""" K = np.shape(means)[0] mean_std = np.sqrt(1/mean_precs) samples = np.array([]) i = 0 random_start = 1234 if not self.random_state else self.random_state while len(samples) < self.n_contaminations * (1-self.p0): prob_ck = np.zeros(K, np.float32) for k in range(K): rnd = (i+1) * (k+1) sample_mean_component = multivariate_normal.rvs(mean=means[k, :], cov=mean_std[k]**2, size=1, random_state=10*random_start + rnd) sample_covariance = wishart.rvs(df=dgf[k], scale=covariances[k]/dgf[k], size=1, random_state=10*random_start + rnd) var = np.diag(sample_covariance) meanstd = np.mean((sample_mean_component)/(1+np.sqrt(var))) prob_ck[k] = self._sigmoid(delta, tau, meanstd) prob_c1ck = self._derive_jointprobs(prob_ck) for k in range(K): ns = int(np.round(self.n_draws * prob_c1ck[k], 0)) if ns > 0: samples = np.concatenate( (samples, np.random.choice(w[k+1], ns, replace=False))) i += 1 if len(samples) > self.n_contaminations*(1-self.p0): samples = np.random.choice(samples, int( self.n_contaminations*(1-self.p0)), replace=False) samples = np.concatenate( (samples, np.zeros(self.n_contaminations - len(samples), np.float32))) return samples def _augment_space(self, decision): """Map outlier likelihood scores into the positive axis, take the log and z normalize the final values.""" norm_scores = np.zeros_like(decision) for i, scores in enumerate(decision.T): minx = np.min(scores) x = np.log(scores - minx + 0.01) meanx = np.mean(x) stdx = np.std(x) x = (x - meanx)/stdx if stdx > 0 else x norm_scores[:, i] = x return norm_scores