1. 概要
上一篇讲了etcd-raft的集群配置变更。本文顺着论文讲快照。
2. 快照内容
etcd-raft的快照内容会这么选择:
- 假如
unstable
中存在了快照,返回它保存的 - 否则返回
Storage
中的快照
一般而言会取第二个。
而快照内容包含:
- 快照数据
- 快照元数据,包含
- 集群配置信息
- 最后一条日志的索引和任期
这部分和Raft论文一模一样。
3. 何时触发快照发送
etcd-raft的快照请求类型是MsgSnap
,发送这个消息在maybeSendAppend
方法中,即复制日志给Follower的时候。
复制之前,会根据Follower的Next
索引(即Follower缺失的第一项日志)捞取所有需要同步的日志,若操作出错,则会触发快照发送。
操作出错的条件为:
- 获取
prevLogTerm
出错,出错可能性为:- 传入的索引
pr.Next - 1
过小,不保存在Raft日志中,数据已被压缩,返回ErrCompacted
- 传入的索引
pr.Next - 1
过大,日志项不存在,返回ErrUnavailable
- 传入的索引
- 获取日志项
entries
(从pr.Next
往后的)出错,出错的可能性也和1类似
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
pr := r.prs.Progress[to] // Follower的日志进度追踪
// ...
m := pb.Message{}
m.To = to
// 根据Follower的Next捞取: 1. 日志entries; 2. prevLogTerm
term, errt := r.raftLog.term(pr.Next - 1)
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
if errt != nil || erre != nil {
// 上面操作出错,则发送快照
// ...
m.Type = pb.MsgSnap
snapshot, err := r.raftLog.snapshot()
// ...
// 消息内容
m.Snapshot = snapshot // 快照数据data
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term // 快照的lastIncludedIndex和lastIncludedTerm
// ...
pr.BecomeSnapshot(sindex)
// ...
} else {
// ...
}
r.send(m) // 发送消息,这里是快照
return true
}
4. Leader发送快照
从3中可以看到Leader发送快照的内容,包含了:
- 快照数据,包含:
- 具体数据,即论文中的
data
- 快照包含的最后的索引,即论文中的
lastIncludedIndex
- 快照包含的最后的任期,即论文中的
lastIncludedTerm
- 配置信息
- 具体数据,即论文中的
- 任期,即论文的
Term
这部分可见第2节,具体是Snapshot
和SnapshotMetadata
。
此外还会标记该Follower的复制状态为StateSnapshot
。
在etcd-raft中,并没有实现数据的分块传输(所以没有offset
, done
等字段),这部分可以由上层实现。
5. Follower处理快照
Follower收到了MsgSnap
消息后,首先需要统一做Term
的检查:
- 若消息的
Term
更大,则降级为Follower,并且更新Term
,设置消息发送方为Leader,然后执行下一步(Rules of Server (All): 规则2) - 若消息的
Term
更小,则忽略快照,直接返回,不响应任何消息(部分实现InstallSnapshot:规则1,因为没有返回自己的任期)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
switch {
// ...
case m.Term > r.Term:
// ...
switch {
// ...
default:
// ...
// 若消息携带的Term大,降级为Follower并设置Leader为发送方
if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
r.becomeFollower(m.Term, m.From)
} else {
r.becomeFollower(m.Term, None)
}
}
case m.Term < r.Term:
// ...
} else {
// 若消息携带的Term小,直接忽略,没有任何响应
r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
r.id, r.Term, m.Type, m.From, m.Term)
}
return nil
}
// ...
}
下一步就会进入stepFollower
中,它会:
- 将选主计时器归零,并设置发送方为Leader,即将其视作心跳
- 然后尝试处理快照
处理快照部分在handleSnapshot
中,核心在于restore
方法:
- 若快照包含的最新数据已被Follower提交,直接返回(InstallSnapshot: 规则5)
- 若快照包含的最新数据已被Follower保存(但没提交),直接提交到快照中的索引位置,然后直接返回(InstallSnapshot: 规则5 + 部分InstallSnapshot: 规则6)
- 这里只部分实现了InstallSnapshot: 规则6:
- 原文:需要保存快照,并保留之后的日志,删除之前的日志
- etcd-raft:没有保留快照,仅做了提交,日志全部保留
- 这里只部分实现了InstallSnapshot: 规则6:
- 应用快照数据,它会清空所有日志,并更新提交索引(InstallSnapshot: 规则5 + InstallSnapshot: 规则7)
- 上层应用快照到状态机可通过轮询
Ready
实现(InstallSnapshot: 规则8)
InstallSnapshot: 规则2~4是用于分段传输快照,etcd-raft内部没有实现,所以忽略
返回的消息直接复用了MsgAppResp
,它包含自己已经提交的日志索引和自己的任期,供Leader更新Follower的日志进度。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (r *raft) handleSnapshot(m pb.Message) {
sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
if r.restore(m.Snapshot) {
r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
r.id, r.raftLog.committed, sindex, sterm)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
} else {
r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
r.id, r.raftLog.committed, sindex, sterm)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
}
}
func (r *raft) restore(s pb.Snapshot) bool {
// 1. 若快照包含的最新日志索引已被提交,直接返回
if s.Metadata.Index <= r.raftLog.committed {
return false
}
// ... 进一步校验,这里保证当前是Follower且自己必须包含在快照保存的集群配置中 ...
// 2. 若快照包含的最新日志已经在Follower保存,则直接提交到快照对应的索引,直接返回
if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
// ...
r.raftLog.commitTo(s.Metadata.Index)
return false
}
// 3. 从快照恢复,它会删除所有的日志,并保存快照数据
r.raftLog.restore(s)
// 4. 从给定快照中恢复集群状态
r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
cfg, prs, err := confchange.Restore(confchange.Changer{
Tracker: r.prs,
LastIndex: r.raftLog.lastIndex(),
}, cs)
// ...
assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
pr := r.prs.Progress[r.id]
pr.MaybeUpdate(pr.Next - 1) // TODO(tbg): this is untested and likely unneeded
// ...
return true
}
6. Leader处理快照响应
Follower返回的就是MsgAppResp
响应,这部分和复制日志时的处理一模一样,这里不再说明。
7. 总结
etcd-raft对于Raft快照的实现总体也是按照论文的,不过也有不同:
- 对于InstallSnapshot: 规则1,选择了不响应,而非原文的有响应
- 对于InstallSnapshot: 规则6:选择了保留旧日志并执行提交操作,而非原文的删除旧日志
- 复用了
MsgAppResp
- 没有实现分段传输,它交给上层完成
下一篇文章说明论文最后的线性一致读的部分。