1. 概要
这里最后说明一下论文的最后部分“线性一致读”在etcd的实现。
2. 线性一致
CAP中的C即线性一致,它指的是:系统写操作提交成功后,之后的读取都会得到最新的数据。
即:在分布式系统上实现寄存器语义
2.1. Raft只能作为线性一致读的基础
有个误区:Raft和Paxos系统是线性一致的。其实不然,这些协议需要上层做些事情,才能达到。
在Raft中,写提交成功只会达成日志的一致(并且落盘),而状态机并不能达成一致,需要将提交的日志应用到状态机。而应用日志的操作的实现基本都是异步的,所以还是可能读到旧数据。
2.2. ReadIndex算法
实现线性一致读有一个很简单的算法:
- 获取集群已提交的日志索引
- 当状态机的日志应用索引大于等于日志提交索引时,读取才能返回
这里会有2个问题:
- 如何获取集群的日志提交索引?
- 如何确保Leader有效?
a) 获取集群的日志提交索引
这部分可以通过路由保证:
- 若Follower收到读请求,直接路由到Leader
- 从Leader获取日志提交索引
- 等待该索引的日志应用到状态机后,执行读请求,返回给客户端
b) 确保Leader有效
Leader可以发起一个广播请求,若能收到大部分节点的应答,说明Leader有效。
这步很重要,没有这一步,在网络分区的时候,Leader不知道自己是否有效,可能会读取到旧数据。
例如(A, B, C, D, E),其中A为Leader,之后网络分区为(A, B)和(C, D, E),C成为新Leader,而A不知道自己已经不是Leader了:
- 首先往C写入,成功
- 然后往A读取,由于A认为自己还是Leader,所以也会返回结果,但这是旧数据
这部分在etcd-raft就是check-quorum机制。
2.3. LeaderRead算法
这部分和ReaderIndex类似,不过它利用了时钟特性:Leader设置租约(比election timeout小),在租约内就可以保证Leader有效,从而不需要执行2.2.b)的广播。
这极大提高了吞吐,降低了延时。但时钟飘移严重,则正确性也有问题。
这部分PingCAP的文章说的非常好。
3. etcd实现
上层部分的逻辑参考了这篇文章,主要的逻辑就是:
-
客户端会发起一个
Range
请求 -
服务端会执行ReadIndex算法,尝试获取日志提交索引
-
向etcd-raft模块发起
ReadIndex
请求,以获取索引 - 等待日志提交索引的结果:
- Follower会路由
ReadIndex
请求到Leader - Leader收到后将其随着自己的日志提交索引进行缓存,并广播心跳
- Leader收到足够心跳,则弹出缓存的消息,返回Follower
ReadIndex
响应,响应包含自己的日志提交索引 - Follower/Leader收到日志提交索引后,将其包装到
readStates
中,并通过Ready
轮询返回给上层
- Follower会路由
- 等待日志应用索引大于等于日志提交索引,满足条件后,即可读取数据
-
这里主要看etcd-raft模块,对应的就是上面2.2.中的步骤。入口就是ReadIndex
请求,这里着重讲ReadIndex算法。
3.1. 处理ReadIndex
请求
a) Follower处理
Follower比较简单,它会将ReadIndex
请求路由到Leader:
1
2
3
4
5
6
7
8
9
10
11
12
13
func stepFollower(r *raft, m pb.Message) error {
switch m.Type {
// ...
case pb.MsgReadIndex:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
return nil
}
m.To = r.lead // 目的就是Leader
r.send(m) // 将请求路由到Leader
}
return nil
}
b) Leader处理
Leader通过2种方式获取ReadIndex
请求:上层调用、Follower路由。
它的执行逻辑很简单:
- 将请求追加到
readOnly
队列 - 广播心跳
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func stepLeader(r *raft, m pb.Message) error {
switch m.Type {
// ...
case pb.MsgReadIndex:
// 若只有1个节点(即自己),则直接返回MsgReadIndexResp响应(包含提交索引)
if r.prs.IsSingleton() {
if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
r.send(resp)
}
return nil
}
// ...
// 若节点多于1个,会走到这里
switch r.readOnly.option {
case ReadOnlySafe:
// ReadIndex
// 1. 将ReadIndex请求追加到readOnly队列,用于ack计数
// 队列中的消息包含了对应的提交日志索引
r.readOnly.addRequest(r.raftLog.committed, m)
// 2. 自己先对自己ack
r.readOnly.recvAck(r.id, m.Entries[0].Data)
// 3. 广播心跳以确认Leader状态
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
case ReadOnlyLeaseBased:
// LeaseRead,
// 直接返回日志提交索引的MsgReadIndexResp响应,这里略
// ...
}
return nil
}
}
这里看下readOnly
相关数据结构:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type readOnly struct {
// 读取选项: ReadIndex和LeaseRead
option ReadOnlyOption
// 待处理的读请求队列
// 键为请求ID, 值是该读请求的状态
pendingReadIndex map[string]*readIndexStatus
// 读请求队列
readIndexQueue []string
}
type readIndexStatus struct {
// 读请求消息体
req pb.Message
// 提交日志的索引
index uint64
// ACK记录,用于quorum计数
acks map[uint64]bool
}
3.2. Leader处理心跳的响应
Leader处理ReadIndex
请求会广播一次心跳,当收到响应后:
- 更新
readOnly
中的quorum - 若quorum过半,则会清理
readOnly
中队列的消息,并:- 若队列中消息来自自己,则将其追加到
readStates
中,它会包装成Ready
返回给应用层 - 若队列中消息来自Follower,则返回
MsgReadIndexResp
响应(包含对应的日志提交索引)
- 若队列中消息来自自己,则将其追加到
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func stepLeader(r *raft, m pb.Message) error {
// ...
switch m.Type {
// ...
case pb.MsgHeartbeatResp:
// ...
// 统计quorm,若没到要求则直接返回
if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
return nil
}
// quorum通过,清理readOnly中队列的消息
rss := r.readOnly.advance(m)
for _, rs := range rss {
if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None {
r.send(resp)
}
}
// ...
}
}
func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Message {
// 若消息来自自己,则将消息追加到readStates中,它会包装成Ready返回给应用层
if req.From == None || req.From == r.id {
r.readStates = append(r.readStates, ReadState{
Index: readIndex,
RequestCtx: req.Entries[0].Data,
})
return pb.Message{}
}
// 若消息来自Follower,则返回MsgReadIndexResp消息,并返回给Follower
return pb.Message{
Type: pb.MsgReadIndexResp,
To: req.From,
Index: readIndex,
Entries: req.Entries,
}
}
3.3. Follower处理ReadIndex
响应
这里很简单,和Leader一样,将对应的ReadIndex
消息随同日志提交索引,缓存到readStates
中,供上层的Ready
轮询使用。
1
2
3
4
5
6
7
8
9
func stepFollower(r *raft, m pb.Message) error {
// ...
case pb.MsgReadIndexResp:
// ...
// 将消息追加到readStates中,它会包装成Ready返回给应用层
r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
}
// ...
}
上层进行Ready
轮询时,会获取上面缓存的readStates
,从而知道了集群日志提交的索引。下一步只要等待日志应用索引超过日志提交索引,就可以对状态机进行读取,把结果返回给客户端了。
3.4. LeaseRead实现
这部分在ReadIndex上进行了简化,只需要在3.1.b)中,收到MsgReadIndex
请求后直接向Follower返回MsgReadIndexResp
响应,然后跳到3.3.,就可以实现。
4. 总结
至此,对照Raft论文,etcd-raft基本看的差不多了。
一个感想:Go语言的确非常简单,但是其CSP风格的确比较难适应,很tricky。