22

跟我学RocketMQ之拉模式消费的两种方式

 3 years ago
source link: http://wuwenliang.net/2020/02/20/%E8%B7%9F%E6%88%91%E5%AD%A6RocketMQ%E4%B9%8B%E6%8B%89%E6%A8%A1%E5%BC%8F%E6%B6%88%E8%B4%B9%E7%9A%84%E4%B8%A4%E7%A7%8D%E6%96%B9%E5%BC%8F/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

今天我们聊聊RocketMQ基于拉模式的两种消费方式。

对于消费而言,RocketMQ提供了推拉两种方式,我们常用的是基于长轮询的DefaultPushConsumer,它具有实时性好,易开发等特点。

但同时由于是长轮询,因此在大量消息消费的场景下,可能导致broker端CPU负载较大等情况,因此我们会在这种情况下选择使用拉模式的

PullConsumer或者MQPullConsumerScheduleService+PullTaskCallback这两种方式进行更为灵活的消费。

PullConsumer消费示例代码

首先我们看下基于PullConsumer方式如何进行消费。这里引用官方的样例代码进行说明:

public class PullConsumer {

我们定义了一个Map,key为指定的队列,value为这个队列拉取数据的最后位置,保存每个对列的拉取进度(offset),这里只是用作示例。实际开发中,这里需要基于持久化到Redis或者MySQL等外部存储设施中。

//Map<key, value>  key为指定的队列,value为这个队列拉取数据的最后位置
private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();

public static void main(String[] args) throws MQClientException {

首先需要定义消费者组,实例化一个DefaultMQPullConsumer消费者对象,标记消费者组。

为消费者设置NameServer地址,保证消费者客户端能够从NameServer获取到broker地址,从而执行消息消费流程。

String group_name = "test_pull_consumer_name";
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group_name);
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
consumer.start();
System.err.println("consumer start");

通过consumer.fetchSubscribeMessageQueues(TOPIC)方法获取指定TOPIC下的所有队列,默认有4个。

//    从TopicTest这个主题去获取所有的队列(默认会有4个队列)
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("test_pull_topic");

对获取到MessageQueue集合进行遍历,拉取数据并执行具体的消费过程。

//    遍历每一个队列,进行拉取数据
for (MessageQueue mq : mqs) {
    System.out.println("Consume from the queue: " + mq);

通过while(true) 不间断地从队列中拉取数据。默认情况下,每次拉取32条,这里需要显式地传入拉取开始offset,通过getMessageQueueOffset(mq)方法获取,从我们持久化的设施中得到对应MessageQueue的拉取进度(可以认为是消费进度)。

拉取结束后,在持久化设施(这里是一个Map)保存下次拉取的开始offset,也就是本次拉取结束的下一个offset。(通过pullResult.getNextBeginOffset()获取)

while (true) {
    try {
        //    从queue中获取数据,从什么位置开始拉取数据 单次对多拉取32条记录
        PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
        System.out.println(pullResult);
        System.out.println(pullResult.getPullStatus());
        System.out.println();
        putMessageQueueOffset(mq, pullResult.getNextBeginOffset());

从pullResult拉取结果中获取拉取状态,如果是FOUND则表明消息拉取成功;获取消息列表,并循环进行消费。其余均认为未拉取到消息,不做处理。

                switch (pullResult.getPullStatus()) {
                    case FOUND:
                        List<MessageExt> list = pullResult.getMsgFoundList();
                        for(MessageExt msg : list){
                            System.out.println(new String(msg.getBody()));
                        }
                        break;
                    case NO_MATCHED_MSG:
                        break;
                    case NO_NEW_MSG:
                        System.out.println("没有新的数据啦...");
                        break SINGLE_MQ;
                    case OFFSET_ILLEGAL:
                        break;
                    default:
                        break;
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    consumer.shutdown();
}

拉取结束之后,手动显式调用该方法,刷新对应队列MessageQueue的拉取进度;

private static void putMessageQueueOffset(MessageQueue mq, long offset) {
    offseTable.put(mq, offset);
}

获取对应MessageQueue的消息消费进度Offset

    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = offseTable.get(mq);
        if (offset != null)
            return offset;
        return 0;
    }

}

小结PullConsumer消费方式

从上述代码样例中可以看出,PullConsumer方式需要我们显式地存储消费进度,并且在消费过程中要根据情况进行消费进度的更新与存储。

如果开发者稍有不慎,忘记保存offset,则每次都会从第一条进行拉取,这样很容易造成消息重复。如果是生产环境,则后果不忍想象。

另外,我们还需要通过额外的存储手段对offset进行保存,并且尽量保证该设施的稳定可靠,否则还是会引起重复消费的问题。

基于此,我建议使用MQPullConsumerScheduleService+PullTaskCallback这种消费方式,那它具体如何使用呢?

MQPullConsumerScheduleService+PullTaskCallback消费方式

基于上述分析的PullConsumer使用一些不便之处,我这里建议使用MQPullConsumerScheduleService+PullTaskCallback方式进行消费。我们还是按照习惯方式,直接上代码。

step1: 声明并实例化一个MQPullConsumerScheduleService对象,通过构造方法传递消费者组;

String group_name = "test_pull_consumer_name";

final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService(group_name);

为消费者设置NameServer地址,以便能够获取broker地址,开启消费过程。

scheduleService.getDefaultMQPullConsumer().setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);

设置消费方式为集群模式;

scheduleService.setMessageModel(MessageModel.CLUSTERING);

step2: 调用registerPullTaskCallback(topic, pullTaskCallback) ,将开发者实现的PullTaskCallback消息拉取实现类注册给MQPullConsumerScheduleService。并绑定到指定的topic下;

scheduleService.registerPullTaskCallback("test_pull_topic", new PullTaskCallback() {

step3: 开发者需要实现PullTaskCallback的doPullTask消息拉取回调方法,这里使用匿名内部类的方式。如果是Spring项目,我们可以定义一个Bean实现PullTaskCallback接口,并将该Bean的引用设置到一个实例化好的MQPullConsumerScheduleService对象中。

@Override
public void doPullTask(MessageQueue mq, PullTaskContext context) {

通过 PullTaskContext上下文获取到消息拉取实例对象MQPullConsumer;

MQPullConsumer consumer = context.getPullConsumer();
System.err.println("-------------- queueId: " + mq.getQueueId()  + "-------------");
try {

获取当前的消费进度,即:从哪儿开始消费,如果offset小于0则指定从0开始。

// 获取从哪里拉取
long offset = consumer.fetchConsumeOffset(mq, false);
if (offset < 0)
    offset = 0;

从对应的offset拉取指定数量的消息,默认32条,返回结果为PullResult。

通过pullResult.getPullStatus()判断拉取结果,如果为FOUND,则开始消费流程;其他状态不做处理。

PullResult pullResult = consumer.pull(mq, "*", offset, 32);
switch (pullResult.getPullStatus()) {
    case FOUND:
        List<MessageExt> list = pullResult.getMsgFoundList();
        for(MessageExt msg : list){
            //消费数据...
            ystem.out.println(new String(msg.getBody()));
        }
        break;
    case NO_MATCHED_MSG:
        break;
    case NO_NEW_MSG:
    case OFFSET_ILLEGAL:
        break;
    default:
        break;
}

step4: 重点来了,这里通过调用updateConsumeOffset,更新消费进度,将下次消费开始时的offset更新到broker。并不需要客户端本地保存消费进度。

设置下次拉取时间,定时进行拉取调度。

    consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
    // 设置再过3000ms后重新拉取
    context.setPullNextDelayTimeMillis(3000);

}
...省略catch块...

启动拉取过程。

        scheduleService.start();
    }
}

小结MQPullConsumerScheduleService消费方式

从代码中我们能够清晰的看出,MQPullConsumerScheduleService的优势:

  1. MQPullConsumerScheduleService基于定时任务,消费端能够灵活控制拉取频率
  2. MQPullConsumerScheduleService支持提交消费进度到broker,不需要消费端进行保存
  3. MQPullConsumerScheduleService本身基于PullConsumer,定制化程度高,使用起来不易出错

可以说,MQPullConsumerScheduleService既保留了PullConsumer的优势,还对其进行了一定程序的增强;通过直接提交消费offset到broker,降低了客户端的开发量,较少了消费重复的风险。

因此笔者提倡在实际开发中,使用MQPullConsumerScheduleService进行拉模式的消息消费。

这里做个小预告,在后续的文章中,笔者将对MQPullConsumerScheduleService消费方式源码实现进行解析,请拭目以待。

为了方便读者和笔者后续开发,这里贴出两种方式的完整源码实现,以略去重读文章的繁琐:

PullConsumer样例

public class PullConsumer {
    //Map<key, value>  key为指定的队列,value为这个队列拉取数据的最后位置
    private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();

    public static void main(String[] args) throws MQClientException {

        String group_name = "test_pull_consumer_name";
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group_name);
        consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
        consumer.start();
        System.err.println("consumer start");
        //    从TopicTest这个主题去获取所有的队列(默认会有4个队列)
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("test_pull_topic");
        //    遍历每一个队列,进行拉取数据
        for (MessageQueue mq : mqs) {
            System.out.println("Consume from the queue: " + mq);

            SINGLE_MQ: while (true) {
                try {
                    //    从queue中获取数据,从什么位置开始拉取数据 单次对多拉取32条记录
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.println(pullResult);
                    System.out.println(pullResult.getPullStatus());
                    System.out.println();
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            List<MessageExt> list = pullResult.getMsgFoundList();
                            for(MessageExt msg : list){
                                System.out.println(new String(msg.getBody()));
                            }
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            System.out.println("没有新的数据啦...");
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        consumer.shutdown();
    }


    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        offseTable.put(mq, offset);
    }


    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = offseTable.get(mq);
        if (offset != null)
            return offset;
        return 0;
    }

}

MQPullConsumerScheduleService代码样例

public class PullScheduleService {

    public static void main(String[] args) throws MQClientException {

        String group_name = "test_pull_consumer_name";

        final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService(group_name);

        scheduleService.getDefaultMQPullConsumer().setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);

        scheduleService.setMessageModel(MessageModel.CLUSTERING);

        scheduleService.registerPullTaskCallback("test_pull_topic", new PullTaskCallback() {

            @Override
            public void doPullTask(MessageQueue mq, PullTaskContext context) {
                MQPullConsumer consumer = context.getPullConsumer();
                System.err.println("-------------- queueId: " + mq.getQueueId()  + "-------------");
                try {
                    // 从哪里拉取
                    long offset = consumer.fetchConsumeOffset(mq, false);
                    if (offset < 0)
                        offset = 0;

                    PullResult pullResult = consumer.pull(mq, "*", offset, 32);
                    switch (pullResult.getPullStatus()) {
                    case FOUND:
                        List<MessageExt> list = pullResult.getMsgFoundList();
                        for(MessageExt msg : list){
                            //消费数据
                            System.out.println(new String(msg.getBody()));
                        }
                        break;
                    case NO_MATCHED_MSG:
                        break;
                    case NO_NEW_MSG:
                    case OFFSET_ILLEGAL:
                        break;
                    default:
                        break;
                    }
                    consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
                    // 设置再过3000ms后重新拉取
                    context.setPullNextDelayTimeMillis(3000);

                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        scheduleService.start();
    }
}




版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。



About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK