1. 概要
上一篇讲了etcd-raft的日志和消息数据结构及其实现,本文就深入它对于Raft协议的实现,这一次挑选的是选主。
2. 节点启动
在看选主前,首先看下etcd-raft是如何启动的。
2.1. 数据结构
这里涉及到的数据结构就是node
,它是Node
接口的实现。其内部声明了一堆通道,然后加上一个RawNode
(而RawNode
就包了一层raft
结构,它记录了Raft的状态):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type node struct {
propc chan msgWithResult
recvc chan pb.Message
confc chan pb.ConfChangeV2
confstatec chan pb.ConfState
readyc chan Ready
advancec chan struct{}
tickc chan struct{}
done chan struct{}
stop chan struct{}
status chan chan Status
rn *RawNode // 包装了raft,代表一个简单的Raft节点实现
}
type RawNode struct {
raft *raft // 记录了raft的状态
prevSoftSt *SoftState
prevHardSt pb.HardState
}
而具体raft
状态包含很多内容,如下所示,这里用注释列举关键的字段:
type raft struct {
id uint64 // 节点ID
Term uint64 // 当前term/epoch
Vote uint64 // 该节点投票给Leader的节点ID
readStates []ReadState
raftLog *raftLog // Raft日志,记录了committed index和applied index
maxMsgSize uint64
maxUncommittedSize uint64
prs tracker.ProgressTracker // 记录了当前活跃的集群配置,包括已知的节点,并且追踪每个节点的日志索引
state StateType // 当前节点的角色
isLearner bool // 该节点是否是Learner
msgs []pb.Message
lead uint64 // Leader节点的ID
leadTransferee uint64
pendingConfIndex uint64
uncommittedSize uint64
readOnly *readOnly
electionElapsed int // 选主超时后/收到当前Leader的有效请求后经过的时间(tick)
heartbeatElapsed int // 心跳超时后经过的时间(tick),只有Leader使用
checkQuorum bool
preVote bool
heartbeatTimeout int // 心跳超时时间(tick)
electionTimeout int // 选主超时时间(tick)
randomizedElectionTimeout int // 随机的选主超时时间,当没收到Leader一段时间的有效请求后,Follower会变成Candidate
// 范围:[electiontimeout, 2 * electiontimeout - 1]
disableProposalForwarding bool
tick func()
step stepFunc
logger Logger
}
2.2. 启动
a) raft
初始化
Raft节点启动首先需要初始化上述数据结构的字段,这里主要还是看 raft
的初始化,它主要做:
- 根据配置初始化字段
- 恢复之前的集群状态
- 将自己变成Follower
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func newRaft(c *Config) *raft {
if err := c.validate(); err != nil {
panic(err.Error())
}
raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady) // 初始化日志
hs, cs, err := c.Storage.InitialState() // 初始化存储
if err != nil {
panic(err) // TODO(bdarnell)
}
if len(c.peers) > 0 || len(c.learners) > 0 {
if len(cs.Voters) > 0 || len(cs.Learners) > 0 {
panic("cannot specify both newRaft(peers, learners) and ConfState.(Voters, Learners)")
}
cs.Voters = c.peers
cs.Learners = c.learners
}
r := &raft{
id: c.ID, // 节点ID
lead: None, // 初始Leader为空
isLearner: false,
raftLog: raftlog, // Raft日志
maxMsgSize: c.MaxSizePerMsg,
maxUncommittedSize: c.MaxUncommittedEntriesSize,
prs: tracker.MakeProgressTracker(c.MaxInflightMsgs), // 集群配置
electionTimeout: c.ElectionTick, // 选主的超时时间(实际上会基于它生成一个随机数)
heartbeatTimeout: c.HeartbeatTick, // 心跳的超时时间
logger: c.Logger,
checkQuorum: c.CheckQuorum,
preVote: c.PreVote,
readOnly: newReadOnly(c.ReadOnlyOption),
disableProposalForwarding: c.DisableProposalForwarding,
}
// 恢复集群状态
cfg, prs, err := confchange.Restore(confchange.Changer{
Tracker: r.prs,
LastIndex: raftlog.lastIndex(),
}, cs)
if err != nil {
panic(err)
}
assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
if !IsEmptyHardState(hs) {
r.loadState(hs)
}
if c.Applied > 0 {
raftlog.appliedTo(c.Applied)
}
// 初始角色为Follower, Leader设为None
r.becomeFollower(r.Term, None)
var nodesStrs []string
for _, n := range r.prs.VoterNodes() {
nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
}
r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
return r
}
b) 启动Raft节点的事件循环
初始化完成后,会启动Raft事件循环。这部分定义在node.run()
方法中,主要处理:
- 获取新产生的
Ready
,并塞入readyc
通道供上层轮询 - 探测Leader变化
- 监听通道,处理消息(主要通过
Node
接口调用):propc
:处理Propose
的请求消息confc
:处理ProposeConfChange
的配置变更请求消息recvc
:处理响应消息tickc
,advancec
:处理上层的Tick
和Advance
的信号- 其他
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
func (n *node) run() {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var rd Ready
r := n.rn.raft
lead := None
for {
// 1. 轮询时,首先将新产生的Ready取出来
if advancec != nil {
readyc = nil
} else if n.rn.HasReady() {
rd = n.rn.readyWithoutAccept()
readyc = n.readyc
}
// 2. 探测Leader变化并打日志
if lead != r.lead {
if r.hasLeader() {
if lead == None {
r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
} else {
r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
}
propc = n.propc
} else {
r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
propc = nil
}
lead = r.lead
}
// 3. 监听通道以进行处理
select {
case pm := <-propc: // 外部对Node调用Propose时,这里会收到,这里会处理请求消息(如追加日志、投票等)
m := pm.m
m.From = r.id
err := r.Step(m) // 调用Step处理消息
if pm.result != nil {
pm.result <- err
close(pm.result)
}
case m := <-n.recvc: // 这里会收到并处理响应消息
if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {
r.Step(m)
}
case cc := <-n.confc: // 外部对Node调用ProposeConfChange时,这里会收到,处理集群配置变更
_, okBefore := r.prs.Progress[r.id]
cs := r.applyConfChange(cc)
if _, okAfter := r.prs.Progress[r.id]; okBefore && !okAfter {
var found bool
outer:
for _, sl := range [][]uint64{cs.Voters, cs.VotersOutgoing} {
for _, id := range sl {
if id == r.id {
found = true
break outer
}
}
}
if !found {
propc = nil
}
}
select {
case n.confstatec <- cs:
case <-n.done:
}
case <-n.tickc: // 外部对Node调用Tick时,这里会收到,并自增tick计数器
n.rn.Tick()
case readyc <- rd: // 将新取出来的Ready塞入ready通道中,供上层对Node调用Ready使用
n.rn.acceptReady(rd)
advancec = n.advancec
case <-advancec: // 外部对Node调用Advance时,这里会收到,并对raft调用Advance表示处理外之前弹出的Ready
n.rn.Advance(rd)
rd = Ready{}
advancec = nil
case c := <-n.status: // 处理状态变更
c <- getStatus(r)
case <-n.stop: // 收到停止信号,关闭Raft节点
close(n.done)
return
}
}
}
3. 选主
3.1. 触发选主
初始状态下,所有的节点都是Follower。这里会调用becomeFollower
方法,主要注意的是:
reset
方法会重置Raft状态,并且随机生成一个选主超时时间- 注意
tickElection
方法,调用Tick
时会调用该方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (r *raft) becomeFollower(term uint64, lead uint64) {
r.step = stepFollower // Follower被调用Step/Propose时可能调用的函数
r.reset(term) // 重置状态
r.tick = r.tickElection // Follower被调用Tick时需要调用的函数
r.lead = lead
r.state = StateFollower // 设置角色为Follower
r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}
func (r *raft) reset(term uint64) {
// ...
r.electionElapsed = 0
r.heartbeatElapsed = 0
r.resetRandomizedElectionTimeout() // 随机生成一个选主超时时间
// ...
}
触发选主必定在调用Node
的Tick
时触发,最终会调用tickElection
,它可能会触发选主:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (r *raft) tickElection() {
r.electionElapsed++ // 首先给electionElapsed计数+1
// 若节点可以成为Leader,且超时,则会给自己发送MsgHup消息,表示触发选主
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
func (r *raft) promotable() bool {
pr := r.prs.Progress[r.id]
// 是否可以成为Leader,必须满足:
// 1. 节点自己必须在集群配置中
// 2. 节点自己不是Learner
// 3. 节点不能有没持久化的快照
return pr != nil && !pr.IsLearner && !r.raftLog.hasPendingSnapshot()
}
从上面可知,选主一旦触发,就会给自己发送MsgHup
消息,最后会在下面的代码块处理:
1
2
3
4
5
6
7
8
9
10
11
12
13
func (r *raft) Step(m pb.Message) error {
// ...
switch m.Type {
case pb.MsgHup:
if r.preVote {
r.hup(campaignPreElection)
} else {
r.hup(campaignElection)
}
// ...
}
// ...
}
我们看到这里就会触发选主。不过这里有一个PreVote的分支,这在Raft论文中没有提及,这在下文会说明。
3.2. PreVote
a) 思想
原始选主过程中,状态变化为:Follower -> Candidate -> Leader;
而etcd-raft中加入了PreVote,状态变化为:Follower -> PreCandidate -> Candidate -> Leader。
PreVote中,要成为Candidate,还需要确认自己有可能获得集群中大部分节点的投票,这样才能将自己的Term
加1,发起真正的选主。
这需要其他节点同意发起选主,同意的条件必须满足下面2个:
- 没有收到有效Leader的心跳,至少有一次选主超时
- PreCandidate的日志足够新(
Term
要更大,或LogIndex
更大且Term
相同)
这样有一定好处。
考虑一个场景:有A、B、C、D、E共5个节点;某个时间段A、B和其他节点分区,C、D、E形成新集群,而A、B也触发选主,但没有半数以上同意,所以必须不停选主,且Term
不停增加;后来分区解决了,A、B会重新加入集群,它们的数据很旧,但Term
很大,所以会再触发一次选主,扰乱集群。
有了PreVote后就不会有上面的问题。
b) 实现:发起PreVote
校验
首先校验是否能发起PreVote,需要满足:
- 节点自己必须在集群配置中
- 节点自己不是Learner,也不是Leader
- 节点不能有没持久化的快照
- 节点不能还有配置变更没应用
发起PreVote
然后向所有节点发起PreVote:
- 节点首先会成为PreCandidate
- 然后投票给自己
- 最后向所有节点发送PreVote请求,即
MsgPreVote
消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func (r *raft) campaign(t CampaignType) {
// ...
var term uint64
var voteMsg pb.MessageType
if t == campaignPreElection {
// 1. 首先成为PreCandidate
r.becomePreCandidate()
voteMsg = pb.MsgPreVote
term = r.Term + 1 // 发送的消息是r.Term+1
} else {
// ...
}
// 2. 投票给自己,并统计投票次数
// 若返回票数过半,则开始正式选举,不需要发送MsgPreVote,这只会在单节电情况下发生
if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
if t == campaignPreElection {
r.campaign(campaignElection)
} else {
r.becomeLeader()
}
return
}
// 3. 向所有节电发送MsgPreVote消息,发起PreVote
var ids []uint64
{
idMap := r.prs.Voters.IDs()
ids = make([]uint64, 0, len(idMap))
for id := range idMap {
ids = append(ids, id)
}
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
}
for _, id := range ids {
if id == r.id {
continue // 忽略自己
}
// ...
var ctx []byte
if t == campaignTransfer {
ctx = []byte(t)
}
// 发送MsgPreVote消息
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
}
c) 实现:处理PreVote请求
处理PreVote请求流程如下:
-
若
MsgPreVote
消息的Term
小,则返回拒绝的响应:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
func (r *raft) Step(m pb.Message) error { // ... switch { // ... case m.Term < r.Term: // ... else if m.Type == pb.MsgPreVote { // ... // 返回拒绝响应MsgPreVoteResp r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true}) } // ... return nil } // ... }
-
然后,检查节点是否还认为该任期内Leader还在(即在租约内),若还在,则忽略请求,直接返回。(注意,这里不会强制降级为Follower,其他类型的消息会):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
func (r *raft) Step(m pb.Message) error { switch { case m.Term > r.Term: if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote { force := bytes.Equal(m.Context, []byte(campaignTransfer)) // 判断是否在租约内,Leader还存在 inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout // 若还存在则直接忽略请求,直接返回,不输出任何响应,即不投票 if !force && inLease { // ... return nil } } // ... } // ... }
-
当满足下面条件时,返回同意响应,否则返回拒绝响应
- 节点已经投给了这个PreCandidate,或节点还没投票且节点认为Leader不在了,或消息的
Term
更大 - 消息携带的日志
Term
和Index
更加新
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
func (r *raft) Step(m pb.Message) error { // ... switch m.Type { // ... case pb.MsgVote, pb.MsgPreVote: canVote := r.Vote == m.From || // 已经投给了该Candidate(对应MsgVote) (r.Vote == None && r.lead == None) || // 还没投票且Leader不在了(对应MsgPreVote) (m.Type == pb.MsgPreVote && m.Term > r.Term) // 下一任期的MsgPreVote // 请求中携带的信息显示日志更加新 if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) { // 返回同意响应 r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)}) if m.Type == pb.MsgVote { // ... // 若对应的时MsgVote,即正式的投票,则记录r.Vote记录自己投票给谁,其遵循先来后到原则 r.electionElapsed = 0 r.Vote = m.From } } else { // ... // 否则发送拒绝响应 r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true}) } // ... } }
- 节点已经投给了这个PreCandidate,或节点还没投票且节点认为Leader不在了,或消息的
d) 实现:处理PreVote响应
此时发起的节点已经成为PreCandidate。收到响应时,它也会进入c)中的Step
方法。
具体流程如下:
-
和c)中第一步类似,若响应的
Term
小,则直接返回忽略 -
和c)中第二步类似,若响应的
Term
大,则说明本节点又发了下一Term
的PreVote- 若返回同意:直接忽略即可,后面直接会推进状态变化
- 若返回拒绝:则需要降级自己为Follower
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
func (r *raft) Step(m pb.Message) error { switch { // ... case m.Term > r.Term: // ... switch { // ... case m.Type == pb.MsgPreVoteResp && !m.Reject: // 若同意,则直接忽略,继续执行 default: // 否则就必须降级为Follower if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap { r.becomeFollower(m.Term, m.From) } else { r.becomeFollower(m.Term, None) // 执行这一步,r.Vote设为空 } } } switch m.Type { // ... default: err := r.step(r, m) // 最后处理PreCandidate/Candidate的状态变化 if err != nil { return err } } return nil }
-
最后,处理PreCandidate的状态变化,这里对应了
stepCandidate
方法,代表作为Candidate/PreCandidate时处理消息的逻辑:- 修改和统计投票数,判断是否过半
- 若过半则推进状态演变,发起正式选主
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
func stepCandidate(r *raft, m pb.Message) error { var myVoteRespType pb.MessageType if r.state == StatePreCandidate { myVoteRespType = pb.MsgPreVoteResp } else { myVoteRespType = pb.MsgVoteResp } switch m.Type { case pb.MsgProp: r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) return ErrProposalDropped case pb.MsgApp: // 若收到了原Leader的日志追加,则降级为Follower,更新Term、Leader等信息 r.becomeFollower(m.Term, m.From) r.handleAppendEntries(m) case pb.MsgHeartbeat: // 若收到了原Leader的心跳,则降级为Follower,更新Term、Leader等信息 r.becomeFollower(m.Term, m.From) r.handleHeartbeat(m) case pb.MsgSnap: // 若收到原Leader的快照,则降级为Follower,更新Term、Leader等信息 r.becomeFollower(m.Term, m.From) r.handleSnapshot(m) case myVoteRespType: // 处理PreVote/Vote响应 // 修改投票统计 gr, rj, res := r.poll(m.From, m.Type, !m.Reject) switch res { // 若过半 case quorum.VoteWon: if r.state == StatePreCandidate { // 若为PreCandidate,则发起正式的选主 r.campaign(campaignElection) } else { // 若已经是Candidate,就直接成为Leader即可 r.becomeLeader() // 成为Leader r.bcastAppend() // 广播本节点的Raft日志 } // 若没有过半,则降级为Follower case quorum.VoteLost: r.becomeFollower(r.Term, None) } // ... } return nil }
e) 实现:处理超时
若PreVote阶段超时,则会触发下一轮的PreVote。这部分在tickElection
方法中实现(和3.1.节一样)。
注意PreVote阶段不会修改Raft的Term
,只是在PreVote请求消息中将Term + 1
(其它消息的Term
依旧是Raft自己的Term
,见3.2.b),因此下一轮PreVote的Term
和之前PreVote的Term
一样。
3.3. 正式选主
PreVote通过后,就可开始正式的选主。
a) 发起Vote请求
这里和PreVote的入口一样,只不过处理的步骤不同:
-
节点成为Candidate,并且自增
Term
1 2 3 4 5 6 7 8
func (r *raft) becomeCandidate() { r.step = stepCandidate r.reset(r.Term + 1) // 自增Term r.tick = r.tickElection r.Vote = r.id // 默认投票给自己 r.state = StateCandidate r.logger.Infof("%x became candidate at term %d", r.id, r.Term) }
-
然后投票给自己
-
最后向所有节点发送Vote请求,即
MsgVote
消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (r *raft) campaign(t CampaignType) {
// ...
var term uint64
var voteMsg pb.MessageType
if t == campaignPreElection {
// ...
} else {
// 1. 成为Candidate,并自增Term
r.becomeCandidate()
voteMsg = pb.MsgVote
term = r.Term // 这里的term是自增后的
}
// 2. 投票给自己
if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
if t == campaignPreElection {
r.campaign(campaignElection)
} else {
r.becomeLeader()
}
return
}
// 3. 向其他节点发送MsgVote请求
var ids []uint64
{
idMap := r.prs.Voters.IDs()
ids = make([]uint64, 0, len(idMap))
for id := range idMap {
ids = append(ids, id)
}
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
}
for _, id := range ids {
if id == r.id {
continue
}
// ...
var ctx []byte
if t == campaignTransfer {
ctx = []byte(t)
}
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
}
b) 处理Vote请求
这里和PreVote类似,不过这里也再叙述一次:
- 若
MsgVote
消息的Term
小,则返回拒绝的响应,这部分和MsgPreVote
一样 - 然后,检查节点是否还认为该任期内Leader还在(即在租约内),若还在,则忽略请求,直接返回,这部分也和
MsgPreVote
一样 - 当满足下面条件时,返回同意响应,否则返回拒绝响应,这部分和
MsgPreVote
类似- 节点没有认为Leader存在了,且还没有投票(
- 消息携带的日志更加的新
- 若投出同意票,则更新
r.Vote
,并重置r.electionElapsed
(实际上等同于一次心跳了),之后的MsgVote
不会投给其他节点了
c) 处理Vote响应
这部分已经在3.2.d中说明,这里不再说明。(就是统计票数,判断过半,升级Leader,广播日志)
d) 处理超时
这部分和PreVote一样,会重新发起选主,但会回退到PreVote阶段。
4. 总结
可从上面得出,etcd-raft的选主还是按照Raft论文一模一样来的,而关于算法的总结,可参考下图,也可参考这里,包含原理总结和demo源码:
而加了一个PreVote,个人认为只是Vote的预演,判断PreCandidate是否可能成为Leader,但是不会修改类似Term
, Vote
等信息,不过它解决了分区时出现的Term
过大扰乱集群的问题,不得不说是真的强。
整体而言PreVote + Vote的风格很像2PC。