Source code for daisy.federated_learning.federated_aggregator

# Copyright (C) 2024-2025 DAI-Labor and others
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
"""To allow the compatibility between different kinds of strategies for the
aggregation of models, gradients, losses, or other parameters/model types and
topologies for the federated distributed system, any strategy has to follow the same
simple interface. This module provides this abstract class and some sample
aggregators to be used.

Author: Fabian Hofmann
Modified: 04.04.24
"""

from abc import ABC, abstractmethod
from collections import deque

import numpy as np


[docs] class Aggregator(ABC): """Abstract aggregator class. Must always be implemented if a new aggregation strategy is used in the federated system. By implementing the aggregate function, allows simple aggregation of values using the object() call directly. """
[docs] @abstractmethod def aggregate(self, *args, **kwargs) -> object: """Calculates the aggregated values from the underlying object attributes and potential given parameters and returns them. :param args: Arguments. Depends on the type of aggregator to be implemented. :param kwargs: Keyword arguments. Depends on the type of aggregator to be implemented. :return: Aggregated values. """ raise NotImplementedError
def __call__(self, *args, **kwargs) -> object: return self.aggregate(*args, **kwargs)
[docs] class ModelAggregator(Aggregator): """Abstract model aggregator class, that uses the parameters of the given models and some internal states (of those parameters) to compute the aggregated state of the model. Must always be implemented if a new model aggregation strategy is used in the federated system. """
[docs] @abstractmethod def aggregate(self, models_parameters: list[list[np.ndarray]]) -> list[np.ndarray]: """Calculates the aggregated values from the underlying object attributes and given model parameters and returns them. :param models_parameters: A list of models' parameters the same type. :return: Aggregated model parameters. """ raise NotImplementedError
[docs] class FedAvgAggregator(ModelAggregator): """Federated Averaging (FedAvg) is the simplest way to aggregate any number of models into a "global" model version, using only their weights (no internal state is kept); simply computing their average and using that as a result. Note this could also be done using gradients or losses or any other form of parameters from other types of models, however this was implemented with the federated_model.TFFederatedModel and its parameters in mind. """
[docs] def aggregate(self, models_parameters: list[list[np.ndarray]]) -> list[np.ndarray]: """Calculates the average for the given models' parameters and returns it, using Welford's algorithm. Note this could be further optimized using Kahan–Babuška summation for reduction of floating point errors. :param models_parameters: A list of models' parameters the same type. :return: Aggregated model parameters. """ avg_parameters = models_parameters[0] n = 0 for model in models_parameters: n += 1 for i in range(len(model)): delta = model[i] - avg_parameters[i] avg_parameters[i] += delta / n return avg_parameters
[docs] class CumAggregator(ModelAggregator): """Cumulative Averaging (FedCum) is the base version of online averaging for models into a "global" version of the model. The cumulative average is computed by first computing the FedAvg over the batch of models provided to the aggregator in a single step, before using that average to update the average (i.e., all models in a batch are treated equally). This aggregator is NOT stable for infinite learning aggregation steps (n). This kind of aggregation is also probably not compatible with the aggregation of gradients or losses, as these averages cannot be computed over various epochs overall. """ _cum_avg: list[np.ndarray] _n: int _fed_avg: FedAvgAggregator def __init__(self): """Creates a new cumulative aggregator.""" self._n = 0 self._fed_avg = FedAvgAggregator()
[docs] def aggregate(self, models_parameters: list[list[np.ndarray]]) -> list[np.ndarray]: """Calculates the cumulative average for the given models' parameters and the previous cumulative average and returns it. :param models_parameters: A list of models' parameters the same type. :return: Aggregated model parameters. """ models_avg = self._fed_avg.aggregate(models_parameters) self._n += 1 if self._n == 1: self._cum_avg = models_avg else: for i in range(len(models_avg)): delta = models_avg[i] - self._cum_avg[i] self._cum_avg[i] += delta / self._n return self._cum_avg
[docs] class SMAggregator(ModelAggregator): """Simple Moving Averaging, also called sliding window averaging (FedSMA) is the simplest version of online moving averaging for models, as it uses only the past k elements of the stream to compute the average. As with other online model averaging aggregators, the regular FedAvg is first computed over the batch of modes provided in a single step, before using that average to update the moving average (i.e., all models in a batch are treated equally). This kind of aggregation is also probably not compatible with the aggregation of gradients or losses, as these averages cannot be computed over various epochs overall. """ _sm_avg: list[np.ndarray] _window: deque _window_size: int _fed_avg: FedAvgAggregator def __init__(self, window_size: int = 5): """Creates a new simple moving aggregator. :param window_size: Size of sliding window. """ self._window = deque() self._window_size = window_size self._fed_avg = FedAvgAggregator()
[docs] def aggregate(self, models_parameters: list[list[np.ndarray]]) -> list[np.ndarray]: """Calculates the simple moving average for the given models' parameters by averaging them first and then adding them to the window. Afterward, the delta between the new added values and the first (to be removed) value from the window is computed to aggregate the new moving average. :param models_parameters: A list of models' parameters the same type. :return: Aggregated model parameters. """ models_avg = self._fed_avg.aggregate(models_parameters) if len(self._window) == 0: self._sm_avg = models_avg return self._sm_avg rm_models_avg = 0 if len(self._window) == self._window_size: rm_models_avg = self._window.popleft() self._window.append(models_avg) for i in range(len(models_avg)): delta = models_avg[i] - rm_models_avg[i] self._sm_avg[i] += delta / len(self._window) return self._sm_avg
[docs] class EMAggregator(ModelAggregator): """Exponential moving averaging (FedEMA) unlike FedSMA takes note of the entire model stream to compute the average, but over multiple time steps starts to forget past elements; this process is also called exponential smoothing, as the past is exponentially less relevant to the current time step the further one goes back. As with other online model averaging aggregators, the regular FedAvg is first computed over the batch of modes provided in a single step, before using that average to update the moving average (i.e., all models in a batch are treated equally). This kind of aggregation is also probably not compatible with the aggregation of gradients or losses, as these averages cannot be computed over various epochs overall. """ _em_avg = list[np.ndarray] _alpha = float _fed_avg: FedAvgAggregator def __init__(self, alpha: float = 0.05): """Creates a new exponential moving aggregator. :param alpha: Smoothing/weight factor for new incoming values. """ self._em_avg = None self._alpha = alpha self._fed_avg = FedAvgAggregator()
[docs] def aggregate(self, models_parameters: list[list[np.ndarray]]) -> list[np.ndarray]: """Calculates the exponential moving average for the given models' parameters and returns it :param models_parameters: A list of models' parameters the same type. :return: Aggregated model parameters. """ models_avg = self._fed_avg.aggregate(models_parameters) if self._em_avg is None: self._em_avg = models_avg for i in range(len(models_avg)): self._em_avg[i] = ( self._alpha * models_avg[i] + (1 - self._alpha) * self._em_avg[i] ) return self._em_avg