4

etcd-raft (7): Raft线性一致读

 2 years ago
source link: https://keys961.github.io/2020/11/06/etcd-raft-7/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

这里最后说明一下论文的最后部分“线性一致读”在etcd的实现。

2. 线性一致

CAP中的C即线性一致,它指的是:系统写操作提交成功后,之后的读取都会得到最新的数据。

即:在分布式系统上实现寄存器语义

2.1. Raft只能作为线性一致读的基础

有个误区:Raft和Paxos系统是线性一致的。其实不然,这些协议需要上层做些事情,才能达到。

在Raft中,写提交成功只会达成日志的一致(并且落盘),而状态机并不能达成一致,需要将提交的日志应用到状态机。而应用日志的操作的实现基本都是异步的,所以还是可能读到旧数据。

2.2. ReadIndex算法

实现线性一致读有一个很简单的算法:

  1. 获取集群已提交的日志索引
  2. 当状态机的日志应用索引大于等于日志提交索引时,读取才能返回

这里会有2个问题:

  1. 如何获取集群的日志提交索引?
  2. 如何确保Leader有效?

a) 获取集群的日志提交索引

这部分可以通过路由保证:

  • 若Follower收到读请求,直接路由到Leader
  • 从Leader获取日志提交索引
  • 等待该索引的日志应用到状态机后,执行读请求,返回给客户端

b) 确保Leader有效

Leader可以发起一个广播请求,若能收到大部分节点的应答,说明Leader有效。

这步很重要,没有这一步,在网络分区的时候,Leader不知道自己是否有效,可能会读取到旧数据。

例如(A, B, C, D, E),其中A为Leader,之后网络分区为(A, B)和(C, D, E),C成为新Leader,而A不知道自己已经不是Leader了:

  • 首先往C写入,成功
  • 然后往A读取,由于A认为自己还是Leader,所以也会返回结果,但这是旧数据

这部分在etcd-raft就是check-quorum机制。

2.3. LeaderRead算法

这部分和ReaderIndex类似,不过它利用了时钟特性:Leader设置租约(比election timeout小),在租约内就可以保证Leader有效,从而不需要执行2.2.b)的广播。

这极大提高了吞吐,降低了延时。但时钟飘移严重,则正确性也有问题。

这部分PingCAP的文章说的非常好。

3. etcd实现

上层部分的逻辑参考了这篇文章,主要的逻辑就是:

  1. 客户端会发起一个Range请求

  2. 服务端会执行ReadIndex算法,尝试获取日志提交索引

    1. 向etcd-raft模块发起ReadIndex请求,以获取索引

    2. 等待日志提交索引的结果:
      • Follower会路由ReadIndex请求到Leader
      • Leader收到后将其随着自己的日志提交索引进行缓存,并广播心跳
      • Leader收到足够心跳,则弹出缓存的消息,返回FollowerReadIndex响应,响应包含自己的日志提交索引
      • Follower/Leader收到日志提交索引后,将其包装到readStates中,并通过Ready轮询返回给上层
    3. 等待日志应用索引大于等于日志提交索引,满足条件后,即可读取数据

这里主要看etcd-raft模块,对应的就是上面2.2.中的步骤。入口就是ReadIndex请求,这里着重讲ReadIndex算法。

3.1. 处理ReadIndex请求

a) Follower处理

Follower比较简单,它会将ReadIndex请求路由到Leader:

func stepFollower(r *raft, m pb.Message) error {
	switch m.Type {
        // ...
        case pb.MsgReadIndex:
		if r.lead == None {
			r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
			return nil
		}
		m.To = r.lead // 目的就是Leader
		r.send(m) // 将请求路由到Leader
    }
    return nil
}

b) Leader处理

Leader通过2种方式获取ReadIndex请求:上层调用、Follower路由。

它的执行逻辑很简单:

  1. 将请求追加到readOnly队列
func stepLeader(r *raft, m pb.Message) error {
	switch m.Type {
    	// ...
        case pb.MsgReadIndex:
        // 若只有1个节点(即自己),则直接返回MsgReadIndexResp响应(包含提交索引)
		if r.prs.IsSingleton() {
			if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
				r.send(resp)
			}
			return nil
		}
		// ...
        // 若节点多于1个,会走到这里
		switch r.readOnly.option {
		case ReadOnlySafe:
            // ReadIndex
            // 1. 将ReadIndex请求追加到readOnly队列,用于ack计数
            // 队列中的消息包含了对应的提交日志索引
			r.readOnly.addRequest(r.raftLog.committed, m)
			// 2. 自己先对自己ack
			r.readOnly.recvAck(r.id, m.Entries[0].Data)
            // 3. 广播心跳以确认Leader状态
			r.bcastHeartbeatWithCtx(m.Entries[0].Data)
		case ReadOnlyLeaseBased:
            // LeaseRead,
            // 直接返回日志提交索引的MsgReadIndexResp响应,这里略
            // ...
		}
		return nil
	}
}

这里看下readOnly相关数据结构:

type readOnly struct {
    // 读取选项: ReadIndex和LeaseRead
	option           ReadOnlyOption
    // 待处理的读请求队列
    // 键为请求ID, 值是该读请求的状态
	pendingReadIndex map[string]*readIndexStatus
	// 读请求队列
    readIndexQueue   []string
}

type readIndexStatus struct {
    // 读请求消息体
	req   pb.Message
	// 提交日志的索引
    index uint64
	// ACK记录,用于quorum计数
	acks map[uint64]bool
}

3.2. Leader处理心跳的响应

Leader处理ReadIndex请求会广播一次心跳,当收到响应后:

  1. 更新readOnly中的quorum
  2. 若quorum过半,则会清理readOnly中队列的消息,并:
    • 若队列中消息来自自己,则将其追加到readStates中,它会包装成Ready返回给应用层
    • 若队列中消息来自Follower,则返回MsgReadIndexResp响应(包含对应的日志提交索引)
func stepLeader(r *raft, m pb.Message) error {
 	// ...
    switch m.Type {
     	// ...
    case pb.MsgHeartbeatResp:
        // ...
        // 统计quorm,若没到要求则直接返回
        if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
			return nil
		}
		// quorum通过,清理readOnly中队列的消息
		rss := r.readOnly.advance(m)
		for _, rs := range rss {
			if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None {
				r.send(resp)
			}
		}
        // ...
    }
}

func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Message {
    // 若消息来自自己,则将消息追加到readStates中,它会包装成Ready返回给应用层
	if req.From == None || req.From == r.id {
		r.readStates = append(r.readStates, ReadState{
			Index:      readIndex,
			RequestCtx: req.Entries[0].Data,
		})
		return pb.Message{}
	}
    // 若消息来自Follower,则返回MsgReadIndexResp消息,并返回给Follower
	return pb.Message{
		Type:    pb.MsgReadIndexResp,
		To:      req.From,
		Index:   readIndex,
		Entries: req.Entries,
	}
}

3.3. Follower处理ReadIndex响应

这里很简单,和Leader一样,将对应的ReadIndex消息随同日志提交索引,缓存到readStates中,供上层的Ready轮询使用。

func stepFollower(r *raft, m pb.Message) error {
 	// ...
    case pb.MsgReadIndexResp:
		// ...
    	// 将消息追加到readStates中,它会包装成Ready返回给应用层
		r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
	}
	// ...
}

上层进行Ready轮询时,会获取上面缓存的readStates,从而知道了集群日志提交的索引。下一步只要等待日志应用索引超过日志提交索引,就可以对状态机进行读取,把结果返回给客户端了。

3.4. LeaseRead实现

这部分在ReadIndex上进行了简化,只需要在3.1.b)中,收到MsgReadIndex请求后直接向Follower返回MsgReadIndexResp响应,然后跳到3.3.,就可以实现。

至此,对照Raft论文,etcd-raft基本看的差不多了。

一个感想:Go语言的确非常简单,但是其CSP风格的确比较难适应,很tricky。


Related Issues not found

Please contact @keys961 to initialize the comment


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK