66

大厂面试 Kafka,一定会问到的幂等性

 4 years ago
source link: https://www.tuicool.com/articles/viaYFvy
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

01 幂等性如此重要

Kafka作为分布式MQ,大量用于分布式系统中,如消息推送系统、业务平台系统(如结算平台),就拿结算来说,业务方作为上游把数据打到结算平台,如果一份数据被计算、处理了多次,产生的后果将会特别严重。

02 哪些因素影响幂等性

使用Kafka时,需要保证exactly-once语义。要知道在分布式系统中,出现网络分区是不可避免的,如果kafka broker 在回复ack时,出现网络故障或者是full gc导致ack timeout,producer将会重发,如何保证producer重试时不造成重复or乱序?又或者producer 挂了,新的producer并没有old producer的状态数据,这个时候如何保证幂等?即使Kafka 发送消息满足了幂等,consumer拉取到消息后,把消息交给线程池workers,workers线程对message的处理可能包含异步操作,又会出现以下情况:

  • 先commit,再执行业务逻辑:提交成功,处理失败 。造成丢失

  • 先执行业务逻辑,再commit:提交失败,执行成功。造成重复执行

  • 先执行业务逻辑,再commit:提交成功,异步执行fail。造成丢失

本文将针对以上问题作出讨论

03 Kafka保证发送幂等性

针对以上的问题,kafka在0.11版新增了幂等型producer和事务型producer。前者解决了单会话幂等性等问题,后者解决了多会话幂等性。

单会话幂等性

为解决producer重试引起的乱序和重复。Kafka增加了pid和seq。Producer中每个RecordBatch都有一个单调递增的seq; Broker上每个tp也会维护pid-seq的映射,并且每Commit都会更新lastSeq。这样recordBatch到来时,broker会先检查RecordBatch再保存数据:如果batch中 baseSeq(第一条消息的seq)比Broker维护的序号(lastSeq)大1,则保存数据,否则不保存(inSequence方法)。

ProducerStateManager.scala


nethrow mew UnknownProducerIdException(s"Local producer state matches expected epoch $producerEpoch "+

引申:Kafka producer 对有序性做了哪些处理

假设我们有5个请求,batch1、batch2、batch3、batch4、batch5;如果只有batch2 ack failed,3、4、5都保存了,那2将会随下次batch重发而造成重复。我们可以设置 max.in.flight.requests.per.connection =1(客户端在单个连接上能够发送的未响应请求的个数)来解决乱序,但降低了系统吞吐。

新版本kafka设置enable.idempotence=true后能够动态调整max-in-flight-request。正常情况下 max.in.flight.requests.per.connection 大于1。 当重试请求到来且时,batch 会根据 seq重新添加到队列的合适位置,并把 max.in.flight.requests.per.connection设为1, 这样它 前面的 batch序号都比它小,只有前面的都发完了,它才能发。


多会话幂等性

在单会话幂等性中介绍,kafka通过引入pid和seq来实现单会话幂等性,但正是引入了pid,当应用重启时,新的producer并没有old producer的状态数据。可能重复保存。

Kafka事务通过隔离机制来实现多会话幂等性

kafka事务引入了transactionId 和Epoch,设置transactional.id后,一个transactionId只对应一个pid, 且Server 端会记录最新的 Epoch 值。这样有新的producer初始化时,会向TransactionCoordinator发送InitPIDRequest请求, TransactionCoordinator 已经有了这个 transactionId对应的 meta,会返回之前分配的 PID,并把 Epoch 自增 1 返回,这样当old producer恢复过来请求操作时,将被认为是无效producer抛出异常。     如果没有开启事务,TransactionCoordinator会为新的producer返回new pid,这样就起不到隔离效果,因此无法实现多会话幂等。

04 Consumer端幂等性

如上所述,consumer拉取到消息后,把消息交给线程池workers,workers对message的handle可能包含异步操作,又会出现以下情况:

  • 先commit,再执行业务逻辑:提交成功,处理失败 。造成丢失

  • 先执行业务逻辑,再commit:提交失败,执行成功。造成重复执行

  • 先执行业务逻辑,再commit:提交成功,异步执行fail。造成丢失

对此我们常用的方法时,works取到消息后先执行如下code:

如果喜欢我的文章,请长按二维码,关注靳刚同学, 同时您的转发也是对我最大的支持,谢谢!

JJvi6n7.jpg!web


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK