import numpy as np
import scipy.signal as signal
from scipy import integrate
from .base import BaseThresholder
from .thresh_utility import check_scores, cut, normalize
# https://github.com/geomdata/gda-public/blob/master/timeseries/curve_geometry.pyx
[docs]
class MOLL(BaseThresholder):
r"""MOLL class for Friedrichs' mollifier thresholder.
Use the Friedrichs' mollifier to evaluate a non-parametric means
to threshold scores generated by the decision_scores where outliers
are set to any value beyond one minus the maximum of the smoothed
dataset via convolution. See :cite:`keyzer1997moll` for details.
Parameters
----------
Attributes
----------
thresh_ : threshold value that separates inliers from outliers
dscores_ : 1D array of decomposed decision scores
Notes
-----
Friedrichs' mollifier is a smoothing function that is applied to create sequences
of smooth functions. These functions can be used to approximate generalized functions
that may be non-smooth. The decision scores are assumed to be a part of a generalized
function with a non-smooth nature in terms of the interval space between the scores
respectively. Friedrichs' mollifier is defined by:
.. math::
\varphi(x) = \begin{cases}
Ce^{\frac{1}{\lvert x \rvert^2-1}} & \text{if } \lvert x \rvert < 1 \\
0 & \text{if } \lvert x \rvert \geq 1
\end{cases} \mathrm{,}
where :math:`C` is a normalization constant and :math:`x` is the z-scores of pseudo
scores generated over the same range as the scores but with a smaller step size. The
normalization constant is calculated by:
.. math::
C = \left(\int_{-1}^{1} e^{\frac{1}{(x^2-1)}}\right)^{-1} \mathrm{.}
The mollifier is inserted into a discrete convolution operator and the smoothed
scores are returned. The threshold is set at one minus the maximum of the smoothed
scores.
"""
def __init__(self, random_state=1234):
self.random_state = random_state
[docs]
def eval(self, decision):
"""Outlier/inlier evaluation process for decision scores.
Parameters
----------
decision : np.array or list of shape (n_samples)
or np.array of shape (n_samples, n_detectors)
which are the decision scores from a
outlier detection.
Returns
-------
outlier_labels : numpy array of shape (n_samples,)
For each observation, tells whether or not
it should be considered as an outlier according to the
fitted model. 0 stands for inliers and 1 for outliers.
"""
decision = check_scores(decision, random_state=self.random_state)
decision = normalize(decision)
self.dscores_ = decision
dat_range = np.linspace(0, 1, len(decision))
# Set the inliers to be where the 1-max(smoothed scores)
limit = 1-np.max(self._mollifier(dat_range, np.sort(decision)))
self.thresh_ = limit
return cut(decision, limit)
def _mollifier(self, time, position, refinement=5, width=1.0):
N = len(position)
delta = (time[-1]-time[0])/(N-1)
# compute boundary space padding
left_pad = np.arange(time[0], time[0]-(width+delta), step=-delta)
left_pad = np.flipud(left_pad)[:-1]
left_pad_num = left_pad.shape[0]
right_pad = np.arange(time[-1], time[-1]+(width+delta), step=delta)[1:]
right_pad_num = right_pad.shape[0]
time_pad = np.concatenate((left_pad, time, right_pad))
# compute boundary score padding
position_pad = np.pad(position, (left_pad_num, right_pad_num), 'edge')
# Define a new smaller space scale s, ds (here we a evenly spaced)
s, ds = np.linspace(time_pad[0], time_pad[-1],
(refinement)*time_pad.shape[0],
retstep=True)
right_pad_num = (refinement)*right_pad_num
left_pad_num = (refinement)*left_pad_num
position_interp = np.interp(s, time_pad, position_pad)
# Compute the mollifier kernel
norm_const, err = integrate.quad(
lambda x: np.exp(1.0/(x**2-1.0)), -1.0, 1.0)
norm_const = 1.0/norm_const
# Compute the mollifier rho
p = np.abs((s - (s[0]+s[-1])/2.0)/width)
r = np.zeros_like(s)
q = p[p < 1.0]
r[p < 1.0] = np.exp(1.0/(q**2-1.0))
rho = (norm_const/width)*r
# Perform convolution to make smooth reconstruction
conv_func = signal.fftconvolve if s.shape[0] > 500 else np.convolve
smooth = conv_func(ds*position_interp, rho, mode='same')
# remove padding
s = s[left_pad_num:-right_pad_num]
smooth = smooth[left_pad_num:-(right_pad_num)]
return np.asarray(smooth)