etcd-raft (7): Raft线性一致读

Posted by keys961 on November 6, 2020

1. 概要

这里最后说明一下论文的最后部分“线性一致读”在etcd的实现。

2. 线性一致

CAP中的C即线性一致,它指的是:系统写操作提交成功后,之后的读取都会得到最新的数据。

即:在分布式系统上实现寄存器语义

2.1. Raft只能作为线性一致读的基础

有个误区:Raft和Paxos系统是线性一致的。其实不然,这些协议需要上层做些事情,才能达到。

在Raft中,写提交成功只会达成日志的一致(并且落盘),而状态机并不能达成一致,需要将提交的日志应用到状态机。而应用日志的操作的实现基本都是异步的,所以还是可能读到旧数据。

2.2. ReadIndex算法

实现线性一致读有一个很简单的算法:

  1. 获取集群已提交的日志索引
  2. 当状态机的日志应用索引大于等于日志提交索引时,读取才能返回

这里会有2个问题:

  1. 如何获取集群的日志提交索引?
  2. 如何确保Leader有效?

a) 获取集群的日志提交索引

这部分可以通过路由保证:

  • 若Follower收到读请求,直接路由到Leader
  • 从Leader获取日志提交索引
  • 等待该索引的日志应用到状态机后,执行读请求,返回给客户端

b) 确保Leader有效

Leader可以发起一个广播请求,若能收到大部分节点的应答,说明Leader有效。

这步很重要,没有这一步,在网络分区的时候,Leader不知道自己是否有效,可能会读取到旧数据。

例如(A, B, C, D, E),其中A为Leader,之后网络分区为(A, B)和(C, D, E),C成为新Leader,而A不知道自己已经不是Leader了:

  • 首先往C写入,成功
  • 然后往A读取,由于A认为自己还是Leader,所以也会返回结果,但这是旧数据

这部分在etcd-raft就是check-quorum机制。

2.3. LeaderRead算法

这部分和ReaderIndex类似,不过它利用了时钟特性:Leader设置租约(比election timeout小),在租约内就可以保证Leader有效,从而不需要执行2.2.b)的广播。

这极大提高了吞吐,降低了延时。但时钟飘移严重,则正确性也有问题。

这部分PingCAP的文章说的非常好。

3. etcd实现

上层部分的逻辑参考了这篇文章,主要的逻辑就是:

  1. 客户端会发起一个Range请求

  2. 服务端会执行ReadIndex算法,尝试获取日志提交索引

    1. 向etcd-raft模块发起ReadIndex请求,以获取索引

    2. 等待日志提交索引的结果:
      • Follower会路由ReadIndex请求到Leader
      • Leader收到后将其随着自己的日志提交索引进行缓存,并广播心跳
      • Leader收到足够心跳,则弹出缓存的消息,返回FollowerReadIndex响应,响应包含自己的日志提交索引
      • Follower/Leader收到日志提交索引后,将其包装到readStates中,并通过Ready轮询返回给上层
    3. 等待日志应用索引大于等于日志提交索引,满足条件后,即可读取数据

这里主要看etcd-raft模块,对应的就是上面2.2.中的步骤。入口就是ReadIndex请求,这里着重讲ReadIndex算法。

3.1. 处理ReadIndex请求

a) Follower处理

Follower比较简单,它会将ReadIndex请求路由到Leader:

1
2
3
4
5
6
7
8
9
10
11
12
13
func stepFollower(r *raft, m pb.Message) error {
	switch m.Type {
        // ...
        case pb.MsgReadIndex:
		if r.lead == None {
			r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
			return nil
		}
		m.To = r.lead // 目的就是Leader
		r.send(m) // 将请求路由到Leader
    }
    return nil
}

b) Leader处理

Leader通过2种方式获取ReadIndex请求:上层调用、Follower路由。

它的执行逻辑很简单:

  1. 将请求追加到readOnly队列
  2. 广播心跳
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func stepLeader(r *raft, m pb.Message) error {
	switch m.Type {
    	// ...
        case pb.MsgReadIndex:
        // 若只有1个节点(即自己),则直接返回MsgReadIndexResp响应(包含提交索引)
		if r.prs.IsSingleton() {
			if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
				r.send(resp)
			}
			return nil
		}
		// ...
        // 若节点多于1个,会走到这里
		switch r.readOnly.option {
		case ReadOnlySafe:
            // ReadIndex
            // 1. 将ReadIndex请求追加到readOnly队列,用于ack计数
            // 队列中的消息包含了对应的提交日志索引
			r.readOnly.addRequest(r.raftLog.committed, m)
			// 2. 自己先对自己ack
			r.readOnly.recvAck(r.id, m.Entries[0].Data)
            // 3. 广播心跳以确认Leader状态
			r.bcastHeartbeatWithCtx(m.Entries[0].Data)
		case ReadOnlyLeaseBased:
            // LeaseRead,
            // 直接返回日志提交索引的MsgReadIndexResp响应,这里略
            // ...
		}
		return nil
	}
}

这里看下readOnly相关数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type readOnly struct {
    // 读取选项: ReadIndex和LeaseRead
	option           ReadOnlyOption
    // 待处理的读请求队列
    // 键为请求ID, 值是该读请求的状态
	pendingReadIndex map[string]*readIndexStatus
	// 读请求队列
    readIndexQueue   []string
}

type readIndexStatus struct {
    // 读请求消息体
	req   pb.Message
	// 提交日志的索引
    index uint64
	// ACK记录,用于quorum计数
	acks map[uint64]bool
}

3.2. Leader处理心跳的响应

Leader处理ReadIndex请求会广播一次心跳,当收到响应后:

  1. 更新readOnly中的quorum
  2. 若quorum过半,则会清理readOnly中队列的消息,并:
    • 若队列中消息来自自己,则将其追加到readStates中,它会包装成Ready返回给应用层
    • 若队列中消息来自Follower,则返回MsgReadIndexResp响应(包含对应的日志提交索引)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func stepLeader(r *raft, m pb.Message) error {
 	// ...
    switch m.Type {
     	// ...
    case pb.MsgHeartbeatResp:
        // ...
        // 统计quorm,若没到要求则直接返回
        if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
			return nil
		}
		// quorum通过,清理readOnly中队列的消息
		rss := r.readOnly.advance(m)
		for _, rs := range rss {
			if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None {
				r.send(resp)
			}
		}
        // ...
    }
}

func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Message {
    // 若消息来自自己,则将消息追加到readStates中,它会包装成Ready返回给应用层
	if req.From == None || req.From == r.id {
		r.readStates = append(r.readStates, ReadState{
			Index:      readIndex,
			RequestCtx: req.Entries[0].Data,
		})
		return pb.Message{}
	}
    // 若消息来自Follower,则返回MsgReadIndexResp消息,并返回给Follower
	return pb.Message{
		Type:    pb.MsgReadIndexResp,
		To:      req.From,
		Index:   readIndex,
		Entries: req.Entries,
	}
}

3.3. Follower处理ReadIndex响应

这里很简单,和Leader一样,将对应的ReadIndex消息随同日志提交索引,缓存到readStates中,供上层的Ready轮询使用。

1
2
3
4
5
6
7
8
9
func stepFollower(r *raft, m pb.Message) error {
 	// ...
    case pb.MsgReadIndexResp:
		// ...
    	// 将消息追加到readStates中,它会包装成Ready返回给应用层
		r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
	}
	// ...
}

上层进行Ready轮询时,会获取上面缓存的readStates,从而知道了集群日志提交的索引。下一步只要等待日志应用索引超过日志提交索引,就可以对状态机进行读取,把结果返回给客户端了。

3.4. LeaseRead实现

这部分在ReadIndex上进行了简化,只需要在3.1.b)中,收到MsgReadIndex请求后直接向Follower返回MsgReadIndexResp响应,然后跳到3.3.,就可以实现。

4. 总结

至此,对照Raft论文,etcd-raft基本看的差不多了。

一个感想:Go语言的确非常简单,但是其CSP风格的确比较难适应,很tricky。