3

etcd-raft (4): Raft的复制和心跳

 2 years ago
source link: https://keys961.github.io/2020/10/13/etcd-raft-4/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

上一篇讲了etcd-raft的选主协议,这次探究一下另一个重要协议:复制和心跳。

2. 日志复制

选主好了后,Leader就可以同步日志到Follower上。这里涉及到的消息类型有:MsgProp, MsgApp, MsgAppResp

2.1. Leader追加与广播日志

Leader上层可通过Node接口的Propose方法追加日志。追加的消息类型是MsgProp,这部分实现在RawNode中,它调用了raftStep函数:

func (rn *RawNode) Propose(data []byte) error {
    // 追加的是MsgProp消息
    return rn.raft.Step(pb.Message{
        Type: pb.MsgProp, 
        From: rn.raft.id,
        // 追加的日志项
        Entries: []pb.Entry{
            {Data: data},
        }})
}

进入raftStep函数时,会检查消息的Term,对于Leader而言,这些检查大多数都会通过:

  • 若消息的Term更大,则自己降级为Follower,但这种情况不可能出现,因为是自己追加的消息
  • 若消息的Term更小,消息直接被忽略,因为它更旧

检查的代码前文讲选主的时候贴了,这里就不贴了。

通过检查后,会调用raftstep函数,对于Leader而言,会调用stepLeader方法。对于MsgProp,它会:

  • 对日志数据进行检查
  • 判断是否有配置变更的日志,并处理相关逻辑,这部分不在本文讨论之中
    • MsgProp类型可以是配置变更(上层调用NodeProposeConfChange),也可以是日志数据(上层调用NodePropose
    • 本文只讨论后者
  • 广播日志变更
func stepLeader(r *raft, m pb.Message) error {
    switch m.Type {
    // ...
    case pb.MsgProp:
        // 1. 日志项检查
        if len(m.Entries) == 0 {
            r.logger.Panicf("%x stepped empty MsgProp", r.id)
        }
        if r.prs.Progress[r.id] == nil {
            return ErrProposalDropped
        }
        if r.leadTransferee != None {
            // ...
            return ErrProposalDropped
        }
        // 2. 判断是否有配置变更的日志,并处理相关逻辑,这部分不在本文讨论之中
        for i := range m.Entries {
            e := &m.Entries[i]
            var cc pb.ConfChangeI
            if e.Type == pb.EntryConfChange {
                var ccc pb.ConfChange
                if err := ccc.Unmarshal(e.Data); err != nil {
                    panic(err)
                }
                cc = ccc
            } else if e.Type == pb.EntryConfChangeV2 {
                var ccc pb.ConfChangeV2
                if err := ccc.Unmarshal(e.Data); err != nil {
                    panic(err)
                }
                cc = ccc
            }
            if cc != nil {
                // ...
            }
        }
        // 3. 追加日志
        if !r.appendEntry(m.Entries...) {
            return ErrProposalDropped
        }
        // 4. 广播日志
        r.bcastAppend()
        return nil // 返回
    // ...
    }
    // ...
}

追加日志很简单,主要就做3步,代码也就不贴了:

  • 将日志项追加到raftLog中(内部追加到unstable中)
  • 更新Leader的日志索引进度
    • 内部为Progress数据结构,包含2个重要字段MatchNext,这些在论文中有指明
  • 尝试提交日志(只更新committedIndex,不会持久化日志,仅当单节点时生效)

而广播日志稍微复杂一些,它将所有追加日志的消息广播给其他所有节点,主要做下面几步,代码也不贴了:

  • raftLog中获取Term和日志项
    • etcd-raft追踪了对方节点的日志索引进度,因此通过这个进度截取需要发送的日志项
    • 正常情况下,不会发生错误,若发生错误则会发送快照数据,关于这部分不在本文说明
  • 组装MsgApp消息,包含这些数据:
    • Index:发送的日志项的前一项索引(即Raft论文中prevLogIndex
    • LogTerm:发送的日志项的前一项任期(即Raft论文中prevLogTerm
    • Entries:日志项
    • Commit:Leader目前的提交索引号
    • Term:Leader当前的任期(这在raft.send方法中组装)
  • 更新Follower的追踪数据,包括更新日志提交索引,并标记日志正在传输
  • 将组装好的消息追加到raftmsg数组
    • 上层轮询调用Ready后,raftmsg数组就会被传到返回的ReadyMessages字段,这样上层可将这些消息通过网络传输给对应的节点(这也说明了etcd-raft不依赖网络,需要上层实现)

2.2. Follower接收日志

Follower收到了MsgApp消息后,首先需要统一做Term的检查:

  • 若消息的Term更大,则降级为Follower,并且更新Term,设置消息发送方为Leader,然后执行下一步(Rules of Server (All): 规则2
  • 若消息的Term更小,则直接返回空的MsgAppResp消息(部分实现AppendEntries: 规则1,因为消息没指明Rejecttrue
    • 直接返回的原因涉及网络分区导致的Follower触发选主,它可以避免Follower重新回到集群后扰乱集群,详细原因可参考注释
switch {
    // ...
    case m.Term > r.Term:
        // 若消息携带的Term大,则降级为Follower
        // ...
        switch {
        // ...
        default:
            // ...
            if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
                r.becomeFollower(m.Term, m.From) // 降级Follower,并设置发送方为Leader
            } else {
                // ...
            }
        }
    case m.Term < r.Term:
        // 若消息携带的Term小,则返回一个空的MsgAppResp,对方可能会降级为Follower
        if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
            // 这里对心跳和日志追加的请求,只响应一个空,且没有设置Reject(即接受)
            r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
        }  
        // ...
        return nil
    }
    // ...
}

下一步就会进入stepFollower中,它会:

  • 将选主计时器归零,并设置发送方为Leader,即将其视作心跳
  • 然后尝试追加日志

追加日志部分在handleAppendEntries中,这部分逻辑如下:

  • prevLogIndex小于自己的committedIndex,说明日志项已经存在,不需要任何操作,返回成功
  • 尝试追加日志:
    • 首先判断消息中prevLogItem对应的日志项term是否和prevLogTerm匹配,若不匹配则返回失败(AppendEntries: 规则2
    • 若匹配,则:
      • 寻找消息和本地的日志冲突,并删除本地冲突日志项及其后面的所有日志项(AppendEntries: 规则3
      • 追加日志(AppendEntries: 规则4
    • 然后本地提交,设置本地committedIndex为Leader提交索引与本地最后一项日志索引的最小值(AppendEntries: 规则5

这里需要注意,一切的MsgAppResp也携带了Term字段,和MsgApp一样。

func (r *raft) handleAppendEntries(m pb.Message) {
    // 1. 若prevLogIndex小于自己的committedIndex
    // 说明日志项已经存在,不需要任何操作,返回成功,并返回自己committedIndex
    if m.Index < r.raftLog.committed {
        r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
        return
    }
    // 2. 尝试追加日志
    if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
        // 成功,则返回成功,并返回自己最新的committedIndex(这里会执行一次提交)
        r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
    } else {
        // 失败,则返回拒绝,并返回自己最后一项日志的索引
        r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
    }
}

func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
    // 2.1. 首先根据消息的prevLogIndex,寻找自己存储的对应日志项,读取该日志项的term,判断是否与prevLogTerm匹配
    if l.matchTerm(index, logTerm) {
        lastnewi = index + uint64(len(ents))
        // 2.2. 若匹配,则先寻找冲突项,正常情况下,冲突项的索引会大于自己的committed
        ci := l.findConflict(ents)
        switch {
        case ci == 0:
        case ci <= l.committed:
            l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
        default:
            // 2.3. 然后追加剩余的日志,这里会把原日志的冲突项之后的一并删除,并用新的日志项追加和替代
            offset := index + 1
            l.append(ents[ci-offset:]...)
        }
        // 2.4. 提交,更新自己的committedIndex
        l.commitTo(min(committed, lastnewi))
        return lastnewi, true
    }
    return 0, false // 不匹配则返回错误
}

2.3. Leader处理Follower响应

这里再次回到stepLeader中,不过首先要说明一下StateType

  • StateProbe:表示Follower的日志索引未知,需要Leader探测出来
  • StateReplicate:正常情况下的Follower状态
  • StateSnapshot:表示Follower不能从Leader的日志中同步,需要使用快照进行恢复

之后看下Leader对于MsgAppResp响应的处理,当然它也有Term的校验:

  • Term大,则降级为Follower,并设置Leader为未知(Rules of Server (All): 规则2
  • Term

这里贴部分代码,主要逻辑为:

  • 若响应为拒绝:
    • 将Follower的日志索引进度Next回退到RejectHint,即Follower最后一项日志的索引(Rules of Server (Leader): 规则3.2
    • 设置状态为StateProbe
    • 重新再发送日志追加请求MsgApp,追加缺失的日志并探测Follower的日志索引进度
  • 若响应为同意:
    • 更新Follower的日志索引进度(更新Match为Follower最后一项日志的索引,Next为下一项待复制的索引)(Rules of Server (Leader): 规则3.1
    • 更新Follower的状态
    • 尝试更新Leader的committedIndex(尝试提交,但不持久化,持久化留给上层调用Advance后做),这里会检查是否过半数Rules of Server (Leader): 规则4
      • 若提交成功:则将该变更传播给其它节点
func stepLeader(r *raft, m pb.Message) error {
    // ...
    pr := r.prs.Progress[m.From] // 获取Follower的日志追踪状态
    if pr == nil {
        r.logger.Debugf("%x no progress available for %x", r.id, m.From)
        return nil
    }
    switch m.Type {
    case pb.MsgAppResp:
        pr.RecentActive = true
        if m.Reject {
            // 若返回拒绝
            // 将Follower的日志索引进度Next回退到RejectHint,即Follower最后一项日志的索引
            if pr.MaybeDecrTo(m.Index, m.RejectHint) {
                r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
                if pr.State == tracker.StateReplicate {
                    // 设置状态为StateProbe
                    pr.BecomeProbe()
                }
                // 重新再发送日志追加请求MsgApp,以探测Follower的日志索引进度
                r.sendAppend(m.From)
            }
        } else {
            // 若返回成功
            oldPaused := pr.IsPaused()
            // 则更新Follower的日志索引进度
            // a. 更新Match为Follower最后一项日志的索引
            // b. 更新Next为下一项待复制的索引
            if pr.MaybeUpdate(m.Index) {
                // 更新Follower的状态
                switch {
                case pr.State == tracker.StateProbe:
                    pr.BecomeReplicate() // 从StateProbe恢复到StateReplicate
                case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
                    pr.BecomeProbe()
                    pr.BecomeReplicate() // 从StateSnapshot恢复到StateReplicate
                case pr.State == tracker.StateReplicate:
                    pr.Inflights.FreeLE(m.Index) // 取消正在传输的标记
                }
                // 尝试更新Leader的committedIndex,这里会检查是否半数通过
                if r.maybeCommit() {
                    // 若超过半数,则将Leader提交的信息广播给其他节点
                    r.bcastAppend()
                } else if oldPaused {
                    // 若没超过半数,且Follower出现过暂停的状况,则重新再发送一次MsgApp,使其追回日志
                    r.sendAppend(m.From)
                }
                // ...
            }
        }
        // ...
    }
    // ...
}

选主后,Leader需要周期性对Follower进行心跳,让Follower感知到Leader存在,Leader也需要通过心跳响应检测Follower的状态。

在Raft论文中,上面的日志复制请求就可以充当心跳,不过在etcd-raft中,心跳还是另外抽出实现。

这部分涉及到的消息类型有:MsgBeat, MsgHeartbeat, MsgHeartbeatResp

3.1. Leader发起心跳

由于Leader会周期性发起心跳,因此很容易猜到心跳通过调用NodeTick方法触发。

的确,Leader调用Tick后,会调用tickHeartBeat方法,它会:

  • 增加raftheartbeatElapsed计数

  • heartbeatElapsed超过heartbeatTimeout,则重置heartbeatElapsed并给自己发送MsgBeat消息

    func (r *raft) tickHeartbeat() {
        r.heartbeatElapsed++ // 增加heartbeatElapsed
        // ...
        if r.heartbeatElapsed >= r.heartbeatTimeout {
            // 超过heartbeatTimeout时,给自己发送MsgBeat消息
            r.heartbeatElapsed = 0
            r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
        }
    }
    

Leader收到MsgBeat消息后,Leader就开始广播心跳:

func stepLeader(r *raft, m pb.Message) error {
    switch m.Type {
    case pb.MsgBeat:
        r.bcastHeartbeat() // 广播心跳
        return nil
    // ...
    }
    // ...
}

而广播心跳的消息类型为MsgHeartbeat,包含了:

  • 两者的最小值:

    • Leader已提交的日志索引
    • 对应节点已提交日志的索引

    为什么要这个字段并取这个值,下面3.2.会说

  • 上下文信息

  • Leader的任期(raft.send调用时会添加)

func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
    commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
    m := pb.Message{
        To:      to,
        Type:    pb.MsgHeartbeat,
        Commit:  commit,
        Context: ctx,
    }
    r.send(m) // 这一步会添加任期Term字段
}

3.2. Follower处理心跳

而Follower收到了MsgHeartbeat后,和MsgApp一样:

  • 必要的检查消息的Term,这部分见2.2.节
  • 重置electionElapsed计数器并设置Leader
func stepFollower(r *raft, m pb.Message) error {
    switch m.Type {
        // ...
    case pb.MsgHeartbeat:
        r.electionElapsed = 0 // 重置electionElapsed计数器
        r.lead = m.From // 设置Leader
        r.handleHeartbeat(m)// 具体处理心跳请求
        // ...
    }
    return nil
}

而在具体处理心跳请求中:

  • Follower会将消息中的Commit字段提取出来,并根据这个索引将本地日志提交(更新committedIndex
    • 这个索引必须是Leader已经提交的,并且也必须是Follower拥有的,这也是3.1.中的问题答案
  • 返回Leader MsgHeartbeatResp响应
func (r *raft) handleHeartbeat(m pb.Message) {
    r.raftLog.commitTo(m.Commit) // 提交日志(更新本地的committedIndex)
    r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context}) // 返回心跳响应
}

3.3. Leader处理Follower响应

Leader收到响应后,除了必要的检查消息的Term之外,主要还是追踪Follower的状态(标记为存活),并且若Follower需要追赶日志则发送日志复制的请求过去。另外其他的关于readOnly的处理,这里不进行说明。

func stepLeader(r *raft, m pb.Message) error {
    // ...
    pr := r.prs.Progress[m.From] // 获取Follower的Progress
    if pr == nil {
        r.logger.Debugf("%x no progress available for %x", r.id, m.From)
        return nil
    }
    switch m.Type {
        // ...
    case pb.MsgHeartbeatResp:
        // 更新Follower的状态,标记为存活
        pr.RecentActive = true
        pr.ProbeSent = false

        // 腾开窗口以供Leader能够向Follower继续发送请求
        if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
            pr.Inflights.FreeFirstOne()
        }
        // 若Follower需要追赶日志,则这里再发送一次日志复制请求
        if pr.Match < r.raftLog.lastIndex() {
            r.sendAppend(m.From)
        }
        // 处理readOnly,这里忽略
        if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
            return nil
        }
        if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
            return nil
        }
        rss := r.readOnly.advance(m)
        for _, rs := range rss {
            if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None {
                r.send(resp)
            }
        }
    // ...
    }
    return nil
}

上面根据源码标识出了Raft原文中的规则,总体而言还是符合Raft论文中的规则,不过略有修改。

一个比较大的改动就是没有完整实现Raft论文中的AppendEntries: 规则1(即没有指明Rejecttrue),它也没有在心跳中实现。

不过,etcd-raft还有一个check-quorum机制:

  • 通过下面的心跳,Leader可以追踪Follower是否存活
  • 每隔electionTimeout,Leader会定期执行check-quorum,给自己发送MsgCheckQurum消息,检查是否过半数的Follower存活,若没有,自己就会降级为Follower,从而触发下一轮选主

此外还有一些优化和其他方面的集成(如快照、配置变更、节点暂停、只读等),这些不在本文讨论范围内。

下文继续按照论文的顺序,说明集群变化在etcd-raft中的实现。


Related Issues not found

Please contact @keys961 to initialize the comment


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK