1. Introduction
已有的3类系统:
- 批处理:高吞吐但高延迟,支持同步迭代
- 流式处理:低延迟,但缺乏对迭代算法的支持
- 基于触发器:支持迭代算法,但一致性弱
Timely dataflow支持:
- 循环结构化,允许在数据流中进行反馈
- 有状态数据流的数据顶点,支持消费和生产记录,不需要全局协调
- 一旦收到某个轮次的所有输入或者一次循环迭代,就会收到顶点的通知
前两点保证迭代和增量计算的低延迟;第三点,可以在流式数据或者迭代存在的情况下,中间阶段和最终输出的结果保持一致。
而Naiad是timely dataflow的一个原型实现,是一个并行分布式计算框架,保证高吞吐和低延迟。
2. Timely Dataflow
Timely dataflow基于有向图:
- 顶点是有状态的,可沿着边发送/接收有逻辑时间的消息
- 图可包含环,且可嵌套,时间戳反映不同的输入时期(epoch)和循环迭代
生成的模型:
- 支持并发执行不同时期和循环迭代的数据
- 当所有的消息都被传输后(根据某个时间戳),顶点支持显式的通知
2.1. 图结构
2.1.1. 图结构的顶点
- 输入顶点:接收外部生产者的消息
- 输入的消息会标记epoch,外部生产者可通知顶点该epoch不会有消息进来,且当所有epoch消息都不会进来时可以“关闭”顶点
- 输出顶点:将消息输出给外部的消费者
- 输出的消息也会标记epoch,顶点可通知外部消费者该epoch的消息不会出来,且当所有epoch消息都不会出来时可以“关闭”顶点
2.1.2. 嵌套循环上下文
包含3个系统相关的顶点
- 入口顶点:进入该上下文必须经过该顶点
- 出口顶点:离开该上下文必须经过该顶点
- 反馈顶点:每个循环至少有1个,它不会嵌套在任何内部循环上下文中
2.1.3. 时间戳
消息时间戳$t$可定义为下面2个元素的元组:
- $e \in \mathbb{N}$:Epoch号
- $\vec{c} = \langle c_1, c_2, …, c_k \rangle \in \mathbb{N}^k$:Loop counter向量,每个维度对应对应循环上下文的循环次数,可用于区分循环上下文,且可跟踪循环进度
上述3个节点处理时间戳的结果如下(对于第$k$个循环上下文):
顶点 | 输入时间戳 | 输出时间戳 |
---|---|---|
入口顶点 | $(e, \langle c_1, c_2, …, c_k \rangle )$ | $(e, \langle c_1, c_2, …, c_k, 0 \rangle )$ |
出口顶点 | $(e, \langle c_1, c_2, …, c_k, c_{k+1} \rangle )$ | $(e, \langle c_1, c_2, …, c_k \rangle )$ |
反馈顶点 | $(e, \langle c_1, c_2, …, c_k \rangle )$ | $(e, \langle c_1, c_2, …, c_k+1 \rangle )$ |
时间戳的比较:当且仅当$e_1 \le e_2$且$\vec{c_1} \le \vec{c_2}$(根据字典序比较)时,$t_1 \le t_2$
2.2. 顶点计算
每个顶点v
实现下面2个回调:
v.OnRecv(edge, msg, timestamp)
:收到来自edge
的消息v.OnNotify(timestamp)
:没有更多<= timestamp
的消息到来
回调可被自定义(重写)
回调上下文中,可能会调用下面2个系统提供的方法:
this.SendBy(edge, msg, timestamp)
:向一条边发送消息this.NotifyAt(timestamp)
:在timestamp
时候,进行调用通知
调用关系是:
u.SendBy(e, m, t) -> v.OnRecv(e, m, t)
,e
为u->v
的边v.NotifyAt(t) -> v.OnNotify(t)
- 每个顶点的回调都会被排队,但必须保证:
- 当
t' <= t
时,v.OnNotify(t)
发生在v.OnRecv(e, m, t')
后,因为前者是作为t
之前所有消息都已收到,不会再来的信号。重写的回调也得满足这个要求。
- 当
2.3. 达成Timely Dataflow
本节主要讲系统如何推断具有给定时间戳的未来消息的不可能性(即推断“通知”的安全性),并提供单线程的实现。
2.3.1. Pointstamp
任何时刻,未来消息的时间戳受到当前未处理事件(消息和通知请求)以及图结构的限制:
-
消息只会沿边传输,且时间戳仅会被2.1.3.小节的三种顶点修改。
-
由于事件的发送不能产生时间回溯,因此可以计算事件产生的消息时间戳的下界。将这种算法应用到未处理事件上,则可判断顶点通知是否正确
这里定义Pointstamp,对应每个事件:$(t \in Timestamp, l \in Edge \cup Vertex)$
- 对于
v.SendBy(e, m, t)
,对应的是(t, e)
- 对于
v.NotifyAt(t)
,对应的是(t, v)
一些结论和定义:
- 当且仅当满足下面条件时,$(t_1, l_1)$ could-result-in $(t_2, l_2)$:
- 存在一条路径$\psi = \langle l_1, …, l_2 \rangle$,根据这条路径,时间戳$\psi(t_1) \le t_2$,左边表示$t_1$仅被路径上的3类节点修改的时间戳
- Path Summary:$l_1$到$l_2$的时间戳变化的函数
- 可以保证若两位置存在多条路径,它们的summary必然不一样,其中一条总会比另一台更早产生调整后的时间戳
- 可以找到最小的path summary,将路径记为$\psi [l_1, l_2]$
- 因此,检测could-result-in,只需检测$\psil_1, l_2 \le t_2$即可
2.3.2. 单线程实现
调度器维护一组活跃的pointstamp,每个元素包含2个计数器:
- Occurrence count(
OC
):未完成的事件发生个数 - Precursor count(
PC
):could-result-in顺序下,前面的pointstamp个数
当顶点产生和撤销事件时,OC
根据下面更新:
v.SendBy(e,m,t)
前,v.NotifyAt(t)
前:OC[(t,e/v)] += 1
v.OnRecv(e,m,t)
后,v.OnNotify(t)
后:OC[(t,e/v)] -= 1
当pointstamp活跃时,PC
根据下面初始化:
- 置为已有could-result-in的活跃pointstamp个数
- 同时,增加当前pointstamp之后的pointstamp
PC
值
当pointstamp不活跃时:
-
OC
值为0,移除活跃pointstamp集合 -
递减之后的pointstamp
PC
值当
PC
值为0,则该pointstamp为frontier,调度器可将任何通知发给它
系统初始化时,在下面位置初始化一个pointstamp:
- 位置为每个输入顶点
- 时间戳为:第一个epoch,以及全为0的loop count
OC
为1,PC
为0
当输入节点的输入完毕时:
- 若epoch
e
完毕,则创建e+1
的pointstamp,删除原有的pointstamp - 通知下游epoch
e
的消息已经输入完毕 - 若输入节点关闭时,删除当前位置的所有pointstamp,允许输入到下游的事件最终可从计算中消失
3. 分布式实现
架构图如下:
架构中包含一组Worker线程:
- 管理一个timely dataflow顶点的分区
- Worker间会通信(本地使用共享内存,远程使用TCP协议)
- 参与分布式的进度跟踪协议,以协调通知的发送
3.1. 数据并行化
如上图:
- 程序定义一个数据流处理的逻辑图,每个顶点代表一个阶段。顶点之间由连接器相连,连接器上可包含分区函数(即上面的
H(m)
) - 执行时,将逻辑图展开成物理图,每个阶段展开成多个顶点,每个连接器展开成多条边
当顶点发送消息给连接器时,通过分区函数,可路由到正确的目的地;若没有分区函数,则路由到本地的顶点。
计算could-result-in关系时,Naiad从物理图上的每个pointstamp $p$进行投影,得到逻辑图的pointstamp $\hat{p}$。虽然投影会造成信息损失,但是根据逻辑图运算可限制计算带来的空间开销。关于pointstamp的投影在后文描述。
3.2. Workers
每个Worker传输消息和通知到timely dataflow graph当前分区的节点上,并优先传输消息(优先级可自定义)。
Worker间使用共享队列通信,没有其它共享状态,保证只有单个线程控制整个执行。
这种情况下,大部分
SendBy
后可马上接上OnRecv
,不需要将执行压入队列,因为它们在同一个Worker上,而只有远程节点的调用会入队,因此队列是小的,且延迟低。
由于允许图有环,顶点支持重入,即一个顶点的OnRecv
可调用另一个顶点的SendBy
。可重入的次数有上限。可重入顶点,可合并消息到OnRecv
,然后调用SendBy
,不需要入队,因此减少内存占用。
3.3. 分布式进度跟踪
3.3.1. 广播“进度更新”
只有一个阶段(所有物理顶点)的pointstamp could-result-in下一个阶段的pointstamp,才能进行通知。因此需要跟踪进度。
跟踪基于单个全局frontier(见2.3.2.),协议实现通过广播OC
更新,并在这个基础上应用了2个优化。
对于每个活跃的pointstamp,每个Worker维护:
- 本地OC:全局OC的本地视图
- 本地PC:由本地OC计算而来
- 本地frontier:使用could-result-in关系算出的本地的pointstamp
当Worker分发事件时,先广播“进度更新”(为$\langle pointstamp, \delta \in \mathbb{Z} \rangle $,第二个根据2.3.2.中的规则产生)。广播必须是FIFO序。当“进度更新”收到时,更新本地OC。
协议包含一个重要的特性:任何本地frontier不会移到全局frontier之前,证明在这里。
3.3.2. 优化
a) 投影Pointstamp
协议维护逻辑图的顶点和边的OC和PC,而不是物理图每个顶点和边。
虽然降低了并发度,但显著降低了广播的数据量。
b) 广播前缓冲
将更新推入缓冲区以累计更新,同一个pointstamp的更新会被合并。
累积更新的条件为(满足下列之一):
- 更新的pointstamp,可由本地frontier could-result-in
- pointstamp对应的顶点净更新数(本地OC、缓冲的更新值和当前Worker已经广播但被收到的更新值之和)严格为正
当更新产生时,先检查是否能累计,若不行,则先清空缓冲区,并发送更新,正数更新必在负数更新之前。
3.4. 容错和可用性
容错实现简单但可扩展:
- 每个有状态顶点实现
Checkpoint()
和Restore()
接口 - 系统会协调,保证所有Worker生成一致的checkpoint
当系统周期性做checkpoint时:
- 先暂停Worker和消息传递线线程
- 传输未完成的
OnRecv
事件以清空消息队列 - 调用每个顶点的
Checkpoint()
当系统恢复时:
- 所有存活线程回滚到上一个持久化的checkpoint
- 失效进程的顶点被重新分配到存活的进程
- 调用
Restore()
,根据checkpoint文件,重建节点状态
3.5. 防止产生微小的落后者(Micro-stragglers)
Micro-straggles和粗粒度批处理系统的落后者有相似之处,但处理方式不同(因为批处理系统无状态,Naiad worker有状态)。
Naiad并不是被动地处理micro-straggler现象,而是主动去避免它。
3.5.1. 网络
Naiad使用TCP协议,而该系统流量往往是突发性的(迭代算法执行时,刚开始是大数据量,但后面的消息大小会较小),默认的TCP配置会有问题:
- Nagle算法造成传输小数据量时,延迟变大(因为小包会被缓冲,直到收到ACK包才会被发出)。因此系统禁用Nagle算法。
- 包丢失时,超时重传的时间默认300ms,过长,因此设置成20ms
3.5.2. 数据结构带来的竞争
大部分数据结构(如顶点状态),只有1个线程访问,因此没有竞争。
协调时,需要传输消息,Naiad使用.NET并发队列和轻量自旋锁,当竞争检测到时,回退并睡眠1ms(相比于默认配置而言,显著降低了停顿的影响)。
3.5.3. GC
使用.NET并发的mark-and-sweep GC进行回收,并且让系统更少触发GC,且降低停顿的时间:
- Naiad runtime和library尽量避免对象的创建,而使用对象池获取和回收
- 尽量使用值类型(而非引用类型/指针),因为GC的开销和指针的数量成正比
4. 编写Naiad程序
4.1. 实例:迭代MapReduce
// Input stages of the dataflow
var input = controller.NewInput<string>();
// Use LINQ to implement MR
var result = input.SeletMany(y => map(y))
.GroupBy(y => key(y), (k, vs) => reduce(k, vs));
// Output callbacks for each epoch
result.Subscribe(result => {...});
// Supply input data to the query
input.OnNext(/*1st epoch data*/);
input.OnNext(/*2nd epoch data*/);
...
input.OnCompleted();
4.2. Naiad的数据并行模式
上层可使用LINQ构建增量式的并行数据流,隐藏底层的接口。LINQ的操作符也有一定的优化,不需要协调即可运行(如Concat
, Select
, Distinct
等),以提高性能。
此外,Naiad实现了Bloom框架的自己,用于异步计算。LINQ的Where
, Concat
, Distinct
, Join
不会调用NotifyAt()
,只使用这些运算符的子图才会异步运行。增量式的Aggregate
操作符也有实现,用于实现Bloom-style的聚合。Naiad只会在顶点显式要求时,才会进行协调。
最后,Naiad也实现了Pregel bulk sync parallel模型,用于图算法。
4.3. 构建timely dataflow图
Naiad基于timely dataflow,提供一个简单的图构建接口,它是所有Naiad库的基础,但是对上层应用实现特定功能也有用。
图构建包含2步:
- 定义dataflow顶点的行为
- 定义dataflow的拓扑
Naiad stage:
- 一组顶点的集合(由一个顶点工厂定义)
- 包含多个输入和输出,和C#类型联系在一起,以有类型的流进行连接
- 输入可指定分区要求,输入可指定分区保证,这样系统可插入连接器(边)以满足分区要求
- 顶点必须提供
OnRecv()
回调,若要支持stage的通知,必须提供OnNotify()
回调
一般而言,stage的输入必须在其输出之前连接,以防止无效循环。
系统提供LoopContext
对象允许上层定义多个入口、出口和反馈stage,并将其连接到其它计算stage;而只有反馈stage可以在输入之前,将输出连接起来(与上面相反),以保证所有的环都符合dataflow图的约束。