1. Programming Model
Map
:将输入转化为一个中间的键值对,之后MapReduce库将这些键值对根据键聚合在一起,给Reduce
输入;
Reduce
:输入聚合好的键值对,输出一个更小的结果(结果可为空)。
流程:input kv --map--> (k,v) --groupByKey/shuffle--> (k, list(v)) --reduce--> list(v')
2. 实现
2.1. 执行过程
-
将输入分片成$M$份(给
Map
节点),然后启动集群上的节点 -
有一个节点是特殊的——Master节点,其它的节点是Worker节点,任务分配(
Map
&Reduce
)受Master领导 -
Map
阶段,Worker节点读取输入分片,将输入的键值对传入map(k, v)
中输出中间键值对,保存在内存 -
中间的键值对会周期性地写入本地磁盘,而存储会被分区成$R$份。
键值对存储的位置也会传回给Master,用于传给执行
Reduce
的Worker节点 -
Reduce
阶段,Worker节点根据Master提供的位置,读取中间键值对,读取完毕后,进行排序和聚合(group-by-key) -
Reduce worker节点遍历排序且聚合过的键值对数据(
(k, list(v))
),传给reduce(k, list(v))
函数,在对应分区上(不是本地,实际是全局对应的分区)输出对应的结果 -
任务完成后,Master节点唤醒用户程序,继续执行下面的代码
输出的$R$份文件通常不需合并
2.2. Master的数据结构
- 存储节点任务状态(
idle
,in-progress
,complete
)以及非空闲机器的标识 - 存储
Map
阶段产生的$R$个中间数据的大小和位置,以推给Reduce
的Worker(可实现增量运行,不必等待所有的Map
都运行结束)
2.3. 容错
2.3.1. Worker失效
Master心跳检测Worker是否存活,若超时则标记它为失效。
进行中的节点:失效后,状态重置为idle
(因此可被调度),Master会重试这些任务(若为进行中的Map
)
已完成任务的节点:
Map
必须重新执行(因为结果存储在本地),重新执行要被通知到所有Reduce
的Worker(任何没从之前Worker读取的Reduce
任务将从新Worker读取)Reduce
不用重新执行(因为结果存储在全局文件系统)
2.3.2. Master失效
将Master保存的数据结构周期性写入磁盘,然后设置checkpoint,恢复时从checkpoint后恢复。
然而Master失效是集群的单点故障,因此常常得终止MapReduce运算。
2.3.3. 失效的处理机制
通常情况下,若map
&reduce
是确定的,则分布式的输出和程序顺序输出是一样的(满足顺序一致)。MapReduce使用原子提交完成这个特性:
Map
:先生成$R$个临时文件,完成后,给Master发送这$R$个文件的信息Reduce
:先生成一个临时文件(因为同一分区),完成时,利用底层文件系统重命名的原子性保证这个特性
若map
和reduce
不确定,输出的只能保证和某一个顺序执行序列相同。
2.4. 存储位置
尽量把输入数据存储在机器的本地磁盘上(GFS管理),多节点会有多份拷贝(通常为3份)。输入数据尽量从本地获取,若失败则从临近副本中拷贝。
2.5. 任务粒度
$M$和$R$应该需要比物理节点要多,这样节点可执行不同任务,提高集群动态负载均衡能力,加快恢复速度。此外,$R$通常要比$M$更小。
时空复杂度上,Master要执行$O(M+R)$次调度,存储$O(MR)$个状态 。
2.6. 备用任务
对于“落伍者”(执行过慢者),当MapReduce接近完成时,Master会调度备用的任务进程执行剩下正在处理中的任务。
无论落伍者还是备用进程完成,都可以标记任务完成。
3. 使用技巧
3.1. 分区函数
默认是hash(key) mod R
,然而有时候其它的一些分区函数更加有效。(如URL分区,使用hash(hostname(url)) mod R
将来自同一主机的URL分到一个分区上)。
3.2. 顺序保证
中间键值对的处理顺序按照键增量顺序处理,它保证每个输出文件都是有序的(按照键排序)。
3.3. Combiner
函数
Combiner
函数本质和Reduce
之前的预处理(排序和聚合)类似。
它应用在每个Map
节点,将中间键值对先做一次排序和聚合,以减小Reduce
节点的负担和网络开销。
3.4. 输入/输出类型
输入输出的数据格式(类型)可通过接口自定义(如Reader
,Writer
),输出的位置也可以变化(如存储在数据库中)。
3.5. 副作用
MapReduce允许用户额外的数据,产生多个文件,但这种情况下并不提供多个文件的原子操作支持(类似2PC)。在这种场景下,若对一致性有要求,则需要注意。
3.6. 跳过损坏记录
MapReduce会检测哪些记录导致确定性的崩溃,然后可跳过记录不处理。
实现:
- 每个Worker检测设置了信号处理函数捕获异常(如段错误,总线错误)
- MapReduce库通过全局遍历保存记录序号,若触发错误,则通过UDP包向Master发送最后一条记录的序号
- 若Master看到记录不止失败一次,则标识它要跳过,下次重新执行时跳过这条记录
3.7. 本地执行
MapReduce提供本地实现版本,用于调试。
3.8. 状态信息
Master使用嵌入式HTTP服务器,显示集群的状态。
3.9. 计数器
MapReduce库提供(命名)计数器统计不同事件的发生次数。
计数器的值周期性上传给Master(通过心跳捎带),Master将计数器的值累计。当MapReduce操作完成后,将其返回给用户代码。
有些计数器MapReduce库自动维持。计数器对完整性检查很有用。
注意计数器计数需要考虑幂等性。
4. MIT-6.824 Lab 1
4.1. Part 1
只需查库就行,用JSON格式序列化/反序列化。
对于一个mapper
,生成$R$个文件供reducer
使用;对于一个reducer
,从$M$个文件(由mapper
向该reducer
分区生成的)中读入。
我在执行reduceF()
前,先进行了排序,这是可选的。
4.2. Part 2
抄论文即可。
4.3. Part 3 & 4
创建goroutine
。由一个死循环构成,总流程是:
- 从
registerChan
读取地址 - 创建RPC请求参数,
reduce
时,文件属性不要赋值 call()
调用RPC,获取是否成功ok
- 若成功,则把地址重新传回
registerChan
,供下一次使用,跳出循环 - 若失败,则把地址丢弃,重试(什么都不做,进行下一步循环)
- 若成功,则把地址重新传回
可见,流程不会感知Worker
恢复的,挂了就永远不可用了。
此外,还有卡死的问题,下面的两个指令不能对调,否则会卡死:
waitGroup.Done()
registerChan <- addr
设有2个
Worker
,若Worker1
和Worker2
赋上最后2个任务task98
,task99
。
Worker1
完成,将地址传入registerChan
,然后执行waitGroup.Done()
退出Worker2
完成,传地址传不过去,卡死,调不到waitGroup.Done()
- 主线程就僵死了,因为在
waitGroup.Wait()
解决办法:
- 调换位置,主线程可以退出,但仍有
goroutine
会僵住- 增加
registerChan
的缓冲大小
4.4. Inverted Index
主要注意的是document
的去重,放在mapper
和reducer
都可以。