Stream Systems-Advanced Windowing

Posted by keys961 on December 7, 2018

1. Processing-Time Windows

对于某些特定的用例,如监控输入的数据流(如QPS等),可以使用,但是若应用于与事件发生时间相关的例子(如用户行为等等)时,不能使用。

实现方式大致2种:

  • Trigger: 忽略Event time,然后通过Trigger来提供Processing time上窗口的快照

    例子:每隔1分钟,对Processing time进行窗口切割(实际上把整个Event time域当成一个大窗口)

    1
    
    Window.triggering(Repeatedly(AlignedDelay(ONE_MIN)))
    

    而基于Event time的切割

    1
    2
    
    Window.into(FixedWindows.of(ONE_MIN))
        .triggering(...) // Repeatedly/Watermark...
    
  • Ingress time: 将Event time赋值为数据输入时的时间,然后通过Event time windowing的方式处理

    例子:需要在输入源进行一步预处理,给数据的Event time赋值为Ingress time

    1
    
    PCollection rawData = IO.read().apply(Pardo.of(() -> context.outputWithTimestamp(new Instant())));
    

不过会略有不同,比如多个Stage下,每个Stage会有各自的窗口,两种实现下,值会有不同。

  • Trigger:实际上把整个Event time域当成了一个大窗口,等效对Processing time做切割
  • Ingress time:实际上会生成Watermark,由于使用Ingress time,这个Watermark往往很接近于Ideal watermark(当然它也是一个Perfect watermark)

不过它们都有一个问题:输入的顺序一旦变化,窗口的内容也会发生变化(而Event time windows是不会发生这个问题的,即顺序无关,只要不遗漏数据)

2. Session Windows

Session: 特殊的变长窗口,根据给定的不活跃的时间长度来决定这个窗口时间上界。它是数据驱动的,且并不是对齐的。

  • 可以通过捎带Session ID实现

  • 大多通过由一定数量的交叠的小窗口合并而成,这些小窗口只包含1个消息,长度为定义的不活跃的时间长度

    代码:Window.into(Sessions.withGapDuration(TIME))

    当Watermark向前进行的时候,窗口初始都是小窗口。Watermark不断前进,Trigger不断物化窗口的值时(不论是Early/On-time/Late),会不断进行合并(可见Figure 4-7

    上图中,也可以看出,Early/Late Triggers是很重要的

3. Custom Windowing

以Apache Beam为例,自定义一个窗口主要有2个方面,包括:

  • Window assignment: 可将每个元素放入一个初始的窗口(极限下,每个元素可独享一个窗口)
  • Window merging (Optional): 允许窗口在分组时合并,使得窗口随着时间推移而变化

3.1. Variation on Fixed Windows

Assignment: 根据元素的时间戳,窗口的大小和偏移来决定放入哪个窗口

Merging: 无

注:

下面实例中(即书上)的3个类是作为“基准”的窗口,它们是Aligned Fixed,因为使用了Window.into(XXFixedWindows.of(TIME))

而实际的窗口是通过这3个类的assignWindow()方法,根据输入的元素来给该元素赋上窗口,这个窗口是实际处理时的窗口。

自定义Fixed Windows,通常对Assignment进行自定义

a) Aligned Fixed Windows

作为基准,符合之前的Fixed Windows的定义。

实例代码:

1
2
3
4
5
6
7
8
9
public class FixedWindows ... {
    private final long size;
    private final long offset;
    // 自定义Assignment
    public Collection<IntervalWindow> assignWindow(AssignContext c) {
        long start = c.timestamp() - c.timestamp().plus(size).minus(offset).mod(size);
        return Array.asList(IntervalWindow.of(new Instant(start), size));
    }
}

这里start永远会是某个FixedWindows的起始时间,且作用于数据集里所有的数据

b) Unaligned Fixed Windows

可以通过自定义实现非对其的Window

一种实例代码:

1
2
3
4
5
6
7
8
9
10
public class UnalignedFixedWindows ... {
    private final long size;
    private final long offset;
    // 自定义Assignment
    public Collection<IntervalWindow> assignWindow(AssignContext c) {
        long shift = hash(c.element().key()) % size; //对于不同的key,可能变更了起始点,从而不对齐
        long start = shift + c.timestamp() - c.timestamp().plus(size).minus(offset).mod(size);
        return Array.asList(IntervalWindow.of(new Instant(start), size));
    }
}

上述实例变更了起始点,从而对于不同的key,窗口的位置可能不一样(但对于相同的key,就是Aligned),总体看来,窗口变得不对称。

c) Per-element/key Fixed Windows

可以当每个element/key定制自己的Fixed Windows,拥有自己的长度和起始位置。可以在每个element中捎带一个自己的windowSizekeyShift,如下例子所示:

1
2
3
4
5
6
7
8
9
//... codes omitted with size & offset
public Collection<IntervalWindow> assignWindow(AssignContext c) {
    long perKeyShift = hash(c.element().key()) % size;
    long perElementSize = c.element().windowSize();
    long start = perKeyShift + perElementSize +
        c.timestamp() - c.timestamp().plus(size).minus(offset).mod(size);
    return Array.asList(IntervalWindow.of(new Instant(start), size));
}

上述例子可以看成b)的拓展版,因为它可以根据输入捎带的信息来确定窗口的大小和位置。

注:

之前定义的Fixed Window:大小固定,首尾对齐。

本小节自定义的Fixed Window:对于某类数据而言,窗口大小固定,首尾对齐,但对于整个数据集整体,不能保证。

3.2. Variations on Session Windows

Assignment: 每个元素最初会放在一个初始的Session Window中(从元素的time-stamp开始,延伸到定义的gap duration结束)

Merging: 分组时,所有有效(没关闭)的窗口进行排序,之后将任何重叠的窗口合并在一起

自定义Session Windows,通常对Merging进行自定义。

Bounded Sessions

这类Session Windows表示在之前的基础上,该窗口的还要满足一些条件(比如Event time长度,窗口内部元素个数等),让这个窗口的长度有界,否则就要割开来变成一个新Session。

通常这类会设置一个上限条件,然后再自定义Merging的时候判断条件,书中是重写了mergeWindows()方法(代码略)。

通常Session Windows的定义是无界的,若要设限,下游需要自己做分割,比较讨厌。而通过自定义Merging可以很好实现这个功能。