Source code for pythresh.thresholds.cpd

import ruptures as rpt

from .base import BaseThresholder
from .thresh_utility import check_scores, cut, gen_cdf, gen_kde, normalize


[docs] class CPD(BaseThresholder): r"""CPD class for Change Point Detection thresholder. Use change point detection to find a non-parametric means to threshold scores generated by the decision_scores where outliers are set to any value beyond the detected change point. See :cite:`fearnhead2016cpd` for details Parameters ---------- method : {'Dynp', 'KernelCPD', 'Binseg', 'BottomUp'}, optional (default='Dynp') Method for change point detection - 'Dynp': Dynamic programming (optimal minimum sum of errors per partition) - 'KernelCPD': RBF kernel function (optimal minimum sum of errors per partition) - 'Binseg': Binary segmentation - 'BottomUp': Bottom-up segmentation transform : {'cdf', 'kde'}, optional (default='cdf') Data transformation method prior to fit - 'cdf': Use the cumulative distribution function - 'kde': Use the kernel density estimation random_state : int, optional (default=1234) Random seed for the random number generators of the thresholders. Can also be set to None. Attributes ---------- thresh_ : threshold value that separates inliers from outliers dscores_ : 1D array of decomposed decision scores """ def __init__(self, method='Dynp', transform='cdf', random_state=1234): self.method = method self.transform = transform self.method_func = {'Dynp': rpt.Dynp(), 'KernelCPD': rpt.KernelCPD(kernel='rbf'), 'Binseg': rpt.Binseg(), 'BottomUp': rpt.BottomUp()} self.random_state = random_state
[docs] def eval(self, decision): """Outlier/inlier evaluation process for decision scores. Parameters ---------- decision : np.array or list of shape (n_samples) or np.array of shape (n_samples, n_detectors) which are the decision scores from a outlier detection. Returns ------- outlier_labels : numpy array of shape (n_samples,) For each observation, tells whether or not it should be considered as an outlier according to the fitted model. 0 stands for inliers and 1 for outliers. """ decision = check_scores(decision, random_state=self.random_state) decision = normalize(decision) self.dscores_ = decision # Transform data prior to fit if self.transform == 'cdf': val_data, data_range = gen_cdf(decision, 0, 1, len(decision)*3) else: val_data, data_range = gen_kde(decision, 0, 1, len(decision)*3) # Change point detection det = self.method_func[self.method].fit(val_data) change = det.predict(n_bkps=1) # Set limit at change point limit = data_range[change[0]] self.thresh_ = limit return cut(decision, limit)