7

etcd-raft (6): Raft快照

 2 years ago
source link: https://keys961.github.io/2020/10/21/etcd-raft-6/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

上一篇讲了etcd-raft的集群配置变更。本文顺着论文讲快照。

2. 快照内容

etcd-raft的快照内容会这么选择:

  • 假如unstable中存在了快照,返回它保存的
  • 否则返回Storage中的快照

一般而言会取第二个。

而快照内容包含:

  • 快照元数据,包含
    • 集群配置信息
    • 最后一条日志的索引和任期

这部分和Raft论文一模一样。

3. 何时触发快照发送

etcd-raft的快照请求类型是MsgSnap,发送这个消息在maybeSendAppend方法中,即复制日志给Follower的时候。

复制之前,会根据Follower的Next索引(即Follower缺失的第一项日志)捞取所有需要同步的日志,若操作出错,则会触发快照发送。

操作出错的条件为:

  1. 获取prevLogTerm出错,出错可能性为:
    • 传入的索引pr.Next - 1过小,不保存在Raft日志中,数据已被压缩,返回ErrCompacted
    • 传入的索引pr.Next - 1过大,日志项不存在,返回ErrUnavailable
  2. 获取日志项entries(从pr.Next往后的)出错,出错的可能性也和1类似
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
	pr := r.prs.Progress[to] // Follower的日志进度追踪
    // ...
    m := pb.Message{}
	m.To = to
    // 根据Follower的Next捞取: 1. 日志entries; 2. prevLogTerm
    term, errt := r.raftLog.term(pr.Next - 1)
	ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
    
    if errt != nil || erre != nil {
        // 上面操作出错,则发送快照
        // ...
        m.Type = pb.MsgSnap
		snapshot, err := r.raftLog.snapshot()
        // ...
        // 消息内容
        m.Snapshot = snapshot // 快照数据data
		sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term // 快照的lastIncludedIndex和lastIncludedTerm
        // ...
        pr.BecomeSnapshot(sindex)
        // ...
    } else {
        // ...
    }
    r.send(m) // 发送消息,这里是快照
	return true
}

4. Leader发送快照

从3中可以看到Leader发送快照的内容,包含了:

  • 快照数据,包含:
    • 具体数据,即论文中的data
    • 快照包含的最后的索引,即论文中的lastIncludedIndex
    • 快照包含的最后的任期,即论文中的lastIncludedTerm
  • 任期,即论文的Term

这部分可见第2节,具体是SnapshotSnapshotMetadata

此外还会标记该Follower的复制状态为StateSnapshot

在etcd-raft中,并没有实现数据的分块传输(所以没有offset, done等字段),这部分可以由上层实现。

5. Follower处理快照

Follower收到了MsgSnap消息后,首先需要统一做Term的检查:

  • 若消息的Term更大,则降级为Follower,并且更新Term,设置消息发送方为Leader,然后执行下一步(Rules of Server (All): 规则2
  • 若消息的Term更小,则忽略快照,直接返回,不响应任何消息(部分实现InstallSnapshot:规则1,因为没有返回自己的任期)
switch {
	// ...
	case m.Term > r.Term:
		// ...
		switch {
		// ...
		default:
			// ...
            // 若消息携带的Term大,降级为Follower并设置Leader为发送方
			if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
				r.becomeFollower(m.Term, m.From) 
			} else {
				r.becomeFollower(m.Term, None)
			}
		}

	case m.Term < r.Term:
		// ...
		} else {
			// 若消息携带的Term小,直接忽略,没有任何响应
			r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
				r.id, r.Term, m.Type, m.From, m.Term)
		}
		return nil
	}
	// ...
}

下一步就会进入stepFollower中,它会:

  • 将选主计时器归零,并设置发送方为Leader,即将其视作心跳
  • 然后尝试处理快照

处理快照部分在handleSnapshot中,核心在于restore方法:

  • 若快照包含的最新数据已被Follower提交,直接返回(InstallSnapshot: 规则5
  • 若快照包含的最新数据已被Follower保存(但没提交),直接提交到快照中的索引位置,然后直接返回(InstallSnapshot: 规则5 + 部分InstallSnapshot: 规则6
    • 这里只部分实现了InstallSnapshot: 规则6
      • 原文:需要保存快照,并保留之后的日志,删除之前的日志
      • etcd-raft:没有保留快照,仅做了提交,日志全部保留
  • 应用快照数据,它会清空所有日志,并更新提交索引(InstallSnapshot: 规则5 + InstallSnapshot: 规则7
  • 上层应用快照到状态机可通过轮询Ready实现(InstallSnapshot: 规则8

InstallSnapshot: 规则2~4是用于分段传输快照,etcd-raft内部没有实现,所以忽略

返回的消息直接复用了MsgAppResp,它包含自己已经提交的日志索引和自己的任期,供Leader更新Follower的日志进度。

func (r *raft) handleSnapshot(m pb.Message) {
	sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
	if r.restore(m.Snapshot) {
		r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
			r.id, r.raftLog.committed, sindex, sterm)
		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
	} else {
		r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
			r.id, r.raftLog.committed, sindex, sterm)
		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
	}
}

func (r *raft) restore(s pb.Snapshot) bool {
    // 1. 若快照包含的最新日志索引已被提交,直接返回
	if s.Metadata.Index <= r.raftLog.committed {
		return false
	}
	// ... 进一步校验,这里保证当前是Follower且自己必须包含在快照保存的集群配置中 ...
	// 2. 若快照包含的最新日志已经在Follower保存,则直接提交到快照对应的索引,直接返回
	if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
		// ...
		r.raftLog.commitTo(s.Metadata.Index)
		return false
	}
	// 3. 从快照恢复,它会删除所有的日志,并保存快照数据
	r.raftLog.restore(s)
	// 4. 从给定快照中恢复集群状态
	r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
	cfg, prs, err := confchange.Restore(confchange.Changer{
		Tracker:   r.prs,
		LastIndex: r.raftLog.lastIndex(),
	}, cs)
	// ...
	assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
	pr := r.prs.Progress[r.id]
	pr.MaybeUpdate(pr.Next - 1) // TODO(tbg): this is untested and likely unneeded
    // ...
	return true
}

6. Leader处理快照响应

Follower返回的就是MsgAppResp响应,这部分和复制日志时的处理一模一样,这里不再说明。

etcd-raft对于Raft快照的实现总体也是按照论文的,不过也有不同:

  • 对于InstallSnapshot: 规则1,选择了不响应,而非原文的有响应
  • 对于InstallSnapshot: 规则6:选择了保留旧日志并执行提交操作,而非原文的删除旧日志
  • 复用了MsgAppResp
  • 没有实现分段传输,它交给上层完成

下一篇文章说明论文最后的线性一致读的部分。


Related Issues not found

Please contact @keys961 to initialize the comment


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK