Source code for src.dackar.anomalies.t_score

# Copyright 2024, Battelle Energy Alliance, LLC  ALL RIGHTS RESERVED

"""
Created on July, 2025

@author: wangc, mandd
"""

import numpy as np
import math
import matplotlib.pyplot as plt
import pandas as pd

from sklearn.metrics import pairwise_distances

import logging
[docs] logger = logging.getLogger(__name__)
from .kernel_two_sample_test import kernel_two_sample_test
[docs] def getL(loc,window,array,type): """ Method designed to extract a portion of a time series Args: loc: int, reference location in the time series for the selected portion window: np.array, size of the selected portion of the time series array: np.array, original time series type: string, type of the selected portion of the time series (rear or front) Return: timeseriesPortion: np array, size of the selected portion of the time series """ if type not in ['rear','front']: logger.error('getL method: Specificed type to select portion of time series (' + str (type) + ') is not allowed. Type can be either rear or front.') if isinstance(array, np.ndarray) or isinstance(array, pd.Series): if type=='rear': l = array[loc:loc+window] elif type=='front': l = array[loc-window:loc] else: logger.error('error') else: logger.error('getL method: Provided time series has incorrect data type.') timeseriesPortion = np.asarray(l) return timeseriesPortion
[docs] def omega(window,array,Nsamples): """ Method designed to extract set of portions of a time series Args: window: np.array, size of the selected portion of the time series array: np.array, original time series Nsamples: int, number of portions to be selected Return: omegaSet: np.array, set of Nsamples portions of a time series """ size = len(array) if isinstance(array, np.ndarray) or isinstance(array, list): omegaSet=np.zeros([Nsamples,window]) start = np.random.randint(low=0, high=(size-window), size=Nsamples) for i in range(Nsamples): omegaSet[i,:] = array[start[i]:start[i]+window] elif isinstance(array, pd.Series): omegaSet = [] # pd.Series with Date as index, the length with window is not consistent # omegaSet=np.zeros([Nsamples,len(array[:array.index[0]+window])]) high = len(array[:array.index[-1]-window]) start = np.random.randint(low=0, high=high, size=Nsamples) for i in range(Nsamples): loc = array.index[start[i]] sample = array[loc:loc+window] omegaSet.append(np.asarray(sample)) return omegaSet
[docs] def choice(array,Nsamples): """ Method designed to randomly choose Nsamples out of an array (replace is set to True) Args: array: np.array, array of values to be sampled Nsamples: int, number of elements of array to be selected Return: sampled: np.array, set of Nsamples elements randomly chosen from array """ sampled = np.random.choice(array, size=Nsamples, replace=True) return sampled
[docs] def t_score(array_front, array_rear): """ Method designed to calculate the statistical difference between two arrays using t-test Args: array_front: np.array, first array array_rear: np.array, second array Return: tscore: float, outcome of t-test """ if array_front.size == array_rear.size: n = float(array_front.size) mean_front = np.mean(array_front) var_front = np.var(array_front) mean_rear = np.mean(array_rear) var_rear = np.var(array_rear) tscore = (mean_front-mean_rear)/math.sqrt((var_front+var_rear)/n) else: logger.error("t_score error: the two provided array have different sizes.") tscore = None return tscore
[docs] def MMD_test(test_array, omegaSet, iterations, alphaTest, alphaOmegaset, printFlag=True): """ Method designed to calculate the statistical difference between an array and a set of arrays using the Maximum Mean Discrepancy testing method Args: test_array: np.array, array of values to be tested against omegaSet omegaSet: np.ndarray or list, population of arrays iterations: int, number of iterartions required to compute MMD^2_u alphaTest: float, acceptance value for hypothesis testing (single array testing) alphaOmegaset: float, acceptance value for hypothesis testing (omegaSet testing) printFlag: bool, flag to plot MMD distribution Return: p_value: float, p-value obtained from MMD testing testOutput: bool, logical outcome of MMD testing """ if isinstance(omegaSet, np.ndarray): omegaSetDim = omegaSet.ndim elif isinstance(omegaSet, list): if isinstance(omegaSet[0], np.ndarray): omegaSetDim = len(omegaSet) else: omegaSetDim = 1 if omegaSetDim==1: sigma2 = np.median(pairwise_distances(test_array.reshape(-1, 1), omegaSet.reshape(-1, 1), metric='euclidean'))**2 mmd2u, mmd2u_null, p_value = kernel_two_sample_test(test_array, omegaSet, kernel_function='rbf', iterations=iterations, gamma=1.0/sigma2, verbose=False) # print("Null hypothesis: the two sub-series are generated from the same distribution") if p_value < alphaTest: nullHypothesis = False testOutput = True logger.info(f'The p-value is: {round(p_value, 5)}, which is less than the significant level: {alphaTest}') # print("The null hypothesis got rejected") # print('The two sub-series are generated from two different distributions') else: nullHypothesis = True testOutput = False logger.info(f'The p-value is: {round(p_value, 5)}, which is larger than the significant level: {alphaTest}') # print("The null hypothesis got accepted") # print('The two sub-series are generated from the same distributions') return p_value, testOutput else: omegaSetSize = len(omegaSet) mmd2u_arr = np.zeros(omegaSetSize) mmd2u_null_agg = [] p_value_arr = np.zeros(omegaSetSize) if isinstance(omegaSet, np.ndarray): sigma2 = np.std(omegaSet.ravel())**2 elif isinstance(omegaSet, list): ravelData = [] for data in omegaSet: ravelData.extend(data) sigma2 = np.std(ravelData)**2 for i in range(omegaSetSize): # sigma2 = np.median(pairwise_distances(test_array.reshape(-1, 1), omegaSet[i,:].reshape(-1, 1), metric='euclidean'))**2 if isinstance(omegaSet, np.ndarray): data = omegaSet[i, :] else: data = omegaSet[i] mmd2u_arr[i], mmd2u_null, p_value_arr[i] = kernel_two_sample_test(test_array, data, kernel_function='rbf', iterations=iterations, gamma=1.0/sigma2, verbose=False) mmd2u_null_agg.extend(list(mmd2u_null)) # if printFlag: # fig, (ax1, ax2, ax3) = plt.subplots(nrows=1, ncols=3, figsize=(20, 6)) # ax1.hist(mmd2u_arr, bins=30, alpha=0.35, density=True, color='green', label='mmd2u') # ax1.legend() # ax2.hist(p_value_arr, bins=30, alpha=0.35, density=True, color='red', label='p_value') # ax2.legend() # ax3.scatter(mmd2u_arr, p_value_arr, marker='.') # ax3.set_xlabel('mmd2u') # ax3.set_ylabel('p_value') # plt.show() # the number of rejections for the null hypothesis in two sample testing count = (p_value_arr<alphaTest).sum() # print('Number of rejection ratio for the null hypothesis in two sample testing for two given sub-series is:', count/omegaSetSize) # probability two signals are from the same distribution p_value = 1. - count/omegaSetSize # Null hypothesis: the sub-series are generated from the normal conditions of full time-series # if the probability lower than alphaOmegaset, reject the null hypothesis # print("Null hypothesis: the sub-series are generated from the normal conditions of full time-series") if p_value < alphaOmegaset: nullHypothesis = False testOutput = True logger.info(f"The probability of the sub-series are generated from the normal conditions of full time-series is {p_value}, which is smaller than given test value {alphaOmegaset}") # print("The null hypothesis got rejected") # print('The sub-series is correlated to the anomaly events') else: nullHypothesis = True testOutput = False logger.info(f"The probability of the sub-series are generated from the normal conditions of full time-series is {p_value}, which is greater than given test value {alphaOmegaset}") # print("The null hypothesis got accepted") # print('The sub-series is not correlated to the anomaly events') if printFlag: fig, ax = plt.subplots() bin1 =30 ax.hist(mmd2u_null_agg, bins=bin1, alpha=1.0, density=True, color='red', label='$MMD^2_u$ Null Distribution', stacked=True) bin2 = int((max(mmd2u_arr) - min(mmd2u_arr))/(max(mmd2u_null_agg) - min(mmd2u_null_agg))*bin1) ax.hist(mmd2u_arr, bins=bin2, alpha=0.5, density=True, color='blue' , label='$MMD^2_u$ True Distribution', stacked=True) ax.legend() ax.set_xlabel('$MMD^2_u$') ax.set_ylabel('$p(MMD^2_u)$') ax.set_title('$MMD^2_u$: null-distribution and true distribution: with $p$-value=%s' % round(p_value,5)) plt.show() return p_value, testOutput
[docs] def event2TStest(E_loc, TS, iterations, alphaTest, alphaOmegaset, windowSize, omegaSize, returnMinPval=False): """ Method designed to assess temporal correlation of an event E and a time series TS Args: E_loc: int, temporal location of event E TS: np.array, univariate time series iterations: int, number of iterartions required to compute MMD^2_u alphaTest: float, acceptance value for hypothesis testing (single array testing) alphaOmegaset: float, acceptance value for hypothesis testing (omegaSet testing) windowSize: int, size of time window prior and after event occurence omegaSize: int, number of samples from time series TS returnMinPval: bool, flag that indictae whether to return p-value of MMD assessment Return: relation: string, outcome of the event to timeseries temporal analysis minPval: float, p-value of MMD assessment """ omegaSet = omega(windowSize,TS,omegaSize) L_front = getL(E_loc,windowSize,TS,'front') L_rear = getL(E_loc,windowSize,TS,'rear') L_front_rear = np.concatenate((L_front, L_rear), axis=None) minPval = 1.0 logger.info('=============================================') # print('MMD Testing') logger.info("Computing statitics for Lfront vs. omega") p_val_f , test_out_f = MMD_test(L_front, omegaSet, iterations, alphaTest, alphaOmegaset, False) logger.info('Correlated?', test_out_f, 'p value:', p_val_f) if p_val_f < minPval: minPval = p_val_f logger.info("\nComputing statistics for Lrear vs. omega") p_val_r , test_out_r = MMD_test(L_rear, omegaSet, iterations, alphaTest, alphaOmegaset, False) logger.info('Correlated?', test_out_r, 'p value:', p_val_r) if p_val_r < minPval: minPval = p_val_r logger.info("\nComputing statitics for Lfront vs. Lrear") p_val_3 , test_out_3 = MMD_test(L_rear, L_front, iterations, alphaTest, alphaOmegaset, False) logger.info('Correlated?', test_out_3, 'p value:', p_val_3) logger.info("\nComputing statitics for Lfront u Lrear vs. omega") p_val_4, test_out_4 = MMD_test(L_front_rear, omegaSet, iterations, alphaTest, alphaOmegaset, False) logger.info('Correlated?', test_out_4, 'p value:', p_val_4) if p_val_4 < minPval: minPval = p_val_4 # tscore = t_score(L_front, L_rear) relation = None ############################################### # Old detection logic # # if test_out_3==False or test_out_4==False: # # add more output info here [congjian] # print('S and E are uncorrelated') # relation = 'S and E are uncorrelated' # elif test_out_r==True and test_out_f==False: # print('E --> S') # relation = 'E --> S' # elif test_out_r==False and test_out_f==True: # print('S --> E') # relation = 'S --> E' # elif test_out_r==True and test_out_f==True: # print('S;E') # relation = 'S;E' # ''' # if p_val_f>p_val_r: # print('S -->* E') # else: # print('E -->* S') # ''' # else: # print('S and E are undecided') # relation = 'S and E are undecided' ##################################### # New Detection Logic # if test_out_r==True and test_out_f==False: relation = 'E --> S' elif test_out_r==False and test_out_f==True: relation = 'S --> E' elif test_out_r==True and test_out_f==True: #relation = 'S;E' if p_val_f<p_val_r: relation = 'S -->* E' else: relation = 'E -->* S' else: if test_out_4: relation = 'S;E' else: if test_out_3: # S and E may be correlated relation = 'S?E' else: # S and E are uncorrelated relation = 'S!E' logger.info('Identified relation: ', relation) if returnMinPval: return relation, minPval else: return relation