

kafka生产者你不得不知的那些事儿 - JAVA旭阳
source link: https://www.cnblogs.com/alvinscript/p/17422784.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

kafka生产者作为消息发送中很重要的一环,这里面可是大有文章,你知道生产者消息发送的流程吗?知道消息是如何发往哪个分区的吗?如何保证生产者消息的可靠性吗?如何保证消息发送的顺序吗?如果对于这些问题还比较模糊的话,那么很有必要看看这篇文章了,本文主要是基于kafka3.x版本讲解。
生产者流程
kafka生产者最重要的就是消息发送的整个流程,我们来看下究竟是怎么一回事把。
在消息发送的过程中,涉及到了两个线程——main
线程和 Sender
线程。在 main
线程中创建了一个双端队列 RecordAccumulator
。main
线程将消息发送给 RecordAccumulator
,Sender
线程不断从 RecordAccumulator
中拉取消息发送到 Kafka Broker
。
- 在主线程中由
kafkaProducer
创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator
, 也称为消息收集器)中。
- 拦截器: 可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。
- 序列化器: 用于在网络传输中将数据序列化为字节流进行传输,保证数据不会丢失。
- 分区器: 用于按照一定的规则将数据分发到不同的kafka broker节点中
Sender
线程负责从RecordAccumulator
获取消息并将其发送到Kafka
中。
RecordAccumulator
主要用来缓存消息以便Sender
线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator
缓存的大小可以通过生产者客户端参数buffer.memory
配置,默认值为33554432B
,即32M
。- 主线程中发送过来的消息都会被迫加到
RecordAccumulator
的某个双端队列(Deque
)中,RecordAccumulator
内部为每个分区都维护了一个双端队列,即Deque<ProducerBatch>
, 消息写入缓存时,追加到双端队列的尾部。 Sender
读取消息时,从双端队列的头部读取。ProducerBatch
是指一个消息批次;与此同时,会将较小的ProducerBatch
凑成一个较大ProducerBatch
,也可以减少网络请求的次数以提升整体的吞吐量。ProducerBatch
大小可以通过batch.size
控制,默认16kb
。Sender
线程会在有数据积累到batch.size
,默认16kb,或者如果数据迟迟未达到batch.size
,Sender
线程等待linger.ms
设置的时间到了之后就会获取数据。linger.ms
单位ms
,默认值是0ms
,表示没有延迟。
Sender
从RecordAccumulator
获取缓存的消息之后,会将数据封装成网络请求<Node,Request>
的形式,这样就可以将Request
请求发往各个Node
了。- 请求在从
sender
线程发往Kafka
之前还会保存到InFlightRequests
中,它的主要作用是缓存了已经发出去但还没有收到服务端响应的请求。InFlightRequests
默认每个分区下最多缓存5个请求,可以通过配置参数为max.in.flight.request.per. connection
修改。 - 请求
Request
通过通道Selector
发送到kafka
节点。 - 发送后,需要等待kafka的应答机制,取决于配置项
acks
.
- 0:生产者发送过来的数据,不需要等待数据落盘就应答。
- 1:生产者发送过来的数据,
Leader
收到数据后应答。 - -1(all):生产者发送过来的数据,Leader和副本节点收齐数据后应答。默认值是-1,-1 和all 是等价的。
Request
请求接受到kafka的响应结果,如果成功的话,从InFlightRequests
清除请求,否则的话需要进行重发操作,可以通过配置项retries
决定,当消息发送出现错误的时候,系统会重发消息。retries
表示重试次数。默认是 int 最大值,2147483647
。- 清理消息累加器
RecordAccumulator
中的数据。
生产者重要参数
现在我们来看看kafka生产者中常用且关键的配置参数。
bootstrap.servers
生产者连接集群所需的 broker 地 址 清 单 。 例 如hadoop102:9092,hadoop103:9092,hadoop104:9092
,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker
里查找到其他 broker
信息。
key.serializer
和value.serializer
指定发送消息的 key 和 value 的序列化类型。一定要写全类名。
buffer.memory
RecordAccumulator
缓冲区总大小,默认 32m。
batch.size
缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms
如果数据迟迟未达到 batch.size
,kafka等待这个时间之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms
之间。
max.request.size
这个参数用来限制生产者客户端能发送的消息的最大值,默认值为 1048576B ,即 lMB 一般情况下,这个默认值就可以满足大多数的应用场景了。
compression.type
这个参数用来指定消息的压缩方式,默认值为“none
",即默认情况下,消息不会被压缩。该参数还可以配置为 "gzip
","snappy
" 和 "lz4
"。对消息进行压缩可以极大地减少网络传输、降低网络 I/O,从而提高整体的性能 。消息压缩是一种以时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩;
acks
acks
的值为0,1和-1或者all。
- 0表示
Producer
往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。 - 1表示
Producer
往集群发送数据只要Leader
成功写入消息就可以发送下一条,只确保Leader
接收成功。 - -1 或 all表示
Producer
往集群发送数据需要所有的ISR Follower
都完成从Leader
的同步才会发送下一条,确保Leader发送
成功和所有的副本都成功接收。安全性最高,但是效率最低。
max.in.flight.requests.per.connection
允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字。
retries
和retry.backoff.ms
当消息发送出现错误的时候,系统会重发消息。retries
表示重试次数。在kafka3.4.0默认是 int 最大值,2147483647
。如果设置了重试,还想保证消息的有序性,需要设置max.in.flight.requests.per.connection
=1否则在重试此失败消息的时候,其他的消息可能发送成功了。另外retry.backoff.ms
控制两次重试之间的时间间隔,默认是 100ms。
更多kafka生产者的配置可以查阅官网https://kafka.apache.org/documentation/#producerconfigs
。
生产者发送消息API
生产者发送demo
通常情况下,生产者发送消息分为以下4个步骤:
(1)配置生产者客户端参数及创建相应的生产者实例
(2)构建待发送的消息
(3)发送消息
(4)关闭生产者实例
我们直接上代码。
- 引入maven依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.0</version>
</dependency>
- 核心发送逻辑
public static void main(String[] args) {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new
ProducerRecord<>("first", Integer.toString(i), "hello " + i));
}
// 5. 关闭资源
kafkaProducer.close();
}
- 消息对象
ProducerRecord
kafka发送时主要构造出ProducerRecord
对象,包含发送的主题,partition,key,value等。
public class ProducerRecord<K, V> {
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
}
三种发送模式
kafka提供了3种发送消息的模式,发后即忘,同步发送和异步发送,我们直接上代码。
- 发后即忘(
fire-and-forget
)
发后即忘,它只管往 Kafka 发送,并不关心消息是否正确到达。 在大多数情况下,这种发送方式没有问题。 不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。 这种发送方式的性能最高,可靠性最差。
Future<RecordMetadata> send = producer.send(rcd);
- 同步发送(
sync
****)
只需在上面种发送方式的基础上,再调用一下 get()方法即可,该方法时阻塞的。
// 同步发送
kafkaProducer.send(new ProducerRecord<>("first","kafka" + i)).get();
- 带回调异步发送(
async
****)
回调函数会在 producer
收到 ack
时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata
和Exception
,如果 Exception
为 null
,说明消息发送成功,如果 Exception
不为 null
,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new
ProducerRecord<>("first", Integer.toString(i), "hello " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
// 没有异常,输出信息到控制台
System.out.println(" 主题: " +
metadata.topic() + "->" + "分区:" + metadata.partition());
} else {
// 出现异常打印
exception.printStackTrace();
}
}
});
}
生产者发送核心机制
生产者分区机制
kafka设计上存在分区的,它有下面两个好处:
- 便于合理使用存储资源,每个
Partition
在一个Broker
上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker
上。合理控制分区的任务,可以实现负载均衡的效果。 - 提高并行度和吞吐量,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。
那究竟生产者是按照什么样的策略发往到不同的分区呢?
根据生产者的发送流程,其中会经过分区器,默认情况下是使用DefaultPartitioner
,具体逻辑如下:
- 按指定分区发送
kafka
发送消息的时候构造消息对象ProducerRecord
,可以传入指定的partition
, 那么消息就会发送这个指定的分区。例如partition=0,所有数据写入分区0。
// 发送消息到0号分区
kafkaProducer.send(new
ProducerRecord<>("first", 0, Integer.toString(i), "hello " + i));
- 没有指明
partition
值但有key
的情况下,将key
的hash
值与topic
的partition
数进行取余得到partition
值;
例如:key1
的hash
值=5, key2
的hash
值=6 ,topic
的partition
数=2,那么key1
对应的value1
写入1号分区,key2
对应的value2
写入0号分区。
- 既没有
partition
值又没有key
值的情况下,Kafka采用Sticky Partition
(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch
已满或者已完成,Kafka
再随机一个分区进行使用(和上一次的分区不同)。
例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms
设置的时间到, Kafka
再随机一个分区进行使用(如果还是0会继续随机)。
- 自定义分区器
如果默认的分区规则不满足需求,我们也可以自定义一个分区器。比如我们实现一个分区器实现,发送过来的数据中如果包含 alvin
,就发往 0 号分区,不包含 alvin
,就发往 1 号分区。
- 实现分区器接口
Partitioner
/**
* 1. 实现接口 Partitioner
* 2. 实现 3 个方法:partition,close,configure
* 3. 编写 partition 方法,返回分区号
*/
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取消息
String msgValue = value.toString();
// 创建 partition
int partition;
// 判断消息是否包含 alvin
if (msgValue.contains("alvin")){
partition = 0;
}else {
partition = 1;
}
// 返回分区号
return partition;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
- 配置分区器
// 添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.alvin.kafka.producer.MyPartitioner");
// 发送消息 略~~
如何提高生产者吞吐量?
对比着前面kafka生产者的发送流程,kafka生产者提供的一些配置参数可以有助于提高生产者的吞吐量。
参数名称 | 描述 |
---|---|
buffer.memory |
RecordAccumulator 缓冲区总大小,默认 32m。适当增加该值,可以提高吞吐量。 |
batch.size |
缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。 |
linger.ms |
如果数据迟迟未达到 batch.size ,sender 线程等待 linger.time 之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。 |
compression.type |
指定消息的压缩方式,默认值为“none ",即默认情况下,消息不会被压缩。该参数还可以配置为 "gzip ","snappy " 和 "lz4 "。对消息进行压缩可以极大地减少网络传输、降低网络 I/O,从而提高整体的性能 。 |
如何保证生产者消息的可靠性?
为了保证消息发送的可靠性,kafka
在 producer
里面提供了消息确认机制。我们可以通过配置来决 定消息发送到对应分区的几个副本才算消息发送成功。可以在定义 producer
时通过 acks
参数指定。
- acks=0
生产者发送过来的数据,不需要等数据落盘应答。
- acks=1(默认值)
生产者发送过来的数据,Leader
收到数据后应答。
- acks=-1或者all
生产者发送过来的数据,Leader
和ISR
队列里面的所有节点收齐数据后应答。
ISR
概念:(同步副本)。每个分区的leader
会维护一个ISR
列表,ISR
列表里面就是follower
副本 的Borker
编 号 , 只 有 跟 得 上Leader
的follower
副 本 才 能 加 入 到ISR
里 面 , 这 个 是 通 过replica.lag.time.max.ms
=30000(默认值)参数配置的,只有ISR
里的成员才有被选为leader
的可能。
如果Leader
收到数据,所有Follower
都开始同步数据,但有一个Follower
,因为某种故障,迟迟不能与Leader
进行同步,那这个问题怎么解决呢?
Leader
维护了一个动态的in-sync replica set
(ISR
),意为和Leader
保持同步的Follower+Leader
集合(leader:0,isr:0,1,2)
。如果Follower
长时间未向Leader
发送通信请求或同步数据,则该Follower
将被踢出ISR
。该时间阈值由replica.lag.time.max.ms
参数设定,默认30s
。
小结:数据完全可靠条件 = ACK
级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2。
acks=0
,生产者发送过来数据就不管了,可靠性差,效率高;acks=1
,生产者发送过来数据Leader
应答,可靠性中等,效率中等;acks=-1或者all
,生产者发送过来数据Leader
和ISR
队列里面所有Follwer
应答,可靠性高,效率低;
在生产环境中,acks=0
很少使用;acks=1
,一般用于传输普通日志,允许丢个别数据;acks=-1
,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。
如何保证消息只发送一次?
kafka作为分布式消息系统,难免会出现重复消息或者丢消息的情况,会存在3种数据传递语义。
- 最多一次(At Most Once)
ack级别设置为0, 可以保证数据不重复,但是不能保证数据不丢失, 所以叫做最多一次。
- 至少一次(At Least Once)
ack级别设置为-1 + 分区副本大于等于2 + ISR
里应答的最小副本数量大于等于2可能会出现至少一次的消息。比如下图中在发送过程Leader节点宕机,消息就会重试,就有可能出现消息的重复。
At Least Once
可以保证数据不丢失,但是不能保证数据不重复。
- 精确一次(Exactly Once)
对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。这在kafka中可以通过幂等性和事务的特性实现。
精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2) 。
幂等性,简单来说,就是一个操作重复做,每次的结果都一样。开启幂等性功能,参数enable.idempotence
设置为 true即可,在3.x版本中默认情况下也是true。具体实现原理如下:
- 每一个
producer
在初始化时会生成一个producer_id
,并为每个目标partition
维护一个“序列号”。 producer
每发送一条消息,会将<producer_id
,分区>对应的“序列号”加 1。broker
服务端端会为每一对<producer_id,分区>
维护一个序列号,对于每收到的一条消息,会判断服务端 的SN_old
和接收到的消息中的SN_new
进行对比:
-
- 如果
SN_OLD+1
=SN_NEW
,正常情况 - 如果
SN_old+1
>SN_new
,说明是重复写入的数据,直接丢弃 - 如果
SN_old+1
<SN_new
,说明中间有数据尚未写入,或者是发生了乱序,或者是数据丢失,将抛出严重异常:OutOfOrderSequenceException
。
- 如果
如何保证生产者消息的顺序?
根据前面的生产者发送流程可以知道,要想保证消息投递的顺序性:
- 首先要保证单分区,因为单分区内是有序的,多分区,分区与分区间无序。
- kafka在1.x版本之前保证数据单分区有序,条件如下:
max.in.flight.requests.per.connection=1
- kafka在1.x及以后版本保证数据单分区有序,条件如下:
- 未开启幂等性,
max.in.flight.requests.per.connection
需要设置为1。 - 开启幂等性,
max.in.flight.requests.per.connection
需要设置小于等于5。
因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer
发来的最近5个request
的元数据,故无论如何,都可以保证最近5个request
的数据都是有序的。
本文总结了kafka生产者整个消息发送的流程,只有明白了这个流程以后,那么我们对于一些生产者消息发送的一些问题才有更加深刻的理解。
欢迎关注个人公众号【JAVA旭阳】交流学习
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK