

消息队列(四)如何处理消息丢失的问题?
source link: https://www.fangzhipeng.com/javainterview/2021/05/29/6054a57.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

一、为什么消息会丢失?
跟消息重复问题类似,消息丢失也可能出现在生产者、MQ、消费者三者中。这三者导致消息丢失的原因是什么呢?
- 生产者:生产者推送消息到 MQ 中,由于网络抖动等原因消息没有推送到 MQ 中,或者消息推送到 MQ 中了但是 MQ 内部出错了,导致消息丢失。
- MQ:MQ 接收到消息后先把消息暂存在 OS Cache 中,消费者还没消费的时候 MQ 自己挂了,导致消息丢失。
- 消费者:消费者消费到了这条消息,但是还没来得及处理,消费者自己挂了,但是消费者已经告诉了 MQ 自己已经消费完了,导致消息丢失。
二、如何解决消息丢失的问题
不同的消息队列解决消息丢失的方法是不同的,下面分别介绍不同 MQ 是如何解决消息丢失问题的。
1、RabbitMQ
1.1 生产者导致消息丢失
RabbitMQ 有两种方案可以避免消息丢失,一种是 RabbitMQ 的事务机制,一种是 Confirm 模式。这里先看一下事务机制。
RabbitMQ 客户端中 Channel 接口有这么几个函数,channel.txSelect 用来开启一个事务,channel.txCommit 用来提交事务,channel.txRollback 用来回滚事务。为了避免消息丢失,我们可以在发送消息前,先开启执行 txSelect 方法开启一个事务,接着发送消息,如果消息投递失败,执行 txRollback 回滚事务,再执行重试操作重新发送,如果消息投递成功,执行 txCommit 方法提交事务。
这个方案可以保证我们的消息一定是投递成功的,但是几乎没有人使用这种方案。因为这个方案是同步阻塞的,也就是一条消息发送后,一定要等到 MQ 回应之后执行了提交或回滚事务操作,才能继续往下执行。大致过程如下图所示:
RabbitMQ 还提供了另一种方法避免生产者消息丢失问题,那就是 Confirm 模式。Confirm 模式是这样子的,生产者发送消息后,不需要等待 MQ 的回应,MQ 接收成功后,会回调生产者的 ack 接口通知生产者消息投递成功了,如果 MQ 接收失败,会回调 nack 接口通知生产者消息投递失败了,生产者可以重新对这条消息进行投递。大致过程如下图所示:
1.2 RabbitMQ 导致消息丢失
RabbitMQ 自己弄丢了数据是由于持久化导致的。通常 RabbitMQ 接收到消息之后写入 OS Cache 中,就会给生产者返回接收成功的回应,这时如果 RabbitMQ 挂了,消息也就丢失了。解决方法可以结合生产者的 Confirm 模式,配置 RabbitMQ 持久化到磁盘之后,才给生产者返回 ack 信号。
1.3 消费者导致消息丢失
RabbitMQ 在消费者端弄丢数据,是由于 RabbitMQ 的默认自动提交 ack 导致的。解决方法就是关闭 RabbitMQ 的自动响应 ack 即可。这样消息没有处理完成,消费者挂了,RabbitMQ 会认为消息没有处理成功,会再次推送消息给消费者处理。
2、Kafka
2.1 生产者导致消息丢失
对于 Kafka 来说,生产者基本不会弄丢消息,因为生产者发送消息会等待 Kafka 响应成功,如果响应失败,生产者会自动不断地重试。
2.2 Kafka 弄丢了数据
Kafka 通常会一台 leader + 两台 follower,当生产者消息刚写入 leader 成功,但是还没同步到 follower 时,leader 宕机了,此时会重新选举 leader,新的 leader 由于还未同步到这条数据,导致该条消息丢失。
解决办法是做一些配置,当有其他 follower 同步到了消息后才通知生产者消息接收成功了。配置如下:
- 给 topic 设置
replication.factor
参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。 - 在 Kafka 服务端设置
min.insync.replicas
参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower。 - 在 producer 端设置
acks=all
:这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了。
按上面的配置配置后,就可以保证在 Kafka Broker 端就算 leader 故障了,进行新 leader 选举切换时,也不会丢失数据。
2.3 消费者导致消息丢失
Kafka 消费端弄丢数据原因跟 RabbitMQ 类似,Kafka 消费者会在接收到消息的时候,会自动提交一个 offset 给 Kafka,告诉 Kafka 消息已经处理了。处理方法也跟 RabbitMQ 类似,关闭 offset 的自动提交即可。
3、RocketMQ
RocketMQ 导致数据丢失的原因与前面的 RabbitMQ 和 Kafka 都很类似。生产者就是因为网络抖动等原因消息投递失败,或者 RocketMQ 自身的 Master 节点故障,主备切换故障之类的,消费者则有可能是异步处理导致还未处理成功就给 RocketMQ 提交了 offset 标识消息已处理了。
在 RocketMQ 中,事务消息可以保证消息零丢失。RocketMQ 的事务消息流程大致如下图所示:
在上面的事务消息流程中,基于这三个业务流程:发送 half 消息 -> 处理其他业务 -> commit/rollback。我们来讨论下面的几种情况:
- 万一生产者发送 half 消息失败,怎么办?
可以做重试或记录消息到如文件、数据库等地方,直接给用户返回失败,本次请求失败。
- 万一生产者发送 half 消息成功,但是处理其他业务失败,又该怎么办呢?
生产者发送 rollback 请求回滚 RocketMQ 中该条消息,本次请求失败。
- 万一生产者发送 half 消息成功,但是 RocketMQ 由于某些原因如网络超时等导致没有响应,怎么处理?
由于 half 消息已发送成功,此时 RocketMQ 中已经有该条消息了,RocketMQ 会有一个补偿机制,补偿机制会回调你开发好的一个接口,询问你这条消息是要 commit 还是 rollback。
- 万一生产者发送 half 消息成功,但是请求 commit 或 rollback 的时候失败了呢?
这个问题与上面的问题一样,都是通过 RocketMQ 的补偿机制来处理。
本文分别从生产者、MQ 自身、消费者介绍了导致消息丢失的原因,消息丢失问题是一个比较常见但有必须解决的问题,通常使用消息队列的业务都是比较重要的业务,不能接受数据的丢失。
接着介绍了不同的 MQ 是如何解决消息丢失问题的。消费端导致的消息丢失都是由于数据还未处理成功确提前通知 MQ 消息已经处理成功了,禁止自动提交或异步操作即可,处理起来比较简单;生产者和 MQ 自身导致的消息丢失则比较难处理,RabbitMQ 使用了 Confirm 模式避免消息丢失;Kafka 则配置所有 follower 同步成功才给生产者响应推送消息成功;RocketMQ 则使用事务消息来保证消息的零丢失,针对不同的异常情况还提供了补偿机制进行处理。
作者:奈何花开
来源:https://xie.infoq.cn/article/b73758f12905a71b1efe1b4d2
本文为原创文章,转载请标明出处。
本文链接:https://www.fangzhipeng.com/javainterview/2021/05/29/6054a57.html
本文出自方志朋的博客
Recommend
-
7
常见问题分析RabbitMQ如何处理消息丢失 工具 ...
-
7
V2EX › 程序员 如何优雅的停止正在处理队列消息的 worker 呢? tedd · 11 分钟前 · 24 次点...
-
14
这篇文章主要围绕下列问题进行探讨:如何处理消息的丢失问题数据的丢失问题,可能出现在生产者、MQ、消费者中,咱们从 RabbitMQ 和 Kafka 分别来分析一下吧。RabbitMQ
-
11
在 JavaScript 中浮点数运算时经常出现 0.1+0.2=0.30000000000000004 这样的问题,除此之外还有一个不容忽视的大数危机(大数处理精度丢失)问题。之前也分享过这个问题,我在做个梳理分享给大家,
-
4
原文链接:Go语言如何操纵Kafka保证无消息丢失目前一些互联网公司会使用消息队列来做核心业务,因为是核心业务,所以对数据的最后一致...
-
7
1)使用后处理Alpha通道丢失的问题 2)ParticleSystem的Trail Material为空时运行会自动创建一个默认材质 3)C#使用中文进行枚举时, 是否会产生性能相关的问题 4)C#脚本存在未引用的命名空间,打包报错提示命名空间不存在UnityEditor...
-
3
消息队列配置config/autoload/async_queue.php 这里可以直接参考 default 拷贝多个队列配置即可每个新的配置都为一个新的队列,与其他队列互不干扰注意点: 如果多个项目 并且使用同个队列名称 且同...
-
13
RocketMQ消息丢失如何排查?-51CTO.COM RocketMQ消息丢失如何排查? 作者:小识 2022-03-31 08:26:44 我们在RocketMQ-Dashboard上其实就能看到每个队列broker端的offset(代理者位点)以及消息消...
-
4
作为后端程序员日常工作中难免会遇到要跟消息队列打交道的时候,而且在当下微服务的场景下,很多服务的性能不是我们自己能控制的,这不阿粉最近就遇到了一个场景,由于上游服务流量增加,发送到消息队列的消息增多,阿粉在处理消息的时候需要依赖下游的一个服务,可...
-
7
今天分享一下kafka的消息丢失问题,kafka的消息丢失是一个很值得关注的问题,根据消息的重要性,消息丢失的严重性也会进行放大,如何从最大程度上保证消息不丢失,要从生产者,消费者,broker几个端来说。 消息发送和接收流程 kafka生产者生产好消息...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK