9

Kafak 的吞吐量为何这么高呢?

 3 years ago
source link: https://xie.infoq.cn/article/82538d9f09ba75d1005dceb07
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Producer的主要功能就是向broker中某个topic的某个分区发送消息,主要由分区器(partitioner)实现。如何选择分区,然后高吞吐的可靠发送到broker是producer的重点。

发送消息

首先,Kafka producer提供了一个默认的分区器。如果消息指定了key则分区器会根据key的哈希值来选择目标分区,若该消息未指定key,则分区器使用轮询的方式确认目标分区,也可自定义分区策略。确认分区后分区器需要寻找该分区对应的leader所在的broker,然后向leader写入数据。

当producer发送请求给broker后,broker响应结果的超时时间,默认30s。具体流程如图:

FBr2YjA.png!mobile

发送流程如下:

  1. 首先要构造一个 ProducerRecord 对象,包含Topic、Partition、Key。

  2. 调用send()方法进行消息发送。

  3. 因为消息要到网络上进行传输,所以必须进行序列化。

  4. 数据传到分区器,如果 ProducerRecord 对象已指定了分区,直接把指定的分区返回;如果没有,那么分区器会根据 Key 来选择一个分区。

  5. 这条记录会被添加到一个记录批次里面,批次里所有的消息会被发送到相同的topic和partition,然后由一个独立的线程把这些记录批次发送到相应的broker上。

  6. Broker成功接收到消息,表示发送成功,返回消息的RecordMetadata元数据(包括Topic、Partiton和分区中的偏移量),发送失败可以选择重试或者直接抛出异常。

主要配置

bootstrap.servers

该属性指定 brokers 的地址清单。

buffer.memory

该参数用来设置producer端内存缓冲区的大小,默认值为32M。如果应用程序发送消息的速度超过发送到服务器的速度,那么会导致生产者内存不足,导致send()方法会被阻塞,如果阻塞的时间超过了max.block.ms配置的时长则会抛出异常

batch.size

该参数用于设置批次发送的阈值,默认16KB

etires

该参数用于当生产者发送消息到broker失败时的重试次数,默认每隔(retry.backoff.on)100ms重试

request.timeout.ms

该参数用于当producer发送请求给broker后,broker响应结果的超时时间,默认30s

发送方式

  • 直接发送

该方式是最简单的发送数据方式,不管消息是否可靠到达,本质上也是一种异步方式吞吐量最高,但无法保证消息的可靠性。

  • 同步发送

可以明确知道消息发送的结果,然后对结果进行处理,由于同步的方式会阻塞,只有当消息通过get返回future对象时,才会继续下一条消息的发送,导致吞吐量下降。

  • 异步发送

在调用send方法发送消息的同时指定一个回调函数,broker在返回响应时会调用该回调函数,通过回调函数能够对异常情况进行处理,比如记录异常日志再统一处理。

幂等性

Producer 的幂等性指的是当发送同一条消息时,数据在broker端只会被持久化一次,数据不丟不重,也就是exactly-once,但是在分布式系统中,出现网络分区是不可避免的,例如以下情况:

  • kafka broker在回复ack时,出现网络故障或者是full gc导致ack timeout,producer将会重发。

  • producer挂了,新的producer并没有old producer的状态数据。

  • producer发送rpc到broker异常,无法得知消息是否写入只能重试。

幂等性的目的就是为了解决重复的问题at least once + 幂等 = exactly once,幂等性是通过两个关键信息保证:

PID

每个producer初始化时会由broker分配一个唯一的PID来标识唯一的producer,producer每次启动都不一样。

sequence numbers

Broker端通过接收到message的PID和sequence numbers 信息进行校验。最新版本中可通过设置enable.idempotence = true开启幂等性,但是Kafka的幂等性是有条件的:

  • 只能保证producer在单个会话内不丟不重,如果producer出现意外挂掉再重启是无法保证的,无法做到跨会话级别的不丢不重。

  • 幂等性不能跨多个partition,只能保证单个 partition 内的幂等性。

事务性

幂等性提供了单会话单partition的Exactly-Once语义的实现,正是因为幂等性不提供跨partition和跨会话场景下的保证,因此需要一种更强的事务保证,能够原子处理多个partition的写入操作,数据要么全部写入成功,要么全部失败,kafka是事务性类似于二阶段提交,总的来说就是能够实现【跨分区的原子写入】。

kafka的事务性可以保证:

  • 跨会话的幂等性写入,即使中间故障,恢复后依然可以保持幂等性。

  • 跨会话的事务恢复,如果一个应用实例挂了,启动的下一个实例依然可以保证上一个事务完成。

  • 跨多个Topic-Partition的幂等性写入,Kafka 可以保证跨多个 Topic-Partition 的数据要么全部写入成功,要么全部失败,不会出现中间状态。

事务属性实现前提是幂等性,即配置事务属性还必须还得配置幂等性,但是幂等性是可以独立使用的,不需要依赖事务属性。如果需要保证消息的事务性,需保证如下配置:

  • enable.idempotence = true,transactional.id不设置:只支持幂等性。

  • enable.idempotence = true,transactional.id设置:支持事务属性和幂等性。

  • enable.idempotence = false,transactional.id不设置:没有事务属性和幂等性。

  • enable.idempotence = false,transactional.id设置:无法获取到PID,此时会报错。

更多文章请加入公众号

nuuqYvU.jpg!mobile


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK