Federated Learning

Aggregator

To allow the compatibility between different kinds of strategies for the aggregation of models, gradients, losses, or other parameters/model types and topologies for the federated distributed system, any strategy has to follow the same simple interface. This module provides this abstract class and some sample aggregators to be used.

class daisy.federated_learning.federated_aggregator.Aggregator[source]

Bases: ABC

Abstract aggregator class. Must always be implemented if a new aggregation strategy is used in the federated system.

By implementing the aggregate function, allows simple aggregation of values using the object() call directly.

abstractmethod aggregate(*args, **kwargs) object[source]

Calculates the aggregated values from the underlying object attributes and potential given parameters and returns them.

Parameters:
  • args – Arguments. Depends on the type of aggregator to be implemented.

  • kwargs – Keyword arguments. Depends on the type of aggregator to be

implemented. :return: Aggregated values.

class daisy.federated_learning.federated_aggregator.CumAggregator[source]

Bases: ModelAggregator

Cumulative Averaging (FedCum) is the base version of online averaging for models into a “global” version of the model. The cumulative average is computed by first computing the FedAvg over the batch of models provided to the aggregator in a single step, before using that average to update the average (i.e., all models in a batch are treated equally).

This aggregator is NOT stable for infinite learning aggregation steps (n).

This kind of aggregation is also probably not compatible with the aggregation of gradients or losses, as these averages cannot be computed over various epochs overall.

aggregate(models_parameters: list[list[ndarray]]) list[ndarray][source]

Calculates the cumulative average for the given models’ parameters and the previous cumulative average and returns it.

Parameters:

models_parameters – A list of models’ parameters the same type.

Returns:

Aggregated model parameters.

class daisy.federated_learning.federated_aggregator.EMAggregator(alpha: float = 0.05)[source]

Bases: ModelAggregator

Exponential moving averaging (FedEMA) unlike FedSMA takes note of the entire model stream to compute the average, but over multiple time steps starts to forget past elements; this process is also called exponential smoothing, as the past is exponentially less relevant to the current time step the further one goes back. As with other online model averaging aggregators, the regular FedAvg is first computed over the batch of modes provided in a single step, before using that average to update the moving average (i.e., all models in a batch are treated equally).

This kind of aggregation is also probably not compatible with the aggregation of gradients or losses, as these averages cannot be computed over various epochs overall.

aggregate(models_parameters: list[list[ndarray]]) list[ndarray][source]

Calculates the exponential moving average for the given models’ parameters and returns it

Parameters:

models_parameters – A list of models’ parameters the same type.

Returns:

Aggregated model parameters.

class daisy.federated_learning.federated_aggregator.FedAvgAggregator[source]

Bases: ModelAggregator

Federated Averaging (FedAvg) is the simplest way to aggregate any number of models into a “global” model version, using only their weights (no internal state is kept); simply computing their average and using that as a result.

Note this could also be done using gradients or losses or any other form of parameters from other types of models, however this was implemented with the federated_model.TFFederatedModel and its parameters in mind.

aggregate(models_parameters: list[list[ndarray]]) list[ndarray][source]

Calculates the average for the given models’ parameters and returns it, using Welford’s algorithm. Note this could be further optimized using Kahan–Babuška summation for reduction of floating point errors.

Parameters:

models_parameters – A list of models’ parameters the same type.

Returns:

Aggregated model parameters.

class daisy.federated_learning.federated_aggregator.ModelAggregator[source]

Bases: Aggregator

Abstract model aggregator class, that uses the parameters of the given models and some internal states (of those parameters) to compute the aggregated state of the model. Must always be implemented if a new model aggregation strategy is used in the federated system.

abstractmethod aggregate(models_parameters: list[list[ndarray]]) list[ndarray][source]

Calculates the aggregated values from the underlying object attributes and given model parameters and returns them.

Parameters:

models_parameters – A list of models’ parameters the same type.

Returns:

Aggregated model parameters.

class daisy.federated_learning.federated_aggregator.SMAggregator(window_size: int = 5)[source]

Bases: ModelAggregator

