Source code for pythresh.thresholds.filter

import numpy as np
from scipy import signal
from scipy.ndimage import gaussian_filter

from .base import BaseThresholder
from .thresh_utility import check_scores, cut, normalize


[docs] class FILTER(BaseThresholder): """FILTER class for Filtering based thresholders. Use the filtering based methods to evaluate a non-parametric means to threshold scores generated by the decision_scores where outliers are set to any value beyond the maximum filter value. See :cite:`hashemi2019filter` for details. Parameters ---------- method : {'gaussian', 'savgol', 'hilbert', 'wiener', 'medfilt', 'decimate','detrend', 'resample'}, optional (default='savgol') Method to filter the scores - 'gaussian': use a gaussian based filter - 'savgol': use the savgol based filter - 'hilbert': use the hilbert based filter - 'wiener': use the wiener based filter - 'medfilt: use a median based filter - 'decimate': use a decimate based filter - 'detrend': use a detrend based filter - 'resample': use a resampling based filter sigma : int, optional (default='auto') Variable specific to each filter type, default sets sigma to len(scores)*np.std(scores) - 'gaussian': standard deviation for Gaussian kernel - 'savgol': savgol filter window size - 'hilbert': number of Fourier components - 'medfilt: kernel size - 'decimate': downsampling factor - 'detrend': number of break points - 'resample': resampling window size random_state : int, optional (default=1234) Random seed for the random number generators of the thresholders. Can also be set to None. Attributes ---------- thresh_ : threshold value that separates inliers from outliers dscores_ : 1D array of decomposed decision scores """ def __init__(self, method='savgol', sigma='auto', random_state=1234): super().__init__() self.method = method self.method_funcs = {'gaussian': self._GAU_fltr, 'savgol': self._SAV_fltr, 'hilbert': self._HIL_fltr, 'wiener': self._WIE_fltr, 'medfilt': self._MED_fltr, 'decimate': self._DEC_fltr, 'detrend': self._DET_fltr, 'resample': self._RES_fltr} self.sigma = sigma self.random_state = random_state
[docs] def eval(self, decision): """Outlier/inlier evaluation process for decision scores. Parameters ---------- decision : np.array or list of shape (n_samples) or np.array of shape (n_samples, n_detectors) which are the decision scores from a outlier detection. Returns ------- outlier_labels : numpy array of shape (n_samples,) For each observation, tells whether or not it should be considered as an outlier according to the fitted model. 0 stands for inliers and 1 for outliers. """ decision = check_scores(decision, random_state=self.random_state) decision = normalize(decision) self.dscores_ = decision # Get sigma variables for various applications for each filter sig = (len(decision)*np.std(decision) if self.sigma == 'auto' else self.sigma) # Filter scores fltr = self.method_funcs[str(self.method)](decision, sig) limit = np.max(fltr) self.thresh_ = limit return cut(decision, limit)
def _GAU_fltr(self, decision, sig): """Gaussian filter scores.""" return gaussian_filter(decision, sigma=sig) def _SAV_fltr(self, decision, sig): """Savgol filter scores.""" sig = round(0.5*sig) if self.sigma == 'auto' else sig sig = sig+1 if sig % 2 == 0 else sig return signal.savgol_filter(decision, window_length=round(sig), polyorder=1) def _HIL_fltr(self, decision, sig): """Hilbert filter scores.""" return signal.hilbert(decision, N=round(sig)) def _WIE_fltr(self, decision, sig): """Wiener filter scores.""" return signal.wiener(decision, mysize=len(decision)) def _MED_fltr(self, decision, sig): """Medfilt filter scores.""" sig = round(sig) sig = sig+1 if sig % 2 == 0 else sig return signal.medfilt(decision, kernel_size=[sig]) def _DEC_fltr(self, decision, sig): """Decimate filter scores.""" return signal.decimate(decision, q=round(sig), ftype='fir') def _DET_fltr(self, decision, sig): """Detrend filter scores.""" return signal.detrend(decision, bp=np.linspace(0, len(decision)-1, round(sig)).astype(int)) def _RES_fltr(self, decision, sig): """Resampling filter scores.""" sig = np.sqrt(sig) if self.sigma == 'auto' else sig return signal.resample(decision, num=round(np.sqrt(len(decision))), window=round(sig))