6

RocketMQ源码分析之消息发送

 3 years ago
source link: https://zhuanlan.zhihu.com/p/58026650
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

RocketMQ源码分析之消息发送

中华儿女多奇志,不爱无码爱代码

一:前言

上接《RocketMQ源码分析之服务发现》,从这篇文章开始要进入消息发送、存储、消费的源码阅读历程了,本篇要解读的便是RocketMQ消息发送过程。

二:架构设计

在上篇文章《RocketMQ源码分析之服务发现》中,我们提到了Broker在启动后会周期性的向NameSrv注册自身及Topic路由信息,而生产者同样会周期性的从NameSrv上拉取更新本地的Topic路由信息。当生产者开始发送某一Topic的消息时,便会从本地的路由表中找到Topic对于的路由,选择合适的Broker发送消息。

对于Topic路由信息来说,RocketMQ不是简单的记录了Broker地址,而是抽象出了Queue的概念,每个Topic路由中会包括若干Queue,即Topic与Queue是一对多的关系。每个Queue都记录了自己所属的Broker,对于同一Topic而言,它的多个Queue可能指向同一个Broker。

生产者在发送消息的时候,会根据消息的Topic,选出对应路由信息,再挑选出某个Queue,将消息发送至Queue对应的Broker。

下面是我阅读源码后根据自己的理解画的消息发送架构图。

v2-b889bb84fd769e7c3dfadf6b56a62218_720w.jpg

BrokerA和BrokerB都属于Master节点,并且都配置了接收主题TopicX的消息,当它们启动后,都会向NameSrv注册自身包含的Topic路由信息。而Producer在发送TopicX的消息时,便会将消息平均发送到每个Queue(queue1,queue2,queue3,queue4),从而发送到Queue对应的Broker。

看完整体架构图,如果你还想继续探索消息发送的具体过程,便可以和我一起,接着往下读读RocketMQ的源码。

三:源码解读

使用RocketMQ 发送消息代码如下所示:

DefaultMQProducer producer = new DefaultMQProducer(topic);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message msg = new Message(topic,
        "testTag",
        "testKey",
        "testMessage".getBytes());
SendResult result = producer.send(msg);

消息对应的Topic信息以及具体内容被封装在了Message中,并交由DefaultMQProducer,调用send()进行发送。DefaultMQProducer 只是一个面向调用方的代理,真正的生产者是DefaultMQProducerImpl,而消息发送的具体实现,便在DefaultMQProducerImpl中的这个方法内:

private SendResult sendDefaultImpl(//
            Message msg,//
            final CommunicationMode communicationMode,//
            final SendCallback sendCallback, final long timeout//
    ) 

下面就从这个方法开始,让我们一步一步,揭开RocketMQ的神秘面纱。

第一步:找到Topic对应的路由信息。

  TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

寻找Topic路由也没那么简单,我们分析下tryToFindTopicPublishInfo(final String topic)中的逻辑。

首先当然是进行本地查表,本地路由信息存放在topicPublishInfoTable中。但是如果本地没有,则会向NameSrv发起请求,获取路由信息,更新本地路由表。接着再次尝试从本地路由表中获取路由信息。

// 本地查表
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
// 本地缓存中没有,便向NameSrv发起请求,更新本地路由缓存。
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
    this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
    topicPublishInfo = this.topicPublishInfoTable.get(topic);
}

如果比较幸运,从NameSrv上查询到了,此处便会直接返回所找到的路由信息:topicPublishInfo。但是如果Topic事先没有在任何Broker上进行配置,那么Broker在向NameSrv注册路由信息时便不会带上该Topic的路由,所以生产者也就无法从NameSrv中查询到该Topic的路由了。

if (topicPublishInfo.isHaveTopicRouterInfo() || (topicPublishInfo != null && topicPublishInfo.ok())) {
    return topicPublishInfo;
} else {
    // 再次查询Topic路由
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
    topicPublishInfo = this.topicPublishInfoTable.get(topic);
    return topicPublishInfo;
}

对于这种没有事先配置Topic的情况,RocketMQ不会直接抛出错误,而是会走到上面的else分支里,再次调用 updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer),从NameSrv 获取路由信息。

这种做法看似毫无意义,因为前面已经从NameSrv中没有查询到,你再试一次就能查到了?与其瞎猜测,我们倒不如跟进方法内部一探究竟。

   public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
                                                      DefaultMQProducer defaultMQProducer) {
        ......
        TopicRouteData topicRouteData;
        if (isDefault && defaultMQProducer != null) {
            // 查询默认Topic TBW102的路由信息
            topicRouteData =
                    this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(
                            defaultMQProducer.getCreateTopicKey(), 1000 * 3);
        }
        ......
        // 克隆一份,放到路由表中
        TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
        ......
        this.topicRouteTable.put(topic, cloneTopicRouteData);
}

这次调用 updateTopicRouteInfoFromNameServer()时,传入的参数 isDefault 为true,那么代码自然就进入了上面的 if 分支中。这里依旧是调用getDefaultTopicRouteInfoFromNameServer( defaultMQProducer.getCreateTopicKey(),1000*3) 从NameSrv查询Topic路由,不过此次不是查询消息所属Topic的路由信息,而是查询RocketMQ设置的一个默认Topic的路由,该默认Topic为 TBW102 ,这个Topic就是用来创建其他Topic所用的。

如果某Broker配置了 autoCreateTopicEnable,允许自动创建Topic,那么在该Broker启动后,便会向自己的路由表中插入TBW102这个Topic,并注册到NameSrv,表明处理该Topic类型的消息。

所以当消息所属的Topic,暂且叫Topic X吧,它本身没有在任何Broker上配置的时候,生产者就会查询Topic TBW102的路由信息,暂时作为Topic X的的路由,并插入到本地路由表中。当TopicX利用该路由发送到 Broker后,Broker发现自己并没有该Topic信息后,便会创建好该Topic,并更新到NameSrv中,表明后续接收TopicX的消息。

到这里,我们大概了解了获取Topic路由的流程,整理下就是下面几步。

  1. 先从本地缓存的路由表中查询;
  2. 没有找到的话,便向NameSrv发起请求,更新本地路由表,再次查询。
  3. 如果仍然没有查询到,表明Topic没有事先配置,则用Topic TBW102向NameSrv发起查询,返回TBW102的路由信息,暂时作为Topic的路由。

好吧,Topic 的路由已经找到了,我们就要进入消息发送的第二个步骤。

第二步:选择某个Queue用来发送消息

前面提到每个Topic的路由信息中可能包含若干Queue,那么这些Queue是从哪来的呢?不管怎么样,这些Queue的信息肯定是NameSrv返回的。生产者从NameSrv拉取的路由信息为TopicRouteData,我们不妨先来看下它的结构:

public class TopicRouteData extends RemotingSerializable {
    private List<QueueData> queueDatas;
    private List<BrokerData> brokerDatas;
    ......
}

queueDatas 中包含了Topic对应的所有Queue信息,其中QueueData结构如下:

public class QueueData implements Comparable<QueueData> {
    private String brokerName;
    private int readQueueNums;
    private int writeQueueNums;
}

看上面的QueueData结构,好像有点奇怪,里面只是简单的记录了brokerName和两个int类型的QueueNums。

其实对于RokcetMQ来说,Queue是比较抽象的一个概念,并不是说某个具体的队列。Topic、QueueData以及Broker是 1:1:1 的,QueueData本质上是记录某个Topic在某个Broker上的所有路由信息。

  • brokerName:这个很容易理解,Queue所属的Broker;
  • readQueueNums:该Broker上,针对该Topic,配置的读队列个数;
  • writeQueueNums:该Broker上,针对该Topic,配置的写队列个数。

前面的第一步中,当生产者从NameSrv获取到Topic对于的TopicRouteData时,会将其转成TopicPublishInfo,存放在本地路由表中。

// Update Pub info
{
    // 转换成TopicPublishInfo
    TopicPublishInfo publishInfo =
            topicRouteData2TopicPublishInfo(topic, topicRouteData);
    publishInfo.setHaveTopicRouterInfo(true);
    Iterator<Entry<String, MQProducerInner>> it =
            this.producerTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, MQProducerInner> entry = it.next();
        MQProducerInner impl = entry.getValue();
        if (impl != null) {
            // 更新本地路由表
            impl.updateTopicPublishInfo(topic, publishInfo);
        }
    }
}

在topicRouteData2TopicPublishInfo(topic, topicRouteData)内部转化过程中,便会遍历TopicRouteData中的QueueData,按照配置的读写队列个数,生成MessageQueue,存放在本地queue表中。

List<QueueData> qds = route.getQueueDatas();
Collections.sort(qds);

// 开始遍历queueDatas
for (QueueData qd : qds) {
    if (PermName.isWriteable(qd.getPerm())) {
        // 查询QueueData对应的BrokerData
        BrokerData brokerData = null;
        for (BrokerData bd : route.getBrokerDatas()) {
            if (bd.getBrokerName().equals(qd.getBrokerName())) {
                brokerData = bd;
                break;
            }
        }

        if (null == brokerData) {
            continue;
        }
        // 只有Master节点的Broker才能接收消息,对于非Master节点的需要过滤掉
        if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
            continue;
        }

        // 按照QueueData配置的写队列个数,生成对应数量的MessageQueue。
        for (int i = 0; i < qd.getWriteQueueNums(); i++) {
            MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
            info.getMessageQueueList().add(mq);
        }
    }
}

到这里,我相信大家应该清楚了Queue的来源,废话不多说,让我们接着之前的话题:生产者如何选择某个Queue进行消息发送。

回到 sendDefaultImpl()中,当拿到路由信息后,要开始进行消息发送。下面这部分代码我省略了部分。简单点说,主要逻辑就是在消息发送的基础上加上了超时机制重试机制。当选择某个Queue发送消息失败后,只要还没有超时,且没有超出最大重试次数,就会再次选择某个Queue进行重试。

// 在超时时间及重试次数内进行重试
for (; times < timesTotal && (endTimestamp - beginTimestamp) < maxTimeout; times++) {
    String lastBrokerName = null == mq ? null : mq.getBrokerName();
    // 选择某个Queue 用来发送消息
    MessageQueue tmpmq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
    if (tmpmq != null) {
        mq = tmpmq;
        brokersSent[times] = mq.getBrokerName();
        try {
            // 进行消息发送
            sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
            endTimestamp = System.currentTimeMillis();
            ......
        }
        catch (Exception e) {
            endTimestamp = System.currentTimeMillis();
            continue;
        }
    } else {
        break;
    }
} 

if (sendResult != null) {
    return sendResult;
}

由上面代码可知,选择Queue的具体逻辑在topicPublishInfo.selectOneMessageQueue(lastBrokerName)中。这里在调用时传入了lastBrokerName,目前我们还不知道是为了什么,所以进入方法内部看看吧。

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    if (lastBrokerName != null) {
        int index = this.sendWhichQueue.getAndIncrement();
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            int pos = Math.abs(index++) % this.messageQueueList.size();
            MessageQueue mq = this.messageQueueList.get(pos);
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }

        return null;
    }
    else {
        int index = this.sendWhichQueue.getAndIncrement();
        int pos = Math.abs(index) % this.messageQueueList.size();
        return this.messageQueueList.get(pos);
    }
}

我们来分析下上面这段逻辑:

  1. 当lastBrokerName不为空时,将计数器进行自增,再遍历TopicPulishInfo中的MessageQueue列表,按照计数器数值对MessageQueue总个数进行取模,再根据取模结果,取出MessageQueue列表中的某个Queue,并判断Queue所属Broker的Name是否和lastBrokerName一致,一致则继续遍历。
  2. 当lastBrokerName为空时,同样将计数器进行自增,按照计数器数值对MessageQueue总个数进行取模,再根据取模结果,取出MessageQueue列表中的某个Queue,直接返回。

这段逻辑的主要部分就是利用计数器,来进行Queue的负载均衡。另外,聪明的你也一定已经猜到了lastBrokerName的作用了。

当某条消息第一次发送时,lastBrokerName 为空,此时就是直接取模进行负载均衡操作。但是如果消息发送失败,就会触发重试机制,发送失败有可能是因为Broker出现来某些故障,或者某些网络连通性问题,所以当消息第N次重试时,就要避开第N-1次时消息发往的Broker,也就是lastBrokerName。

好了,我们已经了解了选择Queue 的来源及消息发送时Queue的负载均衡以及重试机制。下面让我们来看看消息的核心发送过程。

第三步:消息发送的核心过程

消息的网络传输在 DefaultMQProducerImpl的sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout)中。

接下来就要庖丁解牛,拆解下发送流程了。

首先,要获取Queue所属Broker的地址:

String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
    tryToFindTopicPublishInfo(mq.getTopic());
    brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}

拿到Broker地址后,要将消息内容及其他信息封装进请求头:

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
......

接着调用MQClientAPIImpl的sendMessage()方法:

 SendResult sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
                    brokerAddr,// 1
                    mq.getBrokerName(),// 2
                    msg,// 3
                    requestHeader,// 4
                    timeout,// 5
                    communicationMode,// 6
                    sendCallback// 7
                    );

sendMessage()内部便是创建请求,调用封装的Netty进行网络传输了。

首先创建请求:

RemotingCommand request = null;
if (sendSmartMsg) {
    SendMessageRequestHeaderV2 requestHeaderV2 =
            SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
    request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
}
else {
    request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}

这里按照是否发送 smartMsg ,创建了不同请求命令号的请求,接下来,按照发送方式(单向、同步、异步),调用不同的发送函数:

switch (communicationMode) {
    case ONEWAY:
        this.remotingClient.invokeOneway(addr, request, timeoutMillis);
        return null;
    case ASYNC:
        this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback);
        return null;
    case SYNC:
        return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
    default:
        assert false;
        break;
}

这里解释下三种发送方式。

  • 单向:只管发送,不管是否发送成功;
  • 同步:阻塞至拿到发送结果;
  • 异步:发送后直接返回,在回调函数中等待发送结果。

看到这儿,消息的发送就已经结束了,成功的从生产者传输到了Broker。

四:一个小细节

在我调试的过程中,我发现了一个小细节,在双Master模式下,如果不断的发送某一未提前配置Topic信息的消息时,消息会按照前面所说的,均匀发布到每个Broker上。

[msg1,msg2]-->BrokerA

[msg3,msg4]-->BrokerB

但是如果msg2发送后等到一段时间,再发送msg3,那么该Topic后续的所有消息(msg3、msg4、......)都会发送到BrokerA,BrokerB将不会收到任何消息。

So, Why ?

前面提到,对于未提前配置路由的Topic X,会在消息发送时,先利用默认Topic的路由进行消息发送,Broker收到消息后会创建Topic X,并更新到NameSrv,所以一般流程是这样的。

1:找不到Topic X的路由信息,暂时先用默认Topic的路由[BrokerA,BrokerB]。

1:选择BrokerA发送msg1,发送结束,BrokerA创建Topic X,更新到NameSrv。

2:选择BrokerB发送msg3,发送结束,BrokerB创建Topic X,更新到NameSrv。

3:生产者周期性的从NameSrv拉取路由信息,更新到本地,本地路由表中 Topic X 对应路由为:[BrokerA,BrokerB]。

4:后续的消息发送会从 BrokerA 和 BrokerB 中进行负载。

但是如果msg2发送结束后,等待一段时间再发送msg3,流程就会变成这样。

1:msg1发送结束,BrokerA创建Topic X,更新到NameSrv。

2:生产者暂停消息发送。

3:生产者周期性的从NameSrv拉取路由信息,更新到本地,本地路由表中 Topic X 对应路由为:[BrokerA]。

4:发送msg3时,本地路由表中已经有Topic X 的路由信息,即BrokerA,因此所有的消息都会发往 BrokerA。

哈哈,通过这个小细节,你是否对RocketMQ消息发送的理解更加深刻了呢?

五:结尾

回顾下整个发送流程,无非就是第三节所说的三步:

1:获取路由信息

2:按照负载均衡方式,选择路由

3:根据选择出的路由,发送消息到Broker。

看似简单的三步,其实包含了很多细节,如 :

  • Topic 没有提前配置的情况下如何进行路由选择?
  • 消息发送失败的情况下如何重试?

生产者的使命到这已经完成,消息投递到Broker后,Broker又将如何处理并存储消息呢?

对 RocketMQ 感兴趣的同学,如果觉得分析的还不错,别忘记点个关注 。后面的文章,我将和大家一起,继续剖析 RocketMQ 源码。

欢迎大家点个赞,关注下!

文章链接:

汪先生:RocketMQ源码分析之服务发现

汪先生:RocketMQ源码分析之消息发送

汪先生:RocketMQ源码分析之消息存储

汪先生:RocketMQ源码分析之消息刷盘

汪先生:RocketMQ源码分析之ConsumeQueue

听说喜欢点关注的同学都长得帅


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK