

Kafka 生产中常用运维命令
source link: https://chegva.com/5904.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Kafka 生产中常用运维命令
Kafka Architecture Distilled
Kafka名词术语:
消息:Record。Kafka是消息引擎,这里的消息就是指Kafka处理的主要对象。
主题:Topic。主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。
分区:Partition。一个有序不变的消息序列。每个主题下可以有多个分区。
消息位移:Offset。表示分区中每条消息的位置信息,是一个单调递增且不变的值。
副本:Replica。Kafka中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。
生产者:Producer。向主题发布新消息的应用程序。
消费者:Consumer。从主题订阅新消息的应用程序。
消费者位移:Consumer Offset。表征消费者消费进度,每个消费者都有自己的消费者位移。
消费者组:Consumer Group。多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。
重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance是Kafka消费者端实现高可用的重要手段。
Kafka运维常用操作:
创建主题:
kafka-topics.sh --create --zookeeper <zookeeper 地址> --replication-factor <副本数> --partitions <分区数> --topic <主题名>
kafka-topics.sh --bootstrap-server 172.16.1.3:9092,172.16.2.3:9092,172.16.3.3:9092 --create --topic test --partitions 3 --replication-factor 2 --config retention.ms=604800000
查看主题列表:
kafka-topics.sh --list --zookeeper <zookeeper 地址>
kafka-topics.sh --zookeeper 172.16.1.3:2181,172.16.2.3:2181,172.16.3.3:2181 --list
查看主题详情:
kafka-topics.sh --describe --zookeeper <zookeeper 地址> --topic <主题名>
kafka-topics.sh --zookeeper 172.16.1.3:2181,172.16.2.3:2181,172.16.3.3:2181 --describe --topic test
kafka-topics.sh --bootstrap-server 172.16.157.3:9092,172.16.157.4:9092,172.16.157.5:9092 --describe --topic test
创建生产者:
kafka-console-producer.sh --broker-list <Kafka 服务器地址> --topic <主题名>
创建消费者:
kafka-console-consumer.sh --bootstrap-server <Kafka 服务器地址> --topic <主题名> --from-beginning
kafka-console-consumer.sh --bootstrap-server 172.16.1.3:9092,172.16.2.3:9092,172.16.3.4:9092 --topic test --from-beginning
查看消费组:
kafka-consumer-groups.sh --bootstrap-server 172.16.1.3:9092,172.16.2.3:9092,172.16.3.3:9092 --list
查看消费者组的消费情况:
kafka-consumer-groups.sh --bootstrap-server <Kafka 服务器地址> --group <消费者组名> --describe
kafka-consumer-groups.sh --bootstrap-server 172.16.1.3:9092,172.16.2.3:9092,172.16.3.3:9092 --describe --group test-group
修改分区数:
kafka-topics.sh --zookeeper 172.16.1.3:2181,172.16.2.3:2181,172.16.3.3:2181 --alter --topic test --partitions 2
kafka-topics.sh --bootstrap-server 172.16.1.3:9092,172.16.2.3:9092,172.16.3.3:9092 --alter --topic test --partitions 2
创建主题时同时设置消息保留时间:
bin/kafka-topics.sh --bootstrap-server 172.16.1.3:9092,172.16.2.3:9092,172.16.3.3:9092 --create --topic test --partitions 3 --replication-factor 2 --config retention.ms=604800000 (7天)
修改topic消息保留时间:
kafka-configs.sh --alter --zookeeper 172.16.1.3:2181,172.16.2.3:2181,172.16.3.3:2181 --entity-type topics --entity-name test --add-config retention.ms=7200000
假设要设置主题级别参数max.message.bytes,那么命令如下:
bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760
设置常规的主题级别参数,使用 –zookeeper。设置动态参数,使用 –bootstrap-server。
主题级别参数是在创建或修改主题时设置的,它们会应用于整个主题。这些参数包括分区数、副本因子、数据保留策略等。主题级别参数的设置会影响整个主题的行为,一旦设置完成,除非修改主题配置,否则不会随着时间的推移而改变。修改后生效要重启broker。
动态参数是在运行时通过命令行或API进行设置的参数,可以在不停止或重新启动 Kafka 服务的情况下进行更改。这些参数包括日志清理策略、消息最大字节数、消息最大保留时间等。通过动态参数,可以实时地对 Kafka 服务进行调整和优化,以满足实时需求和变化的业务场景。
修改主题限速:
这里主要是指设置Leader副本和Follower副本使用的带宽。有时候,想要让某个主题的副本在执行副本同步机制时,不要消耗过多的带宽。Kafka提供了这样的功能。假设有个主题,名为test,想让该主题各个分区的Leader副本和Follower副本在处理副本同步时,不得占用超过100MBps的带宽。注意是大写B,即每秒不超过100MB。
要达到这个目的,必须先设置Broker端参数leader.replication.throttled.rate和follower.replication.throttled.rate,命令如下:
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0
这条命令结尾处的 –entity-name就是Broker ID。倘若该主题的副本分别在0、1、2、3多个Broker上,那么你还要依次为Broker 1、2、3执行这条命令。
设置好这个参数之后,还需要为该主题设置要限速的副本。在这个例子中,想要为所有副本都设置限速,因此统一使用通配符*来表示,命令如下:
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test
主题分区迁移:
同样是使用kafka-reassign-partitions脚本,对主题各个分区的副本进行“手术”般的调整,比如把某些分区批量迁移到其他Broker上。这种变更比较复杂。
删除主题:
bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic <topic_name>
删除主题的命令并不复杂,关键是删除操作是异步的,执行完这条命令不代表主题立即就被删除了。它仅仅是被标记成“已删除”状态而已。Kafka会在后台默默地开启主题删除操作。因此,通常情况下,需要耐心地等待一段时间。
位移重设还有另一个重要的途径:通过kafka-consumer-groups脚本。需要注意的是,这个功能是在Kafka 0.11版本中新引入的。这就是说,如果你使用的Kafka是0.11版本之前的,那么你只能使用API的方式来重设位移。
Kafka重设位移操作:
重设位移大致可以从两个维度来进行。
位移维度。这是指根据位移值来重设。也就是说,直接把消费者的位移值重设成给定的位移值。
时间维度。可以给定一个时间,让消费者把位移调整成大于该时间的最小位移;也可以给出一段时间间隔,比如30分钟前,然后让消费者直接将位移调回30分钟之前的位移值。
下面的这张表格罗列了7种重设策略。
Earliest策略表示将位移调整到主题当前最早位移处。这个最早位移不一定就是0,因为在生产环境中,很久远的消息会被Kafka自动删除,所以当前最早位移很可能是一个大于0的值。如果你想要重新消费主题的所有消息,那么可以使用Earliest策略。
Latest策略表示把位移重设成最新末端位移。如果你总共向某个主题发送了15条消息,那么最新末端位移就是15。如果你想跳过所有历史消息,打算从最新的消息处开始消费的话,可以使用Latest策略。
Current策略表示将位移调整成消费者当前提交的最新位移。有时候你可能会碰到这样的场景:你修改了消费者程序代码,并重启了消费者,结果发现代码有问题,你需要回滚之前的代码变更,同时也要把位移重设到消费者重启时的位置,那么,Current策略就可以帮你实现这个功能。
表中第4行的Specified-Offset策略则是比较通用的策略,表示消费者把位移值调整到你指定的位移处。这个策略的典型使用场景是,消费者程序在处理某条错误消息时,你可以手动地“跳过”此消息的处理。在实际使用过程中,可能会出现corrupted消息无法被消费的情形,此时消费者程序会抛出异常,无法继续工作。一旦碰到这个问题,你就可以尝试使用Specified-Offset策略来规避。
如果说Specified-Offset策略要求你指定位移的绝对数值的话,那么Shift-By-N策略指定的就是位移的相对数值,即你给出要跳过的一段消息的距离即可。这里的“跳”是双向的,你既可以向前“跳”,也可以向后“跳”。比如,你想把位移重设成当前位移的前100条位移处,此时你需要指定N为-100。
刚刚讲到的这几种策略都是位移维度的,下面来聊聊从时间维度重设位移的DateTime和Duration策略。
DateTime允许你指定一个时间,然后将位移重置到该时间之后的最早位移处。常见的使用场景是,你想重新消费昨天的数据,那么你可以使用该策略重设位移到昨天0点。
Duration策略则是指给定相对的时间间隔,然后将位移调整到距离当前给定时间间隔的位移处,具体格式是PnDTnHnMnS。如果你熟悉Java 8引入的Duration类的话,你应该不会对这个格式感到陌生。它就是一个符合ISO-8601规范的Duration格式,以字母P开头,后面由4部分组成,即D、H、M和S,分别表示天、小时、分钟和秒。举个例子,如果你想将位移调回到15分钟前,那么你就可以指定PT0H15M0S。
目前,重设消费者组位移的方式有两种。
通过消费者API来实现。
通过kafka-consumer-groups命令行脚本来实现。
比起API的方式,用命令行重设位移要简单得多。
Earliest策略直接指定--to-earliest:
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute
Latest策略直接指定--to-latest:
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute
Current策略直接指定--to-current:
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute
Specified-Offset策略直接指定--to-offset:
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset <offset> --execute
Shift-By-N策略直接指定--shift-by N:
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by <offset_N> --execute
DateTime策略直接指定--to-datetime:
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute
最后是实现Duration策略,直接指定--by-duration:
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute
</div
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK