Source code for pythresh.thresholds.moll

import numpy as np
import scipy.signal as signal
from scipy import integrate

from .base import BaseThresholder
from .thresh_utility import check_scores, cut, normalize

# https://github.com/geomdata/gda-public/blob/master/timeseries/curve_geometry.pyx


[docs] class MOLL(BaseThresholder): r"""MOLL class for Friedrichs' mollifier thresholder. Use the Friedrichs' mollifier to evaluate a non-parametric means to threshold scores generated by the decision_scores where outliers are set to any value beyond one minus the maximum of the smoothed dataset via convolution. See :cite:`keyzer1997moll` for details. Parameters ---------- Attributes ---------- thresh_ : threshold value that separates inliers from outliers dscores_ : 1D array of decomposed decision scores Notes ----- Friedrichs' mollifier is a smoothing function that is applied to create sequences of smooth functions. These functions can be used to approximate generalized functions that may be non-smooth. The decision scores are assumed to be a part of a generalized function with a non-smooth nature in terms of the interval space between the scores respectively. Friedrichs' mollifier is defined by: .. math:: \varphi(x) = \begin{cases} Ce^{\frac{1}{\lvert x \rvert^2-1}} & \text{if } \lvert x \rvert < 1 \\ 0 & \text{if } \lvert x \rvert \geq 1 \end{cases} \mathrm{,} where :math:`C` is a normalization constant and :math:`x` is the z-scores of pseudo scores generated over the same range as the scores but with a smaller step size. The normalization constant is calculated by: .. math:: C = \left(\int_{-1}^{1} e^{\frac{1}{(x^2-1)}}\right)^{-1} \mathrm{.} The mollifier is inserted into a discrete convolution operator and the smoothed scores are returned. The threshold is set at one minus the maximum of the smoothed scores. """ def __init__(self, random_state=1234): self.random_state = random_state
[docs] def eval(self, decision): """Outlier/inlier evaluation process for decision scores. Parameters ---------- decision : np.array or list of shape (n_samples) or np.array of shape (n_samples, n_detectors) which are the decision scores from a outlier detection. Returns ------- outlier_labels : numpy array of shape (n_samples,) For each observation, tells whether or not it should be considered as an outlier according to the fitted model. 0 stands for inliers and 1 for outliers. """ decision = check_scores(decision, random_state=self.random_state) decision = normalize(decision) self.dscores_ = decision dat_range = np.linspace(0, 1, len(decision)) # Set the inliers to be where the 1-max(smoothed scores) limit = 1-np.max(self._mollifier(dat_range, np.sort(decision))) self.thresh_ = limit return cut(decision, limit)
def _mollifier(self, time, position, refinement=5, width=1.0): N = len(position) delta = (time[-1]-time[0])/(N-1) # compute boundary space padding left_pad = np.arange(time[0], time[0]-(width+delta), step=-delta) left_pad = np.flipud(left_pad)[:-1] left_pad_num = left_pad.shape[0] right_pad = np.arange(time[-1], time[-1]+(width+delta), step=delta)[1:] right_pad_num = right_pad.shape[0] time_pad = np.concatenate((left_pad, time, right_pad)) # compute boundary score padding position_pad = np.pad(position, (left_pad_num, right_pad_num), 'edge') # Define a new smaller space scale s, ds (here we a evenly spaced) s, ds = np.linspace(time_pad[0], time_pad[-1], (refinement)*time_pad.shape[0], retstep=True) right_pad_num = (refinement)*right_pad_num left_pad_num = (refinement)*left_pad_num position_interp = np.interp(s, time_pad, position_pad) # Compute the mollifier kernel norm_const, err = integrate.quad( lambda x: np.exp(1.0/(x**2-1.0)), -1.0, 1.0) norm_const = 1.0/norm_const # Compute the mollifier rho p = np.abs((s - (s[0]+s[-1])/2.0)/width) r = np.zeros_like(s) q = p[p < 1.0] r[p < 1.0] = np.exp(1.0/(q**2-1.0)) rho = (norm_const/width)*r # Perform convolution to make smooth reconstruction conv_func = signal.fftconvolve if s.shape[0] > 500 else np.convolve smooth = conv_func(ds*position_interp, rho, mode='same') # remove padding s = s[left_pad_num:-right_pad_num] smooth = smooth[left_pad_num:-(right_pad_num)] return np.asarray(smooth)