4

【轻松上手 RocketMQ 专栏】延迟消息源码解析与场景分析

 2 years ago
source link: https://my.oschina.net/apacherocketmq/blog/5276536
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
【轻松上手 RocketMQ 专栏】延迟消息源码解析与场景分析

这一讲,我们主要来讲延迟消息。

这一次我们结合业务来讲。

在电商中,下单后,有一些用户付款后,会主动退款。也有一些用户下单后,未付款。但是这部分未付款的订单,会占用着商品库存。

我们目前的电商App,下单后,会在订单表创建对应的订单数据。这些订单的状态,有一些是未付款的,但是未付款的订单占用着商品库存。为了让商品库存能正常恢复,我们现在的处理方案是:

  • 启动一个定时任务,每30分钟,定时扫描一遍订单表
  • 如果订单是已付款,则跳过,不处理
  • 如果订单是未付款,但未超过30分钟,不处理
  • 如果订单是未付款,且超过30分钟,就取消订单 (补充:取消订单,其实就是下单的逆向流程) 39ce110d-a519-4bd4-97dc-e281ed309457.png

这个方案有什么缺点?

  • 第一,每次定时任务去扫描全部订单,但是订单未付款且超时30分钟的只有一小部分。就是做很多无用功。
  • 第二,如果订单表的数量超级超级大,这个时候,扫描的时间巨长,浪费cpu资源。
  • 第三,这样子频繁查询数据库,给数据库造成很多不必要的压力。

那针对上述的缺点,我们有没有好的解决方案:

  • 第一,避免扫描全表
  • 第二,谁没付款,就去取消谁,不要做多余的动作
  • 第三,要保证近实时取消订单。(近实时:1s左右)

说了这么多,我摊牌了,不装了,就是为了引入RocketMQ的延迟消息4abf446c-34c1-4760-8615-f4242d8d3e82.png

简单总结一下:创建订单的时候,发送一条延时30分钟的消息。到30分钟后,消费者拿到信息,再去判断订单是否已付款,如果付款就跳过不处理,没付款,那就取消订单。

这种方案:没有多余的扫描数据库操作;谁没付款,就去取消谁。多好呀!在生产上,赶紧用起来。

上面,介绍的都是方法论,下面就是具体的实操环节了。

下面,简单用一个demo介绍一下生产者

public class Producer {
    public static void main(String[] args) throws Exception{
        //生产者组
        DefaultMQProducer producer = new DefaultMQProducer("delay_producer_group");

//设置nameserver
        producer.setNamesrvAddr("localhost:9876");
        //启动生产者
        producer.start();

//构建消息
        Message message = new Message("delayTopic","TagA","delayMessage".getBytes(RemotingHelper.DEFAULT_CHARSET));
        // 重点:设置延迟级别
        message.setDelayTimeLevel(3);
        // 发送消息
        SendResult sendResult = producer.send(message);
        // 打印发送结果
        System.out.println("发送结果:"+sendResult);
        // 关闭生产者
        producer.shutdown();
    }
}

这里强调一下,不是延迟发送哈,是延迟消费。发送是立马就发送的,只是消费的时候,延迟30分钟。

补充知识点

延迟级别是从1开始的,不是从0开始。然后你可能会发现,最多延迟2小时。如果你想延迟3小时,对不起,RocketMQ不支持。告辞!!!80ed6b4b-8f6b-4317-b026-1a3f2c238d54.png

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 消费者组
        DefaultMQPushConsumer consumer =  new DefaultMQPushConsumer("delay_consumer_group");
        //注册nameserver
        consumer.setNamesrvAddr("localhost:9876");

// 订阅主题
        consumer.subscribe("delayTopic","TagA");

// 开启消费offset
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);


        //监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (int i = 0; i < list.size(); i++) {
                    MessageExt messageExt = list.get(i);
                    String msg = new String(messageExt.getBody());
                    //这里主要打印延迟时间≈当前时间-消息的生产时间
                    System.out.println(msg+" 时间="+(System.currentTimeMillis()-messageExt.getBornTimestamp()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

consumer.start();
    }
}

总结:延迟消费者和普通的消费者相同,一毛一样。延迟消息的核心点:生产者多了一个延迟级别。

知其然知其所以然

上面,你已经知道怎么使用了。

如果面试官问你:RocketMQ的延迟消息底层原理是什么?

那你接着看下去。

dcf58ebc-0ea0-4448-9d96-06e7eec12722.png

看图说话。

  • 第一,生产者发送的消息,因为带了延迟级别,因此会被分发到叫SCHEDULE_TOPIC_XXXX的Topic中。里面有18的队列,一个队列对应着一个延迟级别。比如queueId=delayLevel-1。

  • 第二,定时器,每100毫秒,扫描所有延迟级别里面的延迟消息message,如果消费时间已经大于当前时间,那定时器就会把延迟消息message,发送到真正的topic(就是代码写的topic,比如上面代码的:delayTopic),根据负载均衡策略,把message发送到具体某个队列。

  • 第三,有消息后,消费者进行消息和后续处理。

上面这里,是一个总体流程图。

然后,我们对照代码,来进一步深刻认识一下。其实,就是加深理解。

第一步:生产者发送的消息到SCHEDULE_TOPIC_XXXX的topic

org.apache.rocketmq.store.CommitLog#putMessage
        //真正的topic
        String topic = msg.getTopic();
        //真正的队列Id
        int queueId = msg.getQueueId();

final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // 延迟级别大于0
            if (msg.getDelayTimeLevel() > 0) {
                // 如果延迟级别大于最大延迟级别,那就把延迟级别设置为最大延迟级别
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
                // 延迟topicSCHEDULE_TOPIC_XXXX
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                // 根据延迟级别,获取队列id
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

// Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
                // 消息的topic设置为延迟topic,不是设置真正的topic
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

省略部分封装msg的代码..
            //最后把msg追加到mappedFile上,mappedFile这个后续再讲,在这里你把它当做一个文件即可
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);

第二步:定时器扫描信息

  • 1,org.apache.rocketmq.store.schedule.ScheduleMessageService#start
public void start() {
        //通过AtomicBoolean 来确保 有且仅有一次执行start方法
        if (started.compareAndSet(false, true)) {
            this.timer = new Timer("ScheduleMessageTimerThread", true);
            // 遍历所有 延迟级别
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                // key为延迟级别
                Integer level = entry.getKey();
                // value 为 毫秒数
                Long timeDelay = entry.getValue();
                // 根据延迟级别 ,获取对应的offset
                Long offset = this.offsetTable.get(level);
                //
                if (null == offset) {
                    offset = 0L;
                }
                // 为每个延迟级别创建定时任务,开始执行定时任务,1S后开始执行
                if (timeDelay != null) {
                    // 第二步:具体核心执行逻辑在DeliverDelayedMessageTimerTask-->executeOnTimeup()
                    this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
                }
            }
            // 延迟10秒后执行定时任务,flushDelayOffsetInterval=10s,周期也是10秒执行一次
            this.timer.scheduleAtFixedRate(new TimerTask() {

@Override
                public void run() {
                    try {
                        //持久化每个队列的消费offset
                        if (started.get()) ScheduleMessageService.this.persist();
                    } catch (Throwable e) {
                        log.error("scheduleAtFixedRate flush exception", e);
                    }
                }
            }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
        }
    }

2,org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup

public void executeOnTimeup() {
            //根据延迟级别和topic:RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";来找到对应的ConsumeQueue
            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));
            // 消费偏移量
            long failScheduleOffset = offset;

if (cq != null) {
                // 根据消费偏移量从消息队列中获取所有有效消息
                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
                if (bufferCQ != null) {
                    try {
                        long nextOffset = offset;
                        int i = 0;
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        // 遍历所有消息
                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            // 获取消息的物理偏移量
                            long offsetPy = bufferCQ.getByteBuffer().getLong();
                            // 获取消息的物理长度
                            int sizePy = bufferCQ.getByteBuffer().getInt();
                            long tagsCode = bufferCQ.getByteBuffer().getLong();


                            //当前时间
                            long now = System.currentTimeMillis();
                            //消费时间
                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
                            //下一个偏移量
                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                            //如果消费时间<当前时间,说明应该被消费了
                            long countdown = deliverTimestamp - now;

if (countdown <= 0) {
                                //根据物理偏移量和长度,获取消息
                                MessageExt msgExt =
                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                        offsetPy, sizePy);

if (msgExt != null) {
                                    try {
                                        //构建真正 的消息
                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);

// 重新把消息发送到真正的消息队列上
                                        PutMessageResult putMessageResult =
                                            ScheduleMessageService.this.writeMessageStore
                                                .putMessage(msgInner);
                                  ...省略一堆不太重要的代码
                                       }
            //这里又重新添加一个新的任务,这次是100毫秒
            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                failScheduleOffset), DELAY_FOR_A_WHILE);
        }

第三步: 消费者后续处理(略)

最后用一张图来总结

cd6aaecf-bfe4-40e3-8b4e-556f345f61b0.png

好了,写完了,下期见,拜拜。

有问题的话,欢迎留言交流。

RocketMQ不支持自定义延迟时间,那Kafka支持延迟消息吗?如果支持,支持自定义延迟时间吗?如要你实现自定义延迟时间,你会怎么实现?说说你的思路


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK