47

Kafka基础知识总结

 3 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzA4NzA5NzE5Ng%3D%3D&%3Bmid=2650228962&%3Bidx=2&%3Bsn=46071eafdfc92f87d7091bd20076a75a
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

点击关注上方“ 知了小巷 ”,

设为“置顶或星标”,第一时间送达干货。

1.Kafka分区复制和多数据中心架构

A7ZRfyE.png!web

ry2a2ua.png!web

2.Kafka压测

Kafka官方自带压力测试脚本:

(kafka-consumer-perf-test.sh、kafka-producer-perf-test.sh)。

Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。 一般都是网络IO达到瓶颈。

3.Kafka的机器数量

Kafka机器数量 = 2*(峰值生产速度 * 副本数 / 100)+1

4.Kafka 的日志保存时间

默认7天,可修改

5.Kafka的硬盘大小

每天的数据量 * 7天 / 70%

6.Kafka监控

公司自己开发的监控器;

开源的监控器:KafkaManager、KafkaMonitor、kafkaeagle

7.Kakfa分区数

分区数并不是越多越好,一般分区数不要超过集群机器数量。 分区数越多占用内存越大(ISR等),一个节点集中的分区也就越多,当它宕机的时候,对系统的影响也就越大。

分区数一般设置为:3-10个

8.副本数设定

一般我们设置成2个或3个,很多企业设置为2个。

9.多少个Topic

通常情况:多少个日志类型就多少个Topic。也有对日志类型进行合并的。

10.Kafka丢不丢数据

Ack=0,相当于异步发送,消息发送完毕即offset增加,继续生产。

Ack=1,leader收到leader replica 对一个消息的接受ack才增加offset,然后继续生产。

Ack=-1,leader收到所有replica 对一个消息的接受ack才增加offset,然后继续生产。

Kafka消息送达语义说明

11.Kafka的ISR副本同步队列

ISR(In-Sync Replicas),副本同步队列。 ISR中包括Leader和Follower。 如果Leader进程挂掉,会在ISR队列中选择一个服务作为新的Leader。

replica.lag.max.messages (延迟条数)和 replica.lag.time.max.ms (延迟时间)两个参数决定一台服务是否可以加入ISR副本队列,在 0.10 版本移除了 replica.lag.max.messages 参数,防止服务频繁的进出队列。

任意一个维度超过阈值都会把Follower剔除出ISR,存入 OSR(Outof-Sync Replicas) 列表, 新加入的Follower也会先存放在OSR中。

12.Kafka分区分配策略

在 Kafka内部存在两种默认的分区分配策略: Range和 RoundRobin。

Range是默认策略。 Range是对每个Topic而言的(即一个Topic一个Topic分), 首先对同一个Topic里面的 分区 按照 序号 进行排序,并对 消费者 按照 字母顺序 进行排序。 然后 用Partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。 如果除不尽,那么前面几个消费者线程将会多消费一个分区。

例如:我们有10个分区,两个消费者(C1,C2),3个消费者线程,10 / 3 = 3而且除不尽。

C1-0 将消费 0, 1, 2, 3 分区

C2-0 将消费 4, 5, 6 分区

C2-1 将消费 7, 8, 9 分区

第一步将所有主题分区组成 TopicAndPartition 列表,然后对TopicAndPartition列表按照hashCode进行排序,最后按照轮询的方式发给每一个消费线程。

13.Kafka中数据量计算

每天总数据量100g,每天产生1亿条日志, 10000万/24/60/60= 1150条/每秒钟

平均每秒钟:1150条

低谷每秒钟:50条

高峰每秒钟:1150条*(2-20倍)=2300条-23000条

每条日志大小:0.5k-2k

每秒多少数据量:2.3M-20MB

14.Kafka挂掉

Kafka本身日志

Kafka生产端日志

Kaf'ka消费端日志

网络内存

短期没事

15.Kafka消息数据积压,Kafka消费能力不足怎么处理? 

1.如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。(两者缺一不可)

2.如果是下游的数据处理不及时:提高每批次拉取的数量。 批次拉取数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压。

16.Kafka幂等性

Producer的幂等性指的是当发送同一条消息时,数据在Server端只会被持久化一次,数据不丟不重,但是这里的幂等性是有条件的:

1.Producer( 幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重 )。

2.幂等性不能跨多个Topic-Partition, 只能保证单个Partition内的幂等性 ,当涉及多个 Topic-Partition时,这中间的状态并没有同步。

17.Kafka事务

Kafka从0.11版本开始引入了事务支持。 事务可以保证Kafka在Exactly Once语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。

1.Producer事务

为了实现跨分区跨会话的事务,需要 引入一个全局唯一的Transaction ID ,并将Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID。

为了管理Transaction,Kafka 引入了一个新的组件Transaction Coordinator。 Producer就是通过和Transaction Coordinator交互获得Transaction ID对应的任务状态。Transaction Coordinator还负责将事务所有写入Kafka的一个 内部Topic ,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

2.Consumer事务

上述事务机制主要是从Producer方面考虑, 对于Consumer而言,事务的保证就会相对较弱,尤其时无法保证Commit的信息被精确消费。 这是由于Consumer可以通过offset访问任意信息,而且不同的Segment File生命周期不同,同一事务的消息可能会出现重启后被删除的情况。

18.Kafka数据重复

幂等性+ack-1+事务

Kafka数据重复,可以再下一级: SparkStreaming、redis或者hive中dwd层 去重 ,去重的手段: 分组、按照id开窗只取第一个值。

19. Kafka参数优化

broker配置优化

1.broker参数配置(server.properties)\

网络和io操作线程配置优化

# broker处理消息的最大线程数(默认为3)

num.network.threads=cpu核数+1

# broker处理磁盘IO的线程数

num.io.threads=cpu核数*2

2.log数据文件刷盘策略

# 每当producer写入10000条消息时,刷数据到磁盘

log.flush.interval.messages=10000

# 每间隔1秒钟时间,刷数据到磁盘

log.flush.interval.ms=1000

3.日志保留策略配置

# 保留三天,也可以更短 

#(log.cleaner.delete.retention.ms)

log.retention.hours=72

4.Replica相关配置

offsets.topic.replication.factor:3

# 这个参数指 新创建一个topic时,默认的Replica数量, Replica过少会影响数据的可用性,太多则会白白浪费存储资源,一般建议在2~3为宜。

Producer优化(producer.properties)

buffer.memory:33554432 (32m)

#在Producer端用来存放尚未发送出去的Message的缓冲区大小。缓冲区满了之后可以选择阻塞发送或抛出异常,由block.on.buffer.full的配置来决定。

compression.type:none

#默认发送不进行压缩,推荐配置一种适合的压缩算法,可以大幅度的减缓网络压力和broker的存储压力。

Consumer优化

num.consumer.fetchers:1

#启动Consumer的个数,适当增加可以提高并发度。

fetch.min.bytes:1

#每次Fetch Request至少要拿到多少字节的数据才可以返回。

fetch.wait.max.ms:100

#在Fetch Request获取的数据至少达到fetch.min.bytes之前,允许等待的最大时长。对应上面说到的Purgatory中请求的超时时间。

Kafka内存调整(kafka-server-start.sh)

默认内存1个G,生产环境尽量不要超过6个G。

export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g"

往期推荐:

Kafka消息送达语义说明

Apache Hadoop YARN:Client<-->ResourceManager源码解析

Apache Hadoop YARN:Client<-->ResourceManager源码DEBUG

Hadoop YARN:ApplicationMaster与ResourceManager交互源码解析

Hive企业级调优

HiveQL查询连续三天有销售记录的店铺

HiveQL实战蚂蚁森林低碳用户排名分析:解法一

HiveQL实战蚂蚁森林低碳用户排名分析:解法二

HiveQL实战蚂蚁森林植物申领统计分析

Hive-函数

Hive-查询

Hive-DML(Data Manipulation Language)数据操作语言

Hive-DDL(Data Definition Language)数据定义

Hive优化(整理版)

Spark Core之Shuffle解析

数据仓库开发规范

nIzeUzu.png!web

67Zzae3.gif

分享-点赞-在 看,谢谢 ~~


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK