1.摘要

联邦学习(FL)允许资源受限的边缘节点在中央服务器的协调下协作学习全局模型,同时保持敏感数据本地化。参与节点间非独立同分布(non-IID)的数据样本降低了模型训练速度,并增加了FL收敛所需的额外通信轮次。本文提出了一种名为联邦自适应加权(FedAdp)的算法,旨在在存在非IID数据集的节点情况下加速模型收敛。我们通过理论和实证分析观察到了全局模型聚合中节点对贡献与本地节点数据分布之间的隐性联系。然后,我们提出通过每个训练轮次动态地为更新全局模型分配不同的权重。首先,通过测量局部梯度向量与全局梯度向量之间的角度来评估参与节点的贡献;随后,通过设计的非线性映射函数来量化权重。这种简单而有效的策略能够动态增强正面(抑制负面)节点贡献,从而大幅度减少通信轮次。与常用的联邦平均(FedAvg)算法相比,其优越性得到了理论和实验验证。在Pytorch和PySyft上进行的广泛实验表明,在MNIST数据集上使用FedAdp可以将通信轮次减少高达54.1%,在FashionMNIST数据集上则可以减少高达45.4%,相较于FedAvg算法。摘要——联邦学习(FL)允许资源受限的边缘节点在中央服务器的协调下协作学习全局模型,同时保持敏感数据本地化。参与节点间非独立同分布(non-IID)的数据样本降低了模型训练速度,并增加了FL收敛所需的额外通信轮次。本文提出了一种名为联邦自适应加权(FedAdp)的算法,旨在在存在非IID数据集的节点情况下加速模型收敛。我们通过理论和实证分析观察到了全局模型聚合中节点对贡献与本地节点数据分布之间的隐性联系。然后,我们提出通过每个训练轮次动态地为更新全局模型分配不同的权重。首先,通过测量局部梯度向量与全局梯度向量之间的角度来评估参与节点的贡献;随后,通过设计的非线性映射函数来量化权重。这种简单而有效的策略能够动态增强正面(抑制负面)节点贡献,从而大幅度减少通信轮次。与常用的联邦平均(FedAvg)算法相比,其优越性得到了理论和实验验证。在Pytorch和PySyft上进行的广泛实验表明,在MNIST数据集上使用FedAdp可以将通信轮次减少高达54.1%,在FashionMNIST数据集上则可以减少高达45.4%,相较于FedAvg算法。

2.现有方法的局限性与不足

现有方法如Federated Averaging(FedAvg)算法在处理non-IID数据时表现出以下局限性:

  • 收敛速度慢:由于不同节点上的数据分布差异大,局部模型更新可能偏离全局最优解方向,导致收敛速度变慢。
  • 通信成本高:为了达到相同的精度,需要更多的通信轮次来进行模型同步,增加了通信负担。
  • 缺乏自适应性:现有的方法通常对所有节点赋予相同权重,未能考虑各节点对全局模型的不同贡献度,无法有效利用节点间的异质性优势。

3.FedAdp算法

定义:Federated Adaptive Weighting(FedAdp)是一种改进的联邦学习算法,旨在通过为不同节点分配自适应权重来加速模型收敛。
具体来说,FedAdp基于各节点对全局模型更新的贡献程度动态调整权重,以减少通信轮次并提高训练效率。贡献度通过计算局部梯度与全局梯度之间的夹角来量化,并通过非线性映射函数进一步转换为权重。

4.理论

这个方法的核心原理在于通过自适应加权策略加速联邦学习的收敛速度,尤其是在存在非独立同分布(non-IID)数据的情况下。其理论基础主要包括以下几个方面:

  • 梯度信息和数据分布对模型收敛的影响:通过对局部梯度和全局梯度之间夹角的测量,可以评估各节点对全局模型更新的贡献。这基于梯度下降的理论,即局部梯度的方向及其节点数据分布对全局目标函数优化的影响。
  • 自适应权重分配:通过设计非线性转换函数,将节点贡献量化为权重,并在每轮通信中动态调整这些权重,以减少训练损失并加速收敛。

4.1 自适应权重分配

定义: 自适应权重分配是一种在联邦学习过程中,根据节点对全局模型贡献的不同动态调整各节点权重的方法。它通过量化每个节点的局部梯度与全局梯度之间的相关性,并据此设计非线性转换函数来确定节点权重,从而实现对节点贡献的差异化处理。

在论文中的应用: 该方法旨在打破联邦学习在非独立同分布(non-IID)数据下的收敛速度,通过减少通信轮次来提高训练效率。具体而言,论文提出了一种称为Federated Adaptive Weighting (FedAdp)的算法,该算法在每次通信轮次中根据节点贡献自适应地分配不同权重,以优化全局模型更新。

4.2 节点贡献度测量公式

$$
\theta_i(t) = \arccos \left( \frac{\langle \nabla F(w(t)), \nabla F_i(w(t)) \rangle}{|\nabla F(w(t))| |\nabla F_i(w(t))|} \right)
$$

: 此公式用于测量每个节点在联邦学习中的贡献度。

  • $$( \theta_i(t) ) $$表示第$$ ( i ) $$个节点在第$$ ( t ) $$轮合集中中的平滑角度,
  • $$( \nabla F(w(t)) )$$ 表示全局模型在第 $$( t )$$ 轮迭代时的梯度,
  • $$( \nabla F_i(w(t)) )$$ 表示第 $$( i )$$ 个节点在第$$ ( t ) $$轮迭代时的局部梯度,
  • $$( \langle \cdot, \cdot \rangle )$$ 表示向量的内积运算,
  • $$(|\cdot| )$$ 表示向量的2范数。

此公式通过计算节点梯度与全局梯度之间的夹角来衡量节点对全局模型的贡献。当夹角较小时,表示局部梯度方向与全局梯度方向相似,节点对全局模型聚合有正向贡献;反之,夹角较大时,节点可能对全局模型聚合产生负向影响。这有助于动态调整各节点的权重,从而加速联邦学习的收敛速度。

5.算法步骤

  • 计算局部更新

    • 每个参与节点执行本地更新,计算局部梯度并获得新的局部模型参数。
    • 计算模型差异 $$( A_i(t) )$$ 并返回给服务器。
  • 计算全局梯度

    • 服务器端汇总所有节点的局部梯度,计算全局梯度 ( \nabla F(w(t)) )。
  • 计算瞬时角度和光滑角度

    • 根据公式
      $$
      \theta_i(t) = \arccos \left( \frac{\langle \nabla F(w(t)), \nabla F_i(w(t)) \rangle}{|\nabla F(w(t))| |\nabla F_i(w(t))|} \right)
      $$
      计算每个节点的瞬时角度。
    • 更新光滑角度 $$( \hat{\theta}_i(t) )$$,以平滑瞬时角度的随机波动。
  • 计算权重

    • 使用非线性映射函數
      $$
      f(\hat{\theta}_i(t)) = \alpha \left(1 - e^{-c (\hat{\theta}_i(t) - 1)}\right)
      $$
      将光滑角度映射为贡献值。
    • 使用 Softmax 函数计算最终的权重 $$( \psi_i(t) )$$。
  • 更新全局模型

    • 根据计算得到的权重,聚合各节点的局部模型更新,更新全局模型 $$( w(t) )$$。

6.代码框架实现

fedadp.py

1
2
3
4
5
6
7
8
9
10
11
import fedadp_server


def main():
"""A federated learning training session using the FedAdp algorithm."""
server = fedadp_server.Server()
server.run()


if __name__ == "__main__":
main()

fedadp_server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import math

import numpy as np
from plato.config import Config
from plato.servers import fedavg


class Server(fedavg.Server):
"""A federated learning server using the FedAdp algorithm."""

def __init__(
self, model=None, datasource=None, algorithm=None, trainer=None, callbacks=None
):
super().__init__(
model=model,
datasource=datasource,
algorithm=algorithm,
trainer=trainer,
callbacks=callbacks,
)

self.local_angles = {}
self.last_global_grads = None
self.adaptive_weighting = None
self.global_grads = None

async def aggregate_deltas(self, updates, deltas_received):
"""Aggregate weight updates and deltas updates from the clients."""
num_samples = [update.report.num_samples for update in updates]
total_samples = sum(num_samples)

self.global_grads = {
name: self.trainer.zeros(weights.shape)
for name, weights in deltas_received[0].items()
}

for i, update in enumerate(deltas_received):
for name, delta in update.items():
self.global_grads[name] += delta * (num_samples[i] / total_samples)

# Get adaptive weighting based on both node contribution and date size
self.adaptive_weighting = self.calc_adaptive_weighting(
deltas_received, num_samples
)

# Perform weighted averaging
avg_update = {
name: self.trainer.zeros(weights.shape)
for name, weights in deltas_received[0].items()
}

# Use adaptive weighted average
for i, update in enumerate(deltas_received):
for name, delta in update.items():
avg_update[name] += delta * self.adaptive_weighting[i]

return avg_update

def calc_adaptive_weighting(self, updates, num_samples):
"""Compute the weights for model aggregation considering both node contribution
and data size."""
# Get the node contribution
contribs = self.calc_contribution(updates)

# Calculate the weighting of each participating client for aggregation
adaptive_weighting = [None] * len(updates)
total_weight = 0.0
for i, contrib in enumerate(contribs):
total_weight += num_samples[i] * math.exp(contrib)
for i, contrib in enumerate(contribs):
adaptive_weighting[i] = (num_samples[i] * math.exp(contrib)) / total_weight

return adaptive_weighting

def calc_contribution(self, updates):
"""Calculate the node contribution based on the angle between the local
and global gradients."""
angles, contribs = [None] * len(updates), [None] * len(updates)

# Compute the global gradient which is surrogated by using local gradients
self.global_grads = self.process_grad(self.global_grads)

# Compute angles in radian between local and global gradients
for i, update in enumerate(updates):
local_grads = self.process_grad(update)
inner = np.inner(self.global_grads, local_grads)
norms = np.linalg.norm(self.global_grads) * np.linalg.norm(local_grads)
angles[i] = np.arccos(np.clip(inner / norms, -1.0, 1.0))

for i, angle in enumerate(angles):
client_id = self.selected_clients[i]

# Update the smoothed angle for all clients
if client_id not in self.local_angles:
self.local_angles[client_id] = angle
self.local_angles[client_id] = (
(self.current_round - 1) / self.current_round
) * self.local_angles[client_id] + (1 / self.current_round) * angle

# Non-linear mapping to node contribution
alpha = (
Config().algorithm.alpha if hasattr(Config().algorithm, "alpha") else 5
)

contribs[i] = alpha * (
1 - math.exp(-math.exp(-alpha * (self.local_angles[client_id] - 1)))
)

return contribs

@staticmethod
def process_grad(grads):
"""Convert gradients to a flattened 1-D array."""
grads = list(dict(sorted(grads.items(), key=lambda x: x[0].lower())).values())

flattened = grads[0]
for i in range(1, len(grads)):
flattened = np.append(
flattened, -grads[i] / Config().parameters.optimizer.lr
)

return flattened

关于fedadp_server.py的说明:

  • 类 Server:
    init: 初始化服务器对象,继承父类的属性和方法,并初始化与自适应加权相关的属性。
    aggregate_deltas: 异步方法,用于从客户端聚合权重更新和增量更新。
    计算每个客户端的样本数和总样本数。
    初始化全局梯度,并逐项累加每个客户端的梯度更新。
    调用 calc_adaptive_weighting 方法计算自适应权重,根据权重计算加权平均更新。
    calc_adaptive_weighting: 计算每个客户端在模型聚合中的权重。
    调用 calc_contribution 方法计算每个客户端的贡献值。
    计算每个客户端的权重,考虑到客户端贡献值和数据规模。
    calc_contribution: 计算每个客户端的贡献值,基于局部梯度和全局梯度之间的夹角。
    计算全局梯度,并用局部梯度替代。
    计算每个客户端的局部梯度和全局梯度之间的夹角,并进行平滑。
    使用非线性函数将夹角映射为贡献值。
    process_grad: 将梯度转换为一维阵列,方便计算。

  • 关键流程:
    聚合更新:
    接收客户端更新,计算全局梯度。
    计算自适应权重并进行加权平均。
    计算权重:
    基于节点贡献和数据规模计算权重。
    计算节点贡献:
    计算局部梯度和全局梯度之间的夹角。
    将夹角映射为节点贡献值。