Source code for pythresh.thresholds.vae

import numpy as np
import scipy.stats as stats
import torch
import torch.optim as opt
from torch import nn
from torch.distributions import Normal, kl_divergence
from torch.nn.functional import softplus
from tqdm import tqdm

from .base import BaseThresholder
from .thresh_utility import check_scores, cut, normalize


[docs] class VAE(BaseThresholder): r"""VAE class for Variational AutoEncoder thresholder. Use a VAE to evaluate a non-parametric means to threshold scores generated by the decision_scores where outliers are set to any value beyond the maximum minus the minimum of the reconstructed distribution probabilities after encoding. See :cite:`xiao2020vae` for details Parameters ---------- verbose : bool, optional (default=False) display training progress device : str, optional (default='cpu') device for pytorch latent_dims : int, optional (default='auto') number of latent dimensions the encoder will map the scores to. Default 'auto' applies automatic dimensionality selection using a profile likelihood. random_state : int, optional (default=1234) random seed for the normal distribution. Can also be set to None epochs : int, optional (default=100) number of epochs to train the VAE batch_size : int, optional (default=64) batch size for the dataloader during training loss : str, optional (default='kl') Loss function during training - 'kl' : use the combined negative log likelihood and Kullback-Leibler divergence - 'mmd': use the combined negative log likelihood and maximum mean discrepancy Attributes ---------- thresh_ : threshold value that separates inliers from outliers Examples -------- The effects of randomness can affect the thresholder's output performance significantly. Therefore, to alleviate the effects of randomness on the thresholder a combined model can be used with different random_state values. E.g. .. code:: python # train the KNN detector from pyod.models.knn import KNN from pythresh.thresholds.comb import COMB from pythresh.thresholds.vae import VAE clf = KNN() clf.fit(X_train) # get outlier scores decision_scores = clf.decision_scores_ # raw outlier scores # get outlier labels with combined model thres = COMB(thresholders = [VAE(random_state=1234), VAE(random_state=42), VAE(random_state=9685), VAE(random_state=111222)]) labels = thres.eval(decision_scores) """ def __init__(self, verbose=False, device='cpu', latent_dims='auto', random_state=1234, epochs=100, batch_size=64, loss='kl'): self.verbose = verbose self.device = device self.latent_dims = latent_dims self.dist = Normal self.epochs = epochs self.loss = loss self.batch_size = batch_size self.random_state = random_state torch.manual_seed(random_state) if random_state else None
[docs] def eval(self, decision): """Outlier/inlier evaluation process for decision scores. Parameters ---------- decision : np.array or list of shape (n_samples) or np.array of shape (n_samples, n_detectors) which are the decision scores from a outlier detection. Returns ------- outlier_labels : numpy array of shape (n_samples,) For each observation, tells whether or not it should be considered as an outlier according to the fitted model. 0 stands for inliers and 1 for outliers. """ decision = check_scores(decision, random_state=self.random_state) scores = normalize(decision.copy()) self.dscores_ = scores if self.latent_dims == 'auto': self.latent_dims = self._autodim(scores) decision = normalize(decision).astype(np.float32).reshape(-1, 1) self.model = VAE_model(1, self.latent_dims, self.random_state, self.dist, self.loss).to(self.device) self.data = torch.utils.data.DataLoader( torch.from_numpy(decision), batch_size=self.batch_size, shuffle=True) self._train() with torch.no_grad(): z = self.model.reconstructed_probability(torch.from_numpy(decision).to( self.device)).to('cpu').detach().numpy() limit = np.max(z)-np.min(z) self.thresh_ = limit return cut(scores, limit)
def _autodim(self, vals): """Estimate the latent dimension size using the method of Zhu and Ghodsi (2006).""" vals = np.sort(vals)[::-1] m = len(vals) profile_lik = [] for i in range(1, m): mu1 = np.mean(vals[:i]) mu2 = np.mean(vals[i:]) sigma = np.sqrt((np.sum((vals[:i] - mu1) ** 2) + np.sum((vals[i:] - mu2) ** 2)) / (m-2)) profile_lik += [np.sum(stats.norm.logpdf(vals[:i], loc=mu1, scale=sigma)) + np.sum(stats.norm.logpdf(vals[i:], loc=mu2, scale=sigma))] prof = np.argsort(profile_lik)[-1] valen = len(vals)**0.7 prof = prof if prof > 0 else valen return round(np.log(prof)) def _train(self): optimizer = torch.optim.Adam(self.model.parameters(), weight_decay=1e-4, lr=1e-2) scheduler = opt.lr_scheduler.ExponentialLR(optimizer, gamma=0.95) for _ in (tqdm(range(self.epochs), ascii=True, desc='Training') if self.verbose else range(self.epochs)): for x in self.data: x = x.to(self.device) optimizer.zero_grad() _ = self.model.forward(x) optimizer.step() scheduler.step()
class VAE_model(nn.Module): def __init__(self, input_size, latent_size, random_state, dist, loss, L=16): super().__init__() self.L = L self.input_size = input_size self.latent_size = latent_size self.encoder = self.data_encoder(input_size, latent_size) self.decoder = self.data_decoder(latent_size, input_size) self.dist = dist self.loss = loss torch.manual_seed(random_state) if random_state else None self.prior = self.dist(0, 1) def data_encoder(self, input_size, latent_size): return nn.Sequential( nn.Linear(input_size, 128), nn.LeakyReLU(), nn.Linear(128, 64), nn.LeakyReLU(), nn.Linear(64, latent_size * 2) ) def data_decoder(self, latent_size, output_size): return nn.Sequential( nn.Linear(latent_size, 64), nn.LeakyReLU(), nn.Linear(64, 128), nn.LeakyReLU(), nn.Linear(128, output_size * 2) ) def forward(self, x): pred_result = self.predict(x) x = x.unsqueeze(0) # average over sample dimension log_lik = self.dist(pred_result['recon_mu'], pred_result['recon_sigma']).log_prob(x).mean( dim=0) log_lik = log_lik.mean(dim=0).sum() # calculate the kl divergence and the forward loss if self.loss == 'kl': kl = kl_divergence(pred_result['latent_dist'], self.prior).mean(dim=0).sum() loss = kl - log_lik return dict(loss=loss, kl=kl, recon_loss=log_lik, **pred_result) # calculate the mmd and the forward loss else: x = x.squeeze() batch_size = len(x) z = pred_result['latent_dist'].rsample([self.L]) z = z.view(self.L * batch_size, self.latent_size) x = self.prior.rsample([self.L * batch_size * self.latent_size]) x = x.view(self.L * batch_size, self.latent_size) mmd = self.compute_mmd(z, x) loss = mmd - log_lik return dict(loss=loss, kl=mmd, recon_loss=log_lik, **pred_result) def predict(self, x): batch_size = len(x) latent_mu, latent_sigma = self.encoder(x).chunk(2, dim=1) latent_sigma = softplus(latent_sigma) dist = self.dist(latent_mu, latent_sigma) z = dist.rsample([self.L]) z = z.view(self.L * batch_size, self.latent_size) recon_mu, recon_sigma = self.decoder(z).chunk(2, dim=1) recon_mu = recon_mu.view(self.L, *x.shape) recon_sigma = softplus(recon_sigma) recon_sigma = recon_sigma.view(self.L, *x.shape) return dict(latent_dist=dist, latent_mu=latent_mu, latent_sigma=latent_sigma, recon_mu=recon_mu, recon_sigma=recon_sigma, z=z) def reconstructed_probability(self, x): with torch.no_grad(): pred = self.predict(x) recon_dist = self.dist(pred['recon_mu'], pred['recon_sigma']) x = x.unsqueeze(0) return recon_dist.log_prob(x).exp().mean(dim=0).mean(dim=-1) def compute_kernel(self, x, y): x_size = x.shape[0] y_size = y.shape[0] dim = x.shape[1] x = x.view(x_size, 1, dim) y = y.view(1, y_size, dim) tx = x.expand(x_size, y_size, dim) ty = y.expand(x_size, y_size, dim) kernel_input = (tx - ty).pow(2).mean(2)/float(dim) return torch.exp(-kernel_input) def compute_mmd(self, x, y): x_kernel = self.compute_kernel(x, x) y_kernel = self.compute_kernel(y, y) xy_kernel = self.compute_kernel(x, y) return x_kernel.mean() + y_kernel.mean() - 2*xy_kernel.mean()