etcd-raft (6): Raft快照

Posted by keys961 on October 21, 2020

1. 概要

上一篇讲了etcd-raft的集群配置变更。本文顺着论文讲快照。

2. 快照内容

etcd-raft的快照内容会这么选择:

  • 假如unstable中存在了快照,返回它保存的
  • 否则返回Storage中的快照

一般而言会取第二个。

而快照内容包含:

  • 快照数据
  • 快照元数据,包含
    • 集群配置信息
    • 最后一条日志的索引和任期

这部分和Raft论文一模一样。

3. 何时触发快照发送

etcd-raft的快照请求类型是MsgSnap,发送这个消息在maybeSendAppend方法中,即复制日志给Follower的时候。

复制之前,会根据Follower的Next索引(即Follower缺失的第一项日志)捞取所有需要同步的日志,若操作出错,则会触发快照发送。

操作出错的条件为:

  1. 获取prevLogTerm出错,出错可能性为:
    • 传入的索引pr.Next - 1过小,不保存在Raft日志中,数据已被压缩,返回ErrCompacted
    • 传入的索引pr.Next - 1过大,日志项不存在,返回ErrUnavailable
  2. 获取日志项entries(从pr.Next往后的)出错,出错的可能性也和1类似
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
	pr := r.prs.Progress[to] // Follower的日志进度追踪
    // ...
    m := pb.Message{}
	m.To = to
    // 根据Follower的Next捞取: 1. 日志entries; 2. prevLogTerm
    term, errt := r.raftLog.term(pr.Next - 1)
	ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
    
    if errt != nil || erre != nil {
        // 上面操作出错,则发送快照
        // ...
        m.Type = pb.MsgSnap
		snapshot, err := r.raftLog.snapshot()
        // ...
        // 消息内容
        m.Snapshot = snapshot // 快照数据data
		sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term // 快照的lastIncludedIndex和lastIncludedTerm
        // ...
        pr.BecomeSnapshot(sindex)
        // ...
    } else {
        // ...
    }
    r.send(m) // 发送消息,这里是快照
	return true
}

4. Leader发送快照

从3中可以看到Leader发送快照的内容,包含了:

  • 快照数据,包含:
    • 具体数据,即论文中的data
    • 快照包含的最后的索引,即论文中的lastIncludedIndex
    • 快照包含的最后的任期,即论文中的lastIncludedTerm
    • 配置信息
  • 任期,即论文的Term

这部分可见第2节,具体是SnapshotSnapshotMetadata

此外还会标记该Follower的复制状态为StateSnapshot

在etcd-raft中,并没有实现数据的分块传输(所以没有offset, done等字段),这部分可以由上层实现。

5. Follower处理快照

Follower收到了MsgSnap消息后,首先需要统一做Term的检查:

  • 若消息的Term更大,则降级为Follower,并且更新Term,设置消息发送方为Leader,然后执行下一步(Rules of Server (All): 规则2
  • 若消息的Term更小,则忽略快照,直接返回,不响应任何消息(部分实现InstallSnapshot:规则1,因为没有返回自己的任期)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
switch {
	// ...
	case m.Term > r.Term:
		// ...
		switch {
		// ...
		default:
			// ...
            // 若消息携带的Term大,降级为Follower并设置Leader为发送方
			if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
				r.becomeFollower(m.Term, m.From) 
			} else {
				r.becomeFollower(m.Term, None)
			}
		}

	case m.Term < r.Term:
		// ...
		} else {
			// 若消息携带的Term小,直接忽略,没有任何响应
			r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
				r.id, r.Term, m.Type, m.From, m.Term)
		}
		return nil
	}
	// ...
}

下一步就会进入stepFollower中,它会:

  • 将选主计时器归零,并设置发送方为Leader,即将其视作心跳
  • 然后尝试处理快照

处理快照部分在handleSnapshot中,核心在于restore方法:

  • 若快照包含的最新数据已被Follower提交,直接返回(InstallSnapshot: 规则5
  • 若快照包含的最新数据已被Follower保存(但没提交),直接提交到快照中的索引位置,然后直接返回(InstallSnapshot: 规则5 + 部分InstallSnapshot: 规则6
    • 这里只部分实现了InstallSnapshot: 规则6
      • 原文:需要保存快照,并保留之后的日志,删除之前的日志
      • etcd-raft:没有保留快照,仅做了提交,日志全部保留
  • 应用快照数据,它会清空所有日志,并更新提交索引(InstallSnapshot: 规则5 + InstallSnapshot: 规则7
  • 上层应用快照到状态机可通过轮询Ready实现(InstallSnapshot: 规则8

InstallSnapshot: 规则2~4是用于分段传输快照,etcd-raft内部没有实现,所以忽略

返回的消息直接复用了MsgAppResp,它包含自己已经提交的日志索引和自己的任期,供Leader更新Follower的日志进度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (r *raft) handleSnapshot(m pb.Message) {
	sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
	if r.restore(m.Snapshot) {
		r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
			r.id, r.raftLog.committed, sindex, sterm)
		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
	} else {
		r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
			r.id, r.raftLog.committed, sindex, sterm)
		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
	}
}

func (r *raft) restore(s pb.Snapshot) bool {
    // 1. 若快照包含的最新日志索引已被提交,直接返回
	if s.Metadata.Index <= r.raftLog.committed {
		return false
	}
	// ... 进一步校验,这里保证当前是Follower且自己必须包含在快照保存的集群配置中 ...
	// 2. 若快照包含的最新日志已经在Follower保存,则直接提交到快照对应的索引,直接返回
	if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
		// ...
		r.raftLog.commitTo(s.Metadata.Index)
		return false
	}
	// 3. 从快照恢复,它会删除所有的日志,并保存快照数据
	r.raftLog.restore(s)
	// 4. 从给定快照中恢复集群状态
	r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
	cfg, prs, err := confchange.Restore(confchange.Changer{
		Tracker:   r.prs,
		LastIndex: r.raftLog.lastIndex(),
	}, cs)
	// ...
	assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
	pr := r.prs.Progress[r.id]
	pr.MaybeUpdate(pr.Next - 1) // TODO(tbg): this is untested and likely unneeded
    // ...
	return true
}

6. Leader处理快照响应

Follower返回的就是MsgAppResp响应,这部分和复制日志时的处理一模一样,这里不再说明。

7. 总结

etcd-raft对于Raft快照的实现总体也是按照论文的,不过也有不同:

  • 对于InstallSnapshot: 规则1,选择了不响应,而非原文的有响应
  • 对于InstallSnapshot: 规则6:选择了保留旧日志并执行提交操作,而非原文的删除旧日志
  • 复用了MsgAppResp
  • 没有实现分段传输,它交给上层完成

下一篇文章说明论文最后的线性一致读的部分。