1. Introduction
RDD——弹性分布式数据集,它的特性是:
- 让中间数据的重用更加高效
- 可容错,可并行操作
-
允许用户显式将数据保留到内存,控制分区以优化数据布局,使用丰富的操作符操作它们
-
提供基于粗粒度转换的接口(如
map
,filter
,join
),并通过记录(打日志)这些转换以提高容错若RDD分区丢失,则可从衍生出的RDDs重新计算得到丢失的分区,不需要复制
粗粒度:整个数据集;细粒度:单行/单条数据
2. RDDs
2.1. RDD抽象
RDD是一个只读、分区的记录集合。
RDD可从以下方式创建:
- 存储在稳定存储的数据
- 通过其它RDD转换,如
map
,filter
,join
RDD有很强的容错性,它在任何情况下都不需要被物化(存入稳定存储),因为丢失RDD可从衍生的RDD以及持久化的原始数据恢复。
RDD可被显式被持久化(存入内存)以重用,也可以对数据进行分区(数据可分布到不同结点上)。
2.2. Spark编程接口
Spark通过一个语言集成的API暴露RDD。
用户可从稳定存储中获得最初的RDD,然后:
- 可通过transformation(如
map
,filter
,join
)获得新的RDD - 可通过action(如
count
,collect
,save
等)得到返回值或导出数据 - 可通过persisit持久化RDD,以供重用,默认存入内存
Spark只有第一次调用action时,才会真正计算RDDs
这里:
transformation是一个non-grouping操作:将流转成流
action是一个grouping操作:将流转成表
详见这里
例子:
1
2
3
4
5
6
7
lines = spark.textFile("hdfs://addr:port") // 定义最初的RDD,但不会被缓存
errors = lines.filter(_.startWith("ERROR")) // transformation, 生成新的RDD
errors.persisit() // 持久化该RDD,后面的代码可重用该RDD
cnt = errors.count() // action,此时才开始计算
// 重用errors RDD
errors.filter(_.contains("MySQL")).count()
errors.filter(_.contains("HDFS")).map(_.split("\\s+")(3)).collect()
若errors
RDD丢失数据,可根据血缘关系(lineage,实际上就是根据流水链路关系),从lines
重新计算恢复。
2.3. RDD模型的优势
- 尽管RDD只能通过粗粒度创建/写以及只读,但和DSM(分布式共享内存)比,能有效容错,且不必回滚整个程序(因为可通过血缘关系恢复)
- 由于RDD不可变,因此能够让系统运行备份任务,以缓和慢节点(MapReduce就会这么做,见这里-MapReduce的备用任务),而DSM难以实现
- 批量操作时,RDD运行时可基于数据局部性调度任务,提高性能
- 只在扫描操作时,RDD在内存不足时才会降级,将数据存入磁盘,此时与现有数据并行系统的性能相当
下面是RDD和DSM的对比:
RDDs | 分布式共享内存 | |
---|---|---|
读 | 粗/细粒度 | 细粒度 |
写 | 粗粒度 | 细粒度 |
一致性 | 不重要(不可变) | 取决于应用 |
故障恢复 | 细粒度,使用血缘关系恢复,开销小 | 需要检查点和程序回滚 |
落后任务降灾 | 可以使用任务备份 | 难以实现 |
任务安排 | 基于数据本地化自动分配 | 取决于应用 |
内存不足时行为 | 仅在扫描时,自动降级,类似于已有数据流系统 | 性能差(可能造成颠簸) |
2.4. 不适合RDD的应用
RDD最适合批处理。
而对共享状态进行细粒度的异步更新操作,如Web应用存储、Web增量爬虫,不太适合。
3. Spark编程模型
Spark给RDD提供了一个Scala的API。
集群分为2个角色:
- 用户的Driver:
- 连接Worker集群
- 定义RDD,并在RDD上调用操作
- 跟踪记录RDD的血缘关系
- 执行任务的Worker:
- 长期运行的进程
- 存储RDD分区数据于内存中
而传给RDD的操作是通过传递函数闭包实现,函数闭包通过Java对象表示,可被序列号,因此可通过网络将其传入到另一个节点上。
3.1. Spark的RDD操作
3.2. 应用
Logistic Regression:
1
2
3
4
5
6
7
8
val points = spark.textFile(...).map(parsePoint).persist()
var w = // random initial vector
for (i <- 1 to ITERATIONS) {
val gradient = points.map { p =>
p.x * (1/(1+exp(-p.y*(w dot p.x)))-1) * p.y
}.reduce((a, b) => a + b)
w -= gradient
}
PageRank:
1
2
3
4
5
6
7
8
9
10
val links = spark.textFile(...).map(...).persist()
var ranks = // RDD of (URL, rank) pairs
for (i <- 1 to ITERATIONS) {
val contribs = links.join(ranks).flatMap {
(url, (link, rank)) =>
links.map(dest => (dest, rank/links.size))
}
ranks = contribs.reduceByKey((x, y) => x + y)
.mapValues(sum => a/N + (1-a)*sum)
}
此外,
partitionBy(func)
可定义数据的分区,以优化通信
4. RDD的表示
使用一个通用接口,以表示每个RDD,接口表达下面的信息:
- 一组分区:数据集的原子片段
- 一组依赖:表示一组父RDD
- 数据分区函数
- 分区方案的元数据
- 数据存储的位置
4.1. RDD依赖
接口中,依赖是个重要的概念,包括2类:
- 窄依赖:父RDD的每个分区至多被一个子RDD的分区使用
- 宽依赖:父RDD的每个分区,可能被多个子RDD的分区使用
这种区分是有用的:
- 若是窄依赖,可以考虑将流水在一个节点上运行
- 窄依赖让节点恢复更加有效,而宽依赖的一个故障节点可能导致所有祖先需要重新计算
4.2. 部分RDD的分区实现
- HDFS files:
partitions()
返回文件的每个块的分区的列表,而块的偏移量存于每个分区对象中 map
:返回一个MappedRDD
对象,和父RDD拥有一样的分区和首选位置union
:返回一个RDD对象,它的分区是各自父RDD的分区,满足窄依赖sample
:和map
类似,除了RDD会为每个分区保存一个随机数种子,以确定性地采样父RDD的记录join
:可能产生2个窄依赖(若它们有相同的分区),也可能产生2个宽依赖,也可能都存在
5. Spark实现
5.1. 任务调度
Spark任务调度使用了第4节中RDD表示的接口信息,且考虑了持久化的RDD哪些分区在内存中可用。
当用户执行action后,调度器会跟踪RDD的血缘关系,建立由stage组成的DAG,然后根据拓扑排序执行。
每个stage会尽量包含多个窄依赖,而每个stage的边界是宽依赖带来的shuffle操作(和MapReduce类似)
调度器根据数据局部性,使用延迟调度,分配任务。若一个节点有足够的内存处理一个分区的数据,就把任务分配给它;若一个RDD拥有preferredLocations()
,则分配给这些对应的节点。
对于宽依赖,中间的结果会被物化(保存在内存中)在拥有父RDD分区的节点上,这与MapReduce将map
结果物化类似。
对于错误处理,若父stage可用,则挑选新节点进行恢复,否则重新提交任务并行计算丢失的分区(不必全部计算)。调度器容错可通过复制RDD血缘关系实现。
5.2. 解释器集成
Scala也包含了一个解释器,而它会把用户的输入编译成一个类,加载到JVM中,之后调用类的函数。
Spark中,对解释器进行了2个改变:
- 解释器创建的类的字节码,通过HTTP协议传输给Worker
- 修改代码生成逻辑,能直接引用每行生成的类的实例,以实现闭包
5.3. 内存管理
Spark提供3种方式持久化数据:
- 反序列化成Java对象到堆内内存(最快,JVM能访问本机每个RDD元素)
- 保存序列化的数据到堆外内存(稍差,内存空间有限时,一种更有效的存储方式)
- 存储到磁盘(最慢,对象太大无法存入内存,每次使用都要重新计算)
当内存有限时,对RDD分区实行LRU替换策略,除非新创建的RDD分区和即将淘汰的分区是相同的。此外,使用RDD持久化优先级,让用户进一步控制RDD。
5.4. Checkpoint的支持
血缘关系能用于恢复RDD,但血缘关系链过长时,恢复会很耗时。
因此可对定期对RDD执行checkpoint,将快照存入稳定存储,以提高恢复速度。这在长链中有用,但对于短链以及窄依赖RDD就不划算。
Spark提供一个API(给persist()
传入REPLICATE
标识),把checkpoint哪些数据的决定权交给了用户。
由于RDD是只读的,因此checkpoint更加简单。因为没有一致性问题,RDD的数据可以直接后台写出,不需要程序暂停或者达成分布式快照的共识。