1. 概要
上一篇讲了etcd-raft的选主协议,这次探究一下另一个重要协议:复制和心跳。
2. 日志复制
选主好了后,Leader就可以同步日志到Follower上。这里涉及到的消息类型有:MsgProp
, MsgApp
, MsgAppResp
。
2.1. Leader追加与广播日志
Leader上层可通过Node
接口的Propose
方法追加日志。追加的消息类型是MsgProp
,这部分实现在RawNode
中,它调用了raft
的Step
函数:
1
2
3
4
5
6
7
8
9
10
func (rn *RawNode) Propose(data []byte) error {
// 追加的是MsgProp消息
return rn.raft.Step(pb.Message{
Type: pb.MsgProp,
From: rn.raft.id,
// 追加的日志项
Entries: []pb.Entry{
{Data: data},
}})
}
进入raft
的Step
函数时,会检查消息的Term
,对于Leader而言,这些检查大多数都会通过:
- 若消息的
Term
更大,则自己降级为Follower,但这种情况不可能出现,因为是自己追加的消息 - 若消息的
Term
更小,消息直接被忽略,因为它更旧
检查的代码前文讲选主的时候贴了,这里就不贴了。
通过检查后,会调用raft
的step
函数,对于Leader而言,会调用stepLeader
方法。对于MsgProp
,它会:
- 对日志数据进行检查
- 判断是否有配置变更的日志,并处理相关逻辑,这部分不在本文讨论之中
MsgProp
类型可以是配置变更(上层调用Node
的ProposeConfChange
),也可以是日志数据(上层调用Node
的Propose
)- 本文只讨论后者
- 追加日志
- 广播日志变更
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func stepLeader(r *raft, m pb.Message) error {
switch m.Type {
// ...
case pb.MsgProp:
// 1. 日志项检查
if len(m.Entries) == 0 {
r.logger.Panicf("%x stepped empty MsgProp", r.id)
}
if r.prs.Progress[r.id] == nil {
return ErrProposalDropped
}
if r.leadTransferee != None {
// ...
return ErrProposalDropped
}
// 2. 判断是否有配置变更的日志,并处理相关逻辑,这部分不在本文讨论之中
for i := range m.Entries {
e := &m.Entries[i]
var cc pb.ConfChangeI
if e.Type == pb.EntryConfChange {
var ccc pb.ConfChange
if err := ccc.Unmarshal(e.Data); err != nil {
panic(err)
}
cc = ccc
} else if e.Type == pb.EntryConfChangeV2 {
var ccc pb.ConfChangeV2
if err := ccc.Unmarshal(e.Data); err != nil {
panic(err)
}
cc = ccc
}
if cc != nil {
// ...
}
}
// 3. 追加日志
if !r.appendEntry(m.Entries...) {
return ErrProposalDropped
}
// 4. 广播日志
r.bcastAppend()
return nil // 返回
// ...
}
// ...
}
追加日志很简单,主要就做3步,代码也就不贴了:
- 将日志项追加到
raftLog
中(内部追加到unstable
中) - 更新Leader的日志索引进度
- 内部为
Progress
数据结构,包含2个重要字段Match
和Next
,这些在论文中有指明
- 内部为
- 尝试提交日志(只更新
committedIndex
,不会持久化日志,仅当单节点时生效)
而广播日志稍微复杂一些,它将所有追加日志的消息广播给其他所有节点,主要做下面几步,代码也不贴了:
- 从
raftLog
中获取Term
和日志项- etcd-raft追踪了对方节点的日志索引进度,因此通过这个进度截取需要发送的日志项
- 正常情况下,不会发生错误,若发生错误则会发送快照数据,关于这部分不在本文说明
- 组装
MsgApp
消息,包含这些数据:Index
:发送的日志项的前一项索引(即Raft论文中prevLogIndex
)LogTerm
:发送的日志项的前一项任期(即Raft论文中prevLogTerm
)Entries
:日志项Commit
:Leader目前的提交索引号Term
:Leader当前的任期(这在raft.send
方法中组装)
- 更新Follower的追踪数据,包括更新日志提交索引,并标记日志正在传输
- 将组装好的消息追加到
raft
的msg
数组- 上层轮询调用
Ready
后,raft
的msg
数组就会被传到返回的Ready
的Messages
字段,这样上层可将这些消息通过网络传输给对应的节点(这也说明了etcd-raft不依赖网络,需要上层实现)
- 上层轮询调用
2.2. Follower接收日志
Follower收到了MsgApp
消息后,首先需要统一做Term
的检查:
- 若消息的
Term
更大,则降级为Follower,并且更新Term
,设置消息发送方为Leader,然后执行下一步(Rules of Server (All): 规则2) - 若消息的
Term
更小,则直接返回空的MsgAppResp
消息(部分实现AppendEntries: 规则1,因为消息没指明Reject
为true
)- 直接返回的原因涉及网络分区导致的Follower触发选主,它可以避免Follower重新回到集群后扰乱集群,详细原因可参考注释
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
switch {
// ...
case m.Term > r.Term:
// 若消息携带的Term大,则降级为Follower
// ...
switch {
// ...
default:
// ...
if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
r.becomeFollower(m.Term, m.From) // 降级Follower,并设置发送方为Leader
} else {
// ...
}
}
case m.Term < r.Term:
// 若消息携带的Term小,则返回一个空的MsgAppResp,对方可能会降级为Follower
if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
// 这里对心跳和日志追加的请求,只响应一个空,且没有设置Reject(即接受)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
}
// ...
return nil
}
// ...
}
下一步就会进入stepFollower
中,它会:
- 将选主计时器归零,并设置发送方为Leader,即将其视作心跳
- 然后尝试追加日志
追加日志部分在handleAppendEntries
中,这部分逻辑如下:
- 若
prevLogIndex
小于自己的committedIndex
,说明日志项已经存在,不需要任何操作,返回成功 - 尝试追加日志:
- 首先判断消息中
prevLogItem
对应的日志项term
是否和prevLogTerm
匹配,若不匹配则返回失败(AppendEntries: 规则2) - 若匹配,则:
- 寻找消息和本地的日志冲突,并删除本地冲突日志项及其后面的所有日志项(AppendEntries: 规则3)
- 追加日志(AppendEntries: 规则4)
- 然后本地提交,设置本地
committedIndex
为Leader提交索引与本地最后一项日志索引的最小值(AppendEntries: 规则5)
- 首先判断消息中
这里需要注意,一切的MsgAppResp
也携带了Term
字段,和MsgApp
一样。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (r *raft) handleAppendEntries(m pb.Message) {
// 1. 若prevLogIndex小于自己的committedIndex
// 说明日志项已经存在,不需要任何操作,返回成功,并返回自己committedIndex
if m.Index < r.raftLog.committed {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
return
}
// 2. 尝试追加日志
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
// 成功,则返回成功,并返回自己最新的committedIndex(这里会执行一次提交)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
// 失败,则返回拒绝,并返回自己最后一项日志的索引
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
}
}
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
// 2.1. 首先根据消息的prevLogIndex,寻找自己存储的对应日志项,读取该日志项的term,判断是否与prevLogTerm匹配
if l.matchTerm(index, logTerm) {
lastnewi = index + uint64(len(ents))
// 2.2. 若匹配,则先寻找冲突项,正常情况下,冲突项的索引会大于自己的committed
ci := l.findConflict(ents)
switch {
case ci == 0:
case ci <= l.committed:
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
// 2.3. 然后追加剩余的日志,这里会把原日志的冲突项之后的一并删除,并用新的日志项追加和替代
offset := index + 1
l.append(ents[ci-offset:]...)
}
// 2.4. 提交,更新自己的committedIndex
l.commitTo(min(committed, lastnewi))
return lastnewi, true
}
return 0, false // 不匹配则返回错误
}
2.3. Leader处理Follower响应
这里再次回到stepLeader
中,不过首先要说明一下StateType
:
StateProbe
:表示Follower的日志索引未知,需要Leader探测出来StateReplicate
:正常情况下的Follower状态StateSnapshot
:表示Follower不能从Leader的日志中同步,需要使用快照进行恢复
之后看下Leader对于MsgAppResp
响应的处理,当然它也有Term
的校验:
- 若
Term
大,则降级为Follower,并设置Leader为未知(Rules of Server (All): 规则2) - 若
Term
小
这里贴部分代码,主要逻辑为:
- 若响应为拒绝:
- 将Follower的日志索引进度
Next
回退到RejectHint
,即Follower最后一项日志的索引(Rules of Server (Leader): 规则3.2) - 设置状态为
StateProbe
- 重新再发送日志追加请求
MsgApp
,追加缺失的日志并探测Follower的日志索引进度
- 将Follower的日志索引进度
- 若响应为同意:
- 更新Follower的日志索引进度(更新
Match
为Follower最后一项日志的索引,Next
为下一项待复制的索引)(Rules of Server (Leader): 规则3.1) - 更新Follower的状态
- 尝试更新Leader的
committedIndex
(尝试提交,但不持久化,持久化留给上层调用Advance
后做),这里会检查是否过半数(Rules of Server (Leader): 规则4)- 若提交成功:则将该变更传播给其它节点
- 更新Follower的日志索引进度(更新
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
func stepLeader(r *raft, m pb.Message) error {
// ...
pr := r.prs.Progress[m.From] // 获取Follower的日志追踪状态
if pr == nil {
r.logger.Debugf("%x no progress available for %x", r.id, m.From)
return nil
}
switch m.Type {
case pb.MsgAppResp:
pr.RecentActive = true
if m.Reject {
// 若返回拒绝
// 将Follower的日志索引进度Next回退到RejectHint,即Follower最后一项日志的索引
if pr.MaybeDecrTo(m.Index, m.RejectHint) {
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
if pr.State == tracker.StateReplicate {
// 设置状态为StateProbe
pr.BecomeProbe()
}
// 重新再发送日志追加请求MsgApp,以探测Follower的日志索引进度
r.sendAppend(m.From)
}
} else {
// 若返回成功
oldPaused := pr.IsPaused()
// 则更新Follower的日志索引进度
// a. 更新Match为Follower最后一项日志的索引
// b. 更新Next为下一项待复制的索引
if pr.MaybeUpdate(m.Index) {
// 更新Follower的状态
switch {
case pr.State == tracker.StateProbe:
pr.BecomeReplicate() // 从StateProbe恢复到StateReplicate
case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
pr.BecomeProbe()
pr.BecomeReplicate() // 从StateSnapshot恢复到StateReplicate
case pr.State == tracker.StateReplicate:
pr.Inflights.FreeLE(m.Index) // 取消正在传输的标记
}
// 尝试更新Leader的committedIndex,这里会检查是否半数通过
if r.maybeCommit() {
// 若超过半数,则将Leader提交的信息广播给其他节点
r.bcastAppend()
} else if oldPaused {
// 若没超过半数,且Follower出现过暂停的状况,则重新再发送一次MsgApp,使其追回日志
r.sendAppend(m.From)
}
// ...
}
}
// ...
}
// ...
}
3. 心跳
选主后,Leader需要周期性对Follower进行心跳,让Follower感知到Leader存在,Leader也需要通过心跳响应检测Follower的状态。
在Raft论文中,上面的日志复制请求就可以充当心跳,不过在etcd-raft中,心跳还是另外抽出实现。
这部分涉及到的消息类型有:MsgBeat
, MsgHeartbeat
, MsgHeartbeatResp
。
3.1. Leader发起心跳
由于Leader会周期性发起心跳,因此很容易猜到心跳通过调用Node
的Tick
方法触发。
的确,Leader调用Tick
后,会调用tickHeartBeat
方法,它会:
-
增加
raft
的heartbeatElapsed
计数 -
若
heartbeatElapsed
超过heartbeatTimeout
,则重置heartbeatElapsed
并给自己发送MsgBeat
消息1 2 3 4 5 6 7 8 9
func (r *raft) tickHeartbeat() { r.heartbeatElapsed++ // 增加heartbeatElapsed // ... if r.heartbeatElapsed >= r.heartbeatTimeout { // 超过heartbeatTimeout时,给自己发送MsgBeat消息 r.heartbeatElapsed = 0 r.Step(pb.Message{From: r.id, Type: pb.MsgBeat}) } }
Leader收到MsgBeat
消息后,Leader就开始广播心跳:
1
2
3
4
5
6
7
8
9
func stepLeader(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgBeat:
r.bcastHeartbeat() // 广播心跳
return nil
// ...
}
// ...
}
而广播心跳的消息类型为MsgHeartbeat
,包含了:
-
两者的最小值:
- Leader已提交的日志索引
- 对应节点已提交日志的索引
为什么要这个字段并取这个值,下面3.2.会说
-
上下文信息
-
Leader的任期(
raft.send
调用时会添加)
1
2
3
4
5
6
7
8
9
10
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
m := pb.Message{
To: to,
Type: pb.MsgHeartbeat,
Commit: commit,
Context: ctx,
}
r.send(m) // 这一步会添加任期Term字段
}
3.2. Follower处理心跳
而Follower收到了MsgHeartbeat
后,和MsgApp
一样:
- 必要的检查消息的
Term
,这部分见2.2.节 - 重置
electionElapsed
计数器并设置Leader - 处理心跳
1
2
3
4
5
6
7
8
9
10
11
func stepFollower(r *raft, m pb.Message) error {
switch m.Type {
// ...
case pb.MsgHeartbeat:
r.electionElapsed = 0 // 重置electionElapsed计数器
r.lead = m.From // 设置Leader
r.handleHeartbeat(m)// 具体处理心跳请求
// ...
}
return nil
}
而在具体处理心跳请求中:
- Follower会将消息中的
Commit
字段提取出来,并根据这个索引将本地日志提交(更新committedIndex
)- 这个索引必须是Leader已经提交的,并且也必须是Follower拥有的,这也是3.1.中的问题答案
- 返回Leader
MsgHeartbeatResp
响应
1
2
3
4
func (r *raft) handleHeartbeat(m pb.Message) {
r.raftLog.commitTo(m.Commit) // 提交日志(更新本地的committedIndex)
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context}) // 返回心跳响应
}
3.3. Leader处理Follower响应
Leader收到响应后,除了必要的检查消息的Term
之外,主要还是追踪Follower的状态(标记为存活),并且若Follower需要追赶日志则发送日志复制的请求过去。另外其他的关于readOnly
的处理,这里不进行说明。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func stepLeader(r *raft, m pb.Message) error {
// ...
pr := r.prs.Progress[m.From] // 获取Follower的Progress
if pr == nil {
r.logger.Debugf("%x no progress available for %x", r.id, m.From)
return nil
}
switch m.Type {
// ...
case pb.MsgHeartbeatResp:
// 更新Follower的状态,标记为存活
pr.RecentActive = true
pr.ProbeSent = false
// 腾开窗口以供Leader能够向Follower继续发送请求
if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
pr.Inflights.FreeFirstOne()
}
// 若Follower需要追赶日志,则这里再发送一次日志复制请求
if pr.Match < r.raftLog.lastIndex() {
r.sendAppend(m.From)
}
// 处理readOnly,这里忽略
if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
return nil
}
if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
return nil
}
rss := r.readOnly.advance(m)
for _, rs := range rss {
if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None {
r.send(resp)
}
}
// ...
}
return nil
}
4. 总结
上面根据源码标识出了Raft原文中的规则,总体而言还是符合Raft论文中的规则,不过略有修改。
一个比较大的改动就是没有完整实现Raft论文中的AppendEntries: 规则1(即没有指明Reject
为true
),它也没有在心跳中实现。
不过,etcd-raft还有一个check-quorum机制:
- 通过下面的心跳,Leader可以追踪Follower是否存活
- 每隔
electionTimeout
,Leader会定期执行check-quorum,给自己发送MsgCheckQurum
消息,检查是否过半数的Follower存活,若没有,自己就会降级为Follower,从而触发下一轮选主
此外还有一些优化和其他方面的集成(如快照、配置变更、节点暂停、只读等),这些不在本文讨论范围内。
下文继续按照论文的顺序,说明集群变化在etcd-raft中的实现。