8

RocketMQ架构设计中的”暴力美学”(1)

 1 year ago
source link: https://vearne.cc/archives/39616
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | http://vearne.cc

人们在潜意识里,总会觉得复杂且精巧的东西是好东西。但是这个复杂这个词在软件架构设计中,却不一定是好事情。因为过于精巧和复杂的系统往往意味着系统更难以维护,出现问题后,故障更难排查。萌叔前段时间在阅读和分享RocketMQ的过程中,发现它有很多设计非常的简单粗暴,堪称”暴力美学”的典范,同时又给人眼前一亮的感觉(还能这么玩)。

1. NameServer高可用

在RocketMQ的架构体系中,NameServer的作用类似于注册中心,Broker会周期性的向NameServer发送心跳,注册Topic信息。Producer和Consumer会向NameServer查询某个Topic的路由信息(Topic位于哪个Broker)
Topic路由信息

{
    "OrderTopicConf": "",
    "queueDatas": [{
        "brokerName": "broker-3",
        "readQueueNums": 4,
        "writeQueueNums": 4,
        "perm": 6,
        "topicSynFlag": 0
    }, {
        "brokerName": "broker-4",
        "readQueueNums": 4,
        "writeQueueNums": 4,
        "perm": 6,
        "topicSynFlag": 0
    }],
    ...
}

如上面所示,某个Topic位于broker-3和broker-4,每个Broker上有4个MessageQueue

那么如何保证部分NameServer实例宕机后,注册中心的功能仍然能够正常运转呢?

按照正常点的想法,肯定是NameServer分成Master和Slave,然后Master和Slave之间在加上数据同步,如果Master宕机了,只要进行主从切换即可。这种做法肯定没问题,毕竟Hadoop中的NameNode也就是这么做的。

RocketMQ中的实现要简单的多,每个NameServer相互独立,并且它们之间没有通信。Broker会向每一个NameServer、NameServer1、NameServer2等等发送心跳,心跳中包含有它所维护的每个Topic的信息),这样每个NameServer就都含有路由信息。

等到Producer和Consumer向NameServer查询路由信息时,它会尝试向每一个NameServer进行请求。

下面的代码萌叔使用的是rocketmq-client-go的代码,因为rocketmq-client-go比Java的代码更加明显。
internal/route.go#queryTopicRouteInfoFromServer

func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData, error) {
    request := &GetRouteInfoRequestHeader{
        Topic: topic,
    }

    var (
        response *remote.RemotingCommand
        err      error
    )
   ...
   // 遍历每一个NameServer
    for i := 0; i < s.Size(); i++ { 
        rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil)
        ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
        response, err = s.nameSrvClient.InvokeSync(ctx, s.getNameServerAddress(), rc)

        if err == nil {
            cancel()
            break
        }
        cancel()
    }
    if err != nil {
        rlog.Error("connect to namesrv failed.", map[string]interface{}{
            "namesrv": s,
            "topic":   topic,
        })
        return nil, primitive.NewRemotingErr(err.Error())
    }

    switch response.Code {
    case ResSuccess:
        if response.Body == nil {
            return nil, primitive.NewMQClientErr(response.Code, response.Remark)
        }
        routeData := &TopicRouteData{}

        err = routeData.decode(string(response.Body))
        if err != nil {
            rlog.Warning("decode TopicRouteData error: %s", map[string]interface{}{
                rlog.LogKeyUnderlayError: err,
                "topic":                  topic,
            })
            return nil, err
        }
        return routeData, nil
    case ResTopicNotExist:
        return nil, ErrTopicNotExist
    default:
        return nil, primitive.NewMQClientErr(response.Code, response.Remark)
    }
}

请我喝瓶饮料

微信支付码


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK