"""
Created on Dec. 19, 2024
@author: wangc, mandd
Base Class for Anomaly Detection
"""
import abc
from sklearn.base import BaseEstimator
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import RobustScaler
import pandas as pd
import numpy as np
import logging
[docs]
logger = logging.getLogger(__name__)
[docs]
class AnomalyBase(BaseEstimator):
"""Anomaly detection base class
"""
def __init__(self, norm='robust'):
"""Constructor
"""
[docs]
self.print_tag = type(self).__name__ # print Class name
[docs]
self.is_fitted = False # True if the model is already fitted
[docs]
self._features = None # User provided input data, reformatted into numpy array
[docs]
self._targets = None # User provided output data, reformatted into numpy array, the purpose of this data is depended on the algorithm
[docs]
self._norm = norm # type of normalization, either 'robust' or 'standard (z-score)'
if norm is None:
logger.info('Use standard scalar (z-score) to transform input data.')
self._scalar = StandardScaler()
elif norm.lower() == 'robust':
logger.info('Use robust scalar to transform input data.')
self._scalar = RobustScaler()
else:
logger.warning('Unrecognized value for param "norm", using default "RobustScalar"')
self._scalar = RobustScaler()
[docs]
self._xindex = None # store index for provided input data
[docs]
self._yindex = None # store index for provided output data
[docs]
self._xcolumns = None # store column names for provided x data
[docs]
self._ycolumns = None # store columns names for provided y data
[docs]
def reset(self):
"""reset
"""
self.is_fitted = False
self._features = None
self._targets = None
self._norm = 'robust'
self._scalar = RobustScaler()
self._meta = {}
[docs]
def get_params(self):
"""
Get parameters for this estimator.
Returns
-------
params : dict
Parameter names mapped to their values.
"""
params = super().get_params()
return params
[docs]
def set_params(self, **params):
"""Set the parameters of this estimator.
Parameters
----------
**params : dict
Estimator parameters.
Returns
-------
self : estimator instance
Estimator instance.
"""
super().set_params(**params)
[docs]
def fit(self, X, y=None):
"""perform fitting
Args:
X (array-like): (n_samples, n_features)
y (array-like, optional): (n_samples, n_features). Defaults to None.
"""
logger.info('Train model.')
self.is_fitted = True
self._features, self._xindex, self._xcolumns = self.check_data(X)
if y is not None:
self._targets, self._yindex, self._ycolumns = self.check_data(y)
X_transform = self._scalar.fit_transform(self._features)
self._fit(X_transform, y)
[docs]
def evaluate(self, X):
"""perform evaluation
Args:
X (array-like): (n_samples, n_features)
"""
logger.info('Perform model forecast.')
X, index, columns = self.check_data(X)
assert columns == self._xcolumns, 'Evaluated data should have the same number of columns!'
X_transform = self._scalar.transform(X)
self._xindex = np.hstack((self._xindex, index))
self._features = np.vstack((self._features, X))
self._evaluate(X_transform)
[docs]
def plot(self):
"""plot data
"""
#################################################################################
# To be implemented in subclasses
@abc.abstractmethod
[docs]
def get_anomalies(self):
"""get the anomalies
"""
@abc.abstractmethod
[docs]
def _fit(self, X, y=None):
"""perform fitting
Args:
X (array-like): (n_samples, n_features)
y (array-like, optional): (n_samples, n_features). Defaults to None.
"""
@abc.abstractmethod
[docs]
def _evaluate(self, X):
"""perform evaluation
Args:
X (array-like): (n_samples, n_features)
"""
@staticmethod
[docs]
def check_data(data):
"""Check the format of data
Args:
data (_type_): list, numpy.ndarray or pandas.DataFrame
"""
index = None
columns = None
data_ = None
if isinstance(data, list):
data_ = np.atleast_1d(data)
if len(data_.shape) == 1:
data_ = data_.reshape(data_.shape[0], 1)
index = np.arange(data_.shape[0])
columns = np.arange(data_.shape[1])
elif isinstance(data, np.ndarray):
data_ = data
if len(data_.shape) == 1:
data_ = data_.reshape(data_.shape[0], 1)
index = np.arange(data_.shape[0])
columns = np.arange(data_.shape[1])
elif isinstance(data, pd.Series):
data_ = data.to_numpy().reshape(len(data),1)
index = data.index
columns = [data.name] if data.name is not None else [0]
elif isinstance(data, pd.DataFrame):
data_ = data.to_numpy()
index = data.index
columns = data.columns
else:
raise IOError(f'The data with type {type(data)} cannot be accepted, please try to provide data with type of "list, numpy.array, pandas.Series or pandas.DataFrame"!')
return data_, index, columns