Simple Moving Averaging, also called sliding window averaging (FedSMA) is the simplest version of online moving averaging for models, as it uses only the past k elements of the stream to compute the average. As with other online model averaging aggregators, the regular FedAvg is first computed over the batch of modes provided in a single step, before using that average to update the moving average (i.e., all models in a batch are treated equally).

This kind of aggregation is also probably not compatible with the aggregation of gradients or losses, as these averages cannot be computed over various epochs overall.

aggregate(models_parameters: list[list[ndarray]]) list[ndarray][source]

Calculates the simple moving average for the given models’ parameters by averaging them first and then adding them to the window. Afterward, the delta between the new added values and the first (to be removed) value from the window is computed to aggregate the new moving average.

Parameters:

models_parameters – A list of models’ parameters the same type.

Returns:

Aggregated model parameters.

Federated Models

A collection of various types of model wrappers, implementing the same interface for each federated model type, thus enabling their inter-compatibility for different aggregation strategies and federated system components.

class daisy.federated_learning.federated_model.FederatedIFTM(identify_fn: FederatedModel, threshold_m: FederatedModel, error_fn: Callable[[Tensor, Tensor], Tensor], pf_mode: bool = False)[source]

Bases: FederatedModel

Double union of two federated models, following the IFTM hybrid model approach — identify function threshold model principle by Schmidt et al. (https://ieeexplore.ieee.org/document/8456348): One for the computation of the identity of a given sample (alternatively prediction of the next sample), while the other maps the error/loss using a threshold(-model) to the binary class labels for anomaly detection. Both are generic federated models, so any approach can be used for them, as long as they abide by the required properties:

  • Identity Function: Computes the identities of given data points. Can be

replaced with a prediction function. * Error Function: Computes the reconstruction/prediction error of one or multiple samples to a scalar (each). * Threshold Model: Maps the scalar to binary class labels.

Note this kind of model absolutely requires a time series in most cases and therefore data passed to it, must be in order!

fit(x_data, y_data=None)[source]

Trains the IFTM model with the given data by calling the wrapped models; first the IF to make a prediction, after which the error can be computed for the fitting of the TM. Afterward, the IF is fitted. Note that one can run IFTM in supervised mode by providing the true classes of each sample — however most TMs require only the input data.

Note that in case of an underlying prediction function (instead of a regular IF), the window is shifted by one step into the past, i.e. the final sample is only used to compute a prediction error, but not make a prediction, and it is stored for the next fitting step.

Note this kind of model absolutely requires a time series in most cases and therefore data passed to it, must be in order! Also for the first step in a time series, it is impossible to compute a prediction error, since there is no previous sample to compare it to.

Parameters:
  • x_data – Input data.

  • y_data – Expected output, optional since default IFTM is fully

unsupervised.

get_parameters() list[ndarray][source]

Retrieves the weights of the underlying models.

Returns:

Concatenated weight lists of the two models.

predict(x_data) Tensor | None[source]

Makes a prediction on the given data and returns it by calling the wrapped models; first the IF to make a prediction, after which the error can be computed for the final prediction step using the TM.

Note that in case of an underlying prediction function (instead of a regular IF), the window is shifted by one step into the past, i.e. the final sample is only used to compute a prediction error, but not make a prediction, and it is stored for the next prediction step.

Note this kind of model absolutely requires a time series in most cases and therefore data passed to it, must be in order! Also for the first step in a time series, there is no previous sample to compare it to, therefore the sample is automatically classified as the default class (0, i.e. normal)

Parameters:

x_data – Input data.

Returns:

Predicted output tensor consisting of bools (0: normal, 1: abnormal).

set_parameters(parameters: list[ndarray])[source]

Updates the internal parameters of the two underlying models by splitting the parameter lists as previously defined.

Parameters:

parameters – Parameters to update the IFTM model with.

class daisy.federated_learning.federated_model.FederatedModel[source]

Bases: ABC

An abstract model wrapper that offers the same methods, no matter the type of underlying model. Must always be implemented if a new model type is to be used in the federated system.

abstractmethod fit(x_data, y_data)[source]

Trains the model with the given data, which must be compatible with the tensorflow API (see: https://www.tensorflow.org/api_docs/python/tf/keras/Model#fit).

Parameters:
  • x_data – Input data.

  • y_data – Expected output.

abstractmethod get_parameters() list[ndarray][source]

Retrieves the internal parameters of the model.

Returns:

Parameters of model.

abstractmethod predict(x_data) Tensor[source]

Makes a prediction on the given data and returns it, which must be compatible with the tensorflow API (see: https://www.tensorflow.org/api_docs/python/tf/keras/Model#predict).

Parameters:

x_data – Input data.

Returns:

Predicted output tensor.

abstractmethod set_parameters(parameters: list[ndarray])[source]

Updates the internal parameters of the model.

Parameters:

parameters – Parameters to update the model with.

class daisy.federated_learning.federated_model.TFFederatedModel(model: Model, optimizer: str | Optimizer, loss: str | Loss, metrics: list[str | Callable | Metric] = None, batch_size: int = 32, epochs: int = 1)[source]

Bases: FederatedModel

The standard federated model wrapper for tensorflow models. Can be used for both online and offline training/ prediction, by default online, however.

fit(x_data, y_data)[source]

Trains the model with the given data by calling the wrapped model, which must be compatible with the tensorflow API (see: https://www.tensorflow.org/api_docs/python/tf/keras/Model#fit).

Parameters:
  • x_data – Input data.

  • y_data – Expected output.

classmethod get_fae(input_size: int, optimizer: str | Optimizer = 'Adam', loss: str | Loss = 'mse', metrics: list[str | Callable | Metric] = None, batch_size: int = 32, epochs: int = 1) Self[source]

Factory class method to create a simple federated autoencoder model of a fixed depth but with variable input size.

Should only serve as a quick and basic setup for a model.

Parameters:
  • input_size – Dimensionality of input/output of autoencoder.

  • optimizer – Optimizer to use during training.

  • loss – Loss function to use during training.

  • metrics – Evaluation metrics to be displayed during training and testing.

  • batch_size – Batch size during training and prediction.

  • epochs – Number of epochs (rounds) during training.

Returns:

Initialized federated autoencoder model.

get_parameters() list[ndarray][source]

Retrieves the weights of the underlying model.

Returns:

Weights of the model.

predict(x_data) Tensor[source]

Makes a prediction on the given data and returns it by calling the wrapped model. Uses the call() tensorflow model interface for small numbers of data points, which must be compatible with the tensorflow API (see: https://www.tensorflow.org/api_docs/python/tf/keras/Model#predict).

Parameters:

x_data – Input data.

Returns:

Predicted output tensor.

set_parameters(parameters: list[ndarray])[source]

Updates the weights of the underlying model with new ones.

Parameters:

parameters – Weights to update the model with.

Threshold Models

A collection of extensions to the FederatedModel class to support a wide array of threshold models, which are used for anomaly detection, mostly for the mapping from numerical scalar values to binary class labels. For this the most common models currently are the statistical ones, of which most use the mean (combined with std. dev.) to compute a singular threshold value for simple classification in online manner.

class daisy.federated_learning.threshold_models.AvgTM(mean: float = 0, var: float = 0, var_weight: float = 1.0, reduce_fn: ~typing.Callable[[~tensorflow.python.framework.tensor.Tensor], ~tensorflow.python.framework.tensor.Tensor] = <function AvgTM.<lambda>>)[source]

Bases: FederatedTM, ABC

Base class for average-based threshold models, all of which computing their internal threshold values by first computing the mean and standard deviation for the data stream and then adding them together for the (absolute) threshold of the model. This method is very similar to the one employed by Schmidt et al. for the original version of IFTM as well (https://ieeexplore.ieee.org/document/8456348), and was therefore implemented here, however this approach stands not alone for error-based anomaly detection approaches, since it follows simple statistical assumptions for normal distributions (i.e., a sample is considered anomalous if it is further than x-times the std. dev. from the mean of the total population), being similar to average absolute deviation methods (AAD), however using a dynamic threshold value.

Any implementation of this class must provide a way to update the mean using new incoming samples, anything else is already taken care of by this base class.

Note that many of the implementations are very similar to the ModelAggregator implementations, as both treat the aggregated values as a timeseries.

get_parameters() list[ndarray][source]

Retrieves the mean and variance of the threshold model, actual threshold excluded.

Returns:

Mean and variance of threshold model.

set_parameters(parameters: list[ndarray])[source]

Updates the mean and variance of the threshold model, then updates the model (adjusting threshold).

Note the nested method arguments — for the base class the first position of the list is allocated for the statistical values, while extending classes use the list’s positions that come after.

Parameters:

parameters – Mean and variance to update threshold model with.

abstractmethod update_mean(new_sample: float)[source]

Updates the internal mean value with new sample.

Parameters:

new_sample – New sample of time series.

update_threshold(x_data=None)[source]

Updates the mean and variance of the threshold model using a batch of new samples to compute the new threshold value. If none provided, simply re-compute the threshold based on the current values.

Parameters:

x_data – Batch of input data. Optional.

class daisy.federated_learning.threshold_models.CumAvgTM(mean: float = 0, var: float = 0, var_weight: float = 1.0, reduce_fn: ~typing.Callable[[~tensorflow.python.framework.tensor.Tensor], ~tensorflow.python.framework.tensor.Tensor] = <function CumAvgTM.<lambda>>)[source]

Bases: AvgTM

Cumulative Averaging is the base version of online averaging, which is equal to the offline average after seeing every instance of the population.

Consequently, this aggregator is NOT stable for infinite learning aggregation steps (n).

get_parameters() list[ndarray][source]

Retrieves the parameters of the cumulative averaging threshold model, actual threshold excluded.

Returns:

Mean and variance (1st) and size of population (2nd) of threshold

model.

set_parameters(parameters: list[ndarray])[source]

Updates the parameters of the cumulative averaging threshold model, which includes the size of the current population. The rest of the updating is done as described in the superclass’ method.

Parameters:

parameters – Parameters to update threshold model with, with the

population number in second place.

update_mean(new_sample: float)[source]

Updates the cumulative mean value with new sample.

Parameters:

new_sample – New sample of time series.

class daisy.federated_learning.threshold_models.EMAvgTM(alpha: float = 0.05, mean: float = 0, var: float = 0, var_weight: float = 1.0, reduce_fn: ~typing.Callable[[~tensorflow.python.framework.tensor.Tensor], ~tensorflow.python.framework.tensor.Tensor] = <function EMAvgTM.<lambda>>)[source]

Bases: AvgTM

Exponential moving averaging takes note of the entire model stream to compute the average, but weights them exponentially less the greater their distance to the present is; this process is also called exponential smoothing.

update_mean(new_sample: float)[source]

Updates the exponential moving average value with new sample, weighting past and present as defined.

Parameters:

new_sample – New sample of time series.

class daisy.federated_learning.threshold_models.FederatedTM(threshold: float = 0, reduce_fn: ~typing.Callable[[~tensorflow.python.framework.tensor.Tensor], ~tensorflow.python.framework.tensor.Tensor] = <function FederatedTM.<lambda>>)[source]

Bases: FederatedModel, ABC

Abstract base class for federated threshold models, all of which, to perform predictions, simply compare the current samples (that are in order) to a dynamic threshold that is updated using internal parameters, varying between approaches/implementations.

Note this kind of model absolutely requires a time series in most cases and therefore data passed to it, must be in order!

Note that for the initial value, the threshold is always zero (any point is considered an anomaly during detection).

fit(x_data, y_data=None)[source]

Trains the threshold model with the given data, which must be compatible with the tensorflow API (see: https://www.tensorflow.org/api_docs/python/tf/keras/Model#fit). If the input data contains more than one sample, it is reduced to a singular scalar using a set function, if defined (see: __init__). This is akin to an additional layer of window-based smoothing/averaging, also speeding up the update process.

Parameters:
  • x_data – Input data.

  • y_data – Expected output, optional since most (simple) threshold models

are unsupervised.

abstractmethod get_parameters() list[ndarray][source]

Retrieves the internal parameters of the threshold model, actual threshold excluded.

Returns:

Parameters of threshold model.

predict(x_data) Tensor[source]

Makes a prediction on the given data and returns it, which must be compatible with the tensorflow API (see: https://www.tensorflow.org/api_docs/python/tf/keras/Model#predict ).

Parameters:

x_data – Input data.

Returns:

Predicted output tensor consisting of bools (0: normal, 1: abnormal).

set_parameters(parameters: list[ndarray])[source]

Updates the internal parameters of the threshold model, before re-computing the actual threshold value based on the internal parameters.

Important Note: Must be overridden (and called afterward) when class is implemented (see implementation examples below).

Parameters:

parameters – Parameters to update the threshold model with.

abstractmethod update_threshold(x_data=None)[source]

Updates the internal parameters of the threshold model using a batch of new samples to compute the new threshold value. If none provided, simply re-compute the threshold.

Parameters:

x_data – Batch of input data. Optional.

class daisy.federated_learning.threshold_models.MadTM(window_size: int = 5, threshold: float = 3.5, reduce_fn: ~typing.Callable[[~tensorflow.python.framework.tensor.Tensor], ~tensorflow.python.framework.tensor.Tensor] = <function MadTM.<lambda>>)[source]

Bases: FederatedTM

Median absolute deviation (MAD)-based threshold models, similar to AAD-based models, assume a symmetric distribution of the time serie’s samples, of which a certain percentage are considered anomalous. However, unlike average-based approaches, the median can only computed using a subset of the population when computed online (it cannot be computed online in fact, as this is a property of the median). Furthermore, as with all absolute deviation based models, the actual threshold value is NOT dynamic — it has to be set manually. Instead, the input is transformed into its modified z-scores, which are then compared to the threshold for binary classification.

fit(x_data, y_data=None)[source]

As it is set upon the creation of the threshold, nothing is to be fitted.

get_parameters() list[ndarray][source]

Retrieves the entire sample window of the MAD threshold model, actual threshold excluded.

Returns:

Current sample window of time stream.

predict(x_data) Tensor[source]

Adds the input data to the current sample window of the time stream, removing older samples accordingly, before computing the modified z-scores over all them. Note that only the z-scores of the input is compared to the threshold and returned accordingly.

Parameters:

x_data – Input data.

Returns:

Predicted output tensor consisting of bools (0: normal, 1: abnormal).

set_parameters(parameters: list[ndarray])[source]

Updates the parameters of the MAD threshold model, which includes the current content of the sliding window, before updating the actual model (adjusting threshold).

Note: Should only be used with threshold models that support the same window size, otherwise random things might happen.

Parameters:

parameters – New sample window of time stream.

update_threshold(x_data=None)[source]

As it is set upon the creation of the threshold, nothing is to be updated.

class daisy.federated_learning.threshold_models.SMAvgTM(window_size: int = 5, mean: float = 0, var: float = 0, var_weight: float = 1.0, reduce_fn: ~typing.Callable[[~tensorflow.python.framework.tensor.Tensor], ~tensorflow.python.framework.tensor.Tensor] = <function SMAvgTM.<lambda>>)[source]

Bases: AvgTM

Simple Moving Averaging, also called sliding window averaging, is the simplest version of online moving averaging, as it uses only the past k elements of the population to compute the average. Note that this really needs a proper datastream/timeseries, as the order of samples influences the average at every step.

get_parameters() list[ndarray][source]

Retrieves the parameters of the cumulative averaging threshold model, actual threshold excluded.

Returns:

Mean and variance (1st) and sliding window (2nd) of threshold model.

set_parameters(parameters: list[ndarray])[source]

Updates the parameters of the cumulative averaging threshold model, which includes the current content of the sliding window. The rest of the updating is done as described in the superclass’ method.

Note: Should only be used with threshold models that support the same window size, otherwise random things might happen.

Parameters:

parameters – Parameters to update threshold model with sliding window’s

samples in second place.

update_mean(new_sample: float)[source]

Updates the simple moving average value with new sample, removing the last sample from the sliding window and storing the new one in it, adjusting the mean accordingly.

Parameters:

new_sample – New sample of time series.