3

【轻松上手 RocketMQ 专栏】事务消息与幂等性

 2 years ago
source link: https://my.oschina.net/apacherocketmq/blog/5276535
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
这一讲,我们结合业务场景来讲讲事务消息以及消息的幂等性。

1、业务场景

在电商场景中,一般付款成功后,会给用户发放优惠券,一来给用户优惠,二来激励用户继续消费。

3946b151-61aa-42ff-b42c-6507a97d1979.png

上面的场景:在电商系统中,会出现付款成功后、准备发优惠券的时候,服务器宕机了。这个时候会造成用户成功付款,却没收到优惠券的情况。这种情况下,我们很容易想到用事务来保证付款和发优惠券的原子性即可:要么付款和发优惠券同时成功,要么同时失败,是不允许其他一个成功,另一个失败的。

但上面,存在一种情况:付款和发优惠券高度耦合,这样子容易出现:发优惠券一直失败,会导致付款一直失败的场景。

对于这种场景的解决方案:引入消息中间件MQ来解耦。

7a1b18f7-42e3-4041-94cd-3af37f52aa24.png

  • 1、支付订单
  • 2、发送支付消息
  • 3、消费支付消息
  • 4、发放优惠券

上面这个是发放优惠券的流程图。

但是上述流程中,存在MQ不可用、消息重复的异常情况,进而导致:

  • 产生付款成功,发优惠券失败
  • 优惠券重复发放

怎样才能确保付款成功后进行优惠券发放并且不会重复呢?这需要引入事务消息和幂等性处理。

2、事务消息

首先我们来了解下事务消息。分布式事务是一种抽象的概念。

那具体的实现呢?

是有很多种实现的。

在这里,主要介绍:RocketMQ的事务消息。

事务消息的流程图

a840b9d7-2f81-4fdd-a038-8b55f311a781.png
流程步骤:
  • 1、生产者发送half消息
  • 2、MQ回复ACK确认消息
  • 3、执行本地事务:订单付款。如果订单付款成功,那么就给MQ发送commit消息。如果订单付款失败,就发送rollback消息
  • 4、如果步骤3发送消息失败,这个时候MQ的定时器会检查half消息。MQ回调方法,去检查本地事务的执行情况。如果执行成功,就返回commit消息。如果执行失败,就返回rollback消息。
  • 5、如果MQ收到的是commit消息,此时会把half消息复制到真正的topic中
  • 6、消费者对消息进行消费,下发优惠券

3、如何使用事务消息

上面,大概知道了事务消息的流程。

接下来,要知道如何使用。

还是以付款下发优惠券为例。

3.1 发送half消息-MQ回复ACK确认消息


 @Override
    public void finishedOrder(String orderNo, String phoneNumber) {

try {
          // 退房事务消息,topic:完成订单
        Message msg = new Message(orderFinishedTopic, JSON.toJSONString(orderInfo).getBytes(StandardCharsets.UTF_8));

// 发送half消息
            TransactionSendResult transactionSendResult = orderFinishedTransactionMqProducer.sendMessageInTransaction(msg, null);

} catch (MQClientException e) {

}

}

3.2 执行本地事务:付款

@Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {

try {
            // 修改订单的状态
            orderService.payOrder();

// 成功 提交prepare消息
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            // 执行本地事务失败 回滚prepare消息
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

3.3 MQ定时器回调查询half消息状态

@Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {

try {
            //查询订单状态
            Integer orderStatus = orderService.getOrderStatus();
            if (Objects.equals(orderStatus, OrderStatusEnum.FINISHED.getStatus())) {          //返回commit消息
                return LocalTransactionState.COMMIT_MESSAGE;
            } else {
                //返回rollback消息
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        } catch (Exception e) {
            // 查询订单状态失败
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

3.4 消费者进行消费,下发优惠券

 @Bean(value = "orderFinishedConsumer")
    public DefaultMQPushConsumer finishedConsumer(@Qualifier(value = "orderFinishedMessageListener") OrderFinishedMessageListener orderFinishedMessageListener) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(orderFinishedConsumerGroup);
        consumer.setNamesrvAddr(namesrvAddress);
        //topic:完成订单
        consumer.subscribe(orderFinishedTopic, "*");
        consumer.setMessageListener(orderFinishedMessageListener);
        consumer.start();
        return consumer;
    }

监听器:OrderFinishedMessageListener
@Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {            
                //下发优惠券
                couponService.distributeCoupon();

}
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

4、知其然知其所以然

你看完上面,已经知道如何使用事务消息。

接下来,你需要了解其底层原理:看看源码(面试常问)

step1:首先看发送half消息的代码:

2673ff31-b3ed-41c4-843d-9a7640aa4736.png

step2:进入代码里面:

ac70825c-e655-4937-bdd1-dd6f8490bdf6.png

step3:其实就是默认调用了DefaultMQProducer#sendMessageInTransaction。

public TransactionSendResult sendMessageInTransaction(final Message msg,
        ...省略一堆代码

SendResult sendResult = null;
        // 给待发送消息添加属性,表名是一个事务消息,即半消息,这里设置为true。(这个属性后面会用到)
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
            //发送消息--重点0
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
            //消息发送成功
            case SEND_OK: {
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    if (null != localTransactionExecuter) {

localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                    } else if (transactionListener != null) {
                        log.debug("Used new transaction API");
                        //执行本地事务,executeLocalTransaction需要子类去具体实现
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    }
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }

try {
            // 最后,给broker发送提交或者回滚事务的RPC请求
            this.endTransaction(sendResult, localTransactionState, localException);
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }
        // 组装结果返回
        TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        return transactionSendResult;
    }

上面的DefaultMQProducerImpl#sendMessageInTransaction方法主要流程:

  • 简单的数据校验
  • 给消息添加属性,表明这个事务消息
  • 发送消息,且返回消息的结果--重点0
  • 根据消息不同结果,进行不同的处理
  • 如果消息发送成功,那么就执行本地事务(付款),返回本地事务的结果--重点1
  • 最后,根据本地事务的结果,给broker发送Commit或rollback的消息--重点2

上面我们简述了一个大概的流程。未涉及到太多细节,是对一个整体流程的了解。

接下来,我们深入了解一些细节:

我们先研究一下重点0:sendResult = this.send(msg); 我们点进去会发现,send的底层其实就是调用了DefaultMQProducerImpl#sendKernelImpl方法。

step4:接着到SendMessageProcessor#sendMessage

eeadb71c-3aad-48ef-ac62-2d6b339049ea.png

step5:事务消息,继续进入TransactionalMessageServiceImpl#prepareMessage-->TransactionalMessageBridge#putHalfMessage-->TransactionalMessageBridge#parseHalfMessageInner

fdabf67d-faf8-4a7c-868c-c3679316a9b3.png

step6:接着,我们坐着研究一下重点1,即transactionListener.executeLocalTransaction(msg, arg);

public interface TransactionListener {
    /**
     * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
     *
     * @param msg Half(prepare) message
     * @param arg Custom business parameter
     * @return Transaction state
     */
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

/**
     * When no response to prepare(half) message. broker will send check message to check the transaction status, and this
     * method will be invoked to get local transaction status.
     *
     * @param msg Check message
     * @return Transaction state
     */
    LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

你会发现,这是一个接口,有2个方法,一个是执行本地事务executeLocalTransaction。另一个是检查本地事务checkLocalTransaction。这两个方法需要实现类去实现。

比如:执行本地事务:付款

step7:接着我们来看重点2:this.endTransaction(sendResult, localTransactionState, localException);

public void endTransaction(
        // 省略一堆代码
        //事务id
        String transactionId = sendResult.getTransactionId();
        // broker地址
        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());
        // 根据事务消息和本地事务的执行结果,发送不同的结果给broker
        switch (localTransactionState) {
            case COMMIT_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
            case UNKNOW:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        }

requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
        //发送给broker
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
            this.defaultMQProducer.getSendMsgTimeout());
    }

到这个时候,我们已经把消息从生产者发送到了broker里面。

那接下来,我们就需要了解broker是如何处理事务消息的。

step8: 事务消息如何回查

直接看代码注解即可
TransactionalMessageCheckService#onWaitEnd

@Override
    protected void onWaitEnd() {
        //timeout是从broker配置文件中获取transactionTimeOut值,代表事务的过期时间,(一个消息的存储时间 + timeout) > 系统当前时间,才会对该消息执行事务状态会查
        long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
        //checkMax是从broker配置文件中获取transactionCheckMax值,代表事务的最大检测次数,如果超过检测次数,消息会默认为丢弃,即rollback消息
        int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
        long begin = System.currentTimeMillis();
        log.info("Begin to check prepare message, begin time:{}", begin);
        //回查:核心点org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl.check
        this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
        log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
    }

step9:进入check方法:TransactionalMessageServiceImpl#check。

直接看注解即可

@Override
    public void check(long transactionTimeout, int transactionCheckMax,
        AbstractTransactionalMessageCheckListener listener) {
        try {
            //RMQ_SYS_TRANS_HALF_TOPIC主题
            String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
            //获取RMQ_SYS_TRANS_HALF_TOPIC主题下的所有队列
            Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
            //数据校验
            if (msgQueues == null || msgQueues.size() == 0) {
                log.warn("The queue of topic is empty :" + topic);
                return;
            }
            log.debug("Check topic={}, queues={}", topic, msgQueues);
            //遍历队列
            for (MessageQueue messageQueue : msgQueues) {
                long startTime = System.currentTimeMillis();
                //根据队列获取对应topic:RMQ_SYS_TRANS_OP_HALF_TOPIC下的opQueue
                //RMQ_SYS_TRANS_HALF_TOPIC:prepare消息的主题,事务消息首先先进入到该主题。
                //RMQ_SYS_TRANS_OP_HALF_TOPIC:当消息服务器收到事务消息的提交或回滚请求后,会将消息存储在该主题下
                MessageQueue opQueue = getOpQueue(messageQueue);
                //messageQueue队列的偏移量
                long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
                //opQueue队列的偏移量
                long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);

log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
                //如果其中一个队列的偏移量小于0,就跳过
                if (halfOffset < 0 || opOffset < 0) {
                    log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
                        halfOffset, opOffset);
                    continue;
                }
                //doneOpOffset和removeMap主要的目的是避免重复调用事务回查接口
                List<Long> doneOpOffset = new ArrayList<>();
                HashMap<Long, Long> removeMap = new HashMap<>();
                PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
                if (null == pullResult) {
                    log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
                        messageQueue, halfOffset, opOffset);
                    continue;
                }
                // single thread
                //空消息的次数
                int getMessageNullCount = 1;
                //RMQ_SYS_TRANS_HALF_TOPIC#queueId的最新偏移量
                long newOffset = halfOffset;
                //RMQ_SYS_TRANS_HALF_TOPIC的偏移量
                long i = halfOffset;
                while (true) {
                    //限制每次最多处理的时间是60s
                    if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
                        log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
                        break;
                    }
                    //removeMap包含当前信息,则跳过,处理下一条信息
                    //removeMap的信息填充是在上面的fillOpRemoveMap
                    //fillOpRemoveMap具体逻辑是:具体实现逻辑是从RMQ_SYS_TRANS_OP_HALF_TOPIC主题中拉取32条,
                    //如果拉取的消息队列偏移量大于等于RMQ_SYS_TRANS_HALF_TOPIC#queueId当前的处理进度时
                    //会添加到removeMap中,表示已处理过
                    if (removeMap.containsKey(i)) {
                        log.info("Half offset {} has been committed/rolled back", i);
                        Long removedOpOffset = removeMap.remove(i);
                        doneOpOffset.add(removedOpOffset);
                    } else {
                        //根据消息队列偏移量i从RMQ_SYS_TRANS_HALF_TOPIC队列中获取消息
                        GetResult getResult = getHalfMsg(messageQueue, i);
                        MessageExt msgExt = getResult.getMsg();
                        //如果消息为空
                        if (msgExt == null) {
                            //则根据允许重复次数进行操作,默认重试一次  MAX_RETRY_COUNT_WHEN_HALF_NULL=1
                            //如果超过重试次数,直接跳出while循环,结束该消息队列的事务状态回查
                            if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
                                break;
                            }
                            //如果是由于没有新的消息而返回为空(拉取状态为:PullStatus.NO_NEW_MSG),则结束该消息队列的事务状态回查。
                            if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
                                log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
                                    messageQueue, getMessageNullCount, getResult.getPullResult());
                                break;
                            } else {
                                log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
                                    i, messageQueue, getMessageNullCount, getResult.getPullResult());
                                //其他原因,则将偏移量i设置为:getResult.getPullResult().getNextBeginOffset(),重新拉取
                                i = getResult.getPullResult().getNextBeginOffset();
                                newOffset = i;
                                continue;
                            }
                        }
                        //判断该消息是否需要discard(吞没,丢弃,不处理)、或skip(跳过)
                        //needDiscard 依据:如果该消息回查的次数超过允许的最大回查次数,
                        // 则该消息将被丢弃,即事务消息提交失败,不能被消费者消费,其做法,
                        // 主要是每回查一次,在消息属性TRANSACTION_CHECK_TIMES中增1,默认最大回查次数为5次。

//needSkip依据:如果事务消息超过文件的过期时间,
                        // 默认72小时(具体请查看RocketMQ过期文件相关内容),则跳过该消息。
                        if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
                            listener.resolveDiscardMsg(msgExt);
                            newOffset = i + 1;
                            i++;
                            continue;
                        }
                        //消息的存储时间大于开始时间,中断while循环
                        if (msgExt.getStoreTimestamp() >= startTime) {
                            log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
                                new Date(msgExt.getStoreTimestamp()));
                            break;
                        }
                        //该消息已存储的时间=系统当前时间-消息存储的时间戳
                        long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
                        //checkImmunityTime:检测事务的时间
                        //transactionTimeout:事务消息的超时时间
                        long checkImmunityTime = transactionTimeout;
                        //用户设定的checkImmunityTimeStr
                        String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
                        if (null != checkImmunityTimeStr) {
                            //checkImmunityTime=Long.valueOf(checkImmunityTimeStr)
                            checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
                            if (valueOfCurrentMinusBorn < checkImmunityTime) {
                                if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                                    //最近进度=当前消息进度+1
                                    newOffset = i + 1;
                                    i++;
                                    continue;
                                }
                            }
                        } else {//如果当前时间小于事务超时时间,则结束while循环
                            if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
                                log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
                                    checkImmunityTime, new Date(msgExt.getBornTimestamp()));
                                break;
                            }
                        }
                        List<MessageExt> opMsg = pullResult.getMsgFoundList();
                        //是否需要回查,判断依据如下:
                        //消息已存储的时间大于事务超时时间
                        boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
                            || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
                            || (valueOfCurrentMinusBorn <= -1);

if (isNeedCheck) {
                            if (!putBackHalfMsgQueue(msgExt, i)) {//11
                                continue;
                            }
                            //重点:进行事务回查(异步)
                            listener.resolveHalfMsg(msgExt);
                        } else {
                            //加载已处理的消息进行筛选
                            pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                            log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
                                messageQueue, pullResult);
                            continue;
                        }
                    }
                    newOffset = i + 1;
                    i++;
                }
                //保存half消息队列的回查进度
                if (newOffset != halfOffset) {
                    transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
                }
                long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
                //保存处理队列opQueue的处理今夕
                if (newOpOffset != opOffset) {
                    transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
                }
            }
        } catch (Throwable e) {
            log.error("Check error", e);
        }

}

step10:继续深入研究一下:resolveHalfMsg

public void resolveHalfMsg(final MessageExt msgExt) {
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    //针对每个待反查的half消息,进行回查本地事务结果
                    sendCheckMessage(msgExt);
                } catch (Exception e) {
                    LOGGER.error("Send check message error!", e);
                }
            }
        });
    }

step11:继续追进sendCheckMessage(msgExt)方法

/**
     * 发送回查消息
     * @param msgExt
     * @throws Exception
     */
    public void sendCheckMessage(MessageExt msgExt) throws Exception {
        CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
        checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
        checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
        checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
        checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
        checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
        //原主题
        msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
        //原队列id
        msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));

msgExt.setStoreSize(0);
        String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
        Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
        if (channel != null) {
            //回调查询本地事务状态
            brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
        } else {
            LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
        }
    }

到这里,基本上把事务消息的流程和实现细节走了一遍。

利用事务消息,我们可以确保消息在事务提交后一定能发送成功到RocketMQ,接下来,我们继续处理消息可能重复的问题。

5、消息重复的原因分析

现在,我们首先来思考一下,绘制一下关于这个问题的蓝图:

1、目的:解决优惠券重复发放的问题
2、解决方案:待定
3、原因:未知

方法论: 想要解决这个问题-->得设计解决方案-->找到问题原因

  • 原因1:生产者多次发送同一条消息,导致消费者多次消息同一条消息,因此重复发放优惠券
  • 原因2:生产者发送一次消息,但消费者多次消费同一条消息,因此重复发送优惠券
  • 比如原因1,用户支付订单后,订单系统处理有点慢,这会让支付系统以为请求超时,这时支付系统会再次调用订单系统。这就会导致订单系统多次发送同一条支付消息。

  • 原因1,消息重试,网络异常,都会导致生产者多次发送同一条消息,这里不细说,想要细究,欢迎留言。

  • 比如原因2,用户支付成功,订单系统和支付系统交互也没超时,顺利发送一条支付消息,这个时候优惠券系统也成功消费支付消息,这时发放了一张优惠券。但意外来了,这个时候优惠券系统崩了,但还没来得及提交消费进度offset到RocketMQ。因而重启优惠券系统后,又会重新消费一次支付消息,从而重复发放优惠券。

我们知道问题原因后,就可以开始设计解决方案了

  • 针对原因1:生产者多次发送同一条消息,导致消费者多次消息同一条消息,因此重复发放优惠券
    • 确保那就是要确保生产者能成功发送有且只有一次消息,但需要确保消费者只消费一次。
  • 对阵原因2:生产者发送一次消息,但消费者多次消费同一条消息,因此重复发送优惠券
    • 那就是要确保消费者只消费一次。但奈何这个不太现实,为什么呢?因为服务器重启,上线升级版本,这是非常常见的现象。因此,只消费一次,不太现实。

上面这2种方案,都涉及到消费者,都不能完美解决重复消费消息的问题

其实我们想想,我们可以用事务消息的方法,来确保消息一定能发送成功到RocketMQ,这个时候我们只需要解决消费者的消费问题即可。

0a954f1c-e00d-4269-8993-65c14abe23ed.png

6、幂等性

想要解决重复消息问题,我们需要引入幂等性机制。

什么是幂等性

就是无论别人对你的接口请求多少次,你都需要保证接口调用一次和多次的结果是相同的。

天上飞的理论,得有落地实现。

幂等性,就是理论。那具体的落地实现一般有:

业务判断法

举个例子,在电商系统中,有订单id,这个时候在优惠券系统每消费一条支付消息,同步插入一条订单数据,能插入成功,证明之前这个订单没被消费过,发送优惠券。插入失败,则证明这个订单之前已经被消费过了。不做任何操作即可。0084ca42-9f72-4a39-b759-3c02f0ee6bea.png

Redis缓存法

在并发量特别高的订单系统中,支付消息会特别多,这个时候,如果用业务判断法,插入数据库,容易存在瓶颈。这个时候如果想要提高并发量,可以考虑使用Redis。

Redis缓存订单id,如果这个订单id已经被消费过后,会存在Redis中。当这个订单id再次被消费时,就会被过滤,不操作。

这也是一种幂等性的实现方法。但Redis容易丢数据,这也是需要考虑的。bfd89d00-8104-4749-9510-17bfac89153c.png

今天我们通过电商优惠券发放的场景,介绍基于RocketMQ事务消息实现分布式事务和消息幂等。

用什么幂等性的具体方案,得看你的使用场景。

  • 如果你的并发量不高,直接用数据库即可解决。

  • 如果你允许有误差,允许重复发放优惠券,只追求高并发量,直接用Redis即可。

  • 如果你既要并发量,也要准确性,可以结合数据库+Redis的方案,但这种方案实现复杂度比较高。

所以,选择什么方案,真的得看你的使用场景。

你也可以看看自己公司的分布式事务、幂等性方案是如何实现的?

好了,今天的分享就到这结束了,欢迎交流。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK