4

使用hashicorp Raft开发分布式服务 - charlieroro

 11 months ago
source link: https://www.cnblogs.com/charlieroro/p/17486646.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

使用hashicorp Raft开发高可用服务

开发raft时用到的比较主流的两个库是Etcd Raft 和hashicorp Raft,网上也有一些关于这两个库的讨论之前分析过etcd Raft,发现该库相对hashicorp Raft比较难以理解,其最大的问题是没有实现网络层,实现难度比较大,因此本文在实现时使用了hashicorp Raft。

下文中会参考consul的一致性协议来讲解如何实现Raft协议。

Raft概述

  • Log entry:Raft的主要单元。Raft将一致性问题分解为日志复制。日志是一个有序的表项,其包含了Raft的集群变更信息(如添加/移除节点)以及对应用数据的操作等。
  • FSM:Finite State Machine。FSM是有限状态的集合。当一条日志被Raft apply后,可以对FSM进行状态转换。相同顺序的日志在apply之后必须产生相同的结果,即行为必须是确定性的。
  • Peer set:指所有参与日志复制的成员。
  • Quorum:仲裁指peer set中的大部分成员:对于包含N个成员的peer set,仲裁要求有(N/2)+1个成员。如果出于某些原因导致仲裁节点不可用,则集群会变为unavailable状态,且新的日志也不会被commit。
  • Committed Entry:当一个Log entry持久化到仲裁数量的节点后,该认为该Log entry是Committed的。只有当Log entry 被Committed之后,它才会被FSM apply。
  • Leader:任何时间,peer set会选举一个节点作为leader。leader负责处理新的Log entry,并将其复制给follower,以及决定何时将Log entry判定为committed状态。

Raft机制简介

Raft节点总是处于三种状态之一: follower, candidate, leader。一开始,所有的节点都是follower,该状态下,节点可以从leader接收log,并参与选举,如果一段时间内没有接收到任何Log entry,则节点会自提升到candidate状态。在candidate状态下,节点会请求其他节点的选举,如果一个candidate接收到大部分节点(仲裁数目)的认同,就会被提升为leader。leader必须接收新的Log entry,并复制到所有其他follower。

如果用户无法接受旧的数据,则所有的请求必须由leader执行。

一旦一个集群有了leader,就可以接收新的Log entry。客户端可以请求leader追加一个新的Log entry。Leader会将Log entry写入持久化存储,并尝试将其复制给仲裁数目的follower。一旦Log entry被commit,就可以将该Log entry apply到FSM。FSM是应用特定的存储,在Consul中,使用 MemDB来维护集群状态。

无限量复制log的方式是不可取的。Raft 提供了一种机制,可以对当前状态进行快照并压缩log。由于 FSM 的抽象,FSM 的状态恢复必须与replay log的状态相同。Raft 可以捕获某个时刻的 FSM 状态,然后移除用于达到该状态的所有log。这些操作可以在没有用户干预的情况下自动执行,防止无限使用磁盘,同时最小化replay log所花费的时间。

Raft Consensus中所有的操作都必须经过Leader,因此需要保证所有的请求都能够发送到Leader节点,然后由Leader将请求发送给所有Follower,并等待大部分(仲裁数目的)节点处理完成该命令,leader通过选举机制产生。之后每个follower会执行如下操作:

  1. 在接收到命令之后,使用WAL方式将数据保存为Log entry
  2. 在成功写入log entry之后,将数据发给FSM进行处理
  3. 在FSM成功处理完数据之后,返回数据。之后Leader会注意到该节点已经成功完成数据处理。

如下表述来自Raft Protocol Overview,它的意思是,如果是查询类的请求,直接从FSM返回结果即可,如果是修改类的请求,则需要通过raft.Apply来保证变更的一致性

所有raft集群中的成员都知道当前的leader,当一个RPC请求到达一个非leader的成员时,它会将该请求转发给当前的leader。如果是查询类型的RPC,意味着它是只读的,leader会基于FSM的当前状态生成结果;如果是事务类型的RPC,意味着它需要修改状态,Leader会生成一条新的log entry,并执行Raft.Apply,当log entry被提交并apply到FSM之后,事务才算执行完成。

接口和原理描述

如下是官方给出的Raft Apply的流程图:

clientleader:mainleader:fsmleader:replicate (each peer)follower:main (each peer)follower:fsm (each peer)opt[leader commit index isahead of peer commitindex]opt[quorum commit index has increased]applyCh to dispatchLogs1store logs to disk2triggerCh3Transport.AppendEntries RPC4store logs to disk5fsmMutateCh apply committed logs6fsm.Apply7respond success=true8update commitment9commitCh10fsmMutateCh11fsm.Apply12future.respond13clientleader:mainleader:fsmleader:replicate (each peer)follower:main (each peer)follower:fsm (each peer)

Apply是数据进入Raft的接口,整个Raft的主要作用是维护数据操作的一致性。在上图中,由两个apply:一个是Raft.Apply(内部会通过applyCh传递log),其也是外部数据的入口。另一个是FSM.Apply,其数据源头是Raft.Apply。FSM基于Raft实现了一致性读写。在上图中可以看到,leader的FSM.Apply是在数据commit成功(仲裁成功)之后才执行的,这样就能以Raft的方式保证分布式场景下应用数据的一致性,可以将FSM.Apply理解为应用数据的写入操作。

Raft中的一条log表示一个操作。使用hashicorp/raft时应该将实现分为两层:一层是底层的Raft,支持Raft数据的存储、快照等,集群的选举和恢复等,这一部分由Raft模块自实现;另一层是应用层,需要由用户实现FSM接口,FSM的接口并不对外,在Raft的处理过程中会调用FSM的接口来实现应用数据的存储、备份和恢复等操作。这两层都有数据的读写和快照实现,因此在理解上需要进行区分。

Raft节点的初始化

如果是新建的Raft节点,可以使用BootstrapCluster方法初始化该节点。为避免非新的节点被初始化,在调用BootstrapCluster前可以使用raft.HasExistingState来判断实例中是否包含相关状态(logs,当前term或snapshot):

if (s.config.Bootstrap) && !s.config.ReadReplica {
  hasState, err := raft.HasExistingState(log, stable, snap)
  if err != nil {
    return err
  }
  if !hasState {
    configuration := raft.Configuration{
      Servers: []raft.Server{
        {
          ID:      s.config.RaftConfig.LocalID,
          Address: trans.LocalAddr(),
        },
      },
    }
    if err := raft.BootstrapCluster(s.config.RaftConfig,
      log, stable, snap, trans, configuration); err != nil {
      return err
    }
  }
}

Raft节点的创建

Raft节点的创建方法如下,如果存储非空,则Raft会尝试恢复该节点:

func NewRaft(conf *Config,
    fsm FSM,
    logs LogStore,
    stable StableStore,
    snaps SnapshotStore,
    trans Transport) (*Raft, error) {
  • fsm:由应用实现,用于处理应用数据。FSM中的数据来自底层的Raft log

  • logsstablesnapslogs(存储Raft log) ,stable(保存Raft选举信息,如角色、term等信息) 可以使用raftboltdb.New进行初始化, snaps用于Leader和follower之间的批量数据同步以及(手动或自动)集群恢复,可以使用raft.NewFileSnapshotStoreraft.NewFileSnapshotStoreWithLogger进行初始化。

  • trans:Transport是raft集群内部节点之间的信息通道,节点之间需要通过该通道来同步log、选举leader等。下面接口中的AppendEntriesPipelineAppendEntries方法用于log同步,RequestVote用于leader选举,InstallSnapshot用于在follower 的log落后过多的情况下,给follower发送snapshot(批量log)。

    可以使用raft.NewTCPTransportraft.NewTCPTransportWithLoggerraft.NewNetworkTransportWithConfig方法来初始化trans。

    type Transport interface {
      ...
      AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error)
      AppendEntries(id ServerID, target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error
    
      // RequestVote sends the appropriate RPC to the target node.
      RequestVote(id ServerID, target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error
      InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error
      ...
    }
    

NewRaft方法中会运行如下后台任务:

r.goFunc(r.run)           //处理角色变更和RPC请求
r.goFunc(r.runFSM)        //负责将logs apply到FSM
r.goFunc(r.runSnapshots)  //管理FSM的snapshot

监控Leader变化

为保证数据的一致性,只能通过leader写入数据,因此需要及时了解leader的变更信息,在Raft的配置中有一个变量NotifyCh chan<- bool,当Raft变为leader时会将true写入该chan,通过读取该chan来判断本节点是否是leader。在初始化Raft配置的时候传入即可:

  leaderNotifyCh := make(chan bool, 10)
  raftConfig.NotifyCh = leaderNotifyCh

还有其他方式可以获取leader变更状态:

如下方法可以生成一个chan,当本节点变为Leader时会发送true,当本节点丢失Leader角色时发送false,该方法的用途与上述方式相同,但由于该方法没有缓存,可能导致丢失变更信号,因此推荐使用上面的方式。

func (r *Raft) LeaderCh() <-chan bool

实现FSM

至此已经完成了Raft的初始化。下面就是要实现初始化函数中要求实现的内容,主要就是实现FSM接口。其中logsstablesnapstrans已经提到,使用现成的方法初始化即可。对于存储来说,也可以根据需要采用其他方式,如S3。

下面是LogStoreSnapshotStoreStableStore的接口定义。

type LogStore interface {//用于存储Raft log
  // FirstIndex returns the first index written. 0 for no entries.
  FirstIndex() (uint64, error)

  // LastIndex returns the last index written. 0 for no entries.
  LastIndex() (uint64, error)

  // GetLog gets a log entry at a given index.
  GetLog(index uint64, log *Log) error

  // StoreLog stores a log entry.
  StoreLog(log *Log) error

  // StoreLogs stores multiple log entries.
  StoreLogs(logs []*Log) error

  // DeleteRange deletes a range of log entries. The range is inclusive.
  DeleteRange(min, max uint64) error
}
type SnapshotStore interface {//用于快照的生成和恢复
  // Create is used to begin a snapshot at a given index and term, and with
  // the given committed configuration. The version parameter controls
  // which snapshot version to create.
  Create(version SnapshotVersion, index, term uint64, configuration Configuration,
    configurationIndex uint64, trans Transport) (SnapshotSink, error)

  // List is used to list the available snapshots in the store.
  // It should return then in descending order, with the highest index first.
  List() ([]*SnapshotMeta, error)

  // Open takes a snapshot ID and provides a ReadCloser. Once close is
  // called it is assumed the snapshot is no longer needed.
  Open(id string) (*SnapshotMeta, io.ReadCloser, error)
}
type StableStore interface { //用于存储集群元数据
  Set(key []byte, val []byte) error

  // Get returns the value for key, or an empty byte slice if key was not found.
  Get(key []byte) ([]byte, error)

  SetUint64(key []byte, val uint64) error

  // GetUint64 returns the uint64 value for key, or 0 if key was not found.
  GetUint64(key []byte) (uint64, error)
}

FSM基于Raft来实现,包含三个方法:

  • Apply:在Raft完成commit索引之后,保存应用数据。
  • Snapshot:用于支持log压缩,可以保存某个时间点的FSM快照。需要注意的是,由于ApplySnapshot运行在同一个线程中(如runrunFSM线程),因此要求函数能够快速返回,否则会阻塞Apply的执行。在实现中,该函数只需捕获指向当前状态的指针,而对于IO开销较大的操作,则放到FSMSnapshot.Persist中执行。
  • Restore:用于从snapshot恢复FSM
type FSM interface {
  // Apply is called once a log entry is committed by a majority of the cluster.
  //
  // Apply should apply the log to the FSM. Apply must be deterministic and
  // produce the same result on all peers in the cluster.
  //
  // The returned value is returned to the client as the ApplyFuture.Response.
  Apply(*Log) interface{}

  // Snapshot returns an FSMSnapshot used to: support log compaction, to
  // restore the FSM to a previous state, or to bring out-of-date followers up
  // to a recent log index.
  //
  // The Snapshot implementation should return quickly, because Apply can not
  // be called while Snapshot is running. Generally this means Snapshot should
  // only capture a pointer to the state, and any expensive IO should happen
  // as part of FSMSnapshot.Persist.
  //
  // Apply and Snapshot are always called from the same thread, but Apply will
  // be called concurrently with FSMSnapshot.Persist. This means the FSM should
  // be implemented to allow for concurrent updates while a snapshot is happening.
  Snapshot() (FSMSnapshot, error)

  // Restore is used to restore an FSM from a snapshot. It is not called
  // concurrently with any other command. The FSM must discard all previous
  // state before restoring the snapshot.
  Restore(snapshot io.ReadCloser) error
}

FSMSnapshot是实现快照需要实现的另一个接口,用于保存持久化FSM状态,后续可以通过FSM.Restore方法恢复FSM。该接口不会阻塞Raft.Apply,但在持久化FSM的数据时需要保证不影响Raft.Apply的并发访问。

FSMSnapshot.Persist的入参sink是调用SnapshotStore.Creates时的返回值。如果是通过raft.NewFileSnapshotStore初始化了SnapshotStore,则入参sink的类型就是FileSnapshotStore

FSMSnapshot.Persist执行结束之后需要执行SnapshotSink.Close() ,如果出现错误,则执行SnapshotSink.Cancel()

// FSMSnapshot is returned by an FSM in response to a Snapshot
// It must be safe to invoke FSMSnapshot methods with concurrent
// calls to Apply.
type FSMSnapshot interface {
  // Persist should dump all necessary state to the WriteCloser 'sink',
  // and call sink.Close() when finished or call sink.Cancel() on error.
  Persist(sink SnapshotSink) error

  // Release is invoked when we are finished with the snapshot.
  Release()
}

FSM的备份和恢复

FSM的备份和恢复的逻辑比较难理解,一方面备份的数据存储在Raft中,FSM接口是由Raft主动调用的,另一方面又需要由用户实现FSM的备份和恢复逻辑,因此需要了解Raft是如何与FSM交互的。

FSM依赖snapshot来实现备份和恢复,snapshot中保存的也都是FSM信息。

何时备份和恢复
备份的时机
  • 当用户执行RecoverCluster接口时会调用FSM.Snapshot触发创建一个新的FSM snapshot

  • 手动调用如下接口也会触发创建FSM snapshot:

    func (r *Raft) Snapshot() SnapshotFuture
    
  • Raft自动备份也会触发创建FSM snapshot,默认时间为[120s, 240s]之间的随机时间。

恢复的时机
  • 当用户执行RecoverCluster接口时会调用FSM.Restore,用于手动恢复集群

  • 当用户执行Raft.Restore接口时会调用FSM.Restore,用于手动恢复集群

  • 通过NewRaft创建Raft节点时会尝试恢复snapshot(Raft.restoreSnapshot-->Raft.tryRestoreSingleSnapshot-->fsmRestoreAndMeasure-->fsm.Restore)

因此在正常情况下,Raft会不定期创建snapshot,且在创建Raft节点(新建或重启)的时候也会尝试通过snapshot来恢复FSM。

备份和恢复的内部逻辑

FSM的备份和恢复与SnapshotStore接口息息相关。

在备份FSM时的逻辑如下,首先通过SnapshotStore.Create创建一个snapshot,然后初始化一个FSMSnapshot实例,并通过FSMSnapshot.Persist将FSM保存到创建出的snapshot中:

sink, err := snaps.Create(version, lastIndex, lastTerm, configuration, 1, trans) //创建一个snapshot
snapshot, err := fsm.Snapshot() //初始化一个FSMSnapshot实例
snapshot.Persist(sink)          //调用FSMSnapshot.Persist将FSM保存到上面的snapshot中

恢复FSM的逻辑如下,首先通过SnapshotStore.List获取snapshots,然后通过SnapshotStore.Open逐个打开获取到的snapshot,最后调用FSM.Restore恢复FSM,其入参可以看做是snapshot的文件描述符:

snapshots, err = snaps.List()
for _, snapshot := range snapshots {
  _, source, err = snaps.Open(snapshot.ID)
  crc := newCountingReadCloser(source)
  err = fsm.Restore(crc)
  // Close the source after the restore has completed
  source.Close()
}

下面以consul的实现为例看下它是如何进行FSM的备份和恢复的。

FSM.Snapshot()的作用就是返回一个SnapshotSink接口对象,进而调用SnapshotSink.Persist来持久化FSM。

下面是consul的SnapshotSink实现,逻辑比较简单,它将FSM持久化到了一个snapshot中,注意它在写入snapshot前做了编码(编码类型为ChunkingStateType):

// Persist saves the FSM snapshot out to the given sink.
func (s *snapshot) Persist(sink raft.SnapshotSink) error {
  ...
  // Write the header
  header := SnapshotHeader{
    LastIndex: s.state.LastIndex(),
  }
  encoder := codec.NewEncoder(sink, structs.MsgpackHandle)
  if err := encoder.Encode(&header); err != nil {
    sink.Cancel()
    return err
  }
  ...

  if _, err := sink.Write([]byte{byte(structs.ChunkingStateType)}); err != nil {
    return err
  }
  if err := encoder.Encode(s.chunkState); err != nil {
    return err
  }

  return nil
}

func (s *snapshot) Release() {
  s.state.Close()
}

备份时将FSM保存在了snapshot中,恢复时读取并解码对应类型的snapshot即可:

// Restore streams in the snapshot and replaces the current state store with a
// new one based on the snapshot if all goes OK during the restore.
func (c *FSM) Restore(old io.ReadCloser) error {
  defer old.Close()
  ...

  handler := func(header *SnapshotHeader, msg structs.MessageType, dec *codec.Decoder) error {
    switch {
    case msg == structs.ChunkingStateType: //解码数据
      chunkState := &raftchunking.State{
        ChunkMap: make(raftchunking.ChunkMap),
      }
      if err := dec.Decode(chunkState); err != nil {
        return err
      }
      if err := c.chunker.State(chunkState); err != nil {
        return err
      }
      ...
    default:
      if msg >= 64 {
        return fmt.Errorf("msg type <%d> is a Consul Enterprise log entry. Consul OSS cannot restore it", msg)
      } else {
        return fmt.Errorf("Unrecognized msg type %d", msg)
      }
    }
    return nil
  }
  if err := ReadSnapshot(old, handler); err != nil {
    return err
  }
  ...

  return nil
}
// ReadSnapshot decodes each message type and utilizes the handler function to
// process each message type individually
func ReadSnapshot(r io.Reader, handler func(header *SnapshotHeader, msg structs.MessageType, dec *codec.Decoder) error) error {
  // Create a decoder
  dec := codec.NewDecoder(r, structs.MsgpackHandle)

  // Read in the header
  var header SnapshotHeader
  if err := dec.Decode(&header); err != nil {
    return err
  }

  // Populate the new state
  msgType := make([]byte, 1)
  for {
    // Read the message type
    _, err := r.Read(msgType)
    if err == io.EOF {
      return nil
    } else if err != nil {
      return err
    }

    // Decode
    msg := structs.MessageType(msgType[0])

    if err := handler(&header, msg, dec); err != nil {
      return err
    }
  }
}

至此已经完成了Raft的开发介绍。需要注意的是,FSM接口都是Raft内部调用的,用户并不会直接与之交互

更多参见:Raft Developer Documentation

Raft关键对外接口

Raft节点管理

将节点添加到集群中,节点刚添加到集群中时状态是staging,当其ready之后就会被提升为voter,参与选举。如果节点已经是voter,则该操作会更新服务地址。该方法必须在leader上调用

func (r *Raft) AddVoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture

如下方法用于添加一个只接收log entry、但不参与投票或commit log的节点:该方法必须在leader上调用

func (r *Raft) AddNonvoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture

将节点从集群中移除,如果移除的节点是leader,则会触发leader选举。该方法必须在leader上调用

func (r *Raft) RemoveServer(id ServerID, prevIndex uint64, timeout time.Duration) IndexFuture

取消节点的投票权,节点不再参与投票或commit log。该方法必须在leader上调用

func (r *Raft) DemoteVoter(id ServerID, prevIndex uint64, timeout time.Duration) IndexFuture

重新加载节点配置:

func (r *Raft) ReloadConfig(rc ReloadableConfig) error
Raft数据的存储和读取

用于阻塞等待FSM apply所有操作。该方法必须在leader上调用

func (r *Raft) Barrier(timeout time.Duration) Future

apply一个命令到FSM,该方法必须在leader上调用

func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture

从上面接口可以看到,在Raft协议中,必须通过leader才能写入(apply)数据,在非leader的节点上执行Apply()会返回ErrNotLeader的错误。

Apply方法会调用LogStore接口的StoreLogs方法存储log(cmd)。Raft.applyCh负责将log发送给FSM进行处理,最后通过dispatchLogs将log分发给其他节点(dispatchLogs会调用Transport.AppendEntries来将log分发给对端)。

在分布式环境中,外部请求可能通过LB转发到非leader节点上,此时非leader节点需要将请求转发到leader节点上进行处理,在consul中会通过ForwardRPC将请求转发给leader,再由leader执行Apply操作。

当集群中的节点少于仲裁数目时,集群将无法正常运作,此时可以手动调用如下接口尝试恢复集群,但这样会可能会导致原本正在复制的日志被commit。

最佳方式是停止所有节点,并在所有节点上运行RecoverCluster,当集群重启之后,会发生选举,Raft也会恢复运作。

func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
  snaps SnapshotStore, trans Transport, configuration Configuration) error 

通过如下方式可以让集群使用外部snapshot(如备份的snapshot)。注意该操作只适用于DR,且只能在Leader上运行

func (r *Raft) Restore(meta *SnapshotMeta, reader io.Reader, timeout time.Duration) error

获取节点的状态信息

func (r *Raft) Stats() map[string]string

返回当前leader的地址和集群ID。如果当前没有leader则返回空:

func (r *Raft) LeaderWithID() (ServerAddress, ServerID)
节点数据交互

各个节点之间主要通过RPC来交互log和选举信息,可以分为RPC客户端和RPC服务端。

RPC客户端通过调用Transport接口方法来传递数据(如Leader执行Raft.Apply log之后会调用Transport.AppendEntries来分发log)。

RPC服务端的实现如下,其处理了不同类型的RPC请求,如AppendEntriesRequest就是Leader执行Transport.AppendEntries传递的请求内容:

func (r *Raft) processRPC(rpc RPC) {
  if err := r.checkRPCHeader(rpc); err != nil {
    rpc.Respond(nil, err)
    return
  }

  switch cmd := rpc.Command.(type) {
  case *AppendEntriesRequest:
    r.appendEntries(rpc, cmd)
  case *RequestVoteRequest:
    r.requestVote(rpc, cmd)
  case *InstallSnapshotRequest:
    r.installSnapshot(rpc, cmd)
  case *TimeoutNowRequest:
    r.timeoutNow(rpc, cmd)
  default:
    r.logger.Error("got unexpected command",
      "command", hclog.Fmt("%#v", rpc.Command))
    rpc.Respond(nil, fmt.Errorf("unexpected command"))
  }
}

实现Raft时需要考虑如下几点:

  • 实现FSM接口,包含FSMFSMSnapshot这两个接口
  • 如何实现Raft节点的自动发现,包含节点的加入和退出
  • 客户端和应用的交互接口,主要用于应用数据的增删改等查等操作,对FSM的修改必须通过Raft.Apply接口实现,以保证FSM的数据一致性,而在读取应用数据时,如果要求数据强一致,则需要从leader的FSM读取,否则也可以从follower的FSM读取
  • 在非Leader节点接收到客户端的修改类请求后,如何将请求转发给Leader节点

在此次实现Raft的过程中,主要参考了stcacheconsul的源代码,其中FSM的实现参考了前者,而Raft的初始化和节点发现参考了后者。

源代码结构如下:

- src
    discovery #节点发现代码
    raft      #raft管理代码
    rpc       #请求转发代码
    service   #主服务管理代码
  • discovery:采用serf来实现节点发现,它底层采用的还是memberlist,通过gossip来管理节点。

  • rpc:实现了非Leader节点向Leader节点转发请求的功能,本demo仅实现了/api/v1/set接口转发,对于/api/v1/get接口,则直接从本节点的FSM中获取数据,因此get接口不是强一致性的。

    使用如下命令可以生成rpc模块的pb.go文件:

    $ protoc --go_out=. --go_opt=paths=source_relative  --go-grpc_out=. --go-grpc_opt=paths=source_relative ./forward.proto
    

启动demo

下面启动3个节点来组成Raft集群:

入参描述如下:

  • httpAddress:与用户交互的服务
  • raftTCPAddress:Raft服务
  • rpcAddress:请求转发的gRpc服务
  • serfAddress:serf节点发现服务
  • dataDir:Raft存储路径,创建Raft节点时会用到
  • bootstrap:该节点是否需要使用bootstrap方式启动
  • joinAddress:加入Raft集群的地址,为serfAddress,可以添加多个,如add1,add2
  1. 第一个节点启动时并没有需要加入的集群,因此第一个节点以bootstrap方式启动,启动后成为leader。

    $ raft-example --httpAddress 0.0.0.0:5000 --raftTCPAddress 192.168.1.42:6000 --rpcAddress=0.0.0.0:7000 --serfAddress 192.168.1.42:8000 --dataDir /Users/charlie.liu/home/raftDatadir/node0 --bootstrap true
    

    注意:raftTCPAddress不能为0.0.0.0,否则raft会报错误:"local bind address is not advertisable"serfAddress的地址最好也不要使用0.0.0.0

  2. 启动第2、3个节点,后续的节点启动的时候需要加入集群,启动的时候指定第一个节点的地址:

    $ raft-example --httpAddress 0.0.0.0:5001 --raftTCPAddress 192.168.1.42:6001 --rpcAddress=0.0.0.0:7001 --serfAddress 192.168.1.42:8001 --dataDir /Users/charlie.liu/home/raftDatadir/node1 --joinAddress 192.168.1.42:8000
    
    $ raft-example --httpAddress 0.0.0.0:5002 --raftTCPAddress 192.168.1.42:6002 --rpcAddress=0.0.0.0:7002 --serfAddress 192.168.1.42:8002 --dataDir /Users/charlie.liu/home/raftDatadir/node2 --joinAddress 192.168.1.42:8000
    

在节点启动之后,就可以在Leader的标准输出中可以看到Raft集群中的成员信息:

[INFO]  raft: updating configuration: command=AddVoter server-id=192.168.1.42:6002 server-addr=192.168.1.42:6002 servers="[{Suffrage:Voter ID:192.168.1.42:6000 Address:192.168.1.42:6000} {Suffrage:Voter ID:192.168.1.42:6001 Address:192.168.1.42:6001} {Suffrage:Voter ID:192.168.1.42:6002 Address:192.168.1.42:6002}]"

使用/api/maintain/stats接口可以查看各个节点的状态,num_peers展示了对端节点数目,state展示了当前节点的角色。

$ curl 0.0.0.0:5000/api/maintain/stats|jq   //node0为Leader
{
  "applied_index": "6",
  "commit_index": "6",
  "fsm_pending": "0",
  "last_contact": "0",
  "last_log_index": "6",
  "last_log_term": "2",
  "last_snapshot_index": "0",
  "last_snapshot_term": "0",
  "latest_configuration": "[{Suffrage:Voter ID:192.168.1.42:6000 Address:192.168.1.42:6000} {Suffrage:Voter ID:192.168.1.42:6001 Address:192.168.1.42:6001} {Suffrage:Voter ID:192.168.1.42:6002 Address:192.168.1.42:6002}]",
  "latest_configuration_index": "0",
  "num_peers": "2",
  "protocol_version": "3",
  "protocol_version_max": "3",
  "protocol_version_min": "0",
  "snapshot_version_max": "1",
  "snapshot_version_min": "0",
  "state": "Leader",
  "term": "2"
}

$ curl 0.0.0.0:5001/api/maintain/stats|jq   //node2为Follower
{
  "applied_index": "6",
  "commit_index": "6",
  "fsm_pending": "0",
  "last_contact": "15.996792ms",
  "last_log_index": "6",
  "last_log_term": "2",
  "last_snapshot_index": "0",
  "last_snapshot_term": "0",
  "latest_configuration": "[{Suffrage:Voter ID:192.168.1.42:6000 Address:192.168.1.42:6000} {Suffrage:Voter ID:192.168.1.42:6001 Address:192.168.1.42:6001} {Suffrage:Voter ID:192.168.1.42:6002 Address:192.168.1.42:6002}]",
  "latest_configuration_index": "0",
  "num_peers": "2",
  "protocol_version": "3",
  "protocol_version_max": "3",
  "protocol_version_min": "0",
  "snapshot_version_max": "1",
  "snapshot_version_min": "0",
  "state": "Follower",
  "term": "2"
}

$ curl 0.0.0.0:5002/api/maintain/stats|jq   //node2为Follower
{
  "applied_index": "6",
  "commit_index": "6",
  "fsm_pending": "0",
  "last_contact": "76.764584ms",
  "last_log_index": "6",
  "last_log_term": "2",
  "last_snapshot_index": "0",
  "last_snapshot_term": "0",
  "latest_configuration": "[{Suffrage:Voter ID:192.168.1.42:6000 Address:192.168.1.42:6000} {Suffrage:Voter ID:192.168.1.42:6001 Address:192.168.1.42:6001} {Suffrage:Voter ID:192.168.1.42:6002 Address:192.168.1.42:6002}]",
  "latest_configuration_index": "0",
  "num_peers": "2",
  "protocol_version": "3",
  "protocol_version_max": "3",
  "protocol_version_min": "0",
  "snapshot_version_max": "1",
  "snapshot_version_min": "0",
  "state": "Follower",
  "term": "2"
}

Leader切换

停掉上述Demo中的Leader节点(node0),可以看到node1称为新的leader,且term变为4:

$ curl 0.0.0.0:5001/api/maintain/stats|jq  //新的Leader
{
  "applied_index": "15",
  "commit_index": "15",
  "fsm_pending": "0",
  "last_contact": "0",
  "last_log_index": "15",
  "last_log_term": "4",
  "last_snapshot_index": "0",
  "last_snapshot_term": "0",
  "latest_configuration": "[{Suffrage:Voter ID:192.168.1.42:6000 Address:192.168.1.42:6000} {Suffrage:Voter ID:192.168.1.42:6001 Address:192.168.1.42:6001} {Suffrage:Voter ID:192.168.1.42:6002 Address:192.168.1.42:6002}]",
  "latest_configuration_index": "0",
  "num_peers": "2",
  "protocol_version": "3",
  "protocol_version_max": "3",
  "protocol_version_min": "0",
  "snapshot_version_max": "1",
  "snapshot_version_min": "0",
  "state": "Leader",
  "term": "4"
}

$ curl 0.0.0.0:5002/api/maintain/stats|jq
{
  "applied_index": "15",
  "commit_index": "15",
  "fsm_pending": "0",
  "last_contact": "42.735ms",
  "last_log_index": "15",
  "last_log_term": "4",
  "last_snapshot_index": "0",
  "last_snapshot_term": "0",
  "latest_configuration": "[{Suffrage:Voter ID:192.168.1.42:6000 Address:192.168.1.42:6000} {Suffrage:Voter ID:192.168.1.42:6001 Address:192.168.1.42:6001} {Suffrage:Voter ID:192.168.1.42:6002 Address:192.168.1.42:6002}]",
  "latest_configuration_index": "0",
  "num_peers": "2",
  "protocol_version": "3",
  "protocol_version_max": "3",
  "protocol_version_min": "0",
  "snapshot_version_max": "1",
  "snapshot_version_min": "0",
  "state": "Follower",
  "term": "4"
}

在本实现中,如果停止一个Raft节点,则Leader节点会一直打印连接该节点失败的日志,原因是在Ctrl+c停止Raft节点的时候没有调用Raft.RemoveServer来移除该节点。这种处理方式是合理的,因为当一个节点重启或故障的时候,不应该从Raft中移除,此时应该查明原因,恢复集群。

本实现中没有主动移除Raft节点的接口,也可以添加一个接口来调用Raft.RemoveServer,进而移除预期的节点,注意只能在Leader节点上执行Raft.RemoveServer

应用数据的读写

下面我们验证应用数据的写入和读取。

  1. 向非Leader节点写入数据,其会将写入请求转发给leader,由leader执行数据写入。下面展示向非Leader节写入数据的场景:

    $ curl 0.0.0.0:5001/api/maintain/stats|jq
    {
      "applied_index": "64",
      "commit_index": "64",
      "fsm_pending": "0",
      "last_contact": "4.312667ms",
      "last_log_index": "64",
      "last_log_term": "137",
      "last_snapshot_index": "0",
      "last_snapshot_term": "0",
      "latest_configuration": "[{Suffrage:Voter ID:0.0.0.0:7000 Address:192.168.1.42:6000} {Suffrage:Voter ID:0.0.0.0:7001 Address:192.168.1.42:6001} {Suffrage:Voter ID:0.0.0.0:7002 Address:192.168.1.42:6002}]",
      "latest_configuration_index": "0",
      "num_peers": "2",
      "protocol_version": "3",
      "protocol_version_max": "3",
      "protocol_version_min": "0",
      "snapshot_version_max": "1",
      "snapshot_version_min": "0",
      "state": "Follower",  #非Leader节点
      "term": "137"
    }
    
    $ curl -XPOST localhost:5001/api/v1/set --header 'Content-Type: application/json' --header 'Content-Type: application/json' -d '
    {
        "key" : "testKey",
        "value" : "testValue"
    }'
    
  2. 向所有节点查询写入的数据,可以看到所有节点都可以查询到该数据:

    $ curl -XGET localhost:5000/api/v1/get --header 'Content-Type: application/json' --header 'Content-Type: application/json' -d '
    {
        "key" : "testKey"
    }'
    testValue
    
    $ curl -XGET localhost:5001/api/v1/get --header 'Content-Type: application/json' --header 'Content-Type: application/json' -d '
    {
        "key" : "testKey"
    }'
    testValue
    
    $curl -XGET localhost:5002/api/v1/get --header 'Content-Type: application/json' --header 'Content-Type: application/json' -d '
    {
        "key" : "testKey"
    }'
    testValue
    
  • 验证场景下,如果节点IP发生变动,可以通过删除--dataDir目录来清除集群元数据
  • 如果集群中的节点不足仲裁数目,则节点可能处理candidate状态,无法变为Leader,因此要保证集群中有足够的节点,避免一次停掉过多节点。

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK