5

java 从零开始实现消息队列 mq-10-pull message ack 消费者主动拉取消息消费状态回执

 2 years ago
source link: https://houbb.github.io/2022/04/15/mq-10-pull-message-ack
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

大家好,我是老马。

上一节我们只实现了拉取消息的实现,但是缺少了消费状态回执。

这一节我们一起来学习下如何实现状态回执。

回执状态的设计

我们规定如下几种回执状态:

  [java]
package com.github.houbb.mq.common.constant;

/**
 * @author binbin.hou
 * @since 0.0.3
 */
public final class MessageStatusConst {

    private MessageStatusConst(){}

    /**
     * 待消费
     * ps: 生产者推送到 broker 的初始化状态
     */
    public static final String WAIT_CONSUMER = "W";

    /**
     * 推送给消费端处理中
     * ps: broker 准备推送时,首先将状态更新为 P,等待推送结果
     * @since 0.1.0
     */
    public static final String TO_CONSUMER_PROCESS = "TCP";

    /**
     * 推送给消费端成功
     * @since 0.1.0
     */
    public static final String TO_CONSUMER_SUCCESS = "TCS";

    /**
     * 推送给消费端失败
     * @since 0.1.0
     */
    public static final String TO_CONSUMER_FAILED = "TCF";

    /**
     * 消费完成
     */
    public static final String CONSUMER_SUCCESS = "CS";

    /**
     * 消费失败
     */
    public static final String CONSUMER_FAILED = "CF";

    /**
     * 稍后消费
     * @since 0.1.0
     */
    public static final String CONSUMER_LATER = "CL";

}

消费者状态回执

我们在消费之后,添加状态回执:

  [java]
for(MqMessage mqMessage : mqMessageList) {
    IMqConsumerListenerContext context = new MqConsumerListenerContext();
    final String messageId = mqMessage.getTraceId();
    ConsumerStatus consumerStatus = mqListenerService.consumer(mqMessage, context);
    log.info("消息:{} 消费结果 {}", messageId, consumerStatus);

    // 状态同步更新
    MqCommonResp ackResp = consumerBrokerService.consumerStatusAck(messageId, consumerStatus);
    log.info("消息:{} 状态回执结果 {}", messageId, JSON.toJSON(ackResp));
}

回执实现,根据 messageId 更新对应的消息消费状态。

  [java]
public MqCommonResp consumerStatusAck(String messageId, ConsumerStatus consumerStatus) {
    final MqConsumerUpdateStatusReq req = new MqConsumerUpdateStatusReq();
    req.setMessageId(messageId);
    req.setMessageStatus(consumerStatus.getCode());
    final String traceId = IdHelper.uuid32();
    req.setTraceId(traceId);
    req.setMethodType(MethodType.C_CONSUMER_STATUS);

    // 重试
    return Retryer.<MqCommonResp>newInstance()
            .maxAttempt(consumerStatusMaxAttempt)
            .callable(new Callable<MqCommonResp>() {
                @Override
                public MqCommonResp call() throws Exception {
                    Channel channel = getChannel(null);
                    MqCommonResp resp = callServer(channel, req, MqCommonResp.class);
                    if(!MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {
                        throw new MqException(ConsumerRespCode.CONSUMER_STATUS_ACK_FAILED);
                    }
                    return resp;
                }
            }).retryCall();
}

Broker 回执处理

  [java]
// 消费者消费状态 ACK
if(MethodType.C_CONSUMER_STATUS.equals(methodType)) {
    MqConsumerUpdateStatusReq req = JSON.parseObject(json, MqConsumerUpdateStatusReq.class);
    final String messageId = req.getMessageId();
    final String messageStatus = req.getMessageStatus();
    return mqBrokerPersist.updateStatus(messageId, messageStatus);
}

这里是基于本地 map 更新状态的,性能比较差。

后续会以 mysql 实现。

  [java]
public MqCommonResp updateStatus(String messageId, String status) {
    // 这里性能比较差,所以不可以用于生产。仅作为测试验证
    for(List<MqMessagePersistPut> list : map.values()) {
        for(MqMessagePersistPut put : list) {
            MqMessage mqMessage = put.getMqMessage();
            if(mqMessage.getTraceId().equals(messageId)) {
                put.setMessageStatus(status);
                break;
            }
        }
    }

    MqCommonResp commonResp = new MqCommonResp();
    commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
    commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
    return commonResp;
}

对于消息状态的细化,更加便于我们后续的管理,和问题的定位。

希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。

我是老马,期待与你的下次重逢。

The message queue in java.(java 简易版本 mq 实现) https://github.com/houbb/mq

rpc-从零开始实现 rpc https://github.com/houbb/rpc


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK