1

我的 Kafka 旅程 - Linux下的安装 & 基础命令 - Sol·wang

 1 year ago
source link: https://www.cnblogs.com/Sol-wang/p/16691608.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

安装解压缩工具 tar

# 检查是否安装了解压缩工具 tar
yum list tar
# 如未安装 tar
yum install tar -y

安装必备的 java

# 检查是否安装了 java-openjdk,这里选择 java-1.8.0.openjdk 版
yum list java-1.8.0.openjdk
# 如未安装 java-openjdk
yum install java-1.8.0.openjdk -y

安装 kafka

官网下载地址:https://kafka.apache.org/downloads,这里下载 kafka_2.13-3.2.3.tgz 版。

# 下载 kafka 安装包文件
curl -O https://downloads.apache.org/kafka/3.2.3/kafka_2.13-3.2.3.tgz
# 解压安装包
tar -xzvf kafka_2.13-3.2.3.tgz -C /usr/local

在 2.8 之前,kafka 依赖与 zookeeper;2.8 之后,zookeeper 的替代品 karft。
以下命令均在解压包(kafka)的根目录进行。

基于 zookeeper 方式

# 启动
#
# 切换到解压后的 kafka 根目录
cd /usr/local/kafka_2.13-3.2.3
# 启动 zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# 启动 kafka
bin/kafka-server-start.sh -daemon config/server.properties
#
# 验证启动;zookeeper 默认端口2181,kafka 默认端口为9092
ss -plnts			# 端口列是否列出了 2181、9092,表示启动正常
#
# 停止 kafka
bin/kafka-server-stop.sh
# 停止 zookeeper
bin/zookeeper-server-stop.sh

基于 kraft 方式

# 启动
#
# 切换到解压后的 kafka 根目录
cd /usr/local/kafka_2.13-3.2.3
# 准备 uuid
bin/kafka-storage.sh random-uuid
# 配置 uuid
bin/kafka-storage.sh format -t {uuid} -c config/kraft/server.properties
# 通过 kraft 配置 后台启动 kafka
bin/kafka-server-start.sh -daemon config/kraft/server.properties
#
# 验证启动;kafka 默认端口为9092
ss -plnts			# 端口列是否列出了 9092,表示启动正常
#
# 停止 kafka
bin/kafka-server-stop.sh

开放端口:确保每个服务器所应用到的端口(2181 和 9092等)开放(参考firewall)。

Topic

# 查看已有的 Topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
# 创建一个新的 Topic,分区数1个,副本数3份
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic {name} --create \
--partitions 1 --replication-factor 1
# 查看一个 Topic 详细
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic {name} --describe
# 修改一个 Topic 的属性:分区变更为2个(分区只能加,不能减少,对于消费者分不清数据所属)
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic {name} --alter --partitions 2

Consumer

# 消费者订阅测试;新打开一个终端,连接到服务器,消费者订阅一个topic,接收消息测试
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic {name} --from-beginning

Producer

# 生产者发送消息;新开一个终端窗体,连接到服务器,用来生产者发送消息(回车后输入要发送的消息内容)
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic {name}

切换到 消费者终端窗口,查看接收到订阅的 topic 消息。

这里用三台机组成集群模式

  • 把解压后的 /usr/local/kafka_2.13-3.2.3 文件夹分发到集群中的每个服务器
  • 配置文件 config/server.properties | config/kraft/server.properties 的配置
  • 启动每台服务器的 kafka

安装包分发到每台服务器

# 把解压后的 kafka 根目录文件夹分发到集群中的每台服务器
scp -r /usr/local/kafka_2.13-3.2.3 root@{服务器IP} /usr/local

zookeeper 方式

以 config/server.properties 文件为主的配置及启动

zookeeper 使用的端口:2181;kafka 使用的端口:9092。

# 新终端窗口登录到每台服务器,配置每台服务器的 config/server.properties
#
#
# 切换到每台服务器的 kafka 根目录
cd /usr/local/kafka_2.13-3.2.3
#
# 编辑每台配置文件 config/server.properties
vi config/server.properties
# 以下两点必须的变更
# 	1、确保每台 config/server.properties 中的 broker.id 唯一(如:0,1,2)
#	2、确保每台 config/server.properties 中的 zookeeper.connect 都相同
#	示例:zookeeper.connect={serverA}:2181,{serverB}:2181,{serverB}:2181
#
#
# 启动每台服务器的 zookeeper,kafka
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties

kraft 方式

以 config/kraft/server.properties 文件为主的配置及启动

kraft 使用的端口:9093;kafka 使用的端口:9092。

# 新终端窗口登录到每台服务器,配置每台服务器的 config/kraft/server.properties
#
#
# 切换到每台服务器的 kafka 根目录
cd /usr/local/kafka_2.13-3.2.3
#
# 编辑每台配置文件 config/kraft/server.properties
vi config/kraft/server.properties
#以下三点必须的变更
#
# 	1、确保每台 config/kraft/server.properties 中的 node.id 唯一(如:1,2,3)
#	示例:node.id=1
#
#	2、确保每台 config/kraft/server.properties 中的 listeners 配置各自的主机名称或IP
#	示例:listeners=PLAINTEXT://{localhost}:9092,CONTROLLER://{localhost}:9093
#
#	3、确保每台  config/kraft/server.properties 中的 controller.quorum.voters 都相同
#	格式:controller.quorum.voters={node.id}@{host}:{port}
#	示例:controller.quorum.voters=1@{hostnameA}:9093,2@{hostnameB}:9093,3@{hostnameC}:9093
#
#
# 启动 kafka
# 先为集群生成一个uuid
bin/kafka-storage.sh random-uuid
# 分别为每台服务器配置相同的uuid后启动
bin/kafka-storage.sh format -t {uuid} -c config/kraft/server.properties
bin/kafka-server-start.sh -daemon config/kraft/server.properties


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK