import ruptures as rpt
from .base import BaseThresholder
from .thresh_utility import check_scores, cut, gen_cdf, gen_kde, normalize
[docs]
class CPD(BaseThresholder):
r"""CPD class for Change Point Detection thresholder.
Use change point detection to find a non-parametric means
to threshold scores generated by the decision_scores where outliers
are set to any value beyond the detected change point.
See :cite:`fearnhead2016cpd` for details
Parameters
----------
method : {'Dynp', 'KernelCPD', 'Binseg', 'BottomUp'}, optional (default='Dynp')
Method for change point detection
- 'Dynp': Dynamic programming (optimal minimum sum of errors per partition)
- 'KernelCPD': RBF kernel function (optimal minimum sum of errors per partition)
- 'Binseg': Binary segmentation
- 'BottomUp': Bottom-up segmentation
transform : {'cdf', 'kde'}, optional (default='cdf')
Data transformation method prior to fit
- 'cdf': Use the cumulative distribution function
- 'kde': Use the kernel density estimation
random_state : int, optional (default=1234)
Random seed for the random number generators of the thresholders. Can also
be set to None.
Attributes
----------
thresh_ : threshold value that separates inliers from outliers
dscores_ : 1D array of decomposed decision scores
"""
def __init__(self, method='Dynp', transform='cdf', random_state=1234):
self.method = method
self.transform = transform
self.method_func = {'Dynp': rpt.Dynp(), 'KernelCPD': rpt.KernelCPD(kernel='rbf'),
'Binseg': rpt.Binseg(), 'BottomUp': rpt.BottomUp()}
self.random_state = random_state
[docs]
def eval(self, decision):
"""Outlier/inlier evaluation process for decision scores.
Parameters
----------
decision : np.array or list of shape (n_samples)
or np.array of shape (n_samples, n_detectors)
which are the decision scores from a
outlier detection.
Returns
-------
outlier_labels : numpy array of shape (n_samples,)
For each observation, tells whether or not
it should be considered as an outlier according to the
fitted model. 0 stands for inliers and 1 for outliers.
"""
decision = check_scores(decision, random_state=self.random_state)
decision = normalize(decision)
self.dscores_ = decision
# Transform data prior to fit
if self.transform == 'cdf':
val_data, data_range = gen_cdf(decision, 0, 1, len(decision)*3)
else:
val_data, data_range = gen_kde(decision, 0, 1, len(decision)*3)
# Change point detection
det = self.method_func[self.method].fit(val_data)
change = det.predict(n_bkps=1)
# Set limit at change point
limit = data_range[change[0]]
self.thresh_ = limit
return cut(decision, limit)