Source code for src.dackar.anomalies.MatrixProfile

import numpy as np
import stumpy
from stumpy import config
import pandas as pd

from .AnomalyBase import AnomalyBase
from .plotUtils import plot_data, plot_kdp

import logging

[docs] logger = logging.getLogger(__name__)
DASK_CLIENT_AVAIL = False try: from dask.distributed import Client
[docs] DASK_CLIENT_AVAIL = True
except ImportError: logger.error('Importing dask.distributed.Client failed. Parallel calculation will not work!') logger.info('Try to install "pip install dask distributed"')
[docs] class MatrixProfile(AnomalyBase): """_summary_ Args: AnomalyBase (_type_): _description_ """ def __init__(self, m, normalize='robust', method='normal', kdp=False, approx_percentage=0.1, sub_sequence_normalize=False, excl_zone_denom=4): """Constructor """ super().__init__(norm=normalize)
[docs] self._m = m # size of slide window
[docs] self._norm = normalize # perform normalization on times series
[docs] self._sub_norm = sub_sequence_normalize # perform normalization on subsequence time series when calculuate matrix profile distance
# setup exclusion zone for stumpy: i +/- int(np.ceil(m/excl_zone_denom)), default excl_zone_denom = 4 # this can avoid nearby subsequences since they are likely highly similar # the distance computed in the exclusion zone is set to np.inf before the matrix profile value is extracted if excl_zone_denom != 4 and isinstance(excl_zone_denom, int): config.STUMPY_EXCL_ZONE_DENOM = excl_zone_denom
[docs] self._mp = {} # store calculated matrix profile values
[docs] self._avail_method = ['normal', 'parallel', 'approx', 'incremental', 'gpu']
if method.lower() not in self._avail_method: raise IOError(f'Unrecognized calculation method: {method}, please choose from {self._avail_method}')
[docs] self._method = method.lower()
[docs] self._scrump_percentage = approx_percentage
if self._method == 'gpu': raise NotImplementedError('Method "gpu" is not implemented yet!')
[docs] self._current_idx = [] # the index for the last entry of current matrix profile
[docs] self._norm_plot = True # Plot normalized data if True
[docs] self._compute_kdp = kdp # Compute KDP profile if True
[docs] self._kdp = {} # Dictionary to store KDP profile data
[docs] def _fit(self, X, y=None): """perform fitting Args: X (pandas.DataFrame): (n_time_steps, n_features) y (pandas.DataFrame, optional): ignored, (n_time_steps, n_features). Defaults to None. """ n_T = X.shape[0] for i, var in enumerate(self._xcolumns): X_ = X[:,i] if y is not None: if y.shape[1] == 1: y_ = y elif y.shape[1] == X.shape[1]: y_ = y[:,i] else: raise IOError('Provided data and annotated data are not aligned! Please check your data.') else: y_ = None self._mp[var] = self._compute_mp(X_, y_) self._current_idx.append(n_T-1) if self._compute_kdp: keys = list(self._mp) mps = [self._mp[key].P_ for key in keys] mps = np.asarray(mps).T kdps = np.sort(mps, axis=1) kdps_idx = np.argsort(mps, axis=1) kdps = np.flip(kdps, axis=1) # flip, sort from max to min kdps_idx = np.flip(kdps_idx, axis=1) colm = [f'{k+1}-DP' for k in range(len(keys))] idx = self._xindex[:kdps.shape[0]] self._kdp['kdp'] = pd.DataFrame(kdps, index=idx, columns=colm) # with dimension (time, variable) self._kdp['idx'] = pd.DataFrame(kdps_idx, index=idx, columns=colm) # with dimension (time, variable) self._kdp['var'] = keys
[docs] def _compute_mp(self, X_, y_=None): """compute matrix profile Args: X_ (pandas.DataFrame): (n_time_steps, n_features) y_ (pandas.DataFrame, optional): ignored, (n_time_steps, n_features). Defaults to None. """ _mp = None if self._method == 'normal': _mp = stumpy.stump(T_A=X_, m=self._m, T_B=y_, normalize=self._sub_norm) elif self._method == 'parallel': if not DASK_CLIENT_AVAIL: raise IOError('Dask is not available, please try to install Dask before use the distributed and parallel implementation') with Client() as dask_client: _mp = stumpy.stumped(dask_client, T_A=X_, m=self._m, T_B=y_, normalize=self._sub_norm) elif self._method == 'approx': _mp = stumpy.scrump(T_A=X_, m=self._m, T_B=y_, percentage = self._scrump_percentage, pre_scrump=True, normalize=self._sub_norm) elif self._method == 'incremental': if y_ is not None: logger.warning('The annotated time series will not be used for incremental calculation!') _mp = stumpy.stumpi(T=X_, m=self._m, egress=False, normalize=self._sub_norm) return _mp
[docs] def _evaluate(self, X): """perform evaluation Args: X_ (pandas.DataFrame): (n_time_steps, n_features) """ self._current_idx.append(X.shape[0]+self._current_idx[-1]) if self._method == 'incremental': for j, var in enumerate(self._xcolumns): for i in range(X.shape[0]): self._mp[var].update(X[i,j]) else: raise NotImplementedError('Evaluate method is not implemented yet!')
[docs] def plot(self): """plot data """ X_transform = self._features.copy() if self._norm_plot: X_transform = self._scalar.fit_transform(self._features) X_transform = pd.DataFrame(X_transform, index=self._xindex, columns=self._xcolumns) return plot_data(X_transform, self._mp) # Input time series data should be stored in pandas.DataFrame
[docs] def plot_kdp(self): """plot data """ return plot_kdp(self._kdp['kdp'])
[docs] def get_mp(self): """get matrix profile value """ data = {} for var, _mp in self._mp.items(): data[var] = _mp.P_ return data
[docs] def get_mp_index(self): """get matrix profile index """ data = {} for var, _mp in self._mp.items(): data[var] = _mp.I_ return data
[docs] def get_mp_left_index(self): """get left matrix profile index """ data = {} for var, _mp in self._mp.items(): data[var] = _mp.left_I_ return data
[docs] def get_mp_right_index(self): """get right matrix profile index """ data = {} for var, _mp in self._mp.items(): data[var] = _mp.right_I_ return data