116

RocketMQ 事务消息入门介绍

 5 years ago
source link: https://mp.weixin.qq.com/s/0AOv8O8_iADLgEfgyKykvQ?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

jIzyyqB.jpg!web

说明

周五的时候发了篇: Rocketmq4.3支持事务啦!!! ,趁着周末的时候把相关内容看了下,下面的主要内容就是关于RocketMQ事务相关内容介绍了。

说明:今天这篇仅仅是入门介绍,并没有涉及到很多细节,先把大概流程说明白,后续再具体细节进行开篇说明。

主题

  • 引出分布式事务相关内容。

  • RocketMQ事务消息。

  • RocketMQ事务消息如何使用。

  • RocketMQ事务消息是怎么实现的。

  • 为什么需要事务消息会查机制。

  • RocketMQ是怎么进行事务消息会查的。

  • RocketMQ对于分布式事务解决还有那些局限性?以及说明。

引出分布式事务相关内容

这里主要是想说明下,是什么背景下面产生了此类问题。

首先我们来说说 事务 ,说道事务,首先让我想到的就是我大学的时候,老师经常举例转账的事情,例子是这样的:

银行转账!张三转100块到李四的账户,这其实需要两条SQL语句:

  • 给张三的账户减去100元

  • 给李四的账户加上100元

如果在第一条SQL语句执行成功后,在执行第二条SQL语句之前,程序被中断了(可能是抛出了某个异常,也可能是其他什么原因),那么李四的账户没有加上100元,而张三却减去了100元。这肯定是不行的!

你现在可能已经知道什么是事务了吧!事务中的多个操作,要么完全成功,要么完全失败!不可能存在成功一半的情况!也就是说给张三的账户减去100元如果成功了,那么给李四的账户加上100元的操作也必须是成功的;否则给张三减去100元,以及给李四加上100元都是失败的!

如果在单个服务里面,我们使用spring的话,我们使用一个注解就解决了这个问题( @Transactional注解 就解决了,基本上就没有下文什么事情了。)

随着各各公司服务越来越复杂,数据量越来越多,慢慢的公司就引入了微服务以及分库分表等技术,这些技术的确非常的有用,但是在解决事务方面带来了一下问题。

强调下:分布式事务,我们一般都是强调的最终一致性,而不是强一致性!!!

主要概括为3类:

  • 基于单个JVM,数据库分库分表了(跨多个数据库)。

  • 基于多JVM,服务拆分了(不跨数据库)。

  • 基于多JVM,服务拆分了 并且数据库分库分表了。

正是为了解决上面的3类问题,才引入了分布式事务相关的技术来解决这些问题。

分布式事务解决方案有很多,本篇下面需要说的RocketMQ事务消息仅仅是其中的一种解决方案。

RocketMQ事务消息

这里主要是参考RocketMQ官方文档里面的内容,参考地址:

http://rocketmq.apache.org/rocketmq/the-design-of-transactional-message/

Half(Prepare) Message

指的是暂不能投递的消息,发送方已经将消息成功发送到了 MQ 服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息。

消息回查

由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,MQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。

执行流程图

Jbuyqqq.jpg!web

  1. 发送方向 MQ 服务端发送消息。

  2. MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。

  3. 发送方开始执行本地事务逻辑。

  4. 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。

  5. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。

  6. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

  7. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。

事务消息发送对应步骤1、2、3、4,事务消息回查对应步骤5、6、7。

RocketMQ事务消息如何使用

1、引入 rocketmq-client

2uIRNvU.png!web

目前还没有,等几天就会有了,或者自己下载源码进行打包到自己的私有仓库也是一样的,经过和4.2.0的引入差不多,就是换成4.3.0即可。

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>

2、编写Producer

主要参考RocketMQ4.3.0里面的例子,地址为:

https://github.com/apache/rocketmq/tree/release-4.3.0/example/src/main/java/org/apache/rocketmq/example/transaction

BJj2Qbn.jpg!web

备注:和普通消息发送 DefaultMQProducer 有所不同,这里使用的是 TransactionMQProducer

j63QnaZ.jpg!web

关键点:自己实现TransactionListener接口,并实现 executeLocalTransaction方法 (执行本地事务的,一般就是操作DB相关内容)和 checkLocalTransaction方法 (用来提供给broker进行会查本地事务消息的,把本地事务执行的结果存储到redis或者DB中都可以,为会查做数据准备)。

还是以:给张三的账户减去100元, 给李四的账户加上100元为例。这个时候我们发送就相当于做的是 张三的账户减去100元的操作,并且操作DB成功了,下面就是另外对李四的账户加上100元的操作了 ,这个时候就是相当于定义刚刚发送事务消息的topic,消费端和我们普通的消费端没有什么区别。

思考:假如这个时候我们消费端失败了怎么办呢?(这个问题后续有讨论,消费失败有2种,第一种是超时了,我们重试即可,第二种是真的处理失败了?该怎么办呢?)

RocketMQ事务消息是怎么实现的

第一感觉和定时消息做法非常类似,但是比定时消息要复杂。定时消息内容在: RocketMQ(九):消息发送(续) 里面提到过。

本质:定时消息先把定时消息的topic修改为SCHEDULE_TOPIC_XXXX,之后一系列处理……,我们这个事务消息也是一样,先发送 Half(Prepare) Message 消息的时候,其实topic内容也继续了修改( RMQ_SYS_TRANS_HALF_TOPIC ),所有consumer是不可见的(如果提交就用真实topic,如果需要回滚那么就那临时的topic内容删除即可。)

Half(Prepare) Message修改topic断点如下:

MveUJrn.jpg!web

本篇仅仅是一个入门介绍,里面具体细节后续章节会进行分析。

思考:如果把发送普通消息和本地执行逻辑放在一个事务里面,如果执行事务成功就发送普通消息,如果失败就回滚好像也是可以,那么这种事务消息相对其有什么优势或者好处呢??? 思考下。

为什么需要事务消息会查机制

其实这个内容在, RocketMQ事务消息 介绍里面也说明了,由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,MQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。那么RocketMQ到底怎么做的呢?下面马上就会介绍。

RocketMQ是怎么进行事务消息会查的

每60s会对 Half(Prepare) Message 的topic主题为 RMQ_SYS_TRANS_HALF_TOPIC 的消息进行check。

myEveab.jpg!webjQjMFfn.jpg!web

broker会调用自己实现TransactionListener接口的 checkLocalTransaction方法

2yErYf7.jpg!web

备注:就是RocketMQ已经实现这个机制,今天这篇仅仅是入门介绍,并没有涉及到很多细节,先把大概流程说明白,后续再具体细节进行开篇说明。

RocketMQ对于分布式事务解决还有那些局限性?以及说明

在RocketMQ事务消息如何使用的时候我们提到, 如果消费失败怎么办? 消费失败有2种,第一种是超时了,我们重试即可,第二种是真的处理失败了?仔细思考下,这块还是蛮复杂的,假如需要有7-8个业务模块呢,其中执行到第6个业务模块就失败呢?  这种重试好几次还是失败,我们该如何处理呢??? 是回滚前面5个操作吗?好复杂好复杂, 为什么RocketMQ不提供自动回滚呢? 希望大家思考下,如果失败率比较大,那么就是系统问题需要优化代码业务……

据零度了解,很多这块是通过人工介入以及T+1对账的以及补偿机制等。

结束语

本人水平有限,难免会有一些理解偏差的地方,如果发现,欢迎各位积极指出,感谢!!!

如果读完觉得有收获的话,欢迎点赞、关注、加公众号【匠心零度】,查阅更多精彩历史!!!

JV3EVfF.gif


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK