11

RocketMQ Consumer 深入学习

 3 years ago
source link: https://www.sevenyuan.cn/2020/11/18/jms/2020-11-18-RocketMQ-Learning-Client/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

RjUrY3.jpg!mobile

消息消费有两种模式:

meAR3af.png!mobile

1、并发消费

并发消费是默认的处理方法,一个消费者使用线程池技术,可以并发消费多条消息,提升机器的资源利用率。默认配置是 20 个线程,所以一台机器默认情况下,同一瞬间可以消费 20 个消息。

其中 ConsumeMessageConcurrentlyService 的构造函数如下:

public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
MessageListenerConcurrently messageListener){
        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
        this.messageListener = messageListener;

        this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
        this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
        this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();

        this.consumeExecutor = new ThreadPoolExecutor(
            this.defaultMQPushConsumer.getConsumeThreadMin(),
            this.defaultMQPushConsumer.getConsumeThreadMax(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.consumeRequestQueue,
            new ThreadFactoryImpl("ConsumeMessageThread_"));

        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
        this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
    }

2、顺序消费

有些业务场景,消息的消费需要顺序性,例如购物时,下订单、库存校验、支付、发送物流,虽然都属于「购物」这个场景的子任务,但他们之间是有顺序性的。如果它们业务处理通过消息解耦,那消息消费也得要有顺序性。

RocketMQ 的做法就是分区有序性,首先需要发送者,将有顺序的消息发往 Topic 下同一个 MessageQueue ,然后消费者,顺序地一个一个进行消费,消费失败将会一直重试,前面消息消费完成才能进行下一个,所以需要在业务上确保消息失败机制,避免消息阻塞。

7JVraer.png!mobile

幂等消费

RocketMQ 的设计中,是不保证消息的幂等性,这时候需要业务方自行保证,重复消费消费不会对数据造成影响,从数学意义上来说, f(x) = f(f(x)) ,多次计算的结果都是一致的。

RocketMQ 保证存储在 Broker 的消息最少投递一次,该特性保证消息一定会被消费,但由于网络抖动或者其它场景,导致一条消息可能被消费多次。

在相同业务类型的消息中,这里需要考虑两个场景

  • 并发消费
  • 消息消费超时后重复投递

第一个场景很好理解,一条相同类型的消息被不同的消费者同时拉取,可能是不同发送者同时发送的,例如喜闻乐见的 A B 转账问题。

第二个场景比较难遇到,默认情况,消息处理超过 15 分钟后,将会重新投递消费,如果原来服务器 A 还在处理中,重新投递的消息被服务器 B 拉取了;另一种就是手动重发消息,通过控制台可以重新发送一模一样的消息, MessageID 和消息体跟之前一样,这两种情况下也会造成消息重复消费。

于是设计上,考虑了使用 Redis 做分布式锁,通过竞争锁来避免同时消息,以及用 Redis 暂存消费状态,设计如下:

UjMbqem.png!mobile

注意点:

1、锁资源 key 的组装规则(【消费组】+【:】+【主题 topic】+【:】+【messageId 或者 messageKey】

2、锁对应的状态流转(Processing or Successed)

3、避免处理耗时超过锁 expire 时间,导致其它服务器订阅消息并成功消费。加入一个定时线程池,抢到锁资源后,组装定时任务,进行【续时】

4、任务成功后,修改状态为【Successed】,失效时间订为 1h;失败情况,清理掉所有锁资源和定时任务,返回失败重试策略

5、根据 Redis 中保存的状态,过滤重复的消息

在消息 SDK 代码实现上,通过装饰器模式,将 MessageConsumer 包装起来,在业务逻辑不需改动太大情况下,动态增加了幂等消费的功能。

负载均衡

mymUniv.png!mobile

上图展示了,在一个 pullRequestQueue ,可能获取到多个消息 MessageExt ,然后每个消息将会进入消费线程池中消费。

Consumer 端使用 RebalanceImpl 来实现负载均衡,所以想要理解拉取消息的流程,需要重点查看它实现。

Consumer 实例启动时,在工厂 MQClientInstance 中能够看到 new RebalanceService(this); ,启动了一个后台线程,每隔 20s 进行重平衡操作 mqClientFactory.doRebalance()

同样按照消费者的消费模式,重平衡逻辑处理分成两个 switch 分支,接下来讨论的是『并发消费』逻辑

一、获取 MessageQueue 列表

RebalanceImpl 维护了一份 map 结构的本地缓存 topicSubscribeInfoTable ,以 topic 维度保存了对应的 MesssageQueue 列表

Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);

二、获取在线的消费者终端列表

List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

findConsumerIdList 方法接受两个参数:Topic 主题和 ConsumerGroup 消费组

底层通过发送 RequestCode.GET_CONSUMER_LIST_BY_GROUP 请求码的 RemotingCommandBroker 查询在线消费者列表,拿到结果后反序列化

三、分配 MessageQueue

根据分配策略,确定当前消费者实例要从哪些 MessageQueue 获取消息

List<MessageQueue> allocateResult = 
    strategy.allocate(this.consumerGroup,
                      this.mQClientFactory.getClientId(),
                      mqAll,
                      cidAll);

neAjYv.png!mobile

默认分配策略是平均分配,取当前下标 index ,队列数取余机器数 mod ,然后按照区间给当前应用分配。

例如有 8 个队列,2 台在线服务器,那平均消费 4 个队列,4 4 分配;3 台服务器的,按照 3 3 2 分配。

注意:

目前遇到很多业务团队,在开发过程中,使用了相同的分组名,但是订阅信息不一致,例如之前已经部署了两台应用,本期开发时,新增了 Topic 后,反馈有些消息无法消费,查看 Topic 消费情况表现如下:

fIbiQjQ.png!mobile

根本原因就是前面说的 MessageQueue 平均分配后,之前的应用没有订阅新 Topic ,于是这些消息状态一直处于 Not Consumed Yet 解决方法就是统一订阅信息或者更换 ConsumerGroup 进行测试。

四、刷新本地缓存 & 构建请求列表

接下来,会根据前面分配的消息队列,来构建获取消息的请求 pullRequest 队列

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder){
        boolean changed = false;
        Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<MessageQueue, ProcessQueue> next = it.next();
            MessageQueue mq = next.getKey();
            ProcessQueue pq = next.getValue();
            ....
        }
        
        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        for (MessageQueue mq : mqSet) {
            if (!this.processQueueTable.containsKey(mq)) {
                this.removeDirtyOffset(mq);
                ProcessQueue pq = new ProcessQueue();
                long nextOffset = this.computePullFromWhere(mq);
                if (nextOffset >= 0) {
                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                    PullRequest pullRequest = new PullRequest();
                    pullRequest.setConsumerGroup(consumerGroup);
                    pullRequest.setNextOffset(nextOffset);
                    pullRequest.setMessageQueue(mq);
                    pullRequest.setProcessQueue(pq);
                    pullRequestList.add(pullRequest);
                    changed = true;
                    }
                } 
            }
        }
    }
    this.dispatchPullRequest(pullRequestList);
    return changed;
}

@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList){
    for (PullRequest pullRequest : pullRequestList) {
        this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
    }
}

public void executePullRequestImmediately(final PullRequest pullRequest){
    this.pullRequestQueue.put(pullRequest);
}

updateProcessQueueTableInRebalance 的作用:更新订阅关系

①、消费节点上下线

②、 Topic 的队列分区参数调整

以上两种行为,将会影响到消息订阅的分配,所以需要客户端在消费消息前,先确定自己被分配到哪几个 MessageQueue ,在构建 PullRequest 时,参数中带上监听的 queueId

最后,为过滤后的消息队列集合(mqSet)中的每个 MessageQueue 创建一个 ProcessQueue 对象,并存入 RebalanceImplprocessQueueTable 队列中。

接着构建 PullRequest ,并调用 dispatchPullRequest 方法,将拉取消息的请求放入到 pullRequestQueue 队列中,等待后面的 PullMessageService 取出来调用。

五、后台线程不停从 Broker 拉取消息

后台线程是: PullMessageService

org.apache.rocketmq.client.impl.consumer.PullMessageService#run
@Override
public void run(){
    while (!this.isStopped()) {
        try {
            PullRequest pullRequest = this.pullRequestQueue.take();
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }
}

pullRequestQueue 请求队列,就是前面重平衡服务,构建好放入该队列中的,然后在 PullMessageService 中的 run 方法,使用 while 死循环,不停的去 Broker 请求新消息

六、消息消费

在获取消息时,会注册一个回调接口,具体入口在 MQConsumerInner ,然后在 PullCallback 里调用 messageListener 进行消费,也就是我们写的业务处理逻辑。

在正常消费完成后,将 pullRequest 重新放回拉取消息的任务队列中,等待 PullMessageService 的下一次获取,拉取新消息。

正常消费,业务处理没有异常的话,将会返回 ConsumeReturnType.SUCCESS 表示成功确认,消费位点也能继续前进。

消费失败将会触发补偿机制

  • ConsumeMessageConcurrentlyService

并发模式下,它会将消息投递到 %retry 队列,更新当前位点,让后面的消息继续消费,如果该消息一直失败,默认最多重试 16 次就会丢到死信队列中。

  • ConsumeMessageOrderlyService

顺序模式需要注意下,出现失败它不会投递到重试队列,而是将一直在本地重试,直到消费成功为止,所以有可能出现某个 MessageQueue 消费卡住,并且后面消息都不能消费的场景,注意捕获业务处理异常。

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
public void pullMessage(final PullRequest pullRequest){
    final ProcessQueue processQueue = pullRequest.getProcessQueue();
    ...
    final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
    // 构建回调接口
    PullCallback pullCallback = new PullCallback() {
        @Override
        public void onSuccess(PullResult pullResult){
            if (pullResult != null) {
                pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData);
                switch (pullResult.getPullStatus()) {
                    case FOUND:
                        ...
                        // 这一步消息消费,进入设定的消费逻辑 messageListener
                        if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        } else {
                            firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
                            boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                            DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume);
                            // 是否稍后才消费,重新扔回到请求队列中
                            if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                            } else {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            }
                        }
                        break;
            ...
                    default:
                        break;
                }
            }
        }
        @Override
        public void onException(Throwable e){
            DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        }
    };
    ...
    try {
        // 从 Broker 端获取消息
        this.pullAPIWrapper.pullKernelImpl(
            pullRequest.getMessageQueue(),
            subExpression,
            subscriptionData.getExpressionType(),
            subscriptionData.getSubVersion(),
            pullRequest.getNextOffset(),
            this.defaultMQPushConsumer.getPullBatchSize(),
            sysFlag,
            commitOffsetValue,
            BROKER_SUSPEND_MAX_TIME_MILLIS,
            CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
            CommunicationMode.ASYNC,
            pullCallback
        );
    } catch (Exception e) {
        this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
    }
}

获取消息的这个方法中,有两个核心部分

①、构建消费回调函数

②、从 Broker 端获取新消息

回调接口中,设定了对新消息的处理逻辑,包括顺序消息的特殊处理,还有是否需要等待一段时间才消费,真正执行业务方设定的消费逻辑在 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest)DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest 中。

然后将回调函数作为参数,放入 this.pullAPIWrapper.pullKernelImpl 方法中,接收消息后,执行回调函数来处理消息。

到这一步为止,从消息获取到消息消费,执行本地业务逻辑的基本流程就基本了解清楚,后面的状态确认以及位点 offset 更新,感兴趣的可以再去跟踪一下。

小结

消费者的深入学习分成以下几部分

  • 消费模式
  • 幂等消费概念
  • 负载均衡

记录了并发模式和广播模式的区别,使用上需要注意的细节。

跟大家分享了一下在原生 RMQ 不支持幂等消费,同时不需要业务方做过多改造的情况下,通过封装 SDK,在里面实现幂等消费的方案。

最后梳理了一下消费者如何重平衡、构建拉取消息的请求最后消费消息的代码过程。

其中还有很多细节点还需要去了解,例如重平衡 doReblance 阶段,出现服务器上下线,还处于消费的 MessageQueue 如何处理(看了一下有加锁 lock 操作,避免两台服务器同时操作同一个队列)的代码如何实现等等

最后,MQ 的学习之旅,从点到面,还需要继续学习,之后再见:wave:~

PS:分类总结排版可以去看语雀笔记 https://www.yuque.com/books/share/2167ed0a-b9c9-4e1f-b648-1031b36cd144?# 《RocketMQ》


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK