13

Kafka 怎么不丢数据?

 3 years ago
source link: https://xie.infoq.cn/article/98858eb1dc7224840bcae52db
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

​ 在Kafka中broker一般有多个,它们组成一个高容错的集群。Broker的主要职责是接受producer和consumer的请求,并把消息持久化到本地磁盘。Broker以topic为单位将消息分布到不同的分区,每个分区可以有多个副本,通过数据冗余的方式实现容错。当partition存在多个副本时,只有一个是leader对外提供读写请求,其它均是follower不对外提供读写服务,只是同步leader中的数据,并在leader出现问题时,通过选举算法将其中的某一个提升为leader。

Broker以追加方式将消息写到磁盘文件,且每个分区中的消息被赋予了唯一整数标识偏移量offset,broker仅提供基于offset的读取方式,不维护各consumer当前已消费消息的offset值,而是由consumer各自维护当前offset。Consumer向broker读取数据时发送起始offset值,broker将之后的消息流式发送过去,Broker中保存的数据是有有效期的,一旦超过了有效期,对应的数据将被移除以释放磁盘空间,只要数据在有效期内,consumer可以重复消费。

Kafka Broker能够保证同一topic下同一partition内部的消息是有序的,但无法保证partition之间的消息全局有序,这意味着一个consumer读取某个topic下多个分区的消息时,和写入的顺序不一致。

一、存储

Broker中以/tmp/kafka_logs来设置消息文件存储目录,如果有多个分区将会在/tmp/kafka_logs下生成多个partition目录,partition目录的命名规则:topic名_分区序号。每个partition物理上是由多个segment组成,segment文件由index和log文件组成,每个log文件的大小由log.segment.bytes控制,写满后两个文件的命名规则:当前文件最小的offset值为名,20位数字字符长度,不足用0补充,例如:00000000000000000123.index。索引文件中的元数据指向数据文件中message的物理偏移位置,例如:该index文件中元数据[6,856]表示在分区中第123+6=129个消息,物理偏移offset是865。

每个分区内部的数据都是有序的,用一个offset来标识每一条数据的位置,但只是仅限于分区内的有序,offset不是message在分区中的实际存储位置,而是逻辑上的一个值,它唯一确定分区中的一条Message:

  • 相对offset:是该segment中的相对offset

  • Position:表示该条Message在数据文件中的绝对位置

每次有消息进入就往log文件中写入,那么就会造成大量的磁盘随机写入,所以引入数据缓存,将要写入的数据先缓存起来再统一写入,从而提升写入效率。kafka采用OS级别缓存pageCache,OS将闲置的memory用作disk caching,当数据写入时OS将数据写入pageCache,同时标记该page为dirty,当读取数据时,先从pageCache中查找,如果没有查到(发生缺页)则去磁盘中读取,由于数据在内存,存在系统down机内存数据丢失的风险。而对于broker来说数据只需要存储到内存,如果OS缓存失效就会导致kafka客户端commit的数据丢失,可以通过log.flush.interval.messages(一定的条数)和log.flush.interval.ms(一定的时间)设置强制写入到磁盘,所以kafka保证它存在于多个replica内存中,不保证被持久化到磁盘。

二、切分文件

index和log文件会存在多个文件,切分规则如下:

  • 当前日志分段文件的大小超过了 log.segment.bytes 配置的值(默认1GB)

  • 当前日志分段中消息的最大时间戳与系统的时间戳的差值大于log.roll.ms或log.roll.hours 的值(7天)

  • 索引文件的大小达到 log.index.size.max.bytes 配置的值默认值(10MB)

三、清理

清理日志实际上是清理过期的segment或者日志文件太大了需要删除最旧的数据,使得整体日志文件大小不超过指定的值,以维持日志文件的固定大小。log.cleanup.policy=delete表示采用删除日志的清理机制,默认5分钟执行一次日志删除,清理日志有三种策略:

  • 基于时间

配置log.retention.hours/minutes/ms(默认7天)参数来设定清除的时间,日志任务会清理超过指定阈值时间的segment文件,但是并不是根据最近修改时间(lastModifiedTime)来计算,而是根据Segment日志中最大的时间戳(largestTimeStamp)来计算,因为segment日志的lastModifiedTime可以被修改(分区副本进行了rebalance),lastModifiedTime并不能真实地反映出日志分段在磁盘的保留时间。

  • 基于日志大小

配置log.retention.bytes参数来设定清除的大小,日志任务会清理超过指定阈值大小的segment文件,该参数是日志文件的总大小,并不是单个Segment文件的大小。首先计算日志文件的总大小size和log.retention.size的差值diff,即需要删除的日志总大小,然后从日志文件中查找可删除的文件集合deletableSegments,之后就执行删除操作。

  • 基于起始偏移量

基于日志起始偏移量(logStartOffset)的删除策略依据是某segment日志的下一个segment的offset是否>=logStartOffset,若是则加入deletablesSegments。

32YJ3uJ.png!mobile

上图中第1个和第2个Segment会加入deletablesSegments集合,然后被删除。删除策略是以topic为级别的,所以不同的topic可以设置不同的删除策略。

三、压缩

日志压缩确保Kafka会为一个topic分区数据日志中保留message key的最后一个值,它解决了应用crash或应用在操作期间重启来重新加载缓存的场景。

IBNJJnu.png!mobile

Kafka日志压缩机制是细粒度key级别的保留机制,而不是基于时间的粗粒度。压缩后的offset可能是不连续的,比如上图中没有5和7,因为这些offset的消息被merge了,当需要消费这些offset消息时,将会拿到比这个offset大的offset对应的消息。

日志压缩提供了如下保证:

  • 所有跟上消费的consumer能消费到所有写入的消息,这些消息有连续的序列号。Topic的min.compaction.lag.ms用于保证消息写入多久后才会被压缩

  • 消息的顺序会被保留。压缩不会重排序消息,只是移除其中一部分

  • 消息的offset不会变更。这是消息在日志中的永久标志

  • 任何从头开始处理日志的consumer至少会拿到每个key的最终状态

Kafka支持GZIP、Snappy、LZ4 三种压缩算法,可通过compression.type 设定具体的压缩算法。通过设置log.cleaner.enable=true启用cleaner(默认false),log.cleanup.policy=compact启用压缩策略(默认delete)。压缩算法是要占用很大一部分cpu资源的并且耗时也是不小的,而压缩的目的很大程度上是为了提升网络传输的性能。

UrYjYn6.png!mobile

四、分区

Kafka中可以将Topic从物理上划分成一个或多个分区(Partition),被分布在集群中的多个broker上。

bIbqqqm.png!mobile

分区机制

Producer在生产数据时可以为每条消息指定Key,消息被发送到broker时会根据分区规则选择被存储到哪一个分区,如果分区规则设置的合理,那么所有的消息将会被均匀的分布到不同的分区中,这样就实现了负载均衡和水平扩展。

分区规则

默认情况下,Kafka根据消息的key来进行分区的分配,即hash(key) % numPartitions,如下图所示:

Y3aURf2.png!mobile

其中numPartitions就是Tpoic的分区总数。

在创建Topic时候可以配置num.partitions来指定默认的分区数。为topic创建分区时,分区数最好是broker数量的整数倍,这样才能使一个topic的分区均匀的分布在Kafka集群中。如果消息key为null时,那么producer将会把这条消息发送给随机的一个partition,然后把这个分区加入到缓存中以备后续的null key直接使用,但是会每隔10分钟清除缓存。当往Broker发送消息时修改了topic的分区数,producer可以在最多topic.metadata.refresh.interval.ms的时间之后动态感知到分区数的变化,并且可以将数据发送到新添加的分区中。

确定分区

通过将topic的消息分散到多个broker的多个分区,理论上一个topic的分区越多,整个集群的吞吐量就越大,但是每个分区都是自身的消耗,所以并不是分区越多越好,但是可以遵循一定的步骤来确定分区数:

创建一个只有1个分区的topic,然后测试这个topic的producer吞吐量和consumer吞吐量,假设它们的值分别是Tp和Tc,单位可以是MB/s,总的目标吞吐量是Tt,那么分区数 =  Tt / max(Tp, Tc)。

消费均衡

Consumer要确定从哪一个分区去取数据来消费,选择规则如下:

6NRZF3F.png!mobile

五、副本

分区的副本被称为replica,每个分区可以有多个副本,并且在副本集中会存在一个leader和多个follower,均匀的分布在多个broker。当leader节点挂掉之后,会从副本集中重新选出一个副本作为leader继续提供服务实现故障转移。副本的个数可以通过broker配置文件来设定,leader处理所有的read-write请求,follower只需要与leader同步数据即可。

YBbM3m.png!mobile

副本同步中有ISR副本的概念,ISR副本是leader和所有能够与leader保持基本同步的follower副本集合,由leader维护。由于消息复制延迟受到最慢同步副本的限制,因此快速检测出慢副本并将其从 ISR 中删除非常重要。如果follow副本和leader数据同步速度过慢导致【落后太多】,该follower将会被剔除出ISR副本,由min.insync.replicas设置ISR中follower的最小个数。

同步机制

Producer将消息发送到partition的leader上并写入其本地log后,其他follower将从leader pull数据,producer需要等待request.required.acks个副本同步完成才算成功提交消息,该参数有如下配置:

  • acks=0 生产者无需等待leader返回确认,但是无法保证消息是否被leader收到

  • acks=1 生产者需要等待leader副本成功写入日志。

  • acks=-1 leader副本和所有follower都写入日志才会向producer发送确认信息

副本同步的主要参数:

num.replica.fetchers:从一个broker同步数据的拉取线程数,可增加该broker的IO并行度。默认值:1

replica.fetch.wait.max.ms:对于follower replica而言,每个fetch请求的最大间隔时间,这个值应该比replica.lag.time.max.ms要小,否则对于吞吐量特别低的topic可能会导致ISR频繁抖动。默认值:500

replica.lag.time.max.ms:超时时间,即当follower在该时间内没有发送fetch请求或者在这个时间内没有追上leader当前的log end offset,则将这个follower从ISR中移除。默认值:10000(10S)

replica.fetch.min.bytes:每次fetch请求最少拉取的数据量,如果不满足这个条件,那么要等待 replica.fetch.wait.max.ms。默认值:1

不同步的原因:

  • 慢副本

在一定周期时间内follower不能追赶上leader。最常见的原因之一是I/O瓶颈导致follower追加复制消息速度慢于从leader拉取速度。

  • 卡住副本

在一定周期时间内follower停止从leader拉取请求。follower replica卡住了是由于GC暂停或follower失效或死亡。

  • 新启动副本

当用户给topic增加副本因子时,新的follower不在同步副本列表中,直到他们完全赶上了leader日志。

宕机恢复

  • 少部分副本宕机

当leader宕机了,会从follower选择一个作为leader,当宕机的旧leader重新恢复时,会把之前commit的数据清空,重新从新leader中pull数据。

  • 全部副本宕机

当全部副本宕机了有两种恢复方式:

等待ISR集合中的副本恢复后选举leader。等待时间较长,降低可用性

选择第一个恢复的副本作为新的leader,无论是否在ISR中,但是并未包含之前leader commit的数据,因此造成数据丢失

offset存储

Kafka新版本已默认将消费的offset迁入到了 Kafka 一个名为 __consumer_offsets 的Topic中。以消费的Group、Topic、Partition做为组合 Key。所有的消费offset都提交写入到上述的Topic中。因为这部分消息是非常重要,以至于是不能容忍丢数据的,所以消息的 acking 级别设置为了 -1,生产者等到所有的 ISR 都收到消息后才会得到ACK。

更多文章请加入公众号

nuuqYvU.jpg!mobile


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK