2

GO实现Redis:GO实现Redis集群(5) - csgopher

 1 year ago
source link: https://www.cnblogs.com/csgopher/p/17249367.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
  • 采用一致性hash算法将key分散到不同的节点,客户端可以连接到集群中任意一个节点
  • https://github.com/csgopher/go-redis
  • 本文涉及以下文件:
    consistenthash:实现添加和选择节点方法
    standalone_database:单机database
    client:客户端
    client_pool:实现连接池
    cluster_database:对key进行路由
    com:与其他节点通信
    router,ping,keys,del,select:各类命令的转发具体逻辑

一致性哈希

为什么需要一致性 hash?
在采用分片方式建立分布式缓存时,我们面临的第一个问题是如何决定存储数据的节点。最自然的方式是参考 hash 表的做法,假设集群中存在 n 个节点,我们用 node = hashCode(key) % n 来决定所属的节点。
普通 hash 算法解决了如何选择节点的问题,但在分布式系统中经常出现增加节点或某个节点宕机的情况。若节点数 n 发生变化, 大多数 key 根据 node = hashCode(key) % n 计算出的节点都会改变。这意味着若要在 n 变化后维持系统正常运转,需要将大多数数据在节点间进行重新分布。这个操作会消耗大量的时间和带宽等资源,这在生产环境下是不可接受的。
算法原理
一致性 hash 算法的目的是在节点数量 n 变化时, 使尽可能少的 key 需要进行节点间重新分布。一致性 hash 算法将数据 key 和服务器地址 addr 散列到 2^32 的空间中。
我们将 2^32 个整数首尾相连形成一个环,首先计算服务器地址 addr 的 hash 值放置在环上。然后计算 key 的 hash 值放置在环上,顺时针查找,将数据放在找到的的第一个节点上。
在增加或删除节点时只有该节点附近的数据需要重新分布,从而解决了上述问题。
如果服务器节点较少则比较容易出现数据分布不均匀的问题,一般来说环上的节点越多数据分布越均匀。我们不需要真的增加一台服务器,只需要将实际的服务器节点映射为几个虚拟节点放在环上即可。
参考:https://www.cnblogs.com/Finley/p/14038398.html

lib/consistenthash/consistenthash.go

type HashFunc func(data []byte) uint32

type NodeMap struct {
   hashFunc    HashFunc
   nodeHashs   []int          
   nodehashMap map[int]string 
}

func NewNodeMap(fn HashFunc) *NodeMap {
   m := &NodeMap{
      hashFunc:    fn,
      nodehashMap: make(map[int]string),
   }
   if m.hashFunc == nil {
      m.hashFunc = crc32.ChecksumIEEE
   }
   return m
}

func (m *NodeMap) IsEmpty() bool {
   return len(m.nodeHashs) == 0
}

func (m *NodeMap) AddNode(keys ...string) {
   for _, key := range keys {
      if key == "" {
         continue
      }
      hash := int(m.hashFunc([]byte(key)))
      m.nodeHashs = append(m.nodeHashs, hash)
      m.nodehashMap[hash] = key
   }
   sort.Ints(m.nodeHashs)
}

func (m *NodeMap) PickNode(key string) string {
   if m.IsEmpty() {
      return ""
   }

   hash := int(m.hashFunc([]byte(key)))

   
   idx := sort.Search(len(m.nodeHashs), func(i int) bool {
      return m.nodeHashs[i] >= hash
   })

   
   if idx == len(m.nodeHashs) {
      idx = 0
   }

   return m.nodehashMap[m.nodeHashs[idx]]
}

HashFunc:hash函数定义,Go的hash函数就是这样定义的
NodeMap:存储所有节点和节点的hash

  • nodeHashs:各个节点的hash值,顺序的
  • nodehashMap<hash, 节点>

AddNode:添加节点到一致性哈希中
PickNode:选择节点。使用二分查找,如果hash比nodeHashs中最大的hash还要大,idx=0

database/standalone_database.go

type StandaloneDatabase struct {
   dbSet []*DB
   aofHandler *aof.AofHandler
}

func NewStandaloneDatabase() *StandaloneDatabase {
  ......
}

把database/database改名为database/standalone_database,再增加一个cluster_database用于对key的路由

resp/client/client.go

// Client is a pipeline mode redis client
type Client struct {
   conn        net.Conn
   pendingReqs chan *request // wait to send
   waitingReqs chan *request // waiting response
   ticker      *time.Ticker
   addr        string

   working *sync.WaitGroup // its counter presents unfinished requests(pending and waiting)
}

// request is a message sends to redis server
type request struct {
   id        uint64
   args      [][]byte
   reply     resp.Reply
   heartbeat bool
   waiting   *wait.Wait
   err       error
}

const (
   chanSize = 256
   maxWait  = 3 * time.Second
)

// MakeClient creates a new client
func MakeClient(addr string) (*Client, error) {
   conn, err := net.Dial("tcp", addr)
   if err != nil {
      return nil, err
   }
   return &Client{
      addr:        addr,
      conn:        conn,
      pendingReqs: make(chan *request, chanSize),
      waitingReqs: make(chan *request, chanSize),
      working:     &sync.WaitGroup{},
   }, nil
}

// Start starts asynchronous goroutines
func (client *Client) Start() {
   client.ticker = time.NewTicker(10 * time.Second)
   go client.handleWrite()
   go func() {
      err := client.handleRead()
      if err != nil {
         logger.Error(err)
      }
   }()
   go client.heartbeat()
}

// Close stops asynchronous goroutines and close connection
func (client *Client) Close() {
   client.ticker.Stop()
   // stop new request
   close(client.pendingReqs)

   // wait stop process
   client.working.Wait()

   // clean
   _ = client.conn.Close()
   close(client.waitingReqs)
}

func (client *Client) handleConnectionError(err error) error {
   err1 := client.conn.Close()
   if err1 != nil {
      if opErr, ok := err1.(*net.OpError); ok {
         if opErr.Err.Error() != "use of closed network connection" {
            return err1
         }
      } else {
         return err1
      }
   }
   conn, err1 := net.Dial("tcp", client.addr)
   if err1 != nil {
      logger.Error(err1)
      return err1
   }
   client.conn = conn
   go func() {
      _ = client.handleRead()
   }()
   return nil
}

func (client *Client) heartbeat() {
   for range client.ticker.C {
      client.doHeartbeat()
   }
}

func (client *Client) handleWrite() {
   for req := range client.pendingReqs {
      client.doRequest(req)
   }
}

// Send sends a request to redis server
func (client *Client) Send(args [][]byte) resp.Reply {
   request := &request{
      args:      args,
      heartbeat: false,
      waiting:   &wait.Wait{},
   }
   request.waiting.Add(1)
   client.working.Add(1)
   defer client.working.Done()
   client.pendingReqs <- request
   timeout := request.waiting.WaitWithTimeout(maxWait)
   if timeout {
      return reply.MakeErrReply("server time out")
   }
   if request.err != nil {
      return reply.MakeErrReply("request failed")
   }
   return request.reply
}

func (client *Client) doHeartbeat() {
   request := &request{
      args:      [][]byte{[]byte("PING")},
      heartbeat: true,
      waiting:   &wait.Wait{},
   }
   request.waiting.Add(1)
   client.working.Add(1)
   defer client.working.Done()
   client.pendingReqs <- request
   request.waiting.WaitWithTimeout(maxWait)
}

func (client *Client) doRequest(req *request) {
   if req == nil || len(req.args) == 0 {
      return
   }
   re := reply.MakeMultiBulkReply(req.args)
   bytes := re.ToBytes()
   _, err := client.conn.Write(bytes)
   i := 0
   for err != nil && i < 3 {
      err = client.handleConnectionError(err)
      if err == nil {
         _, err = client.conn.Write(bytes)
      }
      i++
   }
   if err == nil {
      client.waitingReqs <- req
   } else {
      req.err = err
      req.waiting.Done()
   }
}

func (client *Client) finishRequest(reply resp.Reply) {
   defer func() {
      if err := recover(); err != nil {
         debug.PrintStack()
         logger.Error(err)
      }
   }()
   request := <-client.waitingReqs
   if request == nil {
      return
   }
   request.reply = reply
   if request.waiting != nil {
      request.waiting.Done()
   }
}

func (client *Client) handleRead() error {
   ch := parser.ParseStream(client.conn)
   for payload := range ch {
      if payload.Err != nil {
         client.finishRequest(reply.MakeErrReply(payload.Err.Error()))
         continue
      }
      client.finishRequest(payload.Data)
   }
   return nil
}

client:Redis客户端,具体看:https://www.cnblogs.com/Finley/p/14028402.html

go.mod

require github.com/jolestar/go-commons-pool/v2 v2.1.2

key的转发需要当前节点存储其他节点的连接,互相作为客户端,使用连接池将其他连接池化

cluster/client_pool.go

type connectionFactory struct {
   Peer string // 连接地址
}

func (f *connectionFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) {
   c, err := client.MakeClient(f.Peer)
   if err != nil {
      return nil, err
   }
   c.Start()
   return pool.NewPooledObject(c), nil
}

func (f *connectionFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error {
   c, ok := object.Object.(*client.Client)
   if !ok {
      return errors.New("type mismatch")
   }
   c.Close()
   return nil
}

func (f *connectionFactory) ValidateObject(ctx context.Context, object *pool.PooledObject) bool {
   // do validate
   return true
}

func (f *connectionFactory) ActivateObject(ctx context.Context, object *pool.PooledObject) error {
   // do activate
   return nil
}

func (f *connectionFactory) PassivateObject(ctx context.Context, object *pool.PooledObject) error {
   // do passivate
   return nil
}

client_pool:使用连接池的NewObjectPoolWithDefaultConfig创建连接,需要实现PooledObjectFactory接口

redis.conf

self 127.0.0.1:6379
peers 127.0.0.1:6380

配置中写自己和其他节点的地址

cluster/cluster_database.go

type clusterDatabase struct {
   self           string
   nodes          []string
   peerPicker     *consistenthash.NodeMap
   peerConnection map[string]*pool.ObjectPool
   db             databaseface.Database
}

func MakeClusterDatabase() *clusterDatabase {
   cluster := &clusterDatabase{
      self:           config.Properties.Self,
      db:             database.NewStandaloneDatabase(),
      peerPicker:     consistenthash.NewNodeMap(nil),
      peerConnection: make(map[string]*pool.ObjectPool),
   }
   nodes := make([]string, 0, len(config.Properties.Peers)+1)
   for _, peer := range config.Properties.Peers {
      nodes = append(nodes, peer)
   }
   nodes = append(nodes, config.Properties.Self)
   cluster.peerPicker.AddNode(nodes...)
   ctx := context.Background()
   for _, peer := range config.Properties.Peers {
      cluster.peerConnection[peer] = pool.NewObjectPoolWithDefaultConfig(ctx, &connectionFactory{
         Peer: peer,
      })
   }
   cluster.nodes = nodes
   return cluster
}

func (cluster *clusterDatabase) Close() {
	cluster.db.Close()
}

func (cluster *ClusterDatabase) AfterClientClose(c resp.Connection) {
	cluster.db.AfterClientClose(c)
}

type CmdFunc func(cluster *clusterDatabase, c resp.Connection, cmdAndArgs [][]byte) resp.Reply

cluster_database用于对key的路由
clusterDatabase:
nodes:所有节点
peerPicker :节点的添加和选择
peerConnection:Map<node, 连接池>
db:单机database
CmdFunc:表示Redis的指令类型

cluster/com.go

func (cluster *clusterDatabase) getPeerClient(peer string) (*client.Client, error) {
   factory, ok := cluster.peerConnection[peer]
   if !ok {
      return nil, errors.New("connection factory not found")
   }
   raw, err := factory.BorrowObject(context.Background())
   if err != nil {
      return nil, err
   }
   conn, ok := raw.(*client.Client)
   if !ok {
      return nil, errors.New("connection factory make wrong type")
   }
   return conn, nil
}

func (cluster *clusterDatabase) returnPeerClient(peer string, peerClient *client.Client) error {
   connectionFactory, ok := cluster.peerConnection[peer]
   if !ok {
      return errors.New("connection factory not found")
   }
   return connectionFactory.ReturnObject(context.Background(), peerClient)
}

func (cluster *clusterDatabase) relay(peer string, c resp.Connection, args [][]byte) resp.Reply {
   if peer == cluster.self {
      return cluster.db.Exec(c, args)
   }
   peerClient, err := cluster.getPeerClient(peer)
   if err != nil {
      return reply.MakeErrReply(err.Error())
   }
   defer func() {
      _ = cluster.returnPeerClient(peer, peerClient)
   }()
   peerClient.Send(utils.ToCmdLine("SELECT", strconv.Itoa(c.GetDBIndex())))
   return peerClient.Send(args)
}

func (cluster *clusterDatabase) broadcast(c resp.Connection, args [][]byte) map[string]resp.Reply {
   result := make(map[string]resp.Reply)
   for _, node := range cluster.nodes {
      relay := cluster.relay(node, c, args)
      result[node] = relay
   }
   return result
}

communication:与其他节点通信。执行模式有本地(自己执行),转发(别人执行),群发(所有节点执行)
getPeerClient :从连接池拿一个连接
returnPeerClient :归还连接
relay :转发指令给其他客户端,发送指令之前需要先发一下选择的db
broadcast :指令广播给所有节点

cluster/router.go

func makeRouter() map[string]CmdFunc {
    routerMap := make(map[string]CmdFunc)
    routerMap["ping"] = ping
    routerMap["del"] = Del
    routerMap["exists"] = defaultFunc
    routerMap["type"] = defaultFunc
    routerMap["rename"] = Rename
    routerMap["renamenx"] = Rename
    routerMap["set"] = defaultFunc
    routerMap["setnx"] = defaultFunc
    routerMap["get"] = defaultFunc
    routerMap["getset"] = defaultFunc
    routerMap["flushdb"] = FlushDB
	routerMap["select"] = execSelect
    return routerMap
}

func defaultFunc(cluster *clusterDatabase, c resp.Connection, args [][]byte) resp.Reply {
    key := string(args[1])
    peer := cluster.peerPicker.PickNode(key)
    return cluster.relay(peer, c, args)
}

defaultFunc:转发指令的默认实现

cluster/ping.go

func ping(cluster *clusterDatabase, c resp.Connection, cmdAndArgs [][]byte) resp.Reply {
   return cluster.db.Exec(c, cmdAndArgs)
}

cluster/rename.go

func Rename(cluster *clusterDatabase, c resp.Connection, args [][]byte) resp.Reply {
   if len(args) != 3 {
      return reply.MakeErrReply("ERR wrong number of arguments for 'rename' command")
   }
   src := string(args[1])
   dest := string(args[2])

   srcPeer := cluster.peerPicker.PickNode(src)
   destPeer := cluster.peerPicker.PickNode(dest)

   if srcPeer != destPeer {
      return reply.MakeErrReply("ERR rename must within one slot in cluster mode")
   }
   return cluster.relay(srcPeer, c, args)
}

Rename:修改key的name,两个key的hash必须在同一个节点中

cluster/keys.go

func FlushDB(cluster *clusterDatabase, c resp.Connection, args [][]byte) resp.Reply {
   replies := cluster.broadcast(c, args)
   var errReply reply.ErrorReply
   for _, v := range replies {
      if reply.IsErrorReply(v) {
         errReply = v.(reply.ErrorReply)
         break
      }
   }
   if errReply == nil {
      return &reply.OkReply{}
   }
   return reply.MakeErrReply("error occurs: " + errReply.Error())
}

cluster/del.go

func Del(cluster *clusterDatabase, c resp.Connection, args [][]byte) resp.Reply {
   replies := cluster.broadcast(c, args)
   var errReply reply.ErrorReply
   var deleted int64 = 0
   for _, v := range replies {
      if reply.IsErrorReply(v) {
         errReply = v.(reply.ErrorReply)
         break
      }
      intReply, ok := v.(*reply.IntReply)
      if !ok {
         errReply = reply.MakeErrReply("error")
      }
      deleted += intReply.Code
   }

   if errReply == nil {
      return reply.MakeIntReply(deleted)
   }
   return reply.MakeErrReply("error occurs: " + errReply.Error())
}

cluster/select.go

func execSelect(cluster *clusterDatabase, c resp.Connection, cmdAndArgs [][]byte) resp.Reply {
   return cluster.db.Exec(c, cmdAndArgs)
}

cluster/cluster_database.go

var router = makeRouter()

func (cluster *clusterDatabase) Exec(c resp.Connection, cmdLine [][]byte) (result resp.Reply) {
   defer func() {
      if err := recover(); err != nil {
         logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack())))
         result = &reply.UnknownErrReply{}
      }
   }()
   cmdName := strings.ToLower(string(cmdLine[0]))
   cmdFunc, ok := router[cmdName]
   if !ok {
      return reply.MakeErrReply("ERR unknown command '" + cmdName + "', or not supported in cluster mode")
   }
   result = cmdFunc(cluster, c, cmdLine)
   return
}

resp/handler/handler.go

func MakeHandler() *RespHandler {
   var db databaseface.Database
   if config.Properties.Self != "" && len(config.Properties.Peers) > 0 {
      db = cluster.MakeClusterDatabase()
   } else {
      db = database.NewStandaloneDatabase()
   }
   return &RespHandler{
      db: db,
   }
}

MakeHandler:判断是单机还是集群

先go build,打开项目文件夹找到exe文件,把exe文件和redis.conf放到一个文件夹里,redis.conf改成如下,然后启动exe文件。再回到GoLand启动第二个节点6379。

bind 0.0.0.0
port 6380

appendonly yes
appendfilename appendonly.aof

self 127.0.0.1:6380
peers 127.0.0.1:6379

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK