16

RocketMQ Producer 深入学习

 3 years ago
source link: https://www.sevenyuan.cn/2020/10/31/jms/2020-10-31-RocketMQ-Learning-Producer/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

nUj63uR.jpg!mobile

byyYFvn.png!mobile

主要分为两个流程:

1.1 正常消息的发送、提交

(1) producer 发送 Half 消息

(2) broker 本地写入 Half 消息(将 Topic 改成 RMQ_SYS_TRANS_HALF_TOPIC,该阶段 Consumer 由于没有订阅关系,无法消费)

(3) producer 根据 broker 写入消息结果,成功的话,执行本地事务;写入消息失败,producer 不执行本地事务

(4) 根据本地事务结果,往 broker 发送 Commit 或者 Rollback(如果是 Commit,将会将 Half 消息转回 Real Topic,生成消息索引,订阅者可以进行消费)

1.2 补偿流程:

(1) 半消息发送成功,但 broker 没收到 Commit 或 Rollback,进行状态回查(上图的第五步)

(2) Producer 收到回查消息,检查本地事务状态

(3) 根据本地事务状态,重新发送 Commit 或者 Rollback

补偿阶段用于解决消息 Commit 或者 Rollback 发送超时或者失败的情况

如果使用了事务消息,业务方需要实现该接口:

org.apache.rocketmq.client.producer.TransactionListener
public interface TransactionListener{
    /**
* 当发送 Half 消息成功后,执行本地事务
*/
    LocalTransactionStateexecuteLocalTransaction(final Message msg, final Object arg);

    /**
* 如果 Commit 或者 Rollback 发送丢失,broker 进行消息回查,检查本地事务状态,重新发送 Commit 或 Rollback
*/
    LocalTransactionStatecheckLocalTransaction(final MessageExt msg);
}

2、如何选择发送目的地(MessageQueue)

手动指定 MessageQueue 目的:分区有序性,实现顺序消费

例如 Topic 创建策略,默认分配单 Broker 上分配 8 个 MessageQueue。

Producer 在了解到要发往的 Topic 有 8 个消息队列,默认情况将会轮询发送,尽量让每个队列存储到的消息数量一致。

为了实现顺序消费,需要 Producer 在发送的时候,指定发送的目的地 - 特定的 MessageQueue,这时需要指定选择分区策略(MessageSelector)以及特定的分区键入参(arg)

可以查看具体的发送选择分区逻辑:

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl
private SendResult sendSelectImpl(Message msg, MessageQueueSelector selector, Object arg,final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout){
    long beginStartTime = System.currentTimeMillis();
    this.makeSureStateOK();
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

    MessageQueue mq = null;
    List<MessageQueue> messageQueueList =
        mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
    Message userMessage = MessageAccessor.cloneMessage(msg);
    String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
    userMessage.setTopic(userTopic);
    
    // 这一步 selector.select(messageQueueList, userMessage, arg),选出特定的 MessageQueue
    mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
    long costTime = System.currentTimeMillis() - beginStartTime;
    // 发送
    return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
}

待补充

分组的作用,例如消费者分组 consumerGroup 是为了消息消费的负载均衡,区分不同的消费者

生产者分组 producerGroup 的作用是为了 事务消息 的回查,根据分组进行二次确认,后续相关内容需要深入研究…


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK