论文阅读-Naiad:A Timely Dataflow System

Posted by keys961 on April 19, 2019

1. Introduction

已有的3类系统:

  • 批处理:高吞吐但高延迟,支持同步迭代
  • 流式处理:低延迟,但缺乏对迭代算法的支持
  • 基于触发器:支持迭代算法,但一致性弱

Timely dataflow支持:

  • 循环结构化,允许在数据流中进行反馈
  • 有状态数据流的数据顶点,支持消费和生产记录,不需要全局协调
  • 一旦收到某个轮次的所有输入或者一次循环迭代,就会收到顶点的通知

前两点保证迭代和增量计算的低延迟;第三点,可以在流式数据或者迭代存在的情况下,中间阶段和最终输出的结果保持一致。

而Naiad是timely dataflow的一个原型实现,是一个并行分布式计算框架,保证高吞吐和低延迟。

naiadstack

2. Timely Dataflow

Timely dataflow基于有向图

  • 顶点是有状态的,可沿着边发送/接收有逻辑时间的消息
  • 图可包含环,且可嵌套,时间戳反映不同的输入时期(epoch)和循环迭代

生成的模型:

  • 支持并发执行不同时期和循环迭代的数据
  • 当所有的消息都被传输后(根据某个时间戳),顶点支持显式的通知

2.1. 图结构

2.1.1. 图结构的顶点

  • 输入顶点:接收外部生产者的消息
    • 输入的消息会标记epoch,外部生产者可通知顶点该epoch不会有消息进来,且当所有epoch消息都不会进来时可以“关闭”顶点
  • 输出顶点:将消息输出给外部的消费者
    • 输出的消息也会标记epoch,顶点可通知外部消费者该epoch的消息不会出来,且当所有epoch消息都不会出来时可以“关闭”顶点

2.1.2. 嵌套循环上下文

包含3个系统相关的顶点

  • 入口顶点:进入该上下文必须经过该顶点
  • 出口顶点:离开该上下文必须经过该顶点
  • 反馈顶点:每个循环至少有1个,它不会嵌套在任何内部循环上下文中

loopctx

2.1.3. 时间戳

消息时间戳$t$可定义为下面2个元素的元组:

  • $e \in \mathbb{N}​$:Epoch号
  • $\vec{c} = \langle c_1, c_2, …, c_k \rangle \in \mathbb{N}^k​$:Loop counter向量,每个维度对应对应循环上下文的循环次数,可用于区分循环上下文,且可跟踪循环进度

上述3个节点处理时间戳的结果如下(对于第$k$个循环上下文):

顶点 输入时间戳 输出时间戳
入口顶点 $(e, \langle c_1, c_2, …, c_k \rangle )$ $(e, \langle c_1, c_2, …, c_k, 0 \rangle )$
出口顶点 $(e, \langle c_1, c_2, …, c_k, c_{k+1} \rangle )$ $(e, \langle c_1, c_2, …, c_k \rangle )$
反馈顶点 $(e, \langle c_1, c_2, …, c_k \rangle )$ $(e, \langle c_1, c_2, …, c_k+1 \rangle )$

时间戳的比较:当且仅当$e_1 \le e_2$且$\vec{c_1} \le \vec{c_2}$(根据字典序比较)时,$t_1 \le t_2$

2.2. 顶点计算

每个顶点v实现下面2个回调:

  • v.OnRecv(edge, msg, timestamp):收到来自edge的消息
  • v.OnNotify(timestamp):没有更多<= timestamp的消息到来

回调可被自定义(重写)

回调上下文中,可能会调用下面2个系统提供的方法:

  • this.SendBy(edge, msg, timestamp):向一条边发送消息
  • this.NotifyAt(timestamp):在timestamp时候,进行调用通知

调用关系是:

  • u.SendBy(e, m, t) -> v.OnRecv(e, m, t)eu->v的边
  • v.NotifyAt(t) -> v.OnNotify(t)
  • 每个顶点的回调都会被排队,但必须保证:
    • t' <= t时,v.OnNotify(t)发生在v.OnRecv(e, m, t')后,因为前者是作为t之前所有消息都已收到,不会再来的信号。重写的回调也得满足这个要求。

2.3. 达成Timely Dataflow

本节主要讲系统如何推断具有给定时间戳的未来消息的不可能性(即推断“通知”的安全性),并提供单线程的实现。

2.3.1. Pointstamp

任何时刻,未来消息的时间戳受到当前未处理事件(消息和通知请求)以及图结构的限制:

  • 消息只会沿边传输,且时间戳仅会被2.1.3.小节的三种顶点修改。

  • 由于事件的发送不能产生时间回溯,因此可以计算事件产生的消息时间戳的下界。将这种算法应用到未处理事件上,则可判断顶点通知是否正确

这里定义Pointstamp,对应每个事件:$(t \in Timestamp, l \in Edge \cup Vertex)$

  • 对于v.SendBy(e, m, t),对应的是(t, e)
  • 对于v.NotifyAt(t),对应的是(t, v)

一些结论和定义

  • 当且仅当满足下面条件时,$(t_1, l_1)$ could-result-in $(t_2, l_2)$:
    • 存在一条路径$\psi = \langle l_1, …, l_2 \rangle$,根据这条路径,时间戳$\psi(t_1) \le t_2$,左边表示$t_1$仅被路径上的3类节点修改的时间戳
  • Path Summary:$l_1$到$l_2$的时间戳变化的函数
    • 可以保证若两位置存在多条路径,它们的summary必然不一样,其中一条总会比另一台更早产生调整后的时间戳
    • 可以找到最小的path summary,将路径记为$\psi [l_1, l_2]$
    • 因此,检测could-result-in,只需检测$\psil_1, l_2 \le t_2$即可

2.3.2. 单线程实现

调度器维护一组活跃的pointstamp,每个元素包含2个计数器:

  • Occurrence countOC):未完成的事件发生个数
  • Precursor countPC):could-result-in顺序下,前面的pointstamp个数

当顶点产生和撤销事件时,OC根据下面更新

  • v.SendBy(e,m,t)前,v.NotifyAt(t)前:OC[(t,e/v)] += 1
  • v.OnRecv(e,m,t)后,v.OnNotify(t)后:OC[(t,e/v)] -= 1

当pointstamp活跃时,PC根据下面初始化

  • 置为已有could-result-in的活跃pointstamp个数
  • 同时,增加当前pointstamp之后的pointstamp PC

当pointstamp不活跃时:

  • OC值为0,移除活跃pointstamp集合

  • 递减之后的pointstamp PC

    PC值为0,则该pointstamp为frontier,调度器可将任何通知发给它

系统初始化时,在下面位置初始化一个pointstamp:

  • 位置为每个输入顶点
  • 时间戳为:第一个epoch,以及全为0的loop count
  • OC为1,PC为0

当输入节点的输入完毕时:

  • 若epoch e完毕,则创建e+1的pointstamp,删除原有的pointstamp
  • 通知下游epoch e的消息已经输入完毕
  • 若输入节点关闭时,删除当前位置的所有pointstamp,允许输入到下游的事件最终可从计算中消失

3. 分布式实现

架构图如下:

arch

架构中包含一组Worker线程:

  • 管理一个timely dataflow顶点的分区
  • Worker间会通信(本地使用共享内存,远程使用TCP协议)
  • 参与分布式的进度跟踪协议,以协调通知的发送

3.1. 数据并行化

如上图:

  • 程序定义一个数据流处理的逻辑图,每个顶点代表一个阶段。顶点之间由连接器相连,连接器上可包含分区函数(即上面的H(m)
  • 执行时,将逻辑图展开成物理图,每个阶段展开成多个顶点,每个连接器展开成多条边

当顶点发送消息给连接器时,通过分区函数,可路由到正确的目的地;若没有分区函数,则路由到本地的顶点。

计算could-result-in关系时,Naiad从物理图上的每个pointstamp $p$进行投影,得到逻辑图的pointstamp $\hat{p}$。虽然投影会造成信息损失,但是根据逻辑图运算可限制计算带来的空间开销。关于pointstamp的投影在后文描述。

3.2. Workers

每个Worker传输消息和通知到timely dataflow graph当前分区的节点上,并优先传输消息(优先级可自定义)。

Worker间使用共享队列通信,没有其它共享状态,保证只有单个线程控制整个执行。

这种情况下,大部分SendBy后可马上接上OnRecv,不需要将执行压入队列,因为它们在同一个Worker上,而只有远程节点的调用会入队,因此队列是小的,且延迟低。

由于允许图有环,顶点支持重入,即一个顶点的OnRecv可调用另一个顶点的SendBy。可重入的次数有上限。可重入顶点,可合并消息到OnRecv,然后调用SendBy,不需要入队,因此减少内存占用。

3.3. 分布式进度跟踪

3.3.1. 广播“进度更新”

只有一个阶段(所有物理顶点)的pointstamp could-result-in下一个阶段的pointstamp,才能进行通知。因此需要跟踪进度。

跟踪基于单个全局frontier(见2.3.2.),协议实现通过广播OC更新,并在这个基础上应用了2个优化。

对于每个活跃的pointstamp,每个Worker维护:

  • 本地OC:全局OC的本地视图
  • 本地PC:由本地OC计算而来
  • 本地frontier:使用could-result-in关系算出的本地的pointstamp

当Worker分发事件时,先广播“进度更新”(为$\langle pointstamp, \delta \in \mathbb{Z} \rangle $,第二个根据2.3.2.中的规则产生)。广播必须是FIFO序。当“进度更新”收到时,更新本地OC。

协议包含一个重要的特性:任何本地frontier不会移到全局frontier之前,证明在这里

3.3.2. 优化

a) 投影Pointstamp

协议维护逻辑图的顶点和边的OC和PC,而不是物理图每个顶点和边。

虽然降低了并发度,但显著降低了广播的数据量。

b) 广播前缓冲

将更新推入缓冲区以累计更新,同一个pointstamp的更新会被合并。

累积更新的条件为(满足下列之一):

  • 更新的pointstamp,可由本地frontier could-result-in
  • pointstamp对应的顶点净更新数(本地OC、缓冲的更新值和当前Worker已经广播但被收到的更新值之和)严格为正

当更新产生时,先检查是否能累计,若不行,则先清空缓冲区,并发送更新,正数更新必在负数更新之前。

3.4. 容错和可用性

容错实现简单但可扩展:

  • 每个有状态顶点实现Checkpoint()Restore()接口
  • 系统会协调,保证所有Worker生成一致的checkpoint

当系统周期性做checkpoint时:

  • 先暂停Worker和消息传递线线程
  • 传输未完成的OnRecv事件以清空消息队列
  • 调用每个顶点的Checkpoint()

当系统恢复时:

  • 所有存活线程回滚到上一个持久化的checkpoint
  • 失效进程的顶点被重新分配到存活的进程
  • 调用Restore(),根据checkpoint文件,重建节点状态

3.5. 防止产生微小的落后者(Micro-stragglers)

Micro-straggles和粗粒度批处理系统的落后者有相似之处,但处理方式不同(因为批处理系统无状态,Naiad worker有状态)。

Naiad并不是被动地处理micro-straggler现象,而是主动去避免它。

3.5.1. 网络

Naiad使用TCP协议,而该系统流量往往是突发性的(迭代算法执行时,刚开始是大数据量,但后面的消息大小会较小),默认的TCP配置会有问题:

  • Nagle算法造成传输小数据量时,延迟变大(因为小包会被缓冲,直到收到ACK包才会被发出)。因此系统禁用Nagle算法。
  • 包丢失时,超时重传的时间默认300ms,过长,因此设置成20ms

3.5.2. 数据结构带来的竞争

大部分数据结构(如顶点状态),只有1个线程访问,因此没有竞争。

协调时,需要传输消息,Naiad使用.NET并发队列和轻量自旋锁,当竞争检测到时,回退并睡眠1ms(相比于默认配置而言,显著降低了停顿的影响)。

3.5.3. GC

使用.NET并发的mark-and-sweep GC进行回收,并且让系统更少触发GC,且降低停顿的时间:

  • Naiad runtime和library尽量避免对象的创建,而使用对象池获取和回收
  • 尽量使用值类型(而非引用类型/指针),因为GC的开销和指针的数量成正比

4. 编写Naiad程序

4.1. 实例:迭代MapReduce

// Input stages of the dataflow
var input = controller.NewInput<string>();
// Use LINQ to implement MR
var result = input.SeletMany(y => map(y))
    .GroupBy(y => key(y), (k, vs) => reduce(k, vs)); 
// Output callbacks for each epoch
result.Subscribe(result => {...});
// Supply input data to the query
input.OnNext(/*1st epoch data*/);
input.OnNext(/*2nd epoch data*/);
...
input.OnCompleted();    

4.2. Naiad的数据并行模式

上层可使用LINQ构建增量式的并行数据流,隐藏底层的接口。LINQ的操作符也有一定的优化,不需要协调即可运行(如Concat, Select, Distinct等),以提高性能。

此外,Naiad实现了Bloom框架的自己,用于异步计算。LINQ的Where, Concat, Distinct, Join不会调用NotifyAt(),只使用这些运算符的子图才会异步运行。增量式的Aggregate操作符也有实现,用于实现Bloom-style的聚合。Naiad只会在顶点显式要求时,才会进行协调。

最后,Naiad也实现了Pregel bulk sync parallel模型,用于图算法。

4.3. 构建timely dataflow图

Naiad基于timely dataflow,提供一个简单的图构建接口,它是所有Naiad库的基础,但是对上层应用实现特定功能也有用。

图构建包含2步:

  • 定义dataflow顶点的行为
  • 定义dataflow的拓扑

Naiad stage:

  • 一组顶点的集合(由一个顶点工厂定义)
  • 包含多个输入和输出,和C#类型联系在一起,以有类型的流进行连接
  • 输入可指定分区要求,输入可指定分区保证,这样系统可插入连接器(边)以满足分区要求
  • 顶点必须提供OnRecv()回调,若要支持stage的通知,必须提供OnNotify()回调

一般而言,stage的输入必须在其输出之前连接,以防止无效循环。

系统提供LoopContext对象允许上层定义多个入口、出口和反馈stage,并将其连接到其它计算stage;而只有反馈stage可以在输入之前,将输出连接起来(与上面相反),以保证所有的环都符合dataflow图的约束。