Stream Systems-Exactly-Once and Side Effects

Posted by keys961 on December 10, 2018

1. Why Exactly-Once

Why must at-least-once:Pipeline中数据丢失是不可接受的,会导致计算错误,因此必须保证Pipeline中的数据一定被处理了

Why at-least-once not enough:At-least-once不能保证去重,而重复的数据会对结果造成不可知的影响,造成结果不准确,所以要Exactly-once(实际上最终效果可看成幂等

因此会有Lambda Architecture出现——Stream & Batch混合

但是其正确性只能适用于可重放的数据流,而现实中不太常见

此外,Lambda Architecture还有一些问题:

  • 不准确:数据有较大比例数据丢失或重复
  • 不一致:流处理和批处理的数据的语义往往不同,不易区分
  • 太复杂:该架构要维护两套系统(批和流)
  • 不可预测:往往流处理得到的结果和批处理的结果不同,且流处理的结果有很强的随机性,导致其不可信
  • 延迟高:不适用于一些需要低延迟且高正确性的应用

2. 明确问题

2.1. Accuracy vs. Completeness

导致系统计算不准确的因素有很多,不过由于延迟数据导致的计算不准确并不在范围内,它是完整性问题,而非准确性问题。

2.2. 问题特征描述

当用户将自己的代码注入到Pipeline中时,往往只能保证这段代码对于某个消息而言At-least-once执行,即当错误发生时,可能被重复执行,即不能保证操作的幂等性

Pipeline中对外发送结果(非幂等),能够添加额外的操作首先将其变成幂等的

2.3. 问题定义

以一个例子说明:

1
2
3
4
5
6
7
8
9
10
11
Pipeline p = Pipeline.create(props);
PCollecion perUserCounts = p.apply(Source.read())
    .apply(new KeyByUser())
    .apply(Window.into(...))
    // 1. Shuffle (before aggregation)
    .apply(Count.perKey()); // 2. Aggregation: Count per key
perUserCounts.apply(new PerUserCountSink()); // 3. Sink: Per user count    
perUserCounts.apply(Values.create())
    .apply(Count.globally()) // 4. Aggregation: Count total
    .apply(new GlobalUserCountSink()); // 5. Sink: Total count
p.run();

这个Pipeline有

  • Shuffle (每个Stage之间都有)
  • 2个Windowed aggregation(即2个Stage)
  • 2个Sink用于输出结果

因此要保证Exactly-once,则有下面3个问题:

  • 如何保证Shuffle阶段每个Record处理有且仅有1次
  • 如何保证数据处理阶段(如做Aggregation)每个Record处理有且仅有1次
  • 如何保证每次Sink都会输出准确的结果(并且处理有且仅有1次)

3. Ensuring Exactly-Once in Shuffle

3.1. RPC Exactly-Once

Shuffle即把某个范围的数据传到特定的Worker上(往往是不同的机器),因此需要使用RPC。所以问题变成了RPC exactly-once。

实现的方式如下(以Google Dataflow为例):

  • 上游实现Upstream backup,若没受到回应消息,数据会被保留并重试(但只能保证at-least-once)
  • 消息回应要捎带状态信息(即成功与否,多数RPC框架都已经实现)
  • 每个消息都要携带一个全局唯一的ID,接收端要记录哪些ID已经被处理,当消息到来时,会检查是否已经处理,以保证这个RPC调用不会重复

3.2. Dealing with Determinism

在Beam Model下,用户注入的代码可能会有非确定的输出(即输入相同,输出可能不同,重试的时候会出现),因而保证不了幂等性。

但是保证用户注入代码输出的确定性也是不可控的。

因此,为了保证exactly once,有下面的策略:

  • Google Dataflow对Shuffle传播操作进行checkpoint记录和检查,并捎带一个唯一的ID(即3.1.所述的),在进入下一个阶段之前(被checkpoint前),这个ID要保证存储到一致性存储中,这保证每一个小步骤的确定性。

    重试过程中,checkpoint后的数据才会被重放,因此用户的不确定性输出的代码不会再次执行。

    这类似于数据库系统的事务(日志)。

  • Google Dataflow使用一致性存储,防止ID被重复写到稳定存储中

3.3. Performance

如前2节所述,保证Shuffle传播需要:

  • 发送端Upstream backup,接收端存储ID并对每个请求进行的检查是否重复
  • ID生成和输出需要被checkpoint并存储到稳定存储中,以保证每个ID都是稳定的

但是这样的策略往往带来巨大性能损失,通常是损失在I/O端,因为上面的操作会带来额外的读写。

因此Google Dataflow带来几种不同的优化方法:Graph Optimization, Bloom Filter, Garbage Collection。

a) Graph Optimization

一条Pipeline可通过流图表示。当Pipeline执行之前,这个图是可以优化/简化,减少I/O对整个Pipeline性能的损失影响。

  • Fusion: 合并操作,将多个逻辑步骤进行合并成一个Stage,减少Stage个数,从而减少Shuffle传播次数,进而减少I/O次数
  • Partial Combining: 对于可结合和可交换的Combine操作(如count()sum()),在Shuffle之前(即传入Group By Key操作前),预先地部分进行这个操作,可显著减少传输的消息数量,从而减少I/O压力

b) Bloom Filters

Bloom Filter: 实际上为一个很长的二进制向量$V$,和一系列随机映射函数$F$,用于检索一个元素是否在一个集合中。

原理:当一个元素加入集合中时,通过$K$个散列函数将元素映射到二进制向量的$V$中的$K$个点,将它们全部置为1。查询时,先通过$K$次散列得到$K$个点的位置:

  • 若这些位置上的点在二进制向量$V$上全为1,则元素很可能存在
  • 若存在一个位置上的点为0,则元素必定不存在

优点:时空效率很高;缺点:删除困难,有误识别率

在Google Dataflow中,每个Worker都有自己Bloom filter来监控ID,查询时:

  • 若返回false,则说明没重复,不需要去查询下一级的存储
  • 若返回true,则说明可能重复,查询下一级存储是否重复

此外,由于数据量上升带来的误报率问题,也可以:

  • 对每个请求捎带时间戳,并对Bloom filter进行窗口化处理(实质上是Processing-time window,如每隔十分钟创建一个新的Bloom filter,实现常使用Ingress timestamp),查询和插入时对某个窗口的Bloom filter进行操作,减少单个Bloom filter的数据量
  • 窗口化的Bloom filter可被系统垃圾回收,使系统资源的占用可控

c) Garbage Collection

ID的存储不能无限存,因此要有垃圾回收。

可有下面几个策略:

  • 发送者除了要发送一个自身的ID之外,还需要捎带一个“最小的没被确认“的ID,这些ID都必须是单调递增的。这样,接收方可以回收哪些小于”最小没被确认“的ID。
  • 窗口化Bloom filter,根据请求捎带的时间戳计算Watermark(并且这是一个Processing time watermark),小于这个Watermark值的数据是可以回收的,因为这些ID已经被处理完(并且其提供的信息可以监测Pipeline的处理速度)

意外情形

假如由于网络缘故,如Network remnants,老的时间戳的请求在网络中卡住很长时间然后突然出现,请求中的ID在系统中已经被垃圾回收了

处理

在系统中,Watermark直到请求成功应答后才能前进,因此上述请求只要忽略即可,因为根据Watermark,这个ID已经被处理过了。

4. Exactly-Once in Sources

Pipeline会将源数据读入,并且通过重试机制,保证每个record被处理exactly-once。

而源数据有确定性和非确定性之分:

  • 确定性:数据的顺序、位置不会变,不管读多少次。比如文件、Apache Kafka Topic等。这类数据源,系统可以自动为每个Record生成唯一ID,以保证Exactly-once。
  • 非确定性:数据的顺序、位置不可预计。比如Google Pub/Sub。因此,需要我们显式声明ID,以保证Exactly-once,否则去重很难实现。

联系第3节,有了唯一ID后,就能保证Shuffle以及后续的操作是Exactly-once,因为其它的对数据本身的操作可以基于Shuffle来时间Exactly-once

5. Exactly-Once in Sinks

Google Dataflow的对外输出不能保证Exactly-once。因此有其它的解决方式:

  • Build-in sinks in Beam SDK: 一个Build-in的Sinks,保证输出一定是Exactly-once的。

  • Reshuffle with side effect operations: 通过Reshuffle和一个side effect操作保证输出是Exactly-once的,前提是side effect是幂等/确定的。(即输出操作出现了不幂等的情况,那么会触发Reshuffle,保证输入是稳定的,并执行side effect操作)

    输入稳定:对于某次操作,多次重试的输入是一样的

    不过这个操作已经不推荐了,可在side effect操作中添加@RequiresStableInput来保证输入的稳定性

6. Other Systems

6.1. Apache Spark

Spark的数据抽象的RDD,作为micro-batch的一个最小单位,并依靠批处理的exactly-once来保证正确性。

Spark假定所有的操作都是幂等的,并且可以沿着图中的某点重放操作链。

它提供Checkpoint原语,不过导致了RDD被物化,保证该RDD之前的不会被重放。

Flink也保证Pipeline processing的exactly-once。

但是它周期性去计算分布式的一致性快照(对于整个Pipeline),并且是传递式的(即先从上游计算快照,然后传递到下游,最后得到整个快照,这样延迟更小)。这个快照是Lamport分布式快照的变体。

Flink使用这个快照来保证exactly-once,若任意的一个操作有错误,就会回滚到上一个快照,然后重放。

快照也可以作为一个类似的Checkpoint,任何不在快照内的都表示”没有完成“

局限性有:

  • 假定错误是很少的
  • 快照计算比较快
  • Worker的分配是静态的