Source code for daisy.federated_learning.threshold_models

# Copyright (C) 2024-2025 DAI-Labor and others
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
"""A collection of extensions to the FederatedModel class to support a wide array of
threshold models, which are used for anomaly detection, mostly for the mapping from
numerical scalar values to binary class labels. For this the most common models
currently are the statistical ones, of which most use the mean (combined with std.
dev.) to compute a singular threshold value for simple classification in online manner.

Author: Fabian Hofmann, Seraphin Zunzer
Modified: 04.04.24
"""

import typing
from abc import ABC, abstractmethod
from collections import deque
from typing import Callable, cast

import numpy as np
import tensorflow as tf
from tensorflow import Tensor

from daisy.federated_learning.federated_model import FederatedModel


[docs] class FederatedTM(FederatedModel, ABC): """Abstract base class for federated threshold models, all of which, to perform predictions, simply compare the current samples (that are in order) to a dynamic threshold that is updated using internal parameters, varying between approaches/implementations. Note this kind of model *absolutely requires* a time series in most cases and therefore data passed to it, must be in order! Note that for the initial value, the threshold is always zero (any point is considered an anomaly during detection). """ _threshold: float | Tensor _reduce_fn: Callable def __init__( self, threshold: float = 0, reduce_fn: Callable[[Tensor], Tensor] = lambda o: o ): """Creates a new threshold model. :param threshold: Init for actual threshold value. :param reduce_fn: Function to reduce a batch of samples into a scalar for training. Defaults to NOP. """ self._threshold = threshold self._reduce_fn = reduce_fn self.update_threshold()
[docs] def set_parameters(self, parameters: list[np.ndarray]): """Updates the internal parameters of the threshold model, before re-computing the actual threshold value based on the internal parameters. Important Note: Must be overridden (and called afterward) when class is implemented (see implementation examples below). :param parameters: Parameters to update the threshold model with. """ self.update_threshold()
[docs] @abstractmethod def get_parameters(self) -> list[np.ndarray]: """Retrieves the internal parameters of the threshold model, actual threshold excluded. :return: Parameters of threshold model. """ raise NotImplementedError
[docs] def fit(self, x_data, y_data=None): """Trains the threshold model with the given data, which must be compatible with the tensorflow API (see: https://www.tensorflow.org/api_docs/python/tf/keras/Model#fit). If the input data contains more than one sample, it is reduced to a singular scalar using a set function, if defined (see: __init__). This is akin to an additional layer of window-based smoothing/averaging, also speeding up the update process. :param x_data: Input data. :param y_data: Expected output, optional since most (simple) threshold models are unsupervised. """ reduced_data = self._reduce_fn(x_data) self.update_threshold(reduced_data)
[docs] def predict(self, x_data) -> Tensor: """Makes a prediction on the given data and returns it, which must be compatible with the tensorflow API (see: https://www.tensorflow.org/api_docs/python/tf/keras/Model#predict ). :param x_data: Input data. :return: Predicted output tensor consisting of bools (0: normal, 1: abnormal). """ return tf.math.greater(x_data, self._threshold)
[docs] @abstractmethod def update_threshold(self, x_data=None): """Updates the internal parameters of the threshold model using a batch of new samples to compute the new threshold value. If none provided, simply re-compute the threshold. :param x_data: Batch of input data. Optional. """ raise NotImplementedError
[docs] class AvgTM(FederatedTM, ABC): """Base class for average-based threshold models, all of which computing their internal threshold values by first computing the mean and standard deviation for the data stream and then adding them together for the (absolute) threshold of the model. This method is very similar to the one employed by Schmidt et al. for the original version of IFTM as well (https://ieeexplore.ieee.org/document/8456348), and was therefore implemented here, however this approach stands not alone for error-based anomaly detection approaches, since it follows simple statistical assumptions for normal distributions (i.e., a sample is considered anomalous if it is further than x-times the std. dev. from the mean of the total population), being similar to average absolute deviation methods (AAD), however using a dynamic threshold value. Any implementation of this class must provide a way to update the mean using new incoming samples, anything else is already taken care of by this base class. Note that many of the implementations are very similar to the ModelAggregator implementations, as both treat the aggregated values as a timeseries. """ _mean: float | Tensor _var: float | Tensor _var_weight: float def __init__( self, mean: float = 0, var: float = 0, var_weight: float = 1.0, reduce_fn: Callable[[Tensor], Tensor] = lambda o: o, ): """Creates a new average-based threshold model. :param mean: Init mean value. :param var: Init variance value. :param var_weight: Weight of the variance compared to the mean value for the threshold value. :param reduce_fn: Function to reduce a batch of samples into a scalar for training. Defaults to NOP. """ self._mean = mean self._var = var self._var_weight = var_weight super().__init__(threshold=0, reduce_fn=reduce_fn)
[docs] def set_parameters(self, parameters: list[np.ndarray]): """Updates the mean and variance of the threshold model, then updates the model (adjusting threshold). Note the nested method arguments --- for the base class the first position of the list is allocated for the statistical values, while extending classes use the list's positions that come after. :param parameters: Mean and variance to update threshold model with. """ self._mean = cast(float, parameters[0][0]) self._var = cast(float, parameters[0][1]) super().set_parameters(parameters)
[docs] def get_parameters(self) -> list[np.ndarray]: """Retrieves the mean and variance of the threshold model, actual threshold excluded. :return: Mean and variance of threshold model. """ return [np.array([self._mean, self._var], dtype=np.float32)]
[docs] def update_threshold(self, x_data=None): """Updates the mean and variance of the threshold model using a batch of new samples to compute the new threshold value. If none provided, simply re-compute the threshold based on the current values. :param x_data: Batch of input data. Optional. """ if x_data is not None: for sample in x_data: d_1 = sample - self._mean self.update_mean(sample) d_2 = sample - self._mean self._var += d_1 * d_2 self._threshold = self._mean + self._var * self._var_weight
[docs] @abstractmethod def update_mean(self, new_sample: float): """Updates the internal mean value with new sample. :param new_sample: New sample of time series. """ raise NotImplementedError
[docs] class CumAvgTM(AvgTM): """Cumulative Averaging is the base version of online averaging, which is equal to the offline average after seeing every instance of the population. Consequently, this aggregator is NOT stable for infinite learning aggregation steps (n). """ _n: int def __init__( self, mean: float = 0, var: float = 0, var_weight: float = 1.0, reduce_fn: Callable[[Tensor], Tensor] = lambda o: o, ): """Creates a new cumulative averaging threshold model. :param mean: Init mean value. :param var: Init variance value. :param var_weight: Weight of the variance compared to the mean value for the threshold value. :param reduce_fn: Function to reduce a batch of samples into a scalar for training. Defaults to NOP. """ self._n = 0 super().__init__(mean=mean, var=var, var_weight=var_weight, reduce_fn=reduce_fn)
[docs] def set_parameters(self, parameters: list[np.ndarray]): """Updates the parameters of the cumulative averaging threshold model, which includes the size of the current population. The rest of the updating is done as described in the superclass' method. :param parameters: Parameters to update threshold model with, with the population number in second place. """ self._n = cast(int, parameters[1][0]) super().set_parameters(parameters)
[docs] def get_parameters(self) -> list[np.ndarray]: """Retrieves the parameters of the cumulative averaging threshold model, actual threshold excluded. :return: Mean and variance (1st) and size of population (2nd) of threshold model. """ return super().get_parameters() + [np.array([self._n], dtype=np.float32)]
[docs] def update_mean(self, new_sample: float): """Updates the cumulative mean value with new sample. :param new_sample: New sample of time series. """ self._n += 1 if self._n == 1: self._mean = new_sample else: delta = new_sample - self._mean self._mean += delta / self._n
[docs] class SMAvgTM(AvgTM): """Simple Moving Averaging, also called sliding window averaging, is the simplest version of online moving averaging, as it uses only the past k elements of the population to compute the average. Note that this really needs a proper datastream/timeseries, as the order of samples influences the average at every step. """ _window: deque _window_size: int def __init__( self, window_size: int = 5, mean: float = 0, var: float = 0, var_weight: float = 1.0, reduce_fn: Callable[[Tensor], Tensor] = lambda o: o, ): """Creates a new simple moving averaging threshold model. :param window_size: Size of sliding window. :param mean: Init mean value. :param var: Init variance value. :param var_weight: Weight of the variance compared to the mean value for the threshold value. :param reduce_fn: Function to reduce a batch of samples into a scalar for training. Defaults to NOP. """ self._window = deque() self._window_size = window_size super().__init__(mean=mean, var=var, var_weight=var_weight, reduce_fn=reduce_fn)
[docs] def set_parameters(self, parameters: list[np.ndarray]): """Updates the parameters of the cumulative averaging threshold model, which includes the current content of the sliding window. The rest of the updating is done as described in the superclass' method. Note: Should only be used with threshold models that support the same window size, otherwise random things might happen. :param parameters: Parameters to update threshold model with sliding window's samples in second place. """ self._window = deque() new_window = parameters[1] for sample in new_window: self._window.append(sample) super().set_parameters(parameters)
[docs] def get_parameters(self) -> list[np.ndarray]: """Retrieves the parameters of the cumulative averaging threshold model, actual threshold excluded. :return: Mean and variance (1st) and sliding window (2nd) of threshold model. """ return super().get_parameters() + [np.array(self._window, dtype=np.float32)]
[docs] def update_mean(self, new_sample: float): """Updates the simple moving average value with new sample, removing the last sample from the sliding window and storing the new one in it, adjusting the mean accordingly. :param new_sample: New sample of time series. """ if len(self._window) == 0: self._mean = new_sample return rm_sample = 0 if len(self._window) >= self._window_size: rm_sample = self._window.popleft() self._window.append(new_sample) delta = new_sample - rm_sample self._mean += delta / len(self._window)
[docs] class EMAvgTM(AvgTM): """Exponential moving averaging takes note of the entire model stream to compute the average, but weights them exponentially less the greater their distance to the present is; this process is also called exponential smoothing. """ _alpha = float def __init__( self, alpha: float = 0.05, mean: float = 0, var: float = 0, var_weight: float = 1.0, reduce_fn: Callable[[Tensor], Tensor] = lambda o: o, ): """Creates a new exponential moving averaging threshold model. :param alpha: Smoothing/weight factor for new incoming values. :param mean: Init mean value. :param var: Init variance value. :param var_weight: Weight of the variance compared to the mean value for the threshold value. :param reduce_fn: Function to reduce a batch of samples into a scalar for training. Defaults to NOP. """ self._alpha = alpha super().__init__(mean=mean, var=var, var_weight=var_weight, reduce_fn=reduce_fn)
[docs] def update_mean(self, new_sample: float): """Updates the exponential moving average value with new sample, weighting past and present as defined. :param new_sample: New sample of time series. """ if self._mean is None: self._mean = new_sample self._mean = self._alpha * new_sample + (1 - self._alpha) * self._mean
[docs] class MadTM(FederatedTM): """Median absolute deviation (MAD)-based threshold models, similar to AAD-based models, assume a symmetric distribution of the time serie's samples, of which a certain percentage are considered anomalous. However, unlike average-based approaches, the median can only computed using a subset of the population when computed online (it cannot be computed online in fact, as this is a property of the median). Furthermore, as with all absolute deviation based models, the actual threshold value is NOT dynamic --- it has to be set manually. Instead, the input is transformed into its modified z-scores, which are then compared to the threshold for binary classification. """ _window: deque _window_size: int def __init__( self, window_size: int = 5, threshold: float = 3.5, reduce_fn: Callable[[Tensor], Tensor] = lambda o: o, ): """Creates a new MAD threshold model. :param window_size: Size of sliding window. :param threshold: Threshold value, for which +-3.5 is often used (Boris Iglewicz and David Hoaglin (1993) in "Volume 16: How to Detect and Handle Outliers") :param reduce_fn: Function to reduce a batch of samples into a scalar for training. Defaults to NOP. """ self._window = deque(maxlen=window_size) super().__init__(threshold=threshold, reduce_fn=reduce_fn)
[docs] def set_parameters(self, parameters: list[np.ndarray]): """Updates the parameters of the MAD threshold model, which includes the current content of the sliding window, before updating the actual model (adjusting threshold). Note: Should only be used with threshold models that support the same window size, otherwise random things might happen. :param parameters: New sample window of time stream. """ self._threshold = typing.cast(parameters[0][0], float) self._window = deque() new_window = parameters[1] for sample in new_window: self._window.append(sample) super().set_parameters(parameters)
[docs] def get_parameters(self) -> list[np.ndarray]: """Retrieves the entire sample window of the MAD threshold model, actual threshold excluded. :return: Current sample window of time stream. """ return [self._threshold, np.array(self._window, dtype=np.float32)]
[docs] def fit(self, x_data, y_data=None): """As it is set upon the creation of the threshold, nothing is to be fitted.""" pass
[docs] def predict(self, x_data) -> Tensor: """Adds the input data to the current sample window of the time stream, removing older samples accordingly, before computing the modified z-scores over all them. Note that only the z-scores of the input is compared to the threshold and returned accordingly. :param x_data: Input data. :return: Predicted output tensor consisting of bools (0: normal, 1: abnormal). """ for sample in x_data: self._window.append(sample) samples = np.array(self._window) m = np.median(samples) ad = np.abs(samples - m) mad = np.median(ad) z_scores = 0.6745 * ad / mad return tf.math.greater(z_scores[-len(x_data) :], self._threshold)
[docs] def update_threshold(self, x_data=None): """As it is set upon the creation of the threshold, nothing is to be updated.""" pass