52

入门Apache Kafka需要了解的方方面面

 3 years ago
source link: https://mp.weixin.qq.com/s?__biz=MzA4NzA5NzE5Ng%3D%3D&%3Bmid=2650230181&%3Bidx=1&%3Bsn=599630c608388173379c0e759fb6c7fc
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

点击关注上方“ 知了小巷 ”,

设为“置顶或星标”,第一时间送达干货。

入门Apache Kafka需要了解的方方面面

Apache Kafka是什么?

Apache Kafka是一个开源的 分布式消息引擎系统

Apache Kafka是消息引擎系统,也是一个分布式流处理平台(Distributed Streaming Platform)。

Kafka社区的早期定位是:一个分布式、分区化且带备份功能的提交日志(Commit Log)服务。

除了Kafka之外,还有诸如Pulsar、RabbitMQ等一类的系统都被称为 Messaging System

,由于消息系统具备消息 传输中转和持久化以及分布式自管理能力,因此加上

引擎

两个字更好一些。

关于Messaging System可参考:

https://www.sciencedirect.com/topics/computer-science/messaging-system

A messaging system is responsible for transferring data from one application to another so the applications can focus on data without getting bogged down on data transmission and sharing. Distributed messaging is based on the concept of reliable message queuing. Messages are queued asynchronously between client applications and messaging system. There are two types of messaging patterns . The first one is a point-to-point and the other one is “publish–subscribe” (pub-sub) messaging system. Most of the messaging systems follow the pub-sub pattern.

Kafka支持点对点模式和发布订阅模式。

JMS也支持上述两种模式。

点对点模式:

生产者发送一条消息到消息引擎,只有一个消费者能收到。

发布订阅模式:

发布者发送到指定主题topic的消息,只要是订阅了指定topic的订阅者就都可以收到消息。

比如,简单的描述这样一个事实:

系统A1、A2将 消息 发送给 消息引擎系统 ,系统B、C、D...从 消息引擎系统

中读取A1、A2发送的

消息

。A1、A2与B、C、D...没有直接交互(不需要建立TCP连接)。但是A1、A2与消息引擎系统之间是通过网络进行消息发送,B、C、D...也是通过网络从消息引擎系统消费消息。

上述事实涉及两个重要的点:

  1. 消息引擎传输的对象是: 消息

  2. 消息引擎设计机制:即实现 如何传输消息 这个功能【发送、持久化、消费】

再进一步就是:

  • 消息长什么样子,即消息格式

  • 如何高效低延迟高吞吐不丢失...地传输消息

这里的消息是指Kafka处理的主要对象,而我们“发送“的json字符串或者日志行是消息的一部分(value值)。

消息格式

  1. 能够充分表达业务语义、不能有歧义

  2. 最大限度保证可重用性和通用性

简单一点的,比如XML可扩展标记语言(Extensible Markup Language)?或者常用的数据交换格式JSON(JavaScript Object Notation)?

JSON: 使用方便,序列化过程的数据也非常便于开发者阅读,缺点是当数据量很大的时候,性能不太好;

XML:

序列化和反序列化是目前效率最低的,但可承载数据类型和数据量是最大的。

再比如Google Protocol Buffers、Apache Thrift?Avro?

Thrift: 大量语言都支持Thrift,传输支持文本图片视频,性能很好,反序列化和序列化都非常高效,数据量大也不会有太大影响,Thrift还提供RPC功能。缺点是Thrift是静态的数据交换,需要先确定好数据结构。

Protocol Buffers: 是一种二进制的数据格式,具有更高的传输、打包和解包效率,可以传输一切Thrift可以传输的东西。

Avro:

基于二进制数据传输,设计之初就用来支持数据密集型应用,适合于远程或本地大规模数据的存储和交换。

Kafka的消息:纯二进制的字节序列。消息还是结构化的,用到的时候转为二进制的字节序列(序列化)。

在源码层面:

位于clients模块。

以前是final class Record

/**
* A record: a serialized key and value along with the associated CRC and other fields
*/

public final class Record { // ...}

KAFKA-4816之后重构为:

interface Record

/**
* A log record is a tuple consisting of a unique offset in the log, a sequence number assigned by
* the producer, a timestamp, a key and a value.
*/

public interface Record { // ...}

目前Record的版本:kafka/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java

RecordBatch具体实现,默认是:

DefaultRecordBatch

// Defines the record format versions supported by Kafka.
// 到目前为止,Kafka Record有三个版本
public enum RecordVersion {
V0(0), V1(1), V2(2);
}

// Newer versions (magic versions 2 and above) will generally contain many records regardless of compression.
public interface RecordBatch extends Iterable<Record> {...}

v0版本:

crc32:校验值。校验范围为magic至value之间。
magic:消息格式版本号。此版本的magic值为0。
attributes:消息的属性。总共占1个字节,低3位表示压缩类型:0表示NONE 无压缩,1表示GZIP,2表示SNAPPY,3表示LZ4。其余位保留。
keyLength:表示消息的key的长度。如果key为-1,则表示没有设置key,即key=null
key:可为null。如果没有key,则无此字段。
valueLength:实际消息体的长度。如果为-1,则表示消息为空。
value:消息体。可以为空,比如墓碑(tombstone)消息。或者如我们自己的json字符串数据。

最小字节 = 8 + 4 + 4 + 1 + 1 + 4 + 4 = 26 个bits

v1版本:

v1版本的日志比v0多了一个字段:timestamp,日志时间戳。

该字段可以表示为两种时间戳:

  1. 消息日志生成的时间。

  2. 消息追加到日志文件里的时间。

由attributes里的第四位来判断,0:表示timestamp类型为日志生成时间,1:表示为日志写入日志文件里的时间。

v2版本:

v2版本的消息格式与v1有很大不同。v2版本的消息格式去掉了crc字段,增加了length(消息总长度〉、 timestampDelta(时间戳增量)、 offsetDelta (位移增量)以及headers信息,而且 attributes字段被弃用了。

可以看看默认实现:DefaultRecord。

DefaultRecord

注释比较详细:

/**
* This class implements the inner record format for magic 2 and above. The schema is as follows:
*
*
* Record =>
* Length => Varint
* Attributes => Int8
* TimestampDelta => Varlong
* OffsetDelta => Varint
* Key => Bytes
* Value => Bytes
* Headers => [HeaderKey HeaderValue]
* HeaderKey => String
* HeaderValue => Bytes
*
* Note that in this schema, the Bytes and String types use a variable length integer to represent
* the length of the field. The array type used for the headers also uses a Varint for the number of
* headers.
*
* The current record attributes are depicted below:
*
* ----------------
* | Unused (0-7) |
* ----------------
*
* The offset and timestamp deltas compute the difference relative to the base offset and
* base timestamp of the batch that this record is contained in.
*/

public class DefaultRecord implements Record { // ...}

Kafka有哪些需要了解的“专业名词”

Topic TopicPartition Log LogSegment

KafkaProducer KafkaConsumer

Broker Kafka

Replica Partition

Consumer Group

Kafka数据存储目录:

server.properties

$ vi server.properties
############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/xxxxx/kafka/kafkalogs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
# 默认每个Topic在不指定的情况下是2个分区(一般默认3个,视具体情况而定)
num.partitions=2

Topic:example

TopicPartition:0和1

$ ls kafkalogs/
example-0
example-1

Topic

Topic就是一个字符串,翻译为主题,可以为不同业务、不同应用甚至是不同数据都创建专属的主题。

public class Topic {
// Kafka内部使用的两个Topic
// __consumer_offsets
// __transaction_state
public static final String GROUP_METADATA_TOPIC_NAME = "__consumer_offsets";
public static final String TRANSACTION_STATE_TOPIC_NAME = "__transaction_state";
private static final Set<String> INTERNAL_TOPICS = Collections.unmodifiableSet(
Utils.mkSet(GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME));

// ...
}

TopicPartition

一个字符串topic,一个int的partition号。

主题+分区号。

/**
* A topic name and partition number
*/

public final class TopicPartition implements Serializable {
private static final long serialVersionUID = -613627415771699627L;

private int hash = 0;
private final int partition;
private final String topic;

public TopicPartition(String topic, int partition) {
this.partition = partition;
this.topic = topic;
}
// ...
}

具体文件在磁盘上长成下面的样子:

example-0是Topic和TopicPartition。

log文件有有.index和.log两个。

$ ls example-0/
00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex leader-epoch-checkpoint

Log

An append-only log for storing messages.

Kafka使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Append-only)消息的物理文件。因为只能追加写入,自然也就避免了缓慢的随机I/O操作,采取性能较好的

顺序I/O写操作

,这也是实现Kafka高吞吐量特性的一个重要手段。

kafka/core/src/main/scala/kafka/log/Log.scala

@threadsafe
class Log(...) extends Logging with KafkaMetricsGroup { // ...}

LogSegment

如果不停地向一个日志写入消息,最终也会耗尽所有的磁盘空间,因此Kafka可配置定期地删除消息以回收磁盘空间,通过日志段(Log Segment)机制来实现定期删除。

日志段,一组文件xxx.index和xxx.log称之为segment file,分别表示为segment索引文件和内容文件。log文件里面记录的是真正的Kafka消息;index文件记录的是offset(位置、位移或者偏移量)索引,是从逻辑上的offset到物理文件位置的映射。

每一个日志段有一个初始offset,这个offset小等于当前日志段里所有消息中最小的offset,但是大于之前所有日志段里的所有offset。

在Kafka底层,一个日志会被近一步细分成多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka会自动切分出一个新的日志段,并将老的日志段封存起来。Kafka在后台还有定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的。

文件名:

  • base_offset.index

  • base_offset.log

kafka/core/src/main/scala/kafka/log/LogSegment.scala

@nonthreadsafe
class LogSegment private[log] (...) extends Logging { // ...}

上面是Kafka 消息本身相关 的概念,下面是消息是通过什么来发送和消费的。

发送消息的客户端应用程序被称为Producer,接收消息的客户端应用被称为Consumer,两者都是客户端,都位于Kafka源码中的 clients模块 中。

KafkaProducer

KafkaProducer实现了Producer接口

kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

// KafkaProducer
public class KafkaProducer<K, V> implements Producer<K, V> { // ...}

// The interface for the KafkaProducer
public interface Producer<K, V> extends Closeable { // ...}

KafkaConsumer

KafkaConsumer实现了Consumer接口

kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

// KafkaConsumer
public class KafkaConsumer<K, V> implements Consumer<K, V> { // ...}

// interface Consumer
public interface Consumer<K, V> extends Closeable { // ...}

Kafka 集群节点 ,被称为Broker。

Broker

Broker就是一个case class,并提供了object Broker的apply方法。

重点是它有一个EndPoint集合,维护着endPointsMap,EndPoint(主机、端口、监听者名称、安全协议)。

Broker

负责接收和处理客户端发送过来的请求,以及对消息进行持久化。

/**
* A Kafka broker.
*
* @param id a broker id
* @param endPoints a collection of EndPoint. Each end-point is (host, port, listener name, security protocol).
* @param rack an optional rack
* @param features supported features
*/

case class Broker(id: Int, endPoints: Seq[EndPoint], rack: Option[String], features: Features[SupportedVersionRange]) { // ...}

Kafka

Kafka服务的启动类,jps -l可以看到:

$ jps -l
73890 sun.tools.jps.Jps
52957 kafka.Kafka

源码位置在:

kafka/core/src/main/scala/kafka/Kafka.scala

Kafka.scala里的

main方法

,一路走下去,会调到KafkaServer.scala的startup方法,里面会调用createBrokerInfo方法,BrokerInfo里面传入Broker。

查看 kafka服务的启动脚本

kafka/bin/kafka-server-start.sh

Kafka服务启动主类,即是

kafka.Kafka

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

6ry6niU.png!mobile

Replica

Replication,即备份机制。

备份的思想很简单,就是把相同的数据拷贝到多台机器上,而这些相同的数据拷贝在 Kafka 中被称为副本(Replica)。

kafka/core/src/main/scala/kafka/cluster/Replica.scala

class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Logging { // ...}

哪个地方会用到Replica呢??当然是 Partition ....

kafka/core/src/main/scala/kafka/cluster/Partition.scala

class Partition(...) extends Logging with KafkaMetricsGroup { // ...}

Partition:

用来表示一个主题 分区的数据结构。https://cwiki.apache.org/confluence/display/KAFKA/Kafka+replication+detailed+design+V2

cwiki#Key data structures有相关说明。

经过源码重构之后,现在已经不是上面cwiki描述的样子了。

看看 IsrState ,IsrState是一个密封起来的特质:

// 被提交到ZK的in-sync副本,Int类型的集合
sealed trait IsrState {
/**
* Includes only the in-sync replicas which have been committed to ZK.
*/

def isr: Set[Int]
// ...
}

分区中的所有副本统称为 AR(Assigned Repllicas) 。所有与leader副本保持一定程度同步的副本(包括Leader)组成 ISR(In-Sync Replicas)ISR集合是AR集合中的一个子集 。消息会先发送到leader副本,然后 follower副本 才能 从leader副本中拉取消息进行同步 ,同步期间,follower副本相对于leader副本而言会有一定程度的滞后。前面所说的“一定程度”是指可以忍受的滞后范围,这个范围可以通过参数进行配置。与leader副本同步滞后过多的副本(不包括leader)副本,组成 OSR(Out-Sync Relipcas) ,由此可见:AR=ISR+OSR。在正常情况下,所有的follower副本都应该与leader副本保持一定程度的同步,即AR=ISR,OSR集合为空。

副本的数量是可以配置的,这些副本保存着相同的数据,但却有不同的角色和作用。Kafka 定义了两类副本:领导者副本(Leader Replica)和追随者副本(Follower Replica)。实际上使用两个case object来标识的【case object用于模式匹配、可以序列化】。

object QuotaType  {
case object Fetch extends QuotaType
case object Produce extends QuotaType
case object Request extends QuotaType
case object ControllerMutation extends QuotaType
// LeaderReplication
case object LeaderReplication extends QuotaType
// FollowerReplication
case object FollowerReplication extends QuotaType
case object AlterLogDirsReplication extends QuotaType

def toClientQuotaType(quotaType: QuotaType): ClientQuotaType = {
quotaType match {
case QuotaType.Fetch => ClientQuotaType.FETCH
case QuotaType.Produce => ClientQuotaType.PRODUCE
case QuotaType.Request => ClientQuotaType.REQUEST
case QuotaType.ControllerMutation => ClientQuotaType.CONTROLLER_MUTATION
case _ => throw new IllegalArgumentException(s"Not a client quota type: $quotaType")
}
}
}

只有LeaderReplication对外提供服务,这里的对外指的是与客户端程序进行交互;而FollowerReplication只是被动地追随领导者副本而已,不能与外界进行交互。

Kafka中副本的工作机制

生产者总是向LeaderReplication写消息;而消费者总是从LeaderReplication读消息。至于FollowerReplication,只做一件事:向LeaderReplication发送请求,请求Leader把最新生产的消息发给它,这样它能保持与Leader的同步。

副本机制可以保证数据的持久化或消息不丢失。而Partition,分区,类似Sharding、Region等,是把集中的数据按照某种规则分散到不同节点上去,实现分布式系统最基本的 Scalability(伸缩性)

Kafka中的分区机制

将每个主题划分成多个分区(Partition),每个分区下面有一组有序的消息日志。生产者生产的每条消息只会被发送到一个分区中,比如向一个默认2个分区的主题发送一条消息,这条消息要么在分区0中,要么在分区1中。

Kafka的三层消息架构

  1. 第一层是主题层,每个主题可以配置M个分区,而每个分区又可以配置N个副本

  2. 第二层是分区层,每个分区的N个副本中只能有一个充当Leader角色,对外提供服务;其它N-1个副本是Follower副本,只是提供数据冗余之用

  3. 第三层是消息层,分区中包含若干条消息,每条消息的offset从0开始,依次递增

最后,客户端程序只能与分区的LeaderReplication进行交互。

Consumer Group

消费者组,指的是多个消费者实例共同组成一个组来消费一组主题。这组主题中的每个分区都只会被组内的一个消费者实例消费,其他消费者实例不能消费它。

引入消费者组,主要是为了提升消费者端的吞吐量。

多个消费者实例同时消费,加速整个消费端的吞吐量(TPS)。

消费者组除了能够分担不同的任务之外,还可以进行相互写作,自动检测消费者端实例健康状况,会自动把出现Failed的实例之前负责的分区转移给其他活着的消费者,也就是重平衡 (Rebalance)

需要注意:

同一个组下有多少消费者实例不是看进程数或线程数,而是看创建的KafkaConsumer实例数。

Offset

Consumer在消费消息的过程中会把当前消费到了分区的哪个位置上给记录下来,就是Consumer Offset。Consumer Offset和log分区里面的Offset是不一样的,log分区里面的Offset是分区内的消息位置,它是不变的,即一旦消息被成功写入到一个分区上,它的Offset值就是固定的了。Consumer Offset是随时变化的,Consumer消费到哪儿就会移动到哪儿,而且每个Consumer有着自己的Consumer Offset。

Kafka流处理

Kafka社区的思考:

与其使用Kafka把数据从一个系统传递到下一个系统中做处理,为何不直接在Kafka上实现一套流处理框架呢?

基于流处理的考虑,Kafka社区于0.10.0.0版本正式推出了流处理组件Kafka Streams,从此Kafka正式“变身”为分布式的流处理平台,而不仅仅是消息引擎系统。

Kafka作为流处理平台的优势:

  1. 更容易实现端到端的精确一致性或正确性(Correctness)

    要实现正确性和提供能够推导时间的工具。实现正确性是流处理能够匹敌批处理的基石。正确性一直是批处理的强项,而实现正确性的基石则是要求 框架能提供精确一次处理语义,即处理一条消息有且只有一次机会能够影响系统状态 。目前主流的大数据流处理框架都宣称实现了精确一次处理语义,但这是有限定条件的,即它们 只能实现框架内的精确一次处理语义,无法实现端到端的 。因为当这些框架与外部消息引擎系统结合使用时,它们 无法影响到外部系统的处理语义 ,所以如果你搭建了一套环境使得Spark或Flink从Kafka读取消息之后进行有状态的数据计算,最后再写回Kafka,那么只能保证在Spark或Flink内部,这条消息对于状态的影响只有一次。但是 计算结果有可能多次写入到Kafka

    ,因为它们不能控制Kafka的语义处理。而对于Kafka:所有的数据流转和计算都在Kafka内部完成,所以Kafka可以实现端到端的精确一次处理语义。

    Apache Flink支持端到端的exactly-once,其使用了两阶段提交SinkFunction和Kafka的事务。

  2. 对单纯流计算的定位

除了消息引擎和流处理平台之外,Kafka还能够用于 分布式存储 系统。

https://www.confluent.io/blog/okay-store-data-apache-kafka/

Kafka发行版

  1. Apache Kafka

    社区版Kafka:

    https://github.com/apache/kafka

    目前开发人数最多、版本迭代速度最快的Kafka。使用Apache Kafka碰到任何问题并提交问题到社区,社区都会比较及时地响应。Apache Kafka没有提供任何监控框架或工具,不过可以使用Kafka-Eagle或Kafka Manager。

  2. Confluent Kafka

    https://www.confluent.io/

    Confluent公司,主要从事商业化Kafka工具开发,并在此基础上发布了Confluent Kafka。Confluent Kafka提供了一些Apache Kafka没有的高级特性,比如跨数据中心备份、Schema注册中心以及集群监控工具等。Confluent Kafka目前分为免费版和企业版两种。不过,Confluent公司暂时没有发展国内业务的计划,相关的资料以及技术支持都很欠缺,很多国内Confluent Kafka使用者甚至无法找到对应的中文文档,因此目前Confluent Kafka在国内的普及率比较低。

  3. Cloudera/Hortonworks Kafka

    https://www.cloudera.com/products/open-source/apache-hadoop/apache-kafka.html

    Cloudera提供的CDH和Hortonworks提供的HDP是非常著名的大数据平台(合并后推出CDP等),里面集成了目前主流的大数据框架,能够帮助用户实现从分布式存储、集群调度、流处理到机器学习、实时数据库等全方位的数据处理。不管是CDH还是HDP里面都集成了Apache Kafka,也可以称为CDH Kafka和HDP Kafka。对于类似大数据云公司提供的Kafka,内嵌Apache Kafka。优势在于操作简单,节省运维成本;缺陷在于把控度低,演进速度较慢。

Kafka版本演进

Kafka版本:

0.7、0.8、0.9、0.10、0.11、1.0 和 2.0、2.1、2.2、2.3、2.4、2.5、2.6

2.6.0 is the latest release. The current stable version is 2.6.0.

作为技术选型和架构评估的一部分,重点关注 在kafka中,哪些版本引入了哪些重大的功能改进?

Kafka 从0.7版本升到0.8 之后,正式引入了副本机制。有了副本备份机制,Kafka就能够比较好地做到消息不丢失。这个时候的客户端API,是使用ZK进行访问的。其中,Producer客户端默认使用同步方式发送消息,吞吐量不高。虽然也支持异步发送,但实际场景中可能会造成消息丢失,因此 0.8.2.0版本 引入了新版本Producer API,Producer端需要指定Broker地址。

建议尽量使用比较新的版本 。如果不能升级大版本,建议至少要升级到 0.8.2.2 这个版本,因为该版本中老版本消费者API是比较稳定的。另外即使升到了0.8.2.2,也不要使用新版本Producer API,此时它的Bug还非常多。

0.9大版本 增加了基础的安全认证/权限功能,同时使用Java重写了新版本消费者API,另外还引入了Kafka Connect组件用于实现高性能的数据抽取。新版本Producer API在0.9这个版本中算比较稳定了。如果使用0.9版本,最好不要使用新版本Consumer API,因为Bug非常多。

0.10.0.0版本 引入了Kafka Streams。从这个版本起,Kafka正式升级成分布式流处理平台,只是暂时还不能上生产。如果生产环境依然在使用0.10大版本,建议至少升级到0.10.2.2,然后使用新版本Consumer API。0.10.2.2修复了一个可能导致Producer性能降低的Bug。

0.11.0.0版本 引入了两个重量级的功能变更:一个是提供幂等性Producer API以及事务(Transaction)API;另一个是对Kafka消息格式做了重构。如果对1.0版本是否适用于线上环境依然感到困惑,则至少将环境升级到0.11.0.3,因为这个版本的消息引擎功能已经非常完善了。

1.0和2.0版本 ,主要还是Kafka Streams的各种改进,在消息引擎方面并未引入太多的重大功能特性。

决定是否采用哪个版本,仍要弄清楚这个版本解决了旧版本的哪些痛点,而且没有增加其他新问题。

往期精选

Spark Core基础面试题总结(下)

Spark源码解析-Yarn部署流程(ApplicationMaster)

Spark源码解析-Yarn部署流程(SparkSubmit)

Spark Core基础面试题总结(上)

Spark技术栈-Scala

数据中台实战系列笔记

浅谈OLAP系统核心技术点(建议收藏)

HBase基础面试题总结

Hive基础面试题总结

MapReduce和YARN基础面试题总结

HDFS基础面试题总结

nIzeUzu.png!mobile


Recommend

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK