1. 概要
Raft论文之前读过(Paxos这个读不懂的),也做过一定的练习实现,不过那个有点naive了,真要看还得看大名鼎鼎的etcd-raft。
不过这里先不看etcd-raft的具体实现,本文先从它提供的raftexample
入手,它基于etcd-raft实现了简单的分布式K-V存储。
2. 预备:数据结构与接口
在看样例前,首先要介绍2个数据结构/接口,它们是暴露给外部使用的,分别为:
Ready
结构Node
接口
2.1. Ready
结构
由于etcd-raft库没有实现网络通信和存储,因此上层应用向Raft写入数据后,需要有个机制获取哪些数据需要持久化,哪些数据需要传输到其他地方。
上面这个问题可用Go的通道实现。在下面的Node
接口的Ready
方法会返回一个chan Ready
,上层可通过轮询这个通道来获取这些数据,这些数据保存在Ready
结构中。
Ready
结构包含以下的信息,解释见注释:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type Ready struct {
// 软状态,可变且不需要写入WAL
// 包含:集群Leader, 节点的当前状态(角色)
*SoftState
// 硬状态,需要持久化到磁盘中
// 包含:节点的Term, Vote(票投给谁), Commit(已提交的日志索引)
pb.HardState
// 用于读一致性的数据,之后会说明
ReadStates []ReadState
// 下面的Messages发送前需要持久化到磁盘的日志项
Entries []pb.Entry
// 需要持久化的快照数据
Snapshot pb.Snapshot
// 已被提交的日志项
// 这些日志需要上层读取并应用到状态机中
CommittedEntries []pb.Entry
// 上面Entries持久化后,需要发送的消息
Messages []pb.Message
// HardState和Entries是否需要同步持久化到磁盘
MustSync bool
}
2.2. Node
接口
Node
接口代表了一个Raft节点(etcd-raft有一个实现叫作node
结构,这里不看其实现),上层对于Raft的交互都通过这个接口进行。
该接口主要有下面的方法,详情见注释:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type Node interface {
// 增加内部的逻辑时钟,用于驱动选举、心跳等。
// 上层需要定时调用该方法(如用Ticker控制)。
Tick()
// 将角色变成Candidate, 参与Leader选举
Campaign(ctx context.Context) error
// 往Raft的日志追加数据,可能返回错误
Propose(ctx context.Context, data []byte) error
// 往Raft写入集群配置变更数据
ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error
// 将消息灌入状态机
Step(ctx context.Context, msg pb.Message) error
// 返回一个Ready通道
// 如2.1.所述,上层可通过轮询这个通道获取哪些数据需要持久化/应用到状态机/发送到其他节点
Ready() <-chan Ready
// 调用Ready后,需要执行该方法
Advance()
// ...
}
3. 样例说明
这里简要说明样例存储的工作流程。
3.1. 总体流程
首先,样例会创建2个通道:proposeC
和confChangeC
,它们分别用于写入键值数据和集群配置变更数据。
之后,样例会:
-
启动一个HTTP服务器,接受外部请求,将数据写入上面2个通道中
-
启动
raftNode
(Node
接口实现,并使用了node
结构):- 后台监听上面2个通道,收到数据后调用
Node
接口与Raft进行交互 - 后台周期调用
Tick
,Ready
和Advance
,持久化必要的数据,并修改键值存储的状态
- 后台监听上面2个通道,收到数据后调用
3.2. 往Raft追加日志
首先是监听proposeC
和confChangeC
,并向raftNode
写入数据,追加日志,这部分调用的是Node
的Propose
和ProposeConfChange
方法。
具体位置在raftNode
的serveChannels
方法,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (rc *raftNode) serveChannels() {
// ...
// send proposals over raft
go func() {
confChangeCount := uint64(0)
for rc.proposeC != nil && rc.confChangeC != nil {
// 从proposeC和confChangeC读取数据,并调用Propose和ProposeConfChange
select {
case prop, ok := <-rc.proposeC:
if !ok {
rc.proposeC = nil
} else {
// Propose: 向Raft追加日志
rc.node.Propose(context.TODO(), []byte(prop))
}
case cc, ok := <-rc.confChangeC:
if !ok {
rc.confChangeC = nil
} else {
confChangeCount++
cc.ID = confChangeCount
// ProposeConfChange: 往Raft写入集群配置变更数据
rc.node.ProposeConfChange(context.TODO(), cc)
}
}
}
close(rc.stopc)
}()
// ...
}
3.3. 轮询Ready
2.1.和2.2.中提及,上层需要轮询Ready
以获取需要持久化/应用到状态机/发送给其他节点的消息,这部分也在raftNode
的serveChannels
方法中,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (rc *raftNode) serveChannels() {
// ...
// 设置Ticker, 100ms触发一次事件
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
// ...
// 轮询Ready
for {
select {
// Ticker就绪时,自增raftNode的逻辑时钟以触发选主, 心跳等
case <-ticker.C:
rc.node.Tick()
// 轮询Ready通道,若就绪,上层则执行必要的事情
case rd := <-rc.node.Ready():
// a. 将硬状态和Entries写入WAL
rc.wal.Save(rd.HardState, rd.Entries)
// b. 持久化快照数据
if !raft.IsEmptySnap(rd.Snapshot) {
rc.saveSnap(rd.Snapshot)
rc.raftStorage.ApplySnapshot(rd.Snapshot)
rc.publishSnapshot(rd.Snapshot)
}
// c. 持久化Entries
rc.raftStorage.Append(rd.Entries)
// d. 发送Messages给其他节点
rc.transport.Send(rd.Messages)
// e. 提取已经提交的日志项,应用到键值存储中(即状态机)
if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
rc.stop()
return
}
// f. 可能触发快照
rc.maybeTriggerSnapshot()
// g. 最后需要调用Advance
rc.node.Advance()
// ...
}
}
}
4. 总结
本文通过etcd提供的样例键值存储,总体介绍了外部需要调用的Node
接口和Ready
结构,了解上层如何调用etcd-raft库实现上层应用。
后面的文章会对etcd-raft内部进行探究。