Source code for src.dackar.anomalies.kernel_two_sample_test

# Copyright 2024, Battelle Energy Alliance, LLC  ALL RIGHTS RESERVED
"""
Created on July, 2025

@author: wangc, mandd
"""

# Code modified from https://github.com/emanuele/kernel_two_sample_test
import numpy as np
from sys import stdout
from sklearn.metrics import pairwise_kernels

import logging
[docs] logger = logging.getLogger(__name__)
[docs] def MMD2u(K, m, n): """ Method designed to perform MMD^2_u unbiased statistic using U-statistics. Args: K: np.array, 2-D matrix m: int, size of first vector n: int, size of second vector Return: val: float, MMD^2_u unbiased statistic using U-statistics """ Kx = K[:m, :m] Ky = K[m:, m:] Kxy = K[:m, m:] val = 1.0 / (m * (m - 1.0)) * (Kx.sum() - Kx.diagonal().sum()) + \ 1.0 / (n * (n - 1.0)) * (Ky.sum() - Ky.diagonal().sum()) - \ 2.0 / (m * n) * Kxy.sum() return val
[docs] def MMD2b(K, m, n): """ Method designed to perform MMD^2 biased statistics using V-statistics Args: K: np.array, 2-D matrix m: int, size of first vector n: int, size of second vector Return: out: float, MMD^2 biased statistics using V-statistics """ Kx = K[:m, :m] Ky = K[m:, m:] Kxy = K[:m, m:] out = 1.0/(m**2) * Kx.sum() + 1.0/(n**2)*Ky.sum() - 2.0/(m*n)*Kxy.sum() return out
[docs] def MMD2u_UCB(K, m, alpha=0.05): """ Method designed to calculate the uniform convergence bound for MMD2u Args: K: np.array, 2-D matrix m: int, sample size alpha: float, acceptance value for hypothesis testing Return: ucb: float, uniform convergence bound for MMD2u """ assert alpha > 0 and alpha < 1, f'alpha should be within [0,1], but got {alpha}' Kxy = K[:m, m:] maxKxy = np.max((0,np.max(Kxy))) ucb = 4.0*maxKxy/np.sqrt(m) * np.sqrt(np.log(1.0/alpha)) return ucb
[docs] def MMD2b_UCB(K, m, alpha=0.05): """ Method designed to calculate the uniform convergence bound for MMD2b Args: K: np.array, 2-D matrix m: int, sample size alpha: float, acceptance value for hypothesis testing Return: ucb: float, uniform convergence bound for MMD2b """ assert alpha > 0 and alpha < 1, f'alpha should be within [0,1], but got {alpha}' Kxy = K[:m, m:] maxKxy = np.max((0,np.max(Kxy))) ucb = np.sqrt(2.0*maxKxy/m) * (1.0+np.sqrt(2.0*np.log(1.0/alpha))) return ucb
[docs] def compute_null_distribution(K, m, n, iterations=1000, verbose=False, random_state=None, marker_interval=500): """ Method designed to calculate the bootstrap null-distribution of MMD2u. Args: K: np.array, 2-D matrix m: int, size of first vector n: int, size of second vector iterations: int, number of bootstrap iterations verbose: bool, flag to provide calculation details random_state: np class, numpy random number generator class marker_interval: int, interval where calculation details are displayed Return: mmd2u_null: np.array, null-distribution of MMD2u """ if type(random_state) == type(np.random.RandomState()): rng = random_state else: rng = np.random.RandomState(random_state) mmd2u_null = np.zeros(iterations) for i in range(iterations): if verbose and (i % marker_interval) == 0: logger.info(i), stdout.flush() idx = rng.permutation(m+n) K_i = K[idx, idx[:, None]] mmd2u_null[i] = MMD2u(K_i, m, n) return mmd2u_null
[docs] def compute_null_distribution_given_permutations(K, m, n, permutation, iterations=None): """ Method designed to calculate the bootstrap null-distribution of MMD2u given predefined permutations. Args: K: np.array, 2-D matrix m: int, size of first vector n: int, size of second vector permutation: np.array, array of permutations iterations: int, number of bootstrap iterations Return: mmd2u_null: np.array, null-distribution of MMD2u given predefined permutations """ if iterations is None: iterations = len(permutation) mmd2u_null = np.zeros(iterations) for i in range(iterations): idx = permutation[i] K_i = K[idx, idx[:, None]] mmd2u_null[i] = MMD2u(K_i, m, n) return mmd2u_null
[docs] def kernel_two_sample_test(X, Y, kernel_function='rbf', iterations=2000, verbose=False, random_state=None, alpha=0.05, thin=None, **kwargs): """ Method designed to calculate MMD^2_u, its null distribution and the p-value of the kernel two-sample test. Args: X: np.array, first vector Y: np.array, second vector kernel_function: string, type of kenerl function. Valid values are: additive_chi2, chi2, linear, poly, polynomial, rbf, laplacian, sigmoid, cosine iterations: int, number of iterations verbose: bool, flag to provide calculation details random_state: np class, numpy random number generator class alpha: float, acceptance value for hypothesis testing thin: int, sample size for thinning calculation **kwargs: dict, dictionary of parameteres that are passed to pairwise_kernels() as kernel parameters. E.g. if kernel_two_sample_test(..., kernel_function='rbf', gamma=0.1), then this will result in getting the kernel through kernel_function(metric='rbf', gamma=0.1). Return: mmd2u: float, MMD^2_u unbiased statistic using U-statistics mmd2u_null: np.array, null-distribution of MMD2u p_value: float, calculated p-value for hypothesis testing """ if len(X.shape) == 1: X = X.reshape((-1,1)) if len(Y.shape) == 1: Y = Y.reshape((-1,1)) if thin is not None: try: thin = int(thin) except ValueError: logger.error(f'Try to perform thin calculation, but get non-integer value for "thin: {thin}"') thin = None if thin is not None: X = X[0::thin, :] Y = Y[0::thin, :] m = len(X) n = len(Y) XY = np.vstack([X, Y]) K = pairwise_kernels(XY, metric=kernel_function, **kwargs) mmd2u = MMD2u(K, m, n) if verbose: logger.info("MMD^2_u = %s" % mmd2u) logger.info("Computing the null distribution.") ucb = MMD2u_UCB(K, m, alpha=0.05) if verbose: logger.info(f"UCB bound for MMD^2_u is {ucb}") mmd2u_null = compute_null_distribution(K, m, n, iterations, verbose=verbose, random_state=random_state) p_value = max(1.0/iterations, (mmd2u_null > mmd2u).sum() / float(iterations)) if verbose: logger.info("p-value ~= %s \t (resolution : %s)" % (p_value, 1.0/iterations)) if p_value < alpha: logger.info(f"Deviaition from null hyothesis is statistically significant at test level {alpha}, reject null hyothesis, \n \ i.e., the two samples are from different distributions!") return mmd2u, mmd2u_null, p_value
[docs] def chebyshevTesting(X, Y, kernel_function='rbf', iterations=2000, verbose=False, random_state=None, alpha=0.01, **kwargs): """ Method designed to perform Chebyshev testing using Chebyshev's inequality Args: X: np.array, first vector Y: np.array, second vector kernel_function: string, type of kernel function iterations: int, number of bootstrap iterations verbose: bool, flag to provide calculation details random_state: np class, numpy random number generator class alpha: float, acceptance value for hypothesis testing **kwargs: dict, dictionary of parameteres that are passed to pairwise_kernels() as kernel parameters. Return: accept: bool, outcome of Chebyshev testing """ accept = False mmd2u, mmd2u_null, p_value = kernel_two_sample_test(X, Y, kernel_function=kernel_function, iterations=iterations, verbose=verbose, random_state=random_state, alpha=alpha, **kwargs) accept = chebyshevTesting_precomputed_mmd(mmd2u, mmd2u_null, alpha) return accept
[docs] def chebyshevTesting_precomputed_mmd(mmd2u, mmd2u_null, alpha=0.01): """ Method designed to perform MMD Chebyshev testing Args: mmd2u: float, MMD^2_u unbiased statistic using U-statistics mmd2u_null: np.array, null-distribution of MMD2u alpha: float, acceptance value for hypothesis testing Return: accept: bool, outcome of Chebyshev testing """ accept = False mu = mmd2u_null.mean() sigma = np.std(mmd2u_null) nu = 1.0/(np.sqrt(alpha)) diff = abs(mmd2u-mu) if diff < nu * sigma: accept = True logger.info(f"Chebyshev Inequality holds: {diff} < {nu*sigma}") else: accept = False logger.info(f"Chebyshev Inequality does not hold: {diff} > {nu*sigma}") return accept