论文阅读-MapReduce

Posted by keys961 on March 18, 2019

1. Programming Model

Map:将输入转化为一个中间的键值对,之后MapReduce库将这些键值对根据键聚合在一起,给Reduce输入;

Reduce:输入聚合好的键值对,输出一个更小的结果(结果可为空)。

流程:input kv --map--> (k,v) --groupByKey/shuffle--> (k, list(v)) --reduce--> list(v')

2. 实现

2.1. 执行过程

  1. 将输入分片成$M$份(给Map节点),然后启动集群上的节点

  2. 有一个节点是特殊的——Master节点,其它的节点是Worker节点,任务分配(Map&Reduce受Master领导

  3. Map阶段,Worker节点读取输入分片,将输入的键值对传入map(k, v)中输出中间键值对,保存在内存

  4. 中间的键值对会周期性地写入本地磁盘,而存储会被分区成$R$份

    键值对存储的位置也会传回给Master,用于传给执行Reduce的Worker节点

  5. Reduce阶段,Worker节点根据Master提供的位置,读取中间键值对,读取完毕后,进行排序和聚合(group-by-key)

  6. Reduce worker节点遍历排序且聚合过的键值对数据((k, list(v))),传给reduce(k, list(v))函数,在对应分区上(不是本地,实际是全局对应的分区)输出对应的结果

  7. 任务完成后,Master节点唤醒用户程序,继续执行下面的代码

输出的$R$份文件通常不需合并

2.2. Master的数据结构

  • 存储节点任务状态(idle, in-progress,complete)以及非空闲机器的标识
  • 存储Map阶段产生的$R$个中间数据的大小和位置,以推给Reduce的Worker(可实现增量运行,不必等待所有的Map都运行结束)

2.3. 容错

2.3.1. Worker失效

Master心跳检测Worker是否存活,若超时则标记它为失效。

进行中的节点:失效后,状态重置为idle(因此可被调度),Master会重试这些任务(若为进行中的Map

已完成任务的节点

  • Map必须重新执行(因为结果存储在本地),重新执行要被通知到所有Reduce的Worker(任何没从之前Worker读取的Reduce任务将从新Worker读取)
  • Reduce不用重新执行(因为结果存储在全局文件系统)

2.3.2. Master失效

将Master保存的数据结构周期性写入磁盘,然后设置checkpoint,恢复时从checkpoint后恢复。

然而Master失效是集群的单点故障,因此常常得终止MapReduce运算。

2.3.3. 失效的处理机制

通常情况下,若map&reduce是确定的,则分布式的输出和程序顺序输出是一样的(满足顺序一致)。MapReduce使用原子提交完成这个特性:

  • Map:先生成$R$个临时文件,完成后,给Master发送这$R$个文件的信息
  • Reduce:先生成一个临时文件(因为同一分区),完成时,利用底层文件系统重命名的原子性保证这个特性

mapreduce不确定,输出的只能保证和某一个顺序执行序列相同。

2.4. 存储位置

尽量把输入数据存储在机器的本地磁盘上(GFS管理),多节点会有多份拷贝(通常为3份)。输入数据尽量从本地获取,若失败则从临近副本中拷贝。

2.5. 任务粒度

$M$和$R$应该需要比物理节点要多,这样节点可执行不同任务,提高集群动态负载均衡能力,加快恢复速度。此外,$R$通常要比$M$更小。

时空复杂度上,Master要执行$O(M+R)$次调度,存储$O(MR)$个状态 。

2.6. 备用任务

对于“落伍者”(执行过慢者),当MapReduce接近完成时,Master会调度备用的任务进程执行剩下正在处理中的任务。

无论落伍者还是备用进程完成,都可以标记任务完成。

3. 使用技巧

3.1. 分区函数

默认是hash(key) mod R,然而有时候其它的一些分区函数更加有效。(如URL分区,使用hash(hostname(url)) mod R将来自同一主机的URL分到一个分区上)。

3.2. 顺序保证

中间键值对的处理顺序按照键增量顺序处理,它保证每个输出文件都是有序的(按照键排序)。

3.3. Combiner函数

Combiner函数本质和Reduce之前的预处理(排序和聚合)类似。

它应用在每个Map节点,将中间键值对先做一次排序和聚合,以减小Reduce节点的负担和网络开销。

3.4. 输入/输出类型

输入输出的数据格式(类型)可通过接口自定义(如Reader,Writer),输出的位置也可以变化(如存储在数据库中)。

3.5. 副作用

MapReduce允许用户额外的数据,产生多个文件,但这种情况下并不提供多个文件的原子操作支持(类似2PC)。在这种场景下,若对一致性有要求,则需要注意。

3.6. 跳过损坏记录

MapReduce会检测哪些记录导致确定性的崩溃,然后可跳过记录不处理。

实现

  • 每个Worker检测设置了信号处理函数捕获异常(如段错误,总线错误)
  • MapReduce库通过全局遍历保存记录序号,若触发错误,则通过UDP包向Master发送最后一条记录的序号
  • 若Master看到记录不止失败一次,则标识它要跳过,下次重新执行时跳过这条记录

3.7. 本地执行

MapReduce提供本地实现版本,用于调试。

3.8. 状态信息

Master使用嵌入式HTTP服务器,显示集群的状态。

3.9. 计数器

MapReduce库提供(命名)计数器统计不同事件的发生次数。

计数器的值周期性上传给Master(通过心跳捎带),Master将计数器的值累计。当MapReduce操作完成后,将其返回给用户代码。

有些计数器MapReduce库自动维持。计数器对完整性检查很有用。

注意计数器计数需要考虑幂等性

4. MIT-6.824 Lab 1

4.1. Part 1

只需查库就行,用JSON格式序列化/反序列化。

对于一个mapper,生成$R$个文件供reducer使用;对于一个reducer,从$M$个文件(由mapper向该reducer分区生成的)中读入。

我在执行reduceF()前,先进行了排序,这是可选的。

4.2. Part 2

抄论文即可。

4.3. Part 3 & 4

创建goroutine。由一个死循环构成,总流程是:

  • registerChan读取地址
  • 创建RPC请求参数,reduce时,文件属性不要赋值
  • call()调用RPC,获取是否成功ok
    • 若成功,则把地址重新传回registerChan,供下一次使用,跳出循环
    • 若失败,则把地址丢弃,重试(什么都不做,进行下一步循环)

可见,流程不会感知Worker恢复的,挂了就永远不可用了。

此外,还有卡死的问题,下面的两个指令不能对调,否则会卡死:

waitGroup.Done()
registerChan <- addr

设有2个Worker,若Worker1Worker2赋上最后2个任务task98,task99

  • Worker1完成,将地址传入registerChan,然后执行waitGroup.Done()退出
  • Worker2完成,传地址传不过去,卡死,调不到waitGroup.Done()
  • 主线程就僵死了,因为在waitGroup.Wait()

解决办法:

  • 调换位置,主线程可以退出,但仍有goroutine会僵住
  • 增加registerChan的缓冲大小

4.4. Inverted Index

主要注意的是document的去重,放在mapperreducer都可以。