"""
  Created on Dec. 19, 2024
  @author: wangc, mandd
  Base Class for Anomaly Detection
"""
import abc
from sklearn.base import BaseEstimator
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import RobustScaler
import pandas as pd
import numpy as np
import logging
[docs]
logger = logging.getLogger(__name__) 
[docs]
class AnomalyBase(BaseEstimator):
  """Anomaly detection base class
  """
  def __init__(self, norm='robust'):
    """Constructor
    """
[docs]
    self.print_tag = type(self).__name__ # print Class name 
[docs]
    self.is_fitted = False # True if the model is already fitted 
[docs]
    self._features = None # User provided input data, reformatted into numpy array 
[docs]
    self._targets = None # User provided output data, reformatted into numpy array, the purpose of this data is depended on the algorithm 
[docs]
    self._norm = norm # type of normalization, either 'robust' or 'standard (z-score)' 
    if norm is None:
      logger.info('Use standard scalar (z-score) to transform input data.')
      self._scalar = StandardScaler()
    elif norm.lower() == 'robust':
      logger.info('Use robust scalar to transform input data.')
      self._scalar = RobustScaler()
    else:
      logger.warning('Unrecognized value for param "norm", using default "RobustScalar"')
      self._scalar = RobustScaler()
[docs]
    self._xindex = None # store index for provided input data 
[docs]
    self._yindex = None # store index for provided output data 
[docs]
    self._xcolumns = None # store column names for provided x data 
[docs]
    self._ycolumns = None # store columns names for provided y data 
[docs]
  def reset(self):
    """reset
    """
    self.is_fitted = False
    self._features = None
    self._targets = None
    self._norm = 'robust'
    self._scalar = RobustScaler()
    self._meta = {} 
[docs]
  def get_params(self):
    """
    Get parameters for this estimator.
    Returns
    -------
    params : dict
        Parameter names mapped to their values.
    """
    params = super().get_params()
    return params 
[docs]
  def set_params(self, **params):
    """Set the parameters of this estimator.
    Parameters
    ----------
    **params : dict
        Estimator parameters.
    Returns
    -------
    self : estimator instance
        Estimator instance.
    """
    super().set_params(**params) 
[docs]
  def fit(self, X, y=None):
    """perform fitting
    Args:
        X (array-like): (n_samples, n_features)
        y (array-like, optional): (n_samples, n_features). Defaults to None.
    """
    logger.info('Train model.')
    self.is_fitted = True
    self._features, self._xindex, self._xcolumns = self.check_data(X)
    if y is not None:
      self._targets, self._yindex, self._ycolumns = self.check_data(y)
    X_transform = self._scalar.fit_transform(self._features)
    self._fit(X_transform, y) 
[docs]
  def evaluate(self, X):
    """perform evaluation
    Args:
        X (array-like): (n_samples, n_features)
    """
    logger.info('Perform model forecast.')
    X, index, columns = self.check_data(X)
    assert columns == self._xcolumns, 'Evaluated data should have the same number of columns!'
    X_transform = self._scalar.transform(X)
    self._xindex = np.hstack((self._xindex, index))
    self._features = np.vstack((self._features, X))
    self._evaluate(X_transform) 
[docs]
  def plot(self):
    """plot data
    """ 
  #################################################################################
  # To be implemented in subclasses
  @abc.abstractmethod
[docs]
  def get_anomalies(self):
    """get the anomalies
    """ 
  @abc.abstractmethod
[docs]
  def _fit(self, X, y=None):
    """perform fitting
    Args:
        X (array-like): (n_samples, n_features)
        y (array-like, optional): (n_samples, n_features). Defaults to None.
    """ 
  @abc.abstractmethod
[docs]
  def _evaluate(self, X):
    """perform evaluation
    Args:
        X (array-like): (n_samples, n_features)
    """ 
  @staticmethod
[docs]
  def check_data(data):
    """Check the format of data
    Args:
        data (_type_): list, numpy.ndarray or pandas.DataFrame
    """
    index = None
    columns = None
    data_ = None
    if isinstance(data, list):
      data_ = np.atleast_1d(data)
      if len(data_.shape) == 1:
        data_ = data_.reshape(data_.shape[0], 1)
      index = np.arange(data_.shape[0])
      columns = np.arange(data_.shape[1])
    elif isinstance(data, np.ndarray):
      data_ = data
      if len(data_.shape) == 1:
        data_ = data_.reshape(data_.shape[0], 1)
      index = np.arange(data_.shape[0])
      columns = np.arange(data_.shape[1])
    elif isinstance(data, pd.Series):
      data_ = data.to_numpy().reshape(len(data),1)
      index = data.index
      columns = [data.name] if data.name is not None else [0]
    elif isinstance(data, pd.DataFrame):
      data_ = data.to_numpy()
      index = data.index
      columns = data.columns
    else:
      raise IOError(f'The data with type {type(data)} cannot be accepted, please try to provide data with type of "list, numpy.array, pandas.Series or pandas.DataFrame"!')
    return data_, index, columns