etcd-raft (4): Raft的复制和心跳

Posted by keys961 on October 13, 2020

1. 概要

上一篇讲了etcd-raft的选主协议,这次探究一下另一个重要协议:复制和心跳。

2. 日志复制

选主好了后,Leader就可以同步日志到Follower上。这里涉及到的消息类型有:MsgProp, MsgApp, MsgAppResp

2.1. Leader追加与广播日志

Leader上层可通过Node接口的Propose方法追加日志。追加的消息类型是MsgProp,这部分实现在RawNode中,它调用了raftStep函数:

1
2
3
4
5
6
7
8
9
10
func (rn *RawNode) Propose(data []byte) error {
    // 追加的是MsgProp消息
    return rn.raft.Step(pb.Message{
        Type: pb.MsgProp, 
        From: rn.raft.id,
        // 追加的日志项
        Entries: []pb.Entry{
            {Data: data},
        }})
}

进入raftStep函数时,会检查消息的Term,对于Leader而言,这些检查大多数都会通过:

  • 若消息的Term更大,则自己降级为Follower,但这种情况不可能出现,因为是自己追加的消息
  • 若消息的Term更小,消息直接被忽略,因为它更旧

检查的代码前文讲选主的时候贴了,这里就不贴了。

通过检查后,会调用raftstep函数,对于Leader而言,会调用stepLeader方法。对于MsgProp,它会:

  • 对日志数据进行检查
  • 判断是否有配置变更的日志,并处理相关逻辑,这部分不在本文讨论之中
    • MsgProp类型可以是配置变更(上层调用NodeProposeConfChange),也可以是日志数据(上层调用NodePropose
    • 本文只讨论后者
  • 追加日志
  • 广播日志变更
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func stepLeader(r *raft, m pb.Message) error {
    switch m.Type {
    // ...
    case pb.MsgProp:
        // 1. 日志项检查
        if len(m.Entries) == 0 {
            r.logger.Panicf("%x stepped empty MsgProp", r.id)
        }
        if r.prs.Progress[r.id] == nil {
            return ErrProposalDropped
        }
        if r.leadTransferee != None {
            // ...
            return ErrProposalDropped
        }
        // 2. 判断是否有配置变更的日志,并处理相关逻辑,这部分不在本文讨论之中
        for i := range m.Entries {
            e := &m.Entries[i]
            var cc pb.ConfChangeI
            if e.Type == pb.EntryConfChange {
                var ccc pb.ConfChange
                if err := ccc.Unmarshal(e.Data); err != nil {
                    panic(err)
                }
                cc = ccc
            } else if e.Type == pb.EntryConfChangeV2 {
                var ccc pb.ConfChangeV2
                if err := ccc.Unmarshal(e.Data); err != nil {
                    panic(err)
                }
                cc = ccc
            }
            if cc != nil {
                // ...
            }
        }
        // 3. 追加日志
        if !r.appendEntry(m.Entries...) {
            return ErrProposalDropped
        }
        // 4. 广播日志
        r.bcastAppend()
        return nil // 返回
    // ...
    }
    // ...
}

追加日志很简单,主要就做3步,代码也就不贴了:

  • 将日志项追加到raftLog中(内部追加到unstable中)
  • 更新Leader的日志索引进度
    • 内部为Progress数据结构,包含2个重要字段MatchNext,这些在论文中有指明
  • 尝试提交日志(只更新committedIndex,不会持久化日志,仅当单节点时生效)

而广播日志稍微复杂一些,它将所有追加日志的消息广播给其他所有节点,主要做下面几步,代码也不贴了:

  • raftLog中获取Term和日志项
    • etcd-raft追踪了对方节点的日志索引进度,因此通过这个进度截取需要发送的日志项
    • 正常情况下,不会发生错误,若发生错误则会发送快照数据,关于这部分不在本文说明
  • 组装MsgApp消息,包含这些数据:
    • Index:发送的日志项的前一项索引(即Raft论文中prevLogIndex
    • LogTerm:发送的日志项的前一项任期(即Raft论文中prevLogTerm
    • Entries:日志项
    • Commit:Leader目前的提交索引号
    • Term:Leader当前的任期(这在raft.send方法中组装)
  • 更新Follower的追踪数据,包括更新日志提交索引,并标记日志正在传输
  • 将组装好的消息追加到raftmsg数组
    • 上层轮询调用Ready后,raftmsg数组就会被传到返回的ReadyMessages字段,这样上层可将这些消息通过网络传输给对应的节点(这也说明了etcd-raft不依赖网络,需要上层实现)

2.2. Follower接收日志

Follower收到了MsgApp消息后,首先需要统一做Term的检查:

  • 若消息的Term更大,则降级为Follower,并且更新Term,设置消息发送方为Leader,然后执行下一步(Rules of Server (All): 规则2
  • 若消息的Term更小,则直接返回空的MsgAppResp消息(部分实现AppendEntries: 规则1,因为消息没指明Rejecttrue
    • 直接返回的原因涉及网络分区导致的Follower触发选主,它可以避免Follower重新回到集群后扰乱集群,详细原因可参考注释
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
switch {
    // ...
    case m.Term > r.Term:
        // 若消息携带的Term大,则降级为Follower
        // ...
        switch {
        // ...
        default:
            // ...
            if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
                r.becomeFollower(m.Term, m.From) // 降级Follower,并设置发送方为Leader
            } else {
                // ...
            }
        }
    case m.Term < r.Term:
        // 若消息携带的Term小,则返回一个空的MsgAppResp,对方可能会降级为Follower
        if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
            // 这里对心跳和日志追加的请求,只响应一个空,且没有设置Reject(即接受)
            r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
        }  
        // ...
        return nil
    }
    // ...
}

下一步就会进入stepFollower中,它会:

  • 将选主计时器归零,并设置发送方为Leader,即将其视作心跳
  • 然后尝试追加日志

追加日志部分在handleAppendEntries中,这部分逻辑如下:

  • prevLogIndex小于自己的committedIndex,说明日志项已经存在,不需要任何操作,返回成功
  • 尝试追加日志:
    • 首先判断消息中prevLogItem对应的日志项term是否和prevLogTerm匹配,若不匹配则返回失败(AppendEntries: 规则2
    • 若匹配,则:
      • 寻找消息和本地的日志冲突,并删除本地冲突日志项及其后面的所有日志项(AppendEntries: 规则3
      • 追加日志(AppendEntries: 规则4
    • 然后本地提交,设置本地committedIndex为Leader提交索引与本地最后一项日志索引的最小值(AppendEntries: 规则5

这里需要注意,一切的MsgAppResp也携带了Term字段,和MsgApp一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (r *raft) handleAppendEntries(m pb.Message) {
    // 1. 若prevLogIndex小于自己的committedIndex
    // 说明日志项已经存在,不需要任何操作,返回成功,并返回自己committedIndex
    if m.Index < r.raftLog.committed {
        r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
        return
    }
    // 2. 尝试追加日志
    if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
        // 成功,则返回成功,并返回自己最新的committedIndex(这里会执行一次提交)
        r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
    } else {
        // 失败,则返回拒绝,并返回自己最后一项日志的索引
        r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
    }
}

func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
    // 2.1. 首先根据消息的prevLogIndex,寻找自己存储的对应日志项,读取该日志项的term,判断是否与prevLogTerm匹配
    if l.matchTerm(index, logTerm) {
        lastnewi = index + uint64(len(ents))
        // 2.2. 若匹配,则先寻找冲突项,正常情况下,冲突项的索引会大于自己的committed
        ci := l.findConflict(ents)
        switch {
        case ci == 0:
        case ci <= l.committed:
            l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
        default:
            // 2.3. 然后追加剩余的日志,这里会把原日志的冲突项之后的一并删除,并用新的日志项追加和替代
            offset := index + 1
            l.append(ents[ci-offset:]...)
        }
        // 2.4. 提交,更新自己的committedIndex
        l.commitTo(min(committed, lastnewi))
        return lastnewi, true
    }
    return 0, false // 不匹配则返回错误
}

2.3. Leader处理Follower响应

这里再次回到stepLeader中,不过首先要说明一下StateType

  • StateProbe:表示Follower的日志索引未知,需要Leader探测出来
  • StateReplicate:正常情况下的Follower状态
  • StateSnapshot:表示Follower不能从Leader的日志中同步,需要使用快照进行恢复

之后看下Leader对于MsgAppResp响应的处理,当然它也有Term的校验:

  • Term大,则降级为Follower,并设置Leader为未知(Rules of Server (All): 规则2
  • Term

这里贴部分代码,主要逻辑为:

  • 若响应为拒绝:
    • 将Follower的日志索引进度Next回退到RejectHint,即Follower最后一项日志的索引(Rules of Server (Leader): 规则3.2
    • 设置状态为StateProbe
    • 重新再发送日志追加请求MsgApp,追加缺失的日志并探测Follower的日志索引进度
  • 若响应为同意:
    • 更新Follower的日志索引进度(更新Match为Follower最后一项日志的索引,Next为下一项待复制的索引)(Rules of Server (Leader): 规则3.1
    • 更新Follower的状态
    • 尝试更新Leader的committedIndex(尝试提交,但不持久化,持久化留给上层调用Advance后做),这里会检查是否过半数Rules of Server (Leader): 规则4
      • 若提交成功:则将该变更传播给其它节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
func stepLeader(r *raft, m pb.Message) error {
    // ...
    pr := r.prs.Progress[m.From] // 获取Follower的日志追踪状态
    if pr == nil {
        r.logger.Debugf("%x no progress available for %x", r.id, m.From)
        return nil
    }
    switch m.Type {
    case pb.MsgAppResp:
        pr.RecentActive = true
        if m.Reject {
            // 若返回拒绝
            // 将Follower的日志索引进度Next回退到RejectHint,即Follower最后一项日志的索引
            if pr.MaybeDecrTo(m.Index, m.RejectHint) {
                r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
                if pr.State == tracker.StateReplicate {
                    // 设置状态为StateProbe
                    pr.BecomeProbe()
                }
                // 重新再发送日志追加请求MsgApp,以探测Follower的日志索引进度
                r.sendAppend(m.From)
            }
        } else {
            // 若返回成功
            oldPaused := pr.IsPaused()
            // 则更新Follower的日志索引进度
            // a. 更新Match为Follower最后一项日志的索引
            // b. 更新Next为下一项待复制的索引
            if pr.MaybeUpdate(m.Index) {
                // 更新Follower的状态
                switch {
                case pr.State == tracker.StateProbe:
                    pr.BecomeReplicate() // 从StateProbe恢复到StateReplicate
                case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
                    pr.BecomeProbe()
                    pr.BecomeReplicate() // 从StateSnapshot恢复到StateReplicate
                case pr.State == tracker.StateReplicate:
                    pr.Inflights.FreeLE(m.Index) // 取消正在传输的标记
                }
                // 尝试更新Leader的committedIndex,这里会检查是否半数通过
                if r.maybeCommit() {
                    // 若超过半数,则将Leader提交的信息广播给其他节点
                    r.bcastAppend()
                } else if oldPaused {
                    // 若没超过半数,且Follower出现过暂停的状况,则重新再发送一次MsgApp,使其追回日志
                    r.sendAppend(m.From)
                }
                // ...
            }
        }
        // ...
    }
    // ...
}

3. 心跳

选主后,Leader需要周期性对Follower进行心跳,让Follower感知到Leader存在,Leader也需要通过心跳响应检测Follower的状态。

在Raft论文中,上面的日志复制请求就可以充当心跳,不过在etcd-raft中,心跳还是另外抽出实现。

这部分涉及到的消息类型有:MsgBeat, MsgHeartbeat, MsgHeartbeatResp

3.1. Leader发起心跳

由于Leader会周期性发起心跳,因此很容易猜到心跳通过调用NodeTick方法触发。

的确,Leader调用Tick后,会调用tickHeartBeat方法,它会:

  • 增加raftheartbeatElapsed计数

  • heartbeatElapsed超过heartbeatTimeout,则重置heartbeatElapsed并给自己发送MsgBeat消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    func (r *raft) tickHeartbeat() {
        r.heartbeatElapsed++ // 增加heartbeatElapsed
        // ...
        if r.heartbeatElapsed >= r.heartbeatTimeout {
            // 超过heartbeatTimeout时,给自己发送MsgBeat消息
            r.heartbeatElapsed = 0
            r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
        }
    }
    

Leader收到MsgBeat消息后,Leader就开始广播心跳:

1
2
3
4
5
6
7
8
9
func stepLeader(r *raft, m pb.Message) error {
    switch m.Type {
    case pb.MsgBeat:
        r.bcastHeartbeat() // 广播心跳
        return nil
    // ...
    }
    // ...
}

而广播心跳的消息类型为MsgHeartbeat,包含了:

  • 两者的最小值:

    • Leader已提交的日志索引
    • 对应节点已提交日志的索引

    为什么要这个字段并取这个值,下面3.2.会说

  • 上下文信息

  • Leader的任期(raft.send调用时会添加)

1
2
3
4
5
6
7
8
9
10
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
    commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
    m := pb.Message{
        To:      to,
        Type:    pb.MsgHeartbeat,
        Commit:  commit,
        Context: ctx,
    }
    r.send(m) // 这一步会添加任期Term字段
}

3.2. Follower处理心跳

而Follower收到了MsgHeartbeat后,和MsgApp一样:

  • 必要的检查消息的Term,这部分见2.2.节
  • 重置electionElapsed计数器并设置Leader
  • 处理心跳
1
2
3
4
5
6
7
8
9
10
11
func stepFollower(r *raft, m pb.Message) error {
    switch m.Type {
        // ...
    case pb.MsgHeartbeat:
        r.electionElapsed = 0 // 重置electionElapsed计数器
        r.lead = m.From // 设置Leader
        r.handleHeartbeat(m)// 具体处理心跳请求
        // ...
    }
    return nil
}

而在具体处理心跳请求中:

  • Follower会将消息中的Commit字段提取出来,并根据这个索引将本地日志提交(更新committedIndex
    • 这个索引必须是Leader已经提交的,并且也必须是Follower拥有的,这也是3.1.中的问题答案
  • 返回Leader MsgHeartbeatResp响应
1
2
3
4
func (r *raft) handleHeartbeat(m pb.Message) {
    r.raftLog.commitTo(m.Commit) // 提交日志(更新本地的committedIndex)
    r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context}) // 返回心跳响应
}

3.3. Leader处理Follower响应

Leader收到响应后,除了必要的检查消息的Term之外,主要还是追踪Follower的状态(标记为存活),并且若Follower需要追赶日志则发送日志复制的请求过去。另外其他的关于readOnly的处理,这里不进行说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func stepLeader(r *raft, m pb.Message) error {
    // ...
    pr := r.prs.Progress[m.From] // 获取Follower的Progress
    if pr == nil {
        r.logger.Debugf("%x no progress available for %x", r.id, m.From)
        return nil
    }
    switch m.Type {
        // ...
    case pb.MsgHeartbeatResp:
        // 更新Follower的状态,标记为存活
        pr.RecentActive = true
        pr.ProbeSent = false

        // 腾开窗口以供Leader能够向Follower继续发送请求
        if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
            pr.Inflights.FreeFirstOne()
        }
        // 若Follower需要追赶日志,则这里再发送一次日志复制请求
        if pr.Match < r.raftLog.lastIndex() {
            r.sendAppend(m.From)
        }
        // 处理readOnly,这里忽略
        if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
            return nil
        }
        if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
            return nil
        }
        rss := r.readOnly.advance(m)
        for _, rs := range rss {
            if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None {
                r.send(resp)
            }
        }
    // ...
    }
    return nil
}

4. 总结

上面根据源码标识出了Raft原文中的规则,总体而言还是符合Raft论文中的规则,不过略有修改。

一个比较大的改动就是没有完整实现Raft论文中的AppendEntries: 规则1(即没有指明Rejecttrue),它也没有在心跳中实现。

不过,etcd-raft还有一个check-quorum机制:

  • 通过下面的心跳,Leader可以追踪Follower是否存活
  • 每隔electionTimeout,Leader会定期执行check-quorum,给自己发送MsgCheckQurum消息,检查是否过半数的Follower存活,若没有,自己就会降级为Follower,从而触发下一轮选主

此外还有一些优化和其他方面的集成(如快照、配置变更、节点暂停、只读等),这些不在本文讨论范围内。

下文继续按照论文的顺序,说明集群变化在etcd-raft中的实现。