论文阅读-利用Parameter Server扩展分布式机器学习

Posted by keys961 on April 23, 2019

1. Introduction

真实世界的训练数据量很大,需要创建很多参数和复杂的模型,它们需要被共享,且频繁存取它们,执行运算以优化它们,这造成:

  • 大量网络带宽占用
  • 许多算法是串行化的,这引入了“同步”,造成高延迟
  • 集群节点会不稳定,被其它任务抢占

Parameter Server是一个服务框架,它有下面的特性:

  • 高效通信:通信模型是异步,不阻塞计算过程
  • 灵活的一致性:允许自定义一致性,权衡同步的开销和延迟
  • 弹性扩容:无需重启框架,即可添加节点
  • 可容错,可持久:基于复制,故障恢复时间短且不影响计算工作,使用向量时钟保证操作正确触发
  • 易用:共享参数使用向量和稀疏矩阵表示,易于机器学习的开发

通过修改底层系统,并修改ML算法,以构建一个适合的分布式ML系统。

2. 机器学习

分为3个部分:特征提取,目标函数,学习。特征提取即预处理,可通过现有系统(如MapReduce完成),这里略过。

学习通常基于一个初始模型,然后通过不停的迭代,得到一个收敛的结果。而迭代的数据量很大,但还需要支持一定的实时性。

下面介绍2个广泛使用的机器学习技术,并在此基础上展示Parameter Server的效果。

2.1. 风险最小化

2.1.1. 算法描述

训练含$n$个实例,$x_{i}$表示第$i$个样例,是一个$d$维向量,$n$和$d$数量级会很大。通常$x_i$会与标签$y_i$绑定。

训练集的数量与模型精细度有几个重要的关系:

  • 训练集太小,模型过于精细会造成过拟合
  • 过于简化的模型难以提取出相关属性

正规风险最小化模型:使得下面的值最小化:

  • $F(w)=\sum_{i=1}^{n}{l(x_i,y_i,w)} + \Omega (w)$
    • $l(x_i,y_i,w)$代表预测的误差损失
    • $\Omega(w)$代表训练数据模型正则化的复杂处罚
  • 这里$w$是参数(parameter)

2.1.2. Parameter Server算法实现

模型使用简单的分布式梯度下降模型,算法如下:

dsd

实现的架构:训练数据被分区到不同的worker上,然后各自产生参数向量$w$,并计算出更新$g$(即梯度)。worker得到更新后,汇集到server上,推算出向量$w$最终更新的方向。下一轮迭代时,worker只要拉取server上最新的$w$即可。

sg

2.2. 生成模型

对于非监督学习算法,训练样例的标签未知。通常可抽象成一个通用问题:主题建模

一种流行的方式是LDA。它的统计模型(参数更新逻辑)和2.1.不同,但是实现的架构上和之前的是类似的

LDA更新不急于递归下降,而是描述当前模型是否足够好的描述文档,因此需要存取每个文档的附加元数据(即文档中的单词和主题的对应关系),且每次读取时会被更新。

3. 架构

架构包含:

  • server group
    • server node:维护全局参数的一个分区,并通过相互通信进行复制和迁移
    • server manager:维护一致的server元数据视图,如节点是否存活、参数分区的分配
  • worker group:一个应用对应一个group

    • task scheduler:分配任务,并跟踪进度,若worker有加入/退出,它会重新调度未完成的任务

    • worker node:保存全局训练数据的一部分,以计算本地的统计结果,且它只和server node通信以获取共享的参数

  • resource manager

archps

共享的参数:使用键值向量表示。它们被分布到server group的多个server node上。server node的本地参数可被推出去,且也可从其它节点拉取。

通常情况下,任务在worker node上运行,但也可以通过UDF(用户定义函数)在server node上运行。任务都是并行且异步的。

3.1. 键值向量

共享模型可被表示成键值对的集合(如最小损失下,可表示成<feature_id, weight>;如LDA下,可表示成<word_id, topic_id>),操作可通过键来获取值。

而机器学习算法通常使用向量运算,因此可让键值向量支持高维向量的运算:

  • 键按顺序排列,不存在的键和0相关联
  • 将这些键值对对象视为线性代数的对象,赋予向量和矩阵语义

3.2. Range Push & Pull

节点间的数据传送可通过pushpull进行。

为了优化,这些操作支持“范围”。假定r为范围,则:

  • w.push(r, dest):将$w$下所有已存在的条目中,所有的键属于范围r的条目发给dest
  • w.pull(r, dest):将$w$下所有已存在的条目中,所有的键属于范围r的条目从dest拉取过来

3.3. Server上的UDF

server node可执行UDF以聚合数据,该节点上更加完整的共享参数数据会带来更精确的结果。

3.4. 异步任务和依赖

任务被异步执行。

对于调用者:当对方返回结果(可以是UDF函数、pull获取的键值对、空回复等)时,才会被标记为“结束”

对于被调用者:当调用被返回,且子任务都被触发时,标记该任务结束

任务通常并行执行,也可被串行化执行(通过在任务间添加execute-after-finished依赖)。串行化执行(即依赖)利于帮助实现算法逻辑,且可用于支持多种一致性模型。

3.5. 灵活的一致性

并行执行会带来数据的不一致,从而减慢收敛速度。需要根据算法,考虑系统效率和收敛速度,选择适合的一致性模型。

通过任务依赖(3.4.),可实现3种不同的一致性:

  • 串行化:所有任务一个一个执行,即任务间插入execute-after-finished依赖
  • 最终一致:所有任务都可以同时执行
  • 有限延迟:设置最大延迟t,令当前时间为c,所有在c - t之前的任务完成后,该任务才能继续进行。t == 0时为串行化,t == +INF时为最终一致

通过任务依赖,可生成依赖DAG,它可能是动态的。若DAG是静态的,则调用者可发送所有DAG上的任务,以减少同步开销。

3.6. 用户定义的过滤器

作为基于调度程序的流控制补充,Parameter Server还支持用户定义的过滤器,选择性同步各个键值对,从而允许在任务内,对数据一致性进行细粒度的控制

4. 实现

参数(键值对)分片:利用一致性散列分片

容错:使用链式复制

区间操作的优化:对数据和基于范围的向量时钟进行了压缩

4.1. 向量时钟/向量时间戳

每个键值对都附带一个向量时钟,用于跟踪进度以及请求去重。

普通实现需要$O(nm)$的空间($n$为节点数,$m$为键值对个数),很占空间,因此有这样的优化——范围操作的键值对往往时间戳相同,可通过范围进行压缩,一个范围内所有键值对的时间戳只有1份

  • 对于键$k$,节点$i$和范围$R$,$vc_{i}(R)=t \Rightarrow \forall k \in R, vc_{i}(k)=t$

  • 初始化时,每个节点只有1个时间戳(覆盖所有键值对),根据下面算法,每个范围操作至多产生3个新时间戳

vecclk

因此,若有$k$个不同范围,则至多有$O(mk)$个时间戳,$m$为节点个数,这比原有的实现节省了很多空间。

4.2. 消息传输

消息格式:$[vc(R), (k_1, v_1), …, (k_p, v_p)], k_j \in R \and j \in { 1, …, p}$

  • 对于shared parameter,如上即可
  • 对于task,键值对可为(task_id, args/return_res)

若要向整个server group发送消息或者接收的节点分配的键范围发生变化时,消息携带的”范围“会被切割,算法类似于上面的。

此外,由于ML要求高带宽,因此消息会被压缩,使用fast Snappy压缩库。

而每次迭代会传输相同的键,因此接收方可缓存成列表,发送方只需要传散列值即可。而值往往会有很多0,因此只发送非0键值对。

4.3. 一致散列

Parameter Server通过一致散列将键值对(参数)分区。且通过虚拟节点优化负载均衡和恢复。散列环由server manager管理。

dht

4.4. 复制与一致性

可配置复制因子$k$,复制按照环的时钟顺序决定。

worker node通过pullpush,和master节点通信,然后同步复制到slave节点上。

复制通过链式实现,且复制会被合并/聚合,以减少带宽占用。

rep

4.5. Server管理

节点加入时

  • server manager分配新节点一个key range,作为master
  • 节点获取主分区以及$k$个从分区的数据
  • server manager广播节点变化,接收者缩小自己的管理范围,并将未完成的任务重新提交给新节点

从$S$获取数据,可分为2个阶段:

  • $S$预先拷贝键值对,附带向量时钟(会调用算法2,产生分裂),若该步骤失败则$S$保持不变
  • $S$不再接收受影响”范围“的请求,并将这些变化发给新节点

而对于其它节点,检查不属于自己范围的数据,然后删除;然后检查没收到回复的请求,若请求附带的范围一部分不属于自己管理的,则分裂范围并重新发送。

对于消息重传,可通过向量时钟进行比较并拒绝重复和过期消息。

节点离开时

  • server manager将任务重新分配给另一个节点(根据离开节点的key range决定)
  • server manager通过心跳检查集群节点是否存活

4.6. Worker管理

节点加入时: 对于节点$W$,和4.5.节类似,但更简单:

  • task scheduler分配$W$训练数据的范围
  • 新结点$W$加载训练数据,由于数据只读,因此没有两阶段获取
  • $W$从server node获取共享参数数据
  • task scheduler广播变更,让其它节点释放部分的训练数据集

节点离开时,有下面几种选项

  • 若训练集过大,或者损失数据集对算法影响很小,则可以不恢复,直接继续进行
  • 恢复数据集到其它worker node上,并开始调度任务,以替换失效的worker node