6

rocketmq-client-go 消费MessageQueue消息失败

 5 months ago
source link: https://vearne.cc/archives/40045
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

rocketmq-client-go 消费MessageQueue消息失败

版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc

前几天突然收到线上rocketmq报警,rocketmq队列中的消息有堆积。

打开rocketmq的管理界面,发现broker-4:MessageQueue[4] 上没有消费者,所以消息一直在堆积。重启client,问题仍然无法解决。

5801811a-1a4c-11ee-8aa3-5626e1cdcfe1.jpeg

查看client的日志,萌叔发现有如下可疑的日志

time="2023-07-01T23:03:28+08:00" level=error 
msg="fetch offset of mq from broker error" 
MessageQueue="MessageQueue [topic=account-to-redis, brokerName=broker-4, queueId=3]"
consumerGroup=CG-account 
underlayError="broker response code: 22, remarks: Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first"

“fetch offset of mq from broker error” 这应该就是导致broker-4:MessageQueue[4]一直无法被消费的原因

2. 解决问题

通过网上搜索,萌叔发现了参考资料1.rocketmq-client-go注册消费者组的问题 根据这份资料,此问题是rocketmq-client-go的一个bug。并已经在最新版本中被修复。

offset_store.go

func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq *primitive.MessageQueue) (int64, error) {
    ...
    cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil)

    res, err := r.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
    if err != nil {
        return -1, err
    }

    // 增加判断逻辑
    if res.Code == internal.ResQueryNotFount {
        return -1, nil // 从最小offset开始消费
    }

    // 原先会从这里返回error
    if res.Code != internal.ResSuccess {
        return -2, fmt.Errorf("broker response code: %d, remarks: %s", res.Code, res.Remark)
    }

    off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64)
    if err != nil {
        return -1, err
    }

    return off, nil
}

也就是说当无法从broker 获取offset的时候,直接从最小Offset开始消费。

升级rockect-client-go版本之后,问题解决。

Q:那么为什么笔者的client程序运行了这么长时间,还是一直反复的报

"broker response code: 22, remarks: Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first"

consumer group的信息一直没有同步给broker吗?

3.1 发出创建consumer group的指令

在RocketMQ中,需要使用mqadmin创建 consumer group(详情见参考资料5)

mqadmin updateSubGroup -n 172.24.30.100:9876 -g consume-group-name  -c xdf-test1

UpdateSubGroupSubCommand.java

public void execute(final CommandLine commandLine, final Options options,
    RPCHook rpcHook) throws SubCommandException {
    DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);

    defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));

    try {
        SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
        subscriptionGroupConfig.setConsumeBroadcastEnable(false);
        subscriptionGroupConfig.setConsumeFromMinEnable(false);

        // groupName
        subscriptionGroupConfig.setGroupName(commandLine.getOptionValue('g').trim());


        if (commandLine.hasOption('b')) { // 针对特定broker 都发送一次Command
            String addr = commandLine.getOptionValue('b').trim();

            defaultMQAdminExt.start();

            defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, subscriptionGroupConfig);
            System.out.printf("create subscription group to %s success.%n", addr);
            System.out.printf("%s", subscriptionGroupConfig);
            return;

        } else if (commandLine.hasOption('c')) {
            String clusterName = commandLine.getOptionValue('c').trim();

            defaultMQAdminExt.start();
            Set<String> masterSet =
                CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
            for (String addr : masterSet) {  // 针对集群中的每一个broker(master)都发送一次Command
                try { 
                    defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, subscriptionGroupConfig);
                    System.out.printf("create subscription group to %s success.%n", addr);
                } catch (Exception e) {
                    e.printStackTrace();
                    Thread.sleep(1000 * 1);
                }
            }
            System.out.printf("%s", subscriptionGroupConfig);
            return;
        }

        ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
    } catch (Exception e) {
        throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
    } finally {
        defaultMQAdminExt.shutdown();
    }
}

最终使用MQClientAPIImpl.createSubscriptionGroup

发出Command

  public void createSubscriptionGroup(final String addr, final SubscriptionGroupConfig config,
      final long timeoutMillis) throws RemotingException, InterruptedException, MQClientException {
      RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null);

      byte[] body = RemotingSerializable.encode(config);
      request.setBody(body);

      RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
          request, timeoutMillis);
      assert response != null;
      switch (response.getCode()) {
          case ResponseCode.SUCCESS: {
              return;
          }
          default:
              break;
      }
      throw new MQClientException(response.getCode(), response.getRemark());

  }

3.2 Broker收到Command的处理逻辑

SubscriptionGroupManager.java

public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
    Map<String, String> newAttributes = request(config);
    Map<String, String> currentAttributes = current(config.getGroupName());

    Map<String, String> finalAttributes = AttributeUtil.alterCurrentAttributes(
        this.subscriptionGroupTable.get(config.getGroupName()) == null,
        SubscriptionGroupAttributes.ALL,
        ImmutableMap.copyOf(currentAttributes),
        ImmutableMap.copyOf(newAttributes));

    config.setAttributes(finalAttributes);

        // 存储到缓存中
    SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);
    if (old != null) {
        log.info("update subscription group config, old: {} new: {}", old, config);
    } else {
        log.info("create new subscription group, {}", config);
    }

    long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
    dataVersion.nextVersion(stateMachineVersion);

        // 持久化(默认以文件形式保存到磁盘)
    this.persist();
}

rocketMQ的broker端中,默认offset的是以json文件的形式持久化到磁盘文件中,

文件路径为${user.home}/store/config/consumerOffset.json。

其内容示例如下:

{
    "offsetTable": {
        "test-topic@test-group": {
            "0": 52116, 
            "1": 52088
        }
    }
}

当client向broker请求fetch offset of mq from broker, 如果lastOffset >=0,直接返回lastOffset

否则根据consumer group指定的策略ConsumeFromWhere,有所不同

  • CONSUME_FROM_LAST_OFFSET 从maxOffset开始消费
  • CONSUME_FROM_FIRST_OFFSET 从0开始消费
  • CONSUME_FROM_TIMESTAMP 根据时间戳请求查找offset,从对应的offset开始消费

看完代码,大家已经明白:UpdateSubGroupSubCommand 只会发给broker一次。如果发送的过程中网络出现异常,或者SubscriptionGroupManager执行失败,都可能导致consumer group的信息在broker端不存在。

1.rocketmq-client-go注册消费者组的问题
2.push consumer can‘t consume all queues, and get error … maybe this group consumer boot first”
3. fix query not found
4.rocketMQ–offset管理
5.官方运维管理命令mqadmin使用手册


微信公众号
阅读数: 564

Recommend

  • 64
    • 微信 mp.weixin.qq.com 6 years ago
    • Cache

    你知道android的MessageQueue.IdleHandler吗?

    你知道android的MessageQueue.IdleHandler吗? Original yang...

  • 21
    • www.linkedkeeper.com 4 years ago
    • Cache

    RocketMQ & Kafka 消息消费与消息重试

    RocketMQ & Kafka 消息消费与消息重试 张松然 10余年资深架构经验,对高性能网关、多线程、NIO、HTTP/TCP 长连接等技术...

  • 27

    提到Android里的消息机制,便会提到Message、Handler、Looper、MessageQueue这四个类,我先简单介绍以下这4个类 之间的爱恨情仇。 Message 消息的封装类,里边存储了消息的详细信息,以及要传递的数据...

  • 9

    本文我将继续讲解如何使用DefaultMQPushConsumer对RocketMQ中的消息进行消费,同时在文章的第二部分将继续带领读者朋友对DefaultMQPushConsumer进行薄封装,让我们在Spring中更容易对消息进行消费。 DefaultMQPushConsumer不区分普通消息和...

  • 9

    本文我们接着分析一下RocektMQ实现消息消费的源码细节,这部分的内容较多,因此拆分为几个章节分别进行讲解。 本章节重点讲解DefaultMQPushConsumer的代码逻辑。 DefaultMQPushConsumer使用样例按照惯例还是先看一下DefaultMQPushConsum...

  • 7

    跟我学RocketMQ之消息消费源码解析(2) | 朝·闻·道本文我们接着分析RocketMQ消息消费的逻辑。 接上文,DefaultMQPushConsumerImpl启动过程中,启动了consumeMessageService消息消费线程。 if (this.getMessageListenerInner() instanceof...

  • 7

    《深入理解Android 卷III》即将发布,作者是张大伟。此书填补了深入理解Android Framework卷中的一个主要空白,即Android Framework中和UI相关的部分。在一个特别讲究颜值的时代,本书分析了Android 4.2中WindowManagerService、ViewRoot、Input系统、StatusBar、W...

  • 8

    由于《深入理解Android 卷一》和《深入理解Android卷二》不再出版,而知识的传播不应该因为纸质媒介的问题而中断,所以我将在CSDN博客中全文转发这两本书的全部内容  第2章  深入理解Java Binder和MessageQueue 本章主要内容: ·  ...

  • 19

    RocketMQ -- 消息消费队列与索引文件发布于 3 月 20 日在store目录下,除了commilog目录,还有consumequeue和index目录。consumequeue是消息消费队列存储目...

  • 7
    • www.cnblogs.com 1 year ago
    • Cache

    【RocketMQ】消息的消费 - shanml

    【RocketMQ】消息的消费 上一讲

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK