Source code for daisy.federated_learning.federated_model

# Copyright (C) 2024-2025 DAI-Labor and others
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
"""A collection of various types of model wrappers, implementing the same interface
for each federated model type, thus enabling their inter-compatibility for different
aggregation strategies and federated system components.

Author: Fabian Hofmann
Modified: 04.04.24
"""

from abc import ABC, abstractmethod
from typing import Callable, Optional, Self

import numpy as np
import tensorflow as tf
from tensorflow import Tensor
from tensorflow import keras


[docs] class FederatedModel(ABC): """An abstract model wrapper that offers the same methods, no matter the type of underlying model. Must always be implemented if a new model type is to be used in the federated system. """
[docs] @abstractmethod def set_parameters(self, parameters: list[np.ndarray]): """Updates the internal parameters of the model. :param parameters: Parameters to update the model with. """ raise NotImplementedError
[docs] @abstractmethod def get_parameters(self) -> list[np.ndarray]: """Retrieves the internal parameters of the model. :return: Parameters of model. """ raise NotImplementedError
[docs] @abstractmethod def fit(self, x_data, y_data): """Trains the model with the given data, which must be compatible with the tensorflow API (see: https://www.tensorflow.org/api_docs/python/tf/keras/Model#fit). :param x_data: Input data. :param y_data: Expected output. """ raise NotImplementedError
[docs] @abstractmethod def predict(self, x_data) -> Tensor: """Makes a prediction on the given data and returns it, which must be compatible with the tensorflow API (see: https://www.tensorflow.org/api_docs/python/tf/keras/Model#predict). :param x_data: Input data. :return: Predicted output tensor. """ raise NotImplementedError
[docs] class TFFederatedModel(FederatedModel): """The standard federated model wrapper for tensorflow models. Can be used for both online and offline training/ prediction, by default online, however. """ _model: keras.Model _batch_size: int _epochs: int def __init__( self, model: keras.Model, optimizer: str | keras.optimizers.Optimizer, loss: str | keras.losses.Loss, metrics: list[str | Callable | keras.metrics.Metric] = None, batch_size: int = 32, epochs: int = 1, ): """Creates a new tensorflow federated model from a given model. This also compiles the given model, requiring a set of additional arguments (see: https://www.tensorflow.org/api_docs/python/tf/keras/Model#compile). :param model: Underlying model to be wrapped around. :param optimizer: Optimizer to use during training. :param loss: Loss function to use during training. :param metrics: Evaluation metrics to be displayed during training and testing. :param batch_size: Batch size during training and prediction. :param epochs: Number of epochs (rounds) during training. """ self._model = model self._model.compile(optimizer=optimizer, loss=loss, metrics=metrics) self._batch_size = batch_size self._epochs = epochs
[docs] def set_parameters(self, parameters: list[np.ndarray]): """Updates the weights of the underlying model with new ones. :param parameters: Weights to update the model with. """ self._model.set_weights(parameters)
[docs] def get_parameters(self) -> list[np.ndarray]: """Retrieves the weights of the underlying model. :return: Weights of the model. """ return self._model.get_weights()
[docs] def fit(self, x_data, y_data): """Trains the model with the given data by calling the wrapped model, which must be compatible with the tensorflow API (see: https://www.tensorflow.org/api_docs/python/tf/keras/Model#fit). :param x_data: Input data. :param y_data: Expected output. """ self._model.fit( x=x_data, y=y_data, batch_size=self._batch_size, epochs=self._epochs )
[docs] def predict(self, x_data) -> Tensor: """Makes a prediction on the given data and returns it by calling the wrapped model. Uses the call() tensorflow model interface for small numbers of data points, which must be compatible with the tensorflow API (see: https://www.tensorflow.org/api_docs/python/tf/keras/Model#predict). :param x_data: Input data. :return: Predicted output tensor. """ if len(x_data) > self._batch_size: return self._model.predict(x=x_data, batch_size=self._batch_size) return self._model(x_data, training=False).numpy()
[docs] @classmethod def get_fae( cls, input_size: int, optimizer: str | keras.optimizers.Optimizer = "Adam", loss: str | keras.losses.Loss = "mse", metrics: list[str | Callable | keras.metrics.Metric] = None, batch_size: int = 32, epochs: int = 1, ) -> Self: """Factory class method to create a simple federated autoencoder model of a fixed depth but with variable input size. Should only serve as a quick and basic setup for a model. :param input_size: Dimensionality of input/output of autoencoder. :param optimizer: Optimizer to use during training. :param loss: Loss function to use during training. :param metrics: Evaluation metrics to be displayed during training and testing. :param batch_size: Batch size during training and prediction. :param epochs: Number of epochs (rounds) during training. :return: Initialized federated autoencoder model. """ enc_inputs = keras.layers.Input(shape=(input_size,)) x = keras.layers.Dense(input_size)(enc_inputs) x = keras.layers.Dense(35)(x) enc_outputs = keras.layers.Dense(18)(x) encoder = keras.Model(inputs=enc_inputs, outputs=enc_outputs) dec_inputs = keras.layers.Input(shape=(18,)) y = keras.layers.Dense(35)(dec_inputs) y = keras.layers.Dense(input_size)(y) dec_outputs = keras.layers.Activation("sigmoid")(y) decoder = keras.Model(inputs=dec_inputs, outputs=dec_outputs) fae_inputs = keras.Input(shape=(input_size,)) encoded = encoder(fae_inputs) fae_outputs = decoder(encoded) fae = keras.models.Model(inputs=fae_inputs, outputs=fae_outputs) return TFFederatedModel(fae, optimizer, loss, metrics, batch_size, epochs)
[docs] class FederatedIFTM(FederatedModel): """Double union of two federated models, following the IFTM hybrid model approach --- identify function threshold model principle by Schmidt et al. (https://ieeexplore.ieee.org/document/8456348): One for the computation of the identity of a given sample (alternatively prediction of the next sample), while the other maps the error/loss using a threshold(-model) to the binary class labels for anomaly detection. Both are generic federated models, so any approach can be used for them, as long as they abide by the required properties: * Identity Function: Computes the identities of given data points. Can be replaced with a prediction function. * Error Function: Computes the reconstruction/prediction error of one or multiple samples to a scalar (each). * Threshold Model: Maps the scalar to binary class labels. Note this kind of model *absolutely requires* a time series in most cases and therefore data passed to it, must be in order! """ _if: FederatedModel _tm: FederatedModel _ef: Callable[[Tensor, Tensor], Tensor] _param_split: int _pf_mode: bool _prev_fit_sample: Optional[Tensor] _prev_pred_sample: Optional[Tensor] def __init__( self, identify_fn: FederatedModel, threshold_m: FederatedModel, error_fn: Callable[[Tensor, Tensor], Tensor], pf_mode: bool = False, ): """Creates a new federated IFTM anomaly detection model. :param identify_fn: Federated identity function model. :param threshold_m: Federated threshold model. :param error_fn: Reconstruction/Prediction error function that must compute the error in sample-wise manner (for example, if using a loss function, the reduction should be set to NONE). :param pf_mode: Whether IFTM uses an identity function or a prediction function. """ self._if = identify_fn self._tm = threshold_m self._ef = error_fn self._param_split = len(identify_fn.get_parameters()) self._pf_mode = pf_mode self._prev_fit_sample = None self._prev_pred_sample = None
[docs] def set_parameters(self, parameters: list[np.ndarray]): """Updates the internal parameters of the two underlying models by splitting the parameter lists as previously defined. :param parameters: Parameters to update the IFTM model with. """ self._if.set_parameters(parameters[: self._param_split]) self._tm.set_parameters(parameters[self._param_split :])
[docs] def get_parameters(self) -> list[np.ndarray]: """Retrieves the weights of the underlying models. :return: Concatenated weight lists of the two models. """ params = self._if.get_parameters() params.extend(self._tm.get_parameters()) return params
[docs] def fit(self, x_data, y_data=None): """Trains the IFTM model with the given data by calling the wrapped models; first the IF to make a prediction, after which the error can be computed for the fitting of the TM. Afterward, the IF is fitted. Note that one can run IFTM in supervised mode by providing the true classes of each sample --- however most TMs require only the input data. Note that in case of an underlying prediction function (instead of a regular IF), the window is shifted by one step into the past, i.e. the final sample is only used to compute a prediction error, but not make a prediction, and it is stored for the next fitting step. Note this kind of model *absolutely requires* a time series in most cases and therefore data passed to it, must be in order! Also for the first step in a time series, it is impossible to compute a prediction error, since there is no previous sample to compare it to. :param x_data: Input data. :param y_data: Expected output, optional since default IFTM is fully unsupervised. """ # Adjust input data depending on mode if self._pf_mode: x_data, y_true = self._shift_batch_window(x_data, fit=True) if x_data is None: return else: y_true = x_data # Train TM y_pred = self._if.predict(x_data) pred_errs = self._ef(y_true, y_pred) self._tm.fit(pred_errs, y_data) # Train IF self._if.fit(x_data, y_true)
[docs] def predict(self, x_data) -> Optional[Tensor]: """Makes a prediction on the given data and returns it by calling the wrapped models; first the IF to make a prediction, after which the error can be computed for the final prediction step using the TM. Note that in case of an underlying prediction function (instead of a regular IF), the window is shifted by one step into the past, i.e. the final sample is only used to compute a prediction error, but not make a prediction, and it is stored for the next prediction step. Note this kind of model *absolutely requires* a time series in most cases and therefore data passed to it, must be in order! Also for the first step in a time series, there is no previous sample to compare it to, therefore the sample is automatically classified as the default class (0, i.e. normal) :param x_data: Input data. :return: Predicted output tensor consisting of bools (0: normal, 1: abnormal). """ # Adjust input data depending on mode if self._pf_mode: x_data, y_true = self._shift_batch_window(x_data, fit=False) if x_data is None: return tf.zeros(1) else: y_true = x_data # Make predictions y_pred = self._if.predict(x_data) pred_errs = self._ef(y_true, y_pred) return self._tm.predict(pred_errs)
def _shift_batch_window( self, x_data, fit: bool ) -> tuple[Optional[Tensor], Optional[Tensor]]: """Shifts a given input batch one step in the past, discarding the last sample from the batch and storing it for later user, but adding the last sample from the previous batch to the beginning of the batch. This is necessary for fitting and prediction of prediction-based IFs (see fit and predict function). :param x_data: Input data. :param fit: Whether the window is shifted for fitting or prediction purposes. :return: Shifted input data and adjusted test data. """ if fit: prev_sample = self._prev_fit_sample self._prev_fit_sample = x_data[-1] else: prev_sample = self._prev_pred_sample self._prev_pred_sample = x_data[-1] if prev_sample is None: # first step of time series if len(x_data) == 1: # sample will be used later return None, None else: # decrease window to allow computation of errors y_true = x_data[1:] x_data = x_data[:-1] else: y_true = x_data x_data = tf.concat([prev_sample, x_data[:-1]], 0) return x_data, y_true