5

MQ系列4:NameServer 原理解析 - Hello-Brand

 1 year ago
source link: https://www.cnblogs.com/wzh2010/p/16607258.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

MQ系列1:消息中间件执行原理
MQ系列2:消息中间件的技术选型
MQ系列3:RocketMQ 架构分析

1 关于NameServer

上一节的 MQ系列3:RocketMQ 架构分析,我们大致介绍了 RocketMQ的基本组件构成,包括 NameServer、Broker、Producer以及Consumer四部分。
NameServer,指的是服务可以根据给定的名字来进行资源或对象的地址定位,并获取有关的属性信息。在Rocket中也一样,NameServer是 RocketMQ 的服务注册中心(类似于 Kafka 集群 后面的 Zookeeper 集群一样, 对集群元数据进行管理),根据元数据(ip、port和router信息)来唯一定位服务。RocketMQ 需要先启动 NameServer ,再启动 Rocket 中的 Broker。

2 NameServer运行流程

2.1 注册

注册发生在Broker启动之后,启动后快速与NameServer建立长连接,并每30s对NameService发送一次心跳包,Broker会将自己的IP Address、Port、Router 等信息随着心跳一并注册到 NameServer中。

image

这里的RouterInfo 主要指Broker下包含哪些Topic信息,这种映射关系方便后面消息的生产和消费的时候进行寻址。

注册使用到的核心数据结构如下:
HashMap<String BrokerName, BrokerData> brokerAddrTable

  • HashMap 的 Key 是 Broker 的名称,存储了一个Broker服务所对应的属性信息。
  • Value 是个对象,数据结构如下:
字段 类型 说明
cluster String 所属的集群名称
broker String broker的名称
brokerAddress HashMap Broker的IP地址列表,包含一个Master IP地址列表 和 多个Slave IP地址列表
" Broker-A":{
	"cluster":"Broker-Cluster",
	"brokerName":"Broker-A",
	"cluster":{  // 1主2从
	   "0":"192.168.0.1:1234",
	   "1":"192.168.0.2:1234",
	   "2":"192.168.0.3:1234"
	}
}

2.2 注册信息更新

当你对你的Broker中的Topic信息进行更新了(增、删、改)怎么办,你才需要重新将信息注册到NameServer中。

  • 如果你创建了新的 Topic,Broker会向 NameServer 发送注册信息,接收到信息后会对每个Master 角色的Broker ,创建一个新的QueueData对象。
  • 如果你修改了Topic,则NameServer 会先把旧的 QueueData 删除,在加一个新的 QueueData。
  • 如果你删除了Topic,则NameServer 会将对应的 QueueData 删除。
image

使用到的核心数据结构如下:
HashMap<String topic, List<QueueData>> topicQueueTable

  • HashMap 的 Key 是 Topic 的名称,里面存储了Topic的所有属性信息。
  • Value 是个列表,列表的数据类型是 QueueData,列表的length就是Topic中的 Master角色的 Broker 个数。
  • QueueData的结构如下
字段 类型 说明
brokerName String broker名称
readQueueNums Long 读Queue的数量
writeQueueNums Long 写Queue的数量
perm Integer 权限 PRIORITY = 3, READ = 2, WRITE = 1 , INHERIT = 0
topicSyncFlag Long 同步的位置标识
{
  "topic-test":[ // topic名称,注意下面会用到
   { 
    "brokerName":"Broker-A",
	"readQueueNums":37,
	"writeQueueNums":37,
	"perm":6,  // 读写权限
	"topicSynFlag":12
   },
   { 
    "brokerName":"Broker-B",
	"readQueueNums":37,
	"writeQueueNums":37,
	"perm":6,  // 读写权限
	"topicSynFlag":12
   } 
 ]
}

参考RocketMQ源码如下,这边加了注释,方便理解:

    /**
     * 创建或者更新 MessageQueue 的数据
     * @param brokerName
     * @param topicConfig
     */
    private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
        QueueData queueData = new QueueData();
        queueData.setBrokerName(brokerName); // broker 名称
        queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());   // 读Queue的数量
        queueData.setReadQueueNums(topicConfig.getReadQueueNums());  // 写Queue的数量
        queueData.setPerm(topicConfig.getPerm());  // 权限: PRIORITY = 3, READ = 2, WRITE = 1 , INHERIT = 0
        queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());

        List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
        if (null == queueDataList) {  // 新增
            queueDataList = new LinkedList<QueueData>();
            queueDataList.add(queueData);
            this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
            log.info("new topic registerd, {} {}", topicConfig.getTopicName(), queueData);
        } else {   // 更新
            boolean addNewOne = true;   
            Iterator<QueueData> it = queueDataList.iterator();
            while (it.hasNext()) {
                QueueData qd = it.next();
                if (qd.getBrokerName().equals(brokerName)) {
                    if (qd.equals(queueData)) {
                        addNewOne = false;
                    } else {
                        log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
                                queueData);
                        it.remove();   // 先删除
                    }
                }
            }
            if (addNewOne) {
                queueDataList.add(queueData);   // 再添加
            }
        }
    }

2.3 异常清理

如果Broker挂掉,那么再被消息的生产者和消费者使用就会有问题了。这时候需要对已经宕掉的Broker进行清理,确保NamServer中注册的Broker服务信息都是Alive的。它的做法是这样的:

  • 前面我们说了,Broker每30s对NameService发送一次心跳包给NameServer
  • NameServer接收到心跳包的时候,会将当前时间戳更新到 brokerLiveTable 表的 lastUpdateTimestamp 字段中。
  • NameServer中会启动一个定时任务
  • 每10s(记住这边扫描是10s间隔,与上面心跳包区分开)扫描 一下 brokerLiveTable
  • 检查lastUpdateTimestamp字段,如果时间戳与当前时间相隔超过 120s(即两分钟),则认为 Broker 已经宕了,并会将broker清除出NameServer的注册表。
    image

使用到的核心数据结构如下:
HashMap<String BrokerAddr, BrokerLiveInfo> brokerLiveTable

  • HashMap 的 Key 是 Broker服务器的地址信息(IP+Port),里面存储了该Broker服务器的基本信息。
  • Value 是个对象,结构如下:
字段 类型 说明
lastUpdateTimestamp Long 最后一次收到心跳包的时间戳
dataVersion DataVersion 数据版本号对象
channel Channel netty的Channel,IO数据交互媒介
haServerAddr String master地址,初次请求的时候值为空,slave向NameServer注册之后返回

2.4 消息生产和消费

上面的步骤都完成之后,NameServer这个 "中央大脑" 正式开始投入使用。这时候 ,消息的生产和消费具体是怎么做的呢?

  • Producer 或者 Consumer 启动之后会和 NameServer 建立长连接
  • 定时(默认为每30s)从 NameServer 获取Routers信息,并将路由信息保存至Producer或者Consumer的本地。
  • Producer发送一条消息 hello-brand 到 topic (topic-test) 中
  • 因为名称为 topic-test 的 topic 存在于多个 broker中,所以需要如下几个步骤,才能找到具体的地址:
    • 先 根据 topic 名称 topic-test 查询 topicQueueTable , 选择一个并获取它的broker信息(包含brokerName)
    • 再根据已经获取到的brokerName 查询 brokerAddressTable 获取具体的Broker IP地址(一般包含1个Master和n个Slave的IP地址)
    • 拿到IP地址之后,生产者与broker建立连接,并发送消息
    • 消费者同理
      image
image

上述的流程图比较清晰的描述如下运转流程:

  • NameServer 作为整个 RocketMQ 的“中央大脑” ,负责对集群元数据进行管理,所以 RocketMQ 需要先启动 NameServer 再启动 Rocket 中的 Broker。
  • Broker 启动后,与 NameServer 保持长连接,每 30s 发送一次发送心跳包,来确保Broker是否存活。并将 Broker 信息 ( IP+、端口等信息)以及Broker中存储的Topic信息上报。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
  • NameServer有个定时任务,每10s扫描下brokerLiveTable表 , 如果检测到某个Broker 宕机(因为使用心跳机制, 如果检测超120s(两分钟)无上报心跳),则从路由注册表中将其移除。
  • 生产者在发送某个主题的消息之前先从 NamerServer 获取 Broker 服务器地址列表(通过topic名称查询topicQueueTable获得broker名称,通过broker名称查询brokerAddressTable获取具体的Broker IP地址),然后根据负载均衡算法从列表中选择1台Broker ,建立连接通道,进行消息发送。
  • 消费者在订阅某个topic的消息之前从 NamerServer 获取 Broker 服务器地址列表(同上),包括关联的全部Topic队列信息。进而获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费数据。
  • 生产者和消费者默认每30s 从 NamerServer 获取 Broker 服务器地址列表,以及关联的所有Topic队列信息,更新到Client本地。

参考:
https://zhuanlan.zhihu.com/p/388807516


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK