3

etcd-raft (3): Raft的启动与选主

 2 years ago
source link: https://keys961.github.io/2020/10/11/etcd-raft-3/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

上一篇讲了etcd-raft的日志和消息数据结构及其实现,本文就深入它对于Raft协议的实现,这一次挑选的是选主

2. 节点启动

在看选主前,首先看下etcd-raft是如何启动的。

2.1. 数据结构

这里涉及到的数据结构就是node,它是Node接口的实现。其内部声明了一堆通道,然后加上一个RawNode(而RawNode就包了一层raft结构,它记录了Raft的状态):

type node struct {
	propc      chan msgWithResult
	recvc      chan pb.Message
	confc      chan pb.ConfChangeV2
	confstatec chan pb.ConfState
	readyc     chan Ready
	advancec   chan struct{}
	tickc      chan struct{}
	done       chan struct{}
	stop       chan struct{}
	status     chan chan Status

	rn *RawNode // 包装了raft,代表一个简单的Raft节点实现
}

type RawNode struct {
	raft       *raft // 记录了raft的状态
	prevSoftSt *SoftState
	prevHardSt pb.HardState
}

而具体raft状态包含很多内容,如下所示,这里用注释列举关键的字段:

type raft struct {
	id uint64 // 节点ID

	Term uint64 // 当前term/epoch
	Vote uint64 // 该节点投票给Leader的节点ID

	readStates []ReadState

	raftLog *raftLog // Raft日志,记录了committed index和applied index

	maxMsgSize         uint64
	maxUncommittedSize uint64

	prs tracker.ProgressTracker // 记录了当前活跃的集群配置,包括已知的节点,并且追踪每个节点的日志索引

	state StateType // 当前节点的角色

	isLearner bool // 该节点是否是Learner

	msgs []pb.Message

	lead uint64 // Leader节点的ID

	leadTransferee uint64
    
	pendingConfIndex uint64

	uncommittedSize uint64

	readOnly *readOnly

    electionElapsed int // 选主超时后/收到当前Leader的有效请求后经过的时间(tick)
    heartbeatElapsed int // 心跳超时后经过的时间(tick),只有Leader使用

	checkQuorum bool
	preVote     bool

    heartbeatTimeout int // 心跳超时时间(tick)
    electionTimeout  int // 选主超时时间(tick)
	randomizedElectionTimeout int // 随机的选主超时时间,当没收到Leader一段时间的有效请求后,Follower会变成Candidate
    							  // 范围:[electiontimeout, 2 * electiontimeout - 1]
	disableProposalForwarding bool 

	tick func()
	step stepFunc

	logger Logger
}

2.2. 启动

a) raft初始化

Raft节点启动首先需要初始化上述数据结构的字段,这里主要还是看 raft的初始化,它主要做:

  • 根据配置初始化字段
  • 恢复之前的集群状态
  • 将自己变成Follower
func newRaft(c *Config) *raft {
	if err := c.validate(); err != nil {
		panic(err.Error())
	}
	raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady) // 初始化日志
	hs, cs, err := c.Storage.InitialState() // 初始化存储
	if err != nil {
		panic(err) // TODO(bdarnell)
	}

	if len(c.peers) > 0 || len(c.learners) > 0 {
		if len(cs.Voters) > 0 || len(cs.Learners) > 0 {
			panic("cannot specify both newRaft(peers, learners) and ConfState.(Voters, Learners)")
		}
		cs.Voters = c.peers
		cs.Learners = c.learners
	}

	r := &raft{
		id:                        c.ID, // 节点ID
		lead:                      None, // 初始Leader为空
		isLearner:                 false, 
		raftLog:                   raftlog, // Raft日志
		maxMsgSize:                c.MaxSizePerMsg,
		maxUncommittedSize:        c.MaxUncommittedEntriesSize,
		prs:                       tracker.MakeProgressTracker(c.MaxInflightMsgs), // 集群配置
        electionTimeout:           c.ElectionTick, // 选主的超时时间(实际上会基于它生成一个随机数)
		heartbeatTimeout:          c.HeartbeatTick, // 心跳的超时时间
		logger:                    c.Logger,
		checkQuorum:               c.CheckQuorum,
		preVote:                   c.PreVote,
		readOnly:                  newReadOnly(c.ReadOnlyOption),
		disableProposalForwarding: c.DisableProposalForwarding,
	}
	// 恢复集群状态
	cfg, prs, err := confchange.Restore(confchange.Changer{
		Tracker:   r.prs,
		LastIndex: raftlog.lastIndex(),
	}, cs)
	if err != nil {
		panic(err)
	}
	assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
	if !IsEmptyHardState(hs) {
		r.loadState(hs)
	}
	if c.Applied > 0 {
		raftlog.appliedTo(c.Applied)
	}
    // 初始角色为Follower, Leader设为None
	r.becomeFollower(r.Term, None)

	var nodesStrs []string
	for _, n := range r.prs.VoterNodes() {
		nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
	}

	r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
		r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
	return r
}

b) 启动Raft节点的事件循环

初始化完成后,会启动Raft事件循环。这部分定义在node.run()方法中,主要处理:

  • 获取新产生的Ready,并塞入readyc通道供上层轮询
  • 探测Leader变化
  • 监听通道,处理消息(主要通过Node接口调用):
    • propc:处理Propose的请求消息
    • confc:处理ProposeConfChange的配置变更请求消息
    • recvc:处理响应消息
    • tickc, advancec:处理上层的TickAdvance的信号
func (n *node) run() {
	var propc chan msgWithResult
	var readyc chan Ready
	var advancec chan struct{}
	var rd Ready

	r := n.rn.raft

	lead := None

	for {
        // 1. 轮询时,首先将新产生的Ready取出来
		if advancec != nil {
			readyc = nil
		} else if n.rn.HasReady() {
			rd = n.rn.readyWithoutAccept()
			readyc = n.readyc
		}
		// 2. 探测Leader变化并打日志
		if lead != r.lead {
			if r.hasLeader() {
				if lead == None {
					r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
				} else {
					r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
				}
				propc = n.propc
			} else {
				r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
				propc = nil
			}
			lead = r.lead
		}
		// 3. 监听通道以进行处理
		select {
		case pm := <-propc: // 外部对Node调用Propose时,这里会收到,这里会处理请求消息(如追加日志、投票等)
			m := pm.m
			m.From = r.id
			err := r.Step(m) // 调用Step处理消息
			if pm.result != nil {
				pm.result <- err
				close(pm.result)
			}
		case m := <-n.recvc: // 这里会收到并处理响应消息
			if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {
				r.Step(m)
			}
		case cc := <-n.confc: // 外部对Node调用ProposeConfChange时,这里会收到,处理集群配置变更
			_, okBefore := r.prs.Progress[r.id]
			cs := r.applyConfChange(cc)
			if _, okAfter := r.prs.Progress[r.id]; okBefore && !okAfter {
				var found bool
			outer:
				for _, sl := range [][]uint64{cs.Voters, cs.VotersOutgoing} {
					for _, id := range sl {
						if id == r.id {
							found = true
							break outer
						}
					}
				}
				if !found {
					propc = nil
				}
			}
			select {
			case n.confstatec <- cs:
			case <-n.done:
			}
		case <-n.tickc: // 外部对Node调用Tick时,这里会收到,并自增tick计数器
			n.rn.Tick()
		case readyc <- rd: // 将新取出来的Ready塞入ready通道中,供上层对Node调用Ready使用
			n.rn.acceptReady(rd)
			advancec = n.advancec
		case <-advancec: // 外部对Node调用Advance时,这里会收到,并对raft调用Advance表示处理外之前弹出的Ready
			n.rn.Advance(rd)
			rd = Ready{}
			advancec = nil
		case c := <-n.status: // 处理状态变更
			c <- getStatus(r)
		case <-n.stop: // 收到停止信号,关闭Raft节点
			close(n.done)
			return
		}
	}
}

3.1. 触发选主

初始状态下,所有的节点都是Follower。这里会调用becomeFollower方法,主要注意的是:

  • reset方法会重置Raft状态,并且随机生成一个选主超时时间
  • 注意tickElection方法,调用Tick时会调用该方法
func (r *raft) becomeFollower(term uint64, lead uint64) {
	r.step = stepFollower // Follower被调用Step/Propose时可能调用的函数
	r.reset(term) // 重置状态
	r.tick = r.tickElection // Follower被调用Tick时需要调用的函数
	r.lead = lead
	r.state = StateFollower // 设置角色为Follower
	r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}

func (r *raft) reset(term uint64) {
	// ...
	r.electionElapsed = 0
	r.heartbeatElapsed = 0
	r.resetRandomizedElectionTimeout() // 随机生成一个选主超时时间
	// ...
}

触发选主必定在调用NodeTick时触发,最终会调用tickElection,它可能会触发选主:

func (r *raft) tickElection() {
	r.electionElapsed++ // 首先给electionElapsed计数+1
	// 若节点可以成为Leader,且超时,则会给自己发送MsgHup消息,表示触发选主
	if r.promotable() && r.pastElectionTimeout() {
		r.electionElapsed = 0
		r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
	}
}

func (r *raft) promotable() bool {
	pr := r.prs.Progress[r.id]
    // 是否可以成为Leader,必须满足:
    // 1. 节点自己必须在集群配置中
    // 2. 节点自己不是Learner
    // 3. 节点不能有没持久化的快照
	return pr != nil && !pr.IsLearner && !r.raftLog.hasPendingSnapshot()
}

从上面可知,选主一旦触发,就会给自己发送MsgHup消息,最后会在下面的代码块处理:

func (r *raft) Step(m pb.Message) error {
	// ...
    switch m.Type {
	case pb.MsgHup:
		if r.preVote {
			r.hup(campaignPreElection)
		} else {
			r.hup(campaignElection)
		}
        // ...
    }
    // ...
}

我们看到这里就会触发选主。不过这里有一个PreVote的分支,这在Raft论文中没有提及,这在下文会说明。

3.2. PreVote

原始选主过程中,状态变化为:Follower -> Candidate -> Leader;

而etcd-raft中加入了PreVote,状态变化为:Follower -> PreCandidate -> Candidate -> Leader。

PreVote中,要成为Candidate,还需要确认自己有可能获得集群中大部分节点的投票,这样才能将自己的Term加1,发起真正的选主。

这需要其他节点同意发起选主,同意的条件必须满足下面2个:

  1. 没有收到有效Leader的心跳,至少有一次选主超时
  2. PreCandidate的日志足够新(Term要更大,或LogIndex更大且Term相同)

这样有一定好处。

考虑一个场景:有A、B、C、D、E共5个节点;某个时间段A、B和其他节点分区,C、D、E形成新集群,而A、B也触发选主,但没有半数以上同意,所以必须不停选主,且Term不停增加;后来分区解决了,A、B会重新加入集群,它们的数据很旧,但Term很大,所以会再触发一次选主,扰乱集群。

有了PreVote后就不会有上面的问题。

b) 实现:发起PreVote

首先校验是否能发起PreVote,需要满足:

  1. 节点自己必须在集群配置中
  2. 节点自己不是Learner,也不是Leader
  3. 节点不能有没持久化的快照
  4. 节点不能还有配置变更没应用

发起PreVote

然后向所有节点发起PreVote:

  • 节点首先会成为PreCandidate
  • 然后投票给自己
  • 最后向所有节点发送PreVote请求,即MsgPreVote消息
func (r *raft) campaign(t CampaignType) {
	// ...
	var term uint64
	var voteMsg pb.MessageType
	if t == campaignPreElection {
        // 1. 首先成为PreCandidate
		r.becomePreCandidate()
		voteMsg = pb.MsgPreVote
		term = r.Term + 1 // 发送的消息是r.Term+1
	} else {
		// ...
	}
    // 2. 投票给自己,并统计投票次数
    // 若返回票数过半,则开始正式选举,不需要发送MsgPreVote,这只会在单节电情况下发生
	if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
		if t == campaignPreElection {
			r.campaign(campaignElection)
		} else {
			r.becomeLeader()
		}
		return
	}
    // 3. 向所有节电发送MsgPreVote消息,发起PreVote
	var ids []uint64
	{
		idMap := r.prs.Voters.IDs()
		ids = make([]uint64, 0, len(idMap))
		for id := range idMap {
			ids = append(ids, id)
		}
		sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
	}
	for _, id := range ids {
		if id == r.id {
			continue // 忽略自己
		}
		// ...
		var ctx []byte
		if t == campaignTransfer {
			ctx = []byte(t)
		}
        // 发送MsgPreVote消息
		r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
	}
}

c) 实现:处理PreVote请求

处理PreVote请求流程如下:

  • MsgPreVote消息的Term小,则返回拒绝的响应:

    func (r *raft) Step(m pb.Message) error {
    	// ...
        switch {
        	// ...
            case m.Term < r.Term:
            // ...
            else if m.Type == pb.MsgPreVote {
    			// ...
                // 返回拒绝响应MsgPreVoteResp
    			r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true})
    		}
            // ...
            return nil
        }
        // ...
    }
    
  • 然后,检查节点是否还认为该任期内Leader还在(即在租约内),若还在,则忽略请求,直接返回。(注意,这里不会强制降级为Follower,其他类型的消息会):

    func (r *raft) Step(m pb.Message) error {
    	switch {
    	case m.Term > r.Term:
    		if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
    			force := bytes.Equal(m.Context, []byte(campaignTransfer))
                // 判断是否在租约内,Leader还存在
    			inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
                // 若还存在则直接忽略请求,直接返回,不输出任何响应,即不投票
    			if !force && inLease {
    				// ...
    				return nil
    			}
    		}
            // ...
        }
        // ...
    }
    
  • 当满足下面条件时,返回同意响应,否则返回拒绝响应

    • 节点已经投给了这个PreCandidate,或节点还没投票且节点认为Leader不在了,或消息的Term更大
    • 消息携带的日志TermIndex更加新
    func (r *raft) Step(m pb.Message) error {
        // ...
        switch m.Type {
    	// ...
    	case pb.MsgVote, pb.MsgPreVote:
    		canVote := r.Vote == m.From || // 已经投给了该Candidate(对应MsgVote)
    			(r.Vote == None && r.lead == None) || // 还没投票且Leader不在了(对应MsgPreVote)
    			(m.Type == pb.MsgPreVote && m.Term > r.Term) // 下一任期的MsgPreVote
    		// 请求中携带的信息显示日志更加新
    		if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
                // 返回同意响应
            	r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
    			if m.Type == pb.MsgVote {
                    // ...
    				// 若对应的时MsgVote,即正式的投票,则记录r.Vote记录自己投票给谁,其遵循先来后到原则
    				r.electionElapsed = 0
    				r.Vote = m.From
                }
            } else {
                // ...
                // 否则发送拒绝响应
                r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
            }
        // ...
        }
    }
    

d) 实现:处理PreVote响应

此时发起的节点已经成为PreCandidate。收到响应时,它也会进入c)中的Step方法。

具体流程如下:

  • 和c)中第一步类似,若响应的Term小,则直接返回忽略

  • 和c)中第二步类似,若响应的Term大,则说明本节点又发了下一Term的PreVote

    • 若返回同意:直接忽略即可,后面直接会推进状态变化
    • 若返回拒绝:则需要降级自己为Follower
    func (r *raft) Step(m pb.Message) error {
    	switch {
    	// ...
    	case m.Term > r.Term:
    		// ...
    		switch {
    		// ...
    		case m.Type == pb.MsgPreVoteResp && !m.Reject:
                // 若同意,则直接忽略,继续执行
    		default:
                // 否则就必须降级为Follower
    			if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
    				r.becomeFollower(m.Term, m.From)
    			} else {
    				r.becomeFollower(m.Term, None) // 执行这一步,r.Vote设为空
    			}
    		}
        }
        switch m.Type {
        	// ...
        default:
    		err := r.step(r, m) // 最后处理PreCandidate/Candidate的状态变化
    		if err != nil {
    			return err
    		}
        }
        return nil
    }
    
  • 最后,处理PreCandidate的状态变化,这里对应了stepCandidate方法,代表作为Candidate/PreCandidate时处理消息的逻辑:

    • 修改和统计投票数,判断是否过半
    • 若过半则推进状态演变,发起正式选主
    func stepCandidate(r *raft, m pb.Message) error {
    	var myVoteRespType pb.MessageType
    	if r.state == StatePreCandidate {
    		myVoteRespType = pb.MsgPreVoteResp
    	} else {
    		myVoteRespType = pb.MsgVoteResp
    	}
    	switch m.Type {
    	case pb.MsgProp:
    		r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
    		return ErrProposalDropped
    	case pb.MsgApp:
            // 若收到了原Leader的日志追加,则降级为Follower,更新Term、Leader等信息
    		r.becomeFollower(m.Term, m.From)
    		r.handleAppendEntries(m)
    	case pb.MsgHeartbeat:
            // 若收到了原Leader的心跳,则降级为Follower,更新Term、Leader等信息
    		r.becomeFollower(m.Term, m.From) 
    		r.handleHeartbeat(m)
    	case pb.MsgSnap:
            // 若收到原Leader的快照,则降级为Follower,更新Term、Leader等信息
    		r.becomeFollower(m.Term, m.From) 
    		r.handleSnapshot(m)
    	case myVoteRespType:
            // 处理PreVote/Vote响应
            // 修改投票统计
    		gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
    		switch res {
                // 若过半
    		case quorum.VoteWon:
    			if r.state == StatePreCandidate {
                    // 若为PreCandidate,则发起正式的选主
    				r.campaign(campaignElection)
    			} else {
                    // 若已经是Candidate,就直接成为Leader即可
    				r.becomeLeader() // 成为Leader
    				r.bcastAppend() // 广播本节点的Raft日志
    			}
                // 若没有过半,则降级为Follower
    		case quorum.VoteLost:
    			r.becomeFollower(r.Term, None)
    		}
    		// ...
    	}
    	return nil
    }
    

e) 实现:处理超时

若PreVote阶段超时,则会触发下一轮的PreVote。这部分在tickElection方法中实现(和3.1.节一样)。

注意PreVote阶段不会修改Raft的Term,只是在PreVote请求消息中将Term + 1(其它消息的Term依旧是Raft自己的Term,见3.2.b),因此下一轮PreVote的Term和之前PreVote的Term一样。

3.3. 正式选主

PreVote通过后,就可开始正式的选主。

a) 发起Vote请求

这里和PreVote的入口一样,只不过处理的步骤不同:

  • 节点成为Candidate,并且自增Term

    func (r *raft) becomeCandidate() {
    	r.step = stepCandidate
    	r.reset(r.Term + 1) // 自增Term
    	r.tick = r.tickElection
    	r.Vote = r.id // 默认投票给自己
    	r.state = StateCandidate
    	r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
    }
    
  • 然后投票给自己

  • 最后向所有节点发送Vote请求,即MsgVote消息

func (r *raft) campaign(t CampaignType) {
	// ...
	var term uint64
	var voteMsg pb.MessageType
	if t == campaignPreElection {
		// ...
	} else {
        // 1. 成为Candidate,并自增Term
		r.becomeCandidate()
		voteMsg = pb.MsgVote 
		term = r.Term // 这里的term是自增后的
	}
    // 2. 投票给自己
	if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
		if t == campaignPreElection {
			r.campaign(campaignElection)
		} else {
			r.becomeLeader()
		}
		return
	}
    // 3. 向其他节点发送MsgVote请求
	var ids []uint64
	{
		idMap := r.prs.Voters.IDs()
		ids = make([]uint64, 0, len(idMap))
		for id := range idMap {
			ids = append(ids, id)
		}
		sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
	}
	for _, id := range ids {
		if id == r.id {
			continue
		}
        // ...
		var ctx []byte
		if t == campaignTransfer {
			ctx = []byte(t)
		}
		r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
	}
}

b) 处理Vote请求

这里和PreVote类似,不过这里也再叙述一次:

  • MsgVote消息的Term小,则返回拒绝的响应,这部分和MsgPreVote一样
  • 然后,检查节点是否还认为该任期内Leader还在(即在租约内),若还在,则忽略请求,直接返回,这部分也和MsgPreVote一样
  • 当满足下面条件时,返回同意响应,否则返回拒绝响应,这部分和MsgPreVote类似
    • 节点没有认为Leader存在了,且还没有投票(
    • 消息携带的日志更加的新
  • 若投出同意票,则更新r.Vote,并重置r.electionElapsed(实际上等同于一次心跳了),之后的MsgVote不会投给其他节点了

c) 处理Vote响应

这部分已经在3.2.d中说明,这里不再说明。(就是统计票数,判断过半,升级Leader,广播日志)

d) 处理超时

这部分和PreVote一样,会重新发起选主,但会回退到PreVote阶段。

可从上面得出,etcd-raft的选主还是按照Raft论文一模一样来的,而关于算法的总结,可参考下图,也可参考这里,包含原理总结和demo源码:

pic

而加了一个PreVote,个人认为只是Vote的预演,判断PreCandidate是否可能成为Leader,但是不会修改类似Term, Vote等信息,不过它解决了分区时出现的Term过大扰乱集群的问题,不得不说是真的强。

整体而言PreVote + Vote的风格很像2PC。


Related Issues not found

Please contact @keys961 to initialize the comment


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK