etcd-raft (1): 基于Raft的K-V存储样例

Posted by keys961 on September 29, 2020

1. 概要

Raft论文之前读过(Paxos这个读不懂的),也做过一定的练习实现,不过那个有点naive了,真要看还得看大名鼎鼎的etcd-raft。

不过这里先不看etcd-raft的具体实现,本文先从它提供的raftexample入手,它基于etcd-raft实现了简单的分布式K-V存储。

2. 预备:数据结构与接口

在看样例前,首先要介绍2个数据结构/接口,它们是暴露给外部使用的,分别为:

  • Ready结构
  • Node接口

2.1. Ready结构

由于etcd-raft库没有实现网络通信和存储,因此上层应用向Raft写入数据后,需要有个机制获取哪些数据需要持久化,哪些数据需要传输到其他地方。

上面这个问题可用Go的通道实现。在下面的Node接口的Ready方法会返回一个chan Ready,上层可通过轮询这个通道来获取这些数据,这些数据保存在Ready结构中。

Ready结构包含以下的信息,解释见注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type Ready struct {
	// 软状态,可变且不需要写入WAL
    // 包含:集群Leader, 节点的当前状态(角色)
	*SoftState	
    // 硬状态,需要持久化到磁盘中
    // 包含:节点的Term, Vote(票投给谁), Commit(已提交的日志索引)
	pb.HardState
	// 用于读一致性的数据,之后会说明
	ReadStates []ReadState
    // 下面的Messages发送前需要持久化到磁盘的日志项
	Entries []pb.Entry
	// 需要持久化的快照数据
	Snapshot pb.Snapshot
	// 已被提交的日志项
    // 这些日志需要上层读取并应用到状态机中
	CommittedEntries []pb.Entry
	// 上面Entries持久化后,需要发送的消息
	Messages []pb.Message
    // HardState和Entries是否需要同步持久化到磁盘
	MustSync bool
}

2.2. Node接口

Node接口代表了一个Raft节点(etcd-raft有一个实现叫作node结构,这里不看其实现),上层对于Raft的交互都通过这个接口进行。

该接口主要有下面的方法,详情见注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type Node interface {
    // 增加内部的逻辑时钟,用于驱动选举、心跳等。
    // 上层需要定时调用该方法(如用Ticker控制)。
    Tick()
    // 将角色变成Candidate, 参与Leader选举
    Campaign(ctx context.Context) error
    // 往Raft的日志追加数据,可能返回错误 
    Propose(ctx context.Context, data []byte) error
    // 往Raft写入集群配置变更数据
    ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error
    // 将消息灌入状态机
    Step(ctx context.Context, msg pb.Message) error
    // 返回一个Ready通道
    // 如2.1.所述,上层可通过轮询这个通道获取哪些数据需要持久化/应用到状态机/发送到其他节点
    Ready() <-chan Ready
    // 调用Ready后,需要执行该方法
    Advance()
    // ...
}

3. 样例说明

这里简要说明样例存储的工作流程。

3.1. 总体流程

首先,样例会创建2个通道:proposeCconfChangeC,它们分别用于写入键值数据和集群配置变更数据。

之后,样例会:

  • 启动一个HTTP服务器,接受外部请求,将数据写入上面2个通道中

  • 启动raftNodeNode接口实现,并使用了node结构):

    • 后台监听上面2个通道,收到数据后调用Node接口与Raft进行交互
    • 后台周期调用Tick, ReadyAdvance,持久化必要的数据,并修改键值存储的状态

3.2. 往Raft追加日志

首先是监听proposeCconfChangeC,并向raftNode写入数据,追加日志,这部分调用的是NodeProposeProposeConfChange方法。

具体位置在raftNodeserveChannels方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (rc *raftNode) serveChannels() {
	// ...
	// send proposals over raft
	go func() {
		confChangeCount := uint64(0)
		for rc.proposeC != nil && rc.confChangeC != nil {
            // 从proposeC和confChangeC读取数据,并调用Propose和ProposeConfChange
			select {
			case prop, ok := <-rc.proposeC:
				if !ok {
					rc.proposeC = nil
				} else {
                    // Propose: 向Raft追加日志
					rc.node.Propose(context.TODO(), []byte(prop))
				}

			case cc, ok := <-rc.confChangeC:
				if !ok {
					rc.confChangeC = nil
				} else {
					confChangeCount++
					cc.ID = confChangeCount
                    // ProposeConfChange: 往Raft写入集群配置变更数据
					rc.node.ProposeConfChange(context.TODO(), cc)
				}
			}
		}
		close(rc.stopc)
	}()
    // ...
}

3.3. 轮询Ready

2.1.和2.2.中提及,上层需要轮询Ready以获取需要持久化/应用到状态机/发送给其他节点的消息,这部分也在raftNodeserveChannels方法中,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (rc *raftNode) serveChannels() {
	// ...
    // 设置Ticker, 100ms触发一次事件
    ticker := time.NewTicker(100 * time.Millisecond)
	defer ticker.Stop()
    // ...
    // 轮询Ready
    for {
		select {
        // Ticker就绪时,自增raftNode的逻辑时钟以触发选主, 心跳等
		case <-ticker.C:
			rc.node.Tick()

		// 轮询Ready通道,若就绪,上层则执行必要的事情
		case rd := <-rc.node.Ready():
            // a. 将硬状态和Entries写入WAL
			rc.wal.Save(rd.HardState, rd.Entries) 
            // b. 持久化快照数据
			if !raft.IsEmptySnap(rd.Snapshot) {
				rc.saveSnap(rd.Snapshot)
				rc.raftStorage.ApplySnapshot(rd.Snapshot)
				rc.publishSnapshot(rd.Snapshot)
			}
            // c. 持久化Entries
			rc.raftStorage.Append(rd.Entries)
            // d. 发送Messages给其他节点
			rc.transport.Send(rd.Messages)
            // e. 提取已经提交的日志项,应用到键值存储中(即状态机)
			if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
				rc.stop()
				return
			}
            // f. 可能触发快照
			rc.maybeTriggerSnapshot()
            // g. 最后需要调用Advance
			rc.node.Advance()

		// ...
		}
	}
}

4. 总结

本文通过etcd提供的样例键值存储,总体介绍了外部需要调用的Node接口和Ready结构,了解上层如何调用etcd-raft库实现上层应用。

后面的文章会对etcd-raft内部进行探究。