14

Apache Kafka内核深度剖析

 4 years ago
source link: https://insights.thoughtworks.cn/apache-kafka/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

摘要

目前来说市面上可以选择的消息队列非常多,像activemq,rabbitmq,zeromq已经被大多数人耳熟能详,特别像activemq早期应用在企业中的总线通信,基本作为企业级IT设施解决方案中不可或缺的一部分。目前来说Kafka已经非常稳定,并且逐步应用更加广泛,已经算不得新生事物,但是不可否认Kafka一枝独秀如同雨后春笋,非常耀眼,今天我们仔细分解一下Kafka,了解一下它的内幕。以下的内容版本基于当前最新的Kafka稳定版本2.4.0。文章主要包含以下内容:

  • Kafka为什么快
  • Kafka为什么稳
  • Kafka该怎么用

该文章为开篇引导之做,后续会有对应的HBase,Spark,Kylin,Pulsar等相关组件的剖析。

Kafka为什么快

快是一个相对概念,没有对比就没有伤害,因此通常我们说Kafka是相对于我们常见的activemq,rabbitmq这类会发生IO,并且主要依托于IO来做信息传递的消息队列,像zeromq这种基本纯粹依靠内存做信息流传递的消息队列,当然会更快,但是此类消息队列只有特殊场景下会使用,不在对比之列。

因此当我们说Kakfa快的时候,通常是基于以下场景:

  • 吞吐量: 当我们需要每秒处理几十万上百万message的时候,相对其他MQ,Kafka处理的更快。
  • 高并发: 当具有百万以及千万的consumer的时候,同等配置的机器下,Kafka所拥有的Producer和Consumer会更多。
  • 磁盘锁: 相对其他MQ,Kafka在进行IO操作的时候,其同步锁住IO的场景更少,发生等待的时间更短。

那么基于以上几点,我们来仔细探讨一下,为什么Kafka就快了。

消息队列的推拉模型

首先,如果我们单纯站在Consumer的角度来看“Kafka快”,是一个伪命题,因为相比其他MQ,Kafka从Producer产生一条Message到Consumer消费这条Message来看它的时间一定是大于等于其他MQ的,背后的原因涉及到消息队列设计的两种模型:推模型和拉模型,如下图所示:

E3a2Q3f.png!web

对于拉模型来说,Producer产生Message后,会主动发送给MQ Server,为了提升性能和减少开支,部分Client还会设计成批量发送,但是无论是单条还是批量,Producer都会主动推送消息到MQ Server,当MQ Server接收到消息后,对于拉模型,MQ Server不会主动发送消息到Consumer,同时也不会维持和记录消息的offset,Consumer会自动设置定时器到服务端去询问是否有新的消息产生,通常时间是不超过100ms询问一次,一旦产生新的消息则会同步到本地,并且修改和记录offset,服务端可以辅助存储offset,但是不会主动记录和校验offset的合理性,同时Consumer可以完全自主的维护offset以便实现自定义的信息读取。

对于推模型来说,服务端收到Message后,首先会记录消息的信息,并且从自己的元信息数据库中查询对应的消息的Consumer有谁,由于服务器和Consumer在链接的时候建立了长链接,因此可以直接发送消息到Consumer。

Kafka是基于拉模型的消息队列,因此从Consumer获取消息的角度来说,延迟会小于等于轮询的周期,所以会比推模型的消息队列具有更高的消息获取延迟,但是推模型同样又其问题。首先,由于服务器需要记录对应的Consumer的元信息,包括消息该发给谁,offset是多少,同时需要向Consumer推送消息,必然会带来系列的问题:假如这一刻网络不好,Consumer没有收到,消息没有发成功怎么办?假设消息发出去了,我怎么知道它有没有收到?因此服务器和Consumer之间需要首先多层确认口令,以达到至少消费一次,仅且消费一次等特性。

Kafka此类的拉模型将这一块功能都交由Consumer自动维护,因此服务器减少了更多的不必要的开支,因此从同等资源的角度来讲,Kafka具备链接的Producer和Consumer将会更多,极大的降低了消息堵塞的情况,因此看起来更快了。

OS Page Cache和Buffer Cache

太阳底下无新鲜事,对于一个框架来说,要想运行的更快,通常能用的手段也就那么几招,Kafka在将这一招用到了极致,其中之一就是极大化的使用了OS的Cache,主要是Page Cache和Buffer Cache。对于这两个Cache,使用Linux的同学通常不会陌生,例如我们在Linux下执行free命令的时候会看到如下的输出:

63QFjqZ.png!web

(图片来自网络)

会有两列名为buffers和cached,也有一行名为“-/+ buffers/cache”。这两个信息的具体解释如下:

