from itertools import combinations
import numpy as np
import scipy.optimize as opt
import scipy.stats as stats
from scipy.special import digamma
from .base import BaseThresholder
from .thresh_utility import check_scores, cut, normalize
[docs]
class MIXMOD(BaseThresholder):
r"""MIXMOD class for the Normal & Non-Normal Mixture Models thresholder.
Use normal & non-normal mixture models to find a non-parametric means
to threshold scores generated by the decision_scores, where outliers
are set to any value beyond the posterior probability threshold
for equal posteriors of a two distribution mixture model.
See :cite:`veluw2023mixmod` for details
Parameters
----------
method : str, optional (default='mean')
Method to evaluate selecting the best fit mixture model. Default
'mean' sets this as the closest mixture models to the mean of the posterior
probability threshold for equal posteriors of a two distribution mixture model
for all fits. Setting 'ks' uses the two-sample Kolmogorov-Smirnov test for
goodness of fit.
tol : float, optional (default=1e-5)
Tolerance for convergence of the EM fit
max_iter : int, optional (default=250)
Max number of iterations to run EM during fit
random_state : int, optional (default=1234)
Random seed for the random number generators of the thresholders. Can also
be set to None.
Attributes
----------
thresh_ : threshold value that separates inliers from outliers
dscores_ : 1D array of decomposed decision scores
mixture_ : fitted mixture model class of the selected model used for thresholding
Notes
-----
The Normal & Non-Normal Mixture Models thresholder is constructed by searching
all possible two component combinations of the following distributions (normal,
lognormal, uniform, student's t, pareto, laplace, gamma, fisk, and exponential).
Each two component combination mixture is is fit to the data using
expectation-maximization (EM) using the corresponding maximum likelihood estimation
functions (MLE) for each distribution. From this the posterior probability threshold
is obtained as the point where equal posteriors of a two distribution mixture model
exists.
"""
def __init__(self, method='mean', tol=1e-5, max_iter=250, random_state=1234):
dists = [stats.expon, stats.fisk, stats.gamma, stats.laplace, stats.t,
stats.lognorm, stats.norm, stats.uniform, stats.pareto]
self.combs = list(combinations(dists, 2))
self.method = method
self.tol = tol
self.max_iter = max_iter
self.random_state = random_state
[docs]
def eval(self, decision):
"""Outlier/inlier evaluation process for decision scores.
Parameters
----------
decision : np.array or list of shape (n_samples)
or np.array of shape (n_samples, n_detectors)
which are the decision scores from a
outlier detection.
Returns
-------
outlier_labels : numpy array of shape (n_samples,)
For each observation, tells whether or not
it should be considered as an outlier according to the
fitted model. 0 stands for inliers and 1 for outliers.
"""
decision = check_scores(decision, random_state=self.random_state)
decision = normalize(decision)
self.dscores_ = decision
mix_scores = decision + 1
# Create a KDE of the decision scores
points = max(len(decision)*3, 1000)
x = np.linspace(1, 2, points)
if self.method == 'ks':
kde = stats.gaussian_kde(mix_scores, bw_method=0.1)
mixtures = []
scores = []
crossing = []
# Fit all possible combinations of dists to the scores
for comb in self.combs:
mix = MixtureModel([comb[0], comb[1]], self.tol, self.max_iter)
try:
mix.fit(mix_scores)
except Exception:
continue
# Get the posterior probability threshold for equal posteriors
y = mix.posterior(x)
y_diff = np.sign(y[1] - y[0])
crossings = np.where(np.diff(y_diff) != 0)[0]
if len(crossings) == 0:
continue
# Evaluate the fit
if self.method == 'ks':
stat, _ = stats.ks_2samp(kde(x), mix.pdf(x))
else:
stat = x[crossings[-1]]
mixtures.append(mix)
scores.append(stat)
crossing.append(crossings[-1])
# Use the highest fit score
if self.method == 'ks':
max_stat = np.argmax(scores)
else:
diff = np.mean(scores) - np.array(scores)
max_stat = np.argmin(np.abs(diff))
mixture = mixtures[max_stat]
cross = crossing[max_stat]
limit = x[cross] if len(crossing) > 0 else 2
self.thresh_ = limit - 1
self.mixture_ = mixture
return cut(mix_scores, limit)
# This portion of code is derived from the GitHub repository marcsingleton/mixmod
# which is licensed under the MIT License. Copyright (c) 2022 Marc Singleton
class MixtureModel:
"""Class for performing calculations with mixture models."""
def __init__(self, components, tol, max_iter):
params = [{} for _ in components]
weights = [1 / len(components) for _ in components]
self.components = components
self.params = params
self.weights = weights
self.tol = tol
self.max_iter = max_iter
self.converged = False
def fit(self, data):
"""Fit the free parameters of the mixture model with EM algorithm."""
weights_opt = self.weights.copy()
params_opt = []
# Get closed-form estimator for initial estimation
for component, param in zip(self.components, self.params):
cfe = MLES().cfes[component.name]
param_init = {**cfe(data), **param}
params_opt.append(param_init)
ll0 = self._get_loglikelihood(data, self.components,
params_opt, weights_opt)
# Apply Expectation-Maximization
for numiter in range(1, self.max_iter + 1):
expts = self._get_posterior(
data, self.components, params_opt, weights_opt)
weights_opt = expts.sum(axis=1) / expts.sum()
for component, param_opt, expt in zip(self.components, params_opt, expts):
# Get MLE function and update parameters
mle = MLES().mles[component.name]
opt = mle(data, expt=expt, initial=param_opt)
param_opt.update(opt)
ll = self._get_loglikelihood(
data, self.components, params_opt, weights_opt)
# Test numerical exception then convergence
if np.isnan(ll) or np.isinf(ll):
break
if abs(ll - ll0) < self.tol:
self.converged = True
break
ll0 = ll
self.params = params_opt
self.weights = weights_opt.tolist()
return numiter, ll
def loglikelihood(self, data):
"""Return log-likelihood of data according to mixture model."""
return self._get_loglikelihood(data, self.components, self.params, self.weights)
def posterior(self, data):
"""Return array of posterior probabilities of data for each component of mixture model."""
return self._get_posterior(data, self.components, self.params, self.weights)
def pdf(self, x, component='sum'):
"""Return pdf evaluated at x."""
ps = self._get_pdfstack(x, self.components, self.params, self.weights)
return ps.sum(axis=0)
def _get_loglikelihood(self, data, components, params, weights):
"""Return log-likelihood of data according to mixture model."""
p = 0
model_params = zip(components, params, weights)
for component, param, weight in model_params:
pf = getattr(component, 'pdf')
p += weight * pf(data, **param)
return np.log(p).sum()
def _get_posterior(self, data, components, params, weights):
"""Return array of posterior probabilities of data for each component of mixture model."""
ps = self._get_pdfstack(data, components, params, weights)
return ps / ps.sum(axis=0)
def _get_pdfstack(self, data, components, params, weights):
"""Return array of pdfs evaluated at data for each component of mixture model."""
model_params = zip(components, params, weights)
ps = [weight * component.pdf(data, **param)
for component, param, weight in model_params]
return np.stack(ps, axis=0)
class MLES:
"""Class of maximum likelihood estimation functions."""
def __init__(self):
pass
def create_fisk_scale(data, expt=None):
expt = np.full(len(data), 1) if expt is None else expt
def fisk_scale(scale):
# Compute sums
e = expt.sum()
q = ((expt * data) / (scale + data)).sum()
return 2 * q - e
return fisk_scale
def create_fisk_shape(data, expt=None, scale=1):
expt = np.full(len(data), 1) if expt is None else expt
def fisk_shape(c):
# Compute summands
r = data / scale
s = 1 / c + np.log(r) - 2 * np.log(r) * r ** c / (1 + r ** c)
return (expt * s).sum()
return fisk_shape
def create_gamma_shape(data, expt=None):
expt = np.full(len(data), 1) if expt is None else expt
def gamma_shape(a):
# Compute sums
e = expt.sum()
ed = (expt * data).sum()
elogd = (expt * np.log(data)).sum()
return elogd - e * np.log(ed / e) + e * (np.log(a) - digamma(a))
return gamma_shape
def mm_fisk(data, expt=None, **kwargs):
"""Method of moment estimator for a fisk distribution."""
expt = np.full(len(data), 1) if expt is None else expt
ests = {}
# Moments
logdata = np.log(data)
m1 = (logdata * expt).sum() / expt.sum()
m2 = (logdata ** 2 * expt).sum() / expt.sum()
# Estimators
ests['c'] = np.pi / np.sqrt(3 * (m2 - m1 ** 2))
ests['scale'] = np.exp(m1)
return ests
def mm_gamma(data, expt=None, **kwargs):
"""Method of moment estimator for a gamma distribution."""
expt = np.full(len(data), 1) if expt is None else expt
ests = {}
# Moments
m1 = (data * expt).sum() / expt.sum()
m2 = (data ** 2 * expt).sum() / expt.sum()
# Estimators
ests['a'] = m1 ** 2 / (m2 - m1 ** 2)
ests['scale'] = (m2 - m1 ** 2) / m1
return ests
def mle_expon(data, expt=None, **kwargs):
"""MLE for an exponential distribution."""
expt = np.full(len(data), 1) if expt is None else expt
ests = {}
# Scale parameter estimation
e = expt.sum()
ed = (expt * data).sum()
scale = ed / e
ests['scale'] = scale
return ests
def mle_fisk(data, expt=None, initial=None):
"""MLE for a fisk distribution."""
expt = np.full(len(data), 1) if expt is None else expt
initial = MLES.mm_fisk(data) if initial is None else initial
ests = {}
# Scale parameter estimation
fisk_scale = MLES.create_fisk_scale(data, expt)
scale = opt.newton(fisk_scale, initial['scale'])
ests['scale'] = scale
# Shape parameter estimation
fisk_shape = MLES.create_fisk_shape(data, expt, scale)
c = opt.newton(fisk_shape, initial['c'])
ests['c'] = c
return ests
def mle_gamma(data, expt=None, initial=None):
"""MLE for a gamma distribution."""
expt = np.full(len(data), 1) if expt is None else expt
initial = MLES.mm_gamma(data) if initial is None else initial
ests = {}
# Shape parameter estimation
gamma_shape = MLES.create_gamma_shape(data, expt)
try:
a = opt.newton(gamma_shape, initial['a'])
except ValueError:
lower = initial['a'] / 2
upper = initial['a'] * 2
while np.sign(gamma_shape(lower)) == np.sign(gamma_shape(upper)):
lower /= 2
upper *= 2
a = opt.brentq(gamma_shape, lower, upper)
ests['a'] = a
# Scale parameter estimation
scale = (expt * data).sum() / (a * expt.sum())
ests['scale'] = scale
return ests
def mle_laplace(data, expt=None, **kwargs):
"""MLE for a laplace distribution."""
expt = np.full(len(data), 1) if expt is None else expt[data.argsort()]
data = np.sort(data)
ests = {}
# Location parameter estimation
cm = expt.sum() / 2
e_cum = expt.cumsum()
idx = np.argmax(e_cum > cm)
if data[idx] == data[idx - 1]:
loc = data[idx]
else:
m = (e_cum[idx] - e_cum[idx - 1]) / (data[idx] - data[idx - 1])
b = e_cum[idx] - m * data[idx]
loc = (cm - b) / m
ests['loc'] = loc
# Scale parameter estimation
e = expt.sum()
d_abserr = abs(data - loc)
scale = (expt * d_abserr).sum() / e
ests['scale'] = scale
return ests
def mle_lognorm(data, expt=None, **kwargs):
"""MLE for a log-normal distribution."""
expt = np.full(len(data), 1) if expt is None else expt
ests = {}
# Scale parameter estimation
e = expt.sum()
elogd = (expt * np.log(data)).sum()
scale = np.exp(elogd / e)
ests['scale'] = scale
# Shape parameter estimation
e = expt.sum()
logd_sqerr = (np.log(data) - np.log(scale)) ** 2
s = np.sqrt((expt * logd_sqerr).sum() / e)
ests['s'] = s
return ests
def mle_norm(data, expt=None, **kwargs):
"""MLE for a normal distribution."""
expt = np.full(len(data), 1) if expt is None else expt
ests = {}
# Location parameter estimation
e = expt.sum()
ed = (expt * data).sum()
loc = ed / e
ests['loc'] = loc
# Scale parameter estimation
e = expt.sum()
d_sqerr = (data - loc) ** 2
scale = np.sqrt((expt * d_sqerr).sum() / e)
ests['scale'] = scale
return ests
def mle_pareto(data, expt=None, **kwargs):
"""MLE for a pareto distribution."""
expt = np.full(len(data), 1) if expt is None else expt
ests = {}
# Scale parameter estimation
scale = min(data)
ests['scale'] = scale
# Shape parameter estimation
e = expt.sum()
elogd = (expt * np.log(data)).sum()
b = e / (elogd - e * np.log(scale))
ests['b'] = b
return ests
def mle_uniform(data, **kwargs):
"""MLE for a uniform distribution."""
ests = {}
# Location parameter estimation
loc = min(data)
ests['loc'] = loc
# Scale parameter estimation
scale = max(data) - loc
ests['scale'] = scale
return ests
def mle_t(data, expt=None, **kwargs):
"""MLE for an student-t distribution."""
expt = np.full(len(data), 1) if expt is None else expt
ests = {}
# Location parameter estimation
e = expt.sum()
ed = (expt * data).sum()
loc = ed / e
ests['loc'] = loc
# Scale parameter estimation
e = expt.sum()
w_data = data - loc
scale = np.sqrt((expt * w_data**2).sum() / e)
ests['scale'] = scale
# Effective degrees of freedom estimation
w_sum_squares = (expt**2).sum()
w_sum = expt.sum()
df = w_sum**2 / w_sum_squares
ests['df'] = df
return ests
mles = {'expon': mle_expon,
'fisk': mle_fisk,
'gamma': mle_gamma,
'laplace': mle_laplace,
'lognorm': mle_lognorm,
'norm': mle_norm,
'pareto': mle_pareto,
'uniform': mle_uniform,
't': mle_t}
cfes = {**mles,
'fisk': mm_fisk,
'gamma': mm_gamma}