论文阅读-RDD(Resilient Distributed Datasets):一种内存式集群计算的可容错的数据抽象

Posted by keys961 on April 15, 2019

1. Introduction

RDD——弹性分布式数据集,它的特性是:

  • 让中间数据的重用更加高效
  • 可容错,可并行操作
  • 允许用户显式将数据保留到内存,控制分区以优化数据布局,使用丰富的操作符操作它们

  • 提供基于粗粒度转换的接口(如map,filter,join),并通过记录(打日志)这些转换以提高容错

    若RDD分区丢失,则可从衍生出的RDDs重新计算得到丢失的分区,不需要复制

    粗粒度:整个数据集;细粒度:单行/单条数据

2. RDDs

2.1. RDD抽象

RDD是一个只读、分区的记录集合

RDD可从以下方式创建:

  • 存储在稳定存储的数据
  • 通过其它RDD转换,如map,filter,join

RDD有很强的容错性,它在任何情况下都不需要被物化(存入稳定存储),因为丢失RDD可从衍生的RDD以及持久化的原始数据恢复。

RDD可被显式被持久化(存入内存)以重用,也可以对数据进行分区(数据可分布到不同结点上)。

2.2. Spark编程接口

Spark通过一个语言集成的API暴露RDD。

用户可从稳定存储中获得最初的RDD,然后:

  • 可通过transformation(如map, filter, join)获得新的RDD
  • 可通过action(如count, collect, save等)得到返回值或导出数据
  • 可通过persisit持久化RDD,以供重用,默认存入内存

Spark只有第一次调用action时,才会真正计算RDDs

这里:

  • transformation是一个non-grouping操作:将流转成流

  • action是一个grouping操作:将流转成表

详见这里

例子

1
2
3
4
5
6
7
lines = spark.textFile("hdfs://addr:port") // 定义最初的RDD,但不会被缓存
errors = lines.filter(_.startWith("ERROR")) // transformation, 生成新的RDD
errors.persisit() // 持久化该RDD,后面的代码可重用该RDD
cnt = errors.count() // action,此时才开始计算
// 重用errors RDD
errors.filter(_.contains("MySQL")).count()
errors.filter(_.contains("HDFS")).map(_.split("\\s+")(3)).collect()

errorsRDD丢失数据,可根据血缘关系(lineage,实际上就是根据流水链路关系),从lines重新计算恢复。

2.3. RDD模型的优势

  • 尽管RDD只能通过粗粒度创建/写以及只读,但和DSM(分布式共享内存)比,能有效容错,且不必回滚整个程序(因为可通过血缘关系恢复)
  • 由于RDD不可变,因此能够让系统运行备份任务,以缓和慢节点(MapReduce就会这么做,见这里-MapReduce的备用任务),而DSM难以实现
  • 批量操作时,RDD运行时可基于数据局部性调度任务,提高性能
  • 只在扫描操作时,RDD在内存不足时才会降级,将数据存入磁盘,此时与现有数据并行系统的性能相当

下面是RDD和DSM的对比:

  RDDs 分布式共享内存
粗/细粒度 细粒度
粗粒度 细粒度
一致性 不重要(不可变) 取决于应用
故障恢复 细粒度,使用血缘关系恢复,开销小 需要检查点和程序回滚
落后任务降灾 可以使用任务备份 难以实现
任务安排 基于数据本地化自动分配 取决于应用
内存不足时行为 仅在扫描时,自动降级,类似于已有数据流系统 性能差(可能造成颠簸)

2.4. 不适合RDD的应用

RDD最适合批处理

而对共享状态进行细粒度的异步更新操作,如Web应用存储、Web增量爬虫,不太适合。

3. Spark编程模型

Spark给RDD提供了一个Scala的API。

集群分为2个角色:

  • 用户的Driver:
    • 连接Worker集群
    • 定义RDD,并在RDD上调用操作
    • 跟踪记录RDD的血缘关系
  • 执行任务的Worker:
    • 长期运行的进程
    • 存储RDD分区数据于内存中

cluster

而传给RDD的操作是通过传递函数闭包实现,函数闭包通过Java对象表示,可被序列号,因此可通过网络将其传入到另一个节点上。

3.1. Spark的RDD操作

op

3.2. 应用

Logistic Regression:

1
2
3
4
5
6
7
8
val points = spark.textFile(...).map(parsePoint).persist()
var w = // random initial vector
for (i <- 1 to ITERATIONS) {
    val gradient = points.map { p =>
        p.x * (1/(1+exp(-p.y*(w dot p.x)))-1) * p.y
    }.reduce((a, b) => a + b)
    w -= gradient
}

PageRank:

1
2
3
4
5
6
7
8
9
10
val links = spark.textFile(...).map(...).persist()
var ranks = // RDD of (URL, rank) pairs
for (i <- 1 to ITERATIONS) {
    val contribs = links.join(ranks).flatMap {
        (url, (link, rank)) => 
        links.map(dest => (dest, rank/links.size))
    }
    ranks = contribs.reduceByKey((x, y) => x + y)
    	.mapValues(sum => a/N + (1-a)*sum)
}

pagerank

此外,partitionBy(func)可定义数据的分区,以优化通信

4. RDD的表示

使用一个通用接口,以表示每个RDD,接口表达下面的信息:

  • 一组分区:数据集的原子片段
  • 一组依赖:表示一组父RDD
  • 数据分区函数
  • 分区方案的元数据
  • 数据存储的位置

repRdds

4.1. RDD依赖

接口中,依赖是个重要的概念,包括2类:

  • 窄依赖:父RDD的每个分区至多被一个子RDD的分区使用
  • 宽依赖:父RDD的每个分区,可能被多个子RDD的分区使用

这种区分是有用的:

  • 若是窄依赖,可以考虑将流水在一个节点上运行
  • 窄依赖让节点恢复更加有效,而宽依赖的一个故障节点可能导致所有祖先需要重新计算

4.2. 部分RDD的分区实现

  • HDFS files:partitions()返回文件的每个块的分区的列表,而块的偏移量存于每个分区对象中
  • map:返回一个MappedRDD对象,和父RDD拥有一样的分区和首选位置
  • union:返回一个RDD对象,它的分区是各自父RDD的分区,满足窄依赖
  • sample:和map类似,除了RDD会为每个分区保存一个随机数种子,以确定性地采样父RDD的记录
  • join:可能产生2个窄依赖(若它们有相同的分区),也可能产生2个宽依赖,也可能都存在

dependency

5. Spark实现

5.1. 任务调度

Spark任务调度使用了第4节中RDD表示的接口信息,且考虑了持久化的RDD哪些分区在内存中可用。

当用户执行action后,调度器会跟踪RDD的血缘关系,建立由stage组成的DAG,然后根据拓扑排序执行。

每个stage会尽量包含多个窄依赖,而每个stage的边界是宽依赖带来的shuffle操作(和MapReduce类似)

调度器根据数据局部性,使用延迟调度,分配任务。若一个节点有足够的内存处理一个分区的数据,就把任务分配给它;若一个RDD拥有preferredLocations(),则分配给这些对应的节点。

对于宽依赖,中间的结果会被物化(保存在内存中)在拥有父RDD分区的节点上,这与MapReduce将map结果物化类似。

对于错误处理,若父stage可用,则挑选新节点进行恢复,否则重新提交任务并行计算丢失的分区(不必全部计算)。调度器容错可通过复制RDD血缘关系实现。

5.2. 解释器集成

Scala也包含了一个解释器,而它会把用户的输入编译成一个类,加载到JVM中,之后调用类的函数。

Spark中,对解释器进行了2个改变:

  • 解释器创建的类的字节码,通过HTTP协议传输给Worker
  • 修改代码生成逻辑,能直接引用每行生成的类的实例,以实现闭包

5.3. 内存管理

Spark提供3种方式持久化数据:

  • 反序列化成Java对象到堆内内存(最快,JVM能访问本机每个RDD元素)
  • 保存序列化的数据到堆外内存(稍差,内存空间有限时,一种更有效的存储方式)
  • 存储到磁盘(最慢,对象太大无法存入内存,每次使用都要重新计算)

当内存有限时,对RDD分区实行LRU替换策略,除非新创建的RDD分区和即将淘汰的分区是相同的。此外,使用RDD持久化优先级,让用户进一步控制RDD。

5.4. Checkpoint的支持

血缘关系能用于恢复RDD,但血缘关系链过长时,恢复会很耗时。

因此可对定期对RDD执行checkpoint,将快照存入稳定存储,以提高恢复速度。这在长链中有用,但对于短链以及窄依赖RDD就不划算。

Spark提供一个API(给persist()传入REPLICATE标识),把checkpoint哪些数据的决定权交给了用户。

由于RDD是只读的,因此checkpoint更加简单。因为没有一致性问题,RDD的数据可以直接后台写出,不需要程序暂停或者达成分布式快照的共识。