pagecache:文件系统层级的缓存,从磁盘里读取的内容是存储到这里,这样程序读取磁盘内容就会非常快,比如使用Linux的grep和find等命令查找内容和文件时,第一次会慢很多,再次执行就快好多倍,几乎是瞬间。另外page cache的数据被修改过后,也即脏数据,等到写入磁盘时机到来时,会转移到buffer cache 而不是直接写入到磁盘。我们看到的cached这列的数值表示的是当前的页缓存(page cache)的占用量,page cache文件的页数据,页是逻辑上的概念,因此page cache是与文件系统同级的。

buffer cache:磁盘等块设备的缓冲,内存的这一部分是要写入到磁盘里的 。buffers列表示当前的块缓存(buffer cache)占用量,buffer cache用于缓存块设备(如磁盘)的块数据。块是物理上的概念,因此buffer cache是与块设备驱动程序同级的。

2Ive6vr.png!web

两者都是用来加速数据IO,将写入的页标记为dirty,然后向外部存储flush,读数据时首先读取缓存,如果未命中,再去外部存储读取,并且将读取来的数据也加入缓存。操作系统总是积极地将所有空闲内存都用作page cache和buffer cache,当os的内存不够用时也会用LRU等算法淘汰缓存页。

有了以上概念后,我们再看来Kafka是怎么利用这个特性的。首先,对于一次数据IO来说,通常会发生以下的流程:

Mn2IV36.jpg!web

  • 操作系统将数据从磁盘拷贝到内核区的pagecache
  • 用户程序将内核区的pagecache拷贝到用户区缓存
  • 用户程序将用户区的缓存拷贝到socket缓存中
  • 操作系统将socket缓存中的数据拷贝到网卡的buffer上,发送数据

可以发现一次IO请求操作进行了2次上下文切换和4次系统调用,而同一份数据在缓存中多次拷贝,实际上对于拷贝来说完全可以直接在内核态中进行,也就是省去第二和第三步骤,变成这样:

VJjyMrZ.jpg!web

正因为可以如此的修改数据的流程,于是Kafka在设计之初就参考此流程,尽可能大的利用os的page cache来对数据进行拷贝,尽量减少对磁盘的操作。如果kafka生产消费配合的好,那么数据完全走内存,这对集群的吞吐量提升是很大的。早期的操作系统中的page cache和buffer cache是分开的两块cache,后来发现同样的数据可能会被cache两次,于是大部分情况下两者都是合二为一的。

Kafka虽然使用JVM语言编写,在运行的时候脱离不了JVM和JVM的GC,但是Kafka并未自己去管理缓存,而是直接使用了OS的page cache作为缓存,这样做带来了以下好处:

  • JVM中的一切皆对象,所以无论对象的大小,总会有些额外的JVM的对象元数据浪费空间。
  • JVM自己的GC不受程序手动控制,所以如果使用JVM作为缓存,在遇到大对象或者频繁GC的时候会降低整个系统的吞吐量。
  • 程序异常退出或者重启,所有的缓存都将失效,在容灾架构下会影响快速恢复。而page cache因为是os的cache,即便程序退出,缓存依旧存在。

所以Kafka优化IO流程,充分利用page cache,其消耗的时间更短,吞吐量更高,相比其他MQ就更快了,用一张图来简述三者之间的关系如下:

UVRBN3j.png!web

当Producer和Consumer速率相差不大的情况下,Kafka几乎可以完全实现不落盘就完成信息的传输。

追加顺序写入

除了前面的重要特性之外,Kafka还有一个设计,就是对数据的持久化存储采用的顺序的追加写入,Kafka在将消息落到各个topic的partition文件时,只是顺序追加,充分的利用了磁盘顺序访问快的特性。

JvUrIru.png!web

(图片来自网络)

Kafka的文件存储按照topic下的partition来进行存储,每一个partition有各自的序列文件,各个partition的序列不共享,主要的划分按照消息的key进行hash决定落在哪个分区之上,我们先来详细解释一下Kafka的各个名词,以便充分理解其特点:

  • broker:Kafka中用来处理消息的服务器,也是Kafka集群的一个节点,多个节点形成一个Kafka集群。
  • topic:一个消息主题,每一个业务系统或者Consumer需要订阅一个或者多个主题来获取消息,Producer需要明确发生消息对于的topic,等于信息传递的口令名称。
  • partition:一个topic会拆分成多个partition落地到磁盘,在kafka配置的存储目录下按照对应的分区ID创建的文件夹进行文件的存储,磁盘可以见的最大的存储单元。
  • segment:一个partition会有多个segment文件来实际存储内容。
  • offset:每一个partition有自己的独立的序列编号,作用域仅在当前的partition之下,用来对对应的文件内容进行读取操作。
  • leader:每一个topic需要有一个leader来负责该topic的信息的写入,数据一致性的维护。
  • controller:每一个kafka集群会选择出一个broker来充当controller,负责决策每一个topic的leader是谁,监听集群broker信息的变化,维持集群状态的健康。

Rb6rua6.png!web

mINZVfa.png!web

可以看到最终落地到磁盘都是Segment文件,每一个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便老的 segment file快速被删除。因为Kafka处理消息的力度是到partition,因此只需要保持好partition对应的顺序处理,segment可以单独维护其状态。

segment的文件由index file和data file组成,落地在磁盘的后缀为.index和.log,文件按照序列编号生成,如下所示:

JjemMbn.png!web

(图片来自网络)

其中index维持着数据的物理地址,而data存储着数据的偏移地址,相互关联,这里看起来似乎和磁盘的顺序写入关系不大,想想HDFS的块存储,每次申请固定大小的块和这里的segment?是不是挺相似的?另外因为有index文的本身命名是以offset作为文件名的,在进行查找的时候可以快速根据需要查找的offset定位到对应的文件,再根据文件进行内容的检索。因此Kafka的查找流程为先根据要查找的offset对文件名称进行二分查找,找到对应的文件,再根据index的元数据的物理地址和log文件的偏移位置结合顺序读区到对应offset的位置的内容即可。

segment index file采取稀疏索引存储方式,它减少索引文件大小,通过mmap可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间,特别是在随机读取的场景下,Kafka非常不合适。所以因为Kafka特殊的存储设计,也让Kafka感觉起来,更快。

Kafka为什么稳

前面提到Kafka为什么快,除了快的特性之外,Kafka还有其他特点,那就是:稳。Kafka的稳体现在几个维度:

  • 数据安全,几乎不会丢数据。
  • 集群安全,发生故障几乎可以Consumer无感知切换。
  • 可用性强,即便部分partition不可用,剩余的partition的数据依旧不影响读取。
  • 流控限制,避免大量Consumer拖垮服务器的带宽。

限流机制

对于Kafka的稳,通常是由其整体架构设计决定,很多优秀的特性结合在一起,就更加的优秀,像Kafka的Qutota就是其中一个,既然是限流,那就意味着需要控制Consumer或者Producer的流量带宽,通常限制流量这件事需要在网卡上作处理,像常见的N路交换机或者高端路由器,所以对于Kafka来说,想要操控OS的网卡去控制流量显然具有非常高的难度,因此Kafka采用了另外一个特别的思路,即:没有办法控制网卡通过的流量大小,就控制返回数据的时间。对于JVM程序来说,就是一个wait或者seelp的事情。

所以对于Kafka来说,有一套特殊的时延计算规则,Kafka按照一个窗口来统计单位时间传输的流量,当流量大小超过设置的阈值的时候,触发流量控制,将当前请求丢入Kafka的Qutota Manager,等到延迟时间到达后,再次返回数据。我们通过Kafka的ClientQutotaManager类中的方法来看:

biqyQjq.png!web

这几行代码代表了Kafka的限流计算逻辑,大概的思路为:假设我们设定当前流量上限不超过T,根据窗口计算出当前的速率为O,如果O超过了T,那么会进行限速,限速的公示为:

X = (O - T)/ T * W

X为需要延迟的时间,让我举一个形象的例子,假设我们限定流量不超过10MB/s,过去5秒(公示中的W,窗口区间)内通过的流量为100MB,则延迟的时间为:(100-5*10)/ 10=5秒。这样就能够保障在下一个窗口运行完成后,整个流量的大小是不会超过限制的。通过KafkaApis里面对Producer和Consumer的call back代码可以看到对限流的延迟返回:

juquA3j.png!web

对于kafka的限流来讲,默认是按照client id或者user来进行限流的,从实际使用的角度来说,意义不是很大,基于topic或者partition分区级别的限流,相对使用场景更大,ThoughtWroks曾经帮助某客户修改Kafka核心源码,实现了基于topic的流量控制。

竞选机制

Kafka背后的元信息重度依赖Zookeeper,再次我们不解释Zookeeper本身,而是关注Kafka到底是如何使用zk的,首先一张图解释Kafka对zk的重度依赖:

Y7J3MbA.png!web

(图片来源于网络)

利用zk除了本身信息的存储之外,最重要的就是Kafka利用zk实现选举机制,其中以controller为主要的介绍,首先controller作为Kafka的心脏,主要负责着包括不限于以下重要事项:

VV3uQvU.png!web

也就是说Controller是Kafka的核心角色,对于Controller来说,采用公平竞争,任何一个Broker都有可能成为Controller,保障了集群的健壮性,对于Controller来说,其选举流程如下:

  • 先获取 zk 的 /cotroller 节点的信息,获取 controller 的 broker id,如果该节点不存在(比如集群刚创建时),* 那么获取的 controller id 为-1。
  • 如果 controller id 不为-1,即 controller 已经存在,直接结束流程。
  • 如果 controller id 为-1,证明 controller 还不存在,这时候当前 broker 开始在 zk 注册 controller;。
  • 如果注册成功,那么当前 broker 就成为了 controller,这时候开始调用 onBecomingLeader() 方法,正式初始化 controller(注意:controller 节点是临时节点,如果当前 controller 与 zk 的 session 断开,那么 controller 的临时节点会消失,会触发 controller 的重新选举)。
  • 如果注册失败(刚好 controller 被其他 broker 创建了、抛出异常等),那么直接返回。

其代码直接通过KafkaController可以看到:

AzE3MnE.png!web

一旦Controller选举出来之后,则其他Broker会监听zk的变化,来响应集群中Controller挂掉的情况:

vU7JviQ.png!web

从而触发新的Controller选举动作。对于Kafka来说,整个设计非常紧凑,代码质量相当高,很多设计也非常具有借鉴意义,类似的功能在Kafka中有非常多的特性体现,这些特性结合一起,形成了Kafka整个稳定的局面。

Kafka该怎么用

虽然Kafka整体看起来非常优秀,但是Kafka也不是全能的银弹,必然有其对应的短板,那么对于Kafka如何,或者如何能用的更好,则需要经过实际的实践才能得感悟的出。经过归纳和总结,能够发现以下不同的使用场景和特点。

  • Kafka 并不合适高频交易系统:Kafka虽然具有非常高的吞吐量和性能,但是不可否认,Kafka在单条消息的低延迟方面依旧不如传统MQ,毕竟依托推模型的MQ能够在实时消息发送的场景下取得先天的优势。
  • Kafka并不具备完善的事务机制:0.11之后Kafka新增了事务机制,可以保障Producer的批量提交,为了保障不会读取到脏数据,Consumer可以通过对消息状态的过滤过滤掉不合适的数据,但是依旧保留了读取所有数据的操作,即便如此Kafka的事务机制依旧不完备,背后主要的原因是Kafka对client并不感冒,所以不会统一所有的通用协议,因此在类似仅且被消费一次等场景下,效果非常依赖于客户端的实现。
  • Kafka的异地容灾方案非常复杂:对于Kafka来说,如果要实现跨机房的无感知切换,就需要支持跨集群的代理,因为Kafka特殊的append log的设计机制,导致同样的offset在不同的broker和不同的内容上无法复用,也就是文件一旦被拷贝到另外一台服务器上,将不可读取,相比类似基于数据库的MQ,很难实现数据的跨集群同步,同时对于offset的复现也非常难,曾经帮助客户实现了一套跨机房的Kafka 集群Proxy,投入了非常大的成本。
  • Kafka Controller架构无法充分利用集群资源:Kafka Controller类似于Es的去中心化思想,按照竞选规则从集群中选择一台服务器作为Controller,意味着改服务器即承担着Controller的职责,同时又承担着Broker的职责,导致在海量消息的压迫下,该服务器的资源很容易成为集群的瓶颈,导致集群资源无法最大化。Controller虽然支持HA但是并不支持分布式,也就意味着如果要想Kafka的性能最优,每一台服务器至少都需要达到最高配置。
  • Kafka不具备非常智能的分区均衡能力:通常在设计落地存储的时候,对于热点或者要求性能足够高的场景下,会是SSD和HD的结合,同时如果集群存在磁盘容量大小不均等的情况,对于Kafka来说会有非常严重的问题,Kafka的分区产生是按照paratition的个数进行统计,将新的分区创建在个数最少的磁盘上,见下图:

vUfEBbn.png!web

曾经我帮助某企业修改了分区创建规则,考虑了容量的情况,也就是按照磁盘容量进行分区的选择,紧接着带来第二个问题:容量大的磁盘具备更多的分区,则会导致大量的IO都压向该盘,最后问题又落回IO,会影响该磁盘的其他topic的性能。所以在考虑MQ系统的时候,需要合理的手动设置Kafka的分区规则。。

结尾

Kafka并不是唯一的解决方案,像几年前新生势头挺厉害的pulsar,以取代Kafka的口号冲入市场,也许会成为下一个解决Kafka部分痛点的框架,下文再讲述pulsar。

更多精彩洞见,请关注微信公众号ThoughtWorks洞见


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK