26

Kafka 最新版配置

 4 years ago
source link: https://www.tuicool.com/articles/mqyUNfY
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

当前基于kafaka最新版 kafka_2.12-2.2.1.tgz 进行配置 。

官网地址: http://kafka.apache.org/intro

kafka的一些基础知识 参考: http://www.hechunbo.com/index.php/archives/140.html

最新版 kafka_2.12-2.2.1.tgz 进行配置 。单机生产者消费者图解配配置,多机模拟配置。以及文件读写配置,经验掌握,集成zookeeper不用再安装

  1. 配置java环境安装jdk

    参考 http://www.hechunbo.com/index.php/archives/132.html

  2. 解压kafaka

    [root@localhost hcb]# tar -zxvf kafka_2.12-2.2.1.tgz -C /usr/local
  3. 启动zookeeper .因为最新版 已经包含有zookeeper 所以不用另外安装了

    [root@localhost kafka_2.12-2.2.1]# bin/zookeeper-server-start.sh config/zookeeper.properties 
    [2019-06-22 17:47:49,667] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
  4. 重新开一个连接 。输入jps 发现多了一个进程

    [root@localhost ~]# jps
    3136 Jps
    2842 QuorumPeerMain
  5. 启动kafka

    [root@localhost kafka_2.12-2.2.1]# ./bin/kafka-server-start.sh config/server.properties 
    [2019-06-22 17:51:18,786] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
    [2019-06-22 17:51:20,624] INFO starting (kafka.server.KafkaServer)
  6. 再开一个连接 输入jps查看当前运行的进程

    发现多了一个kafka

    [root@localhost ~]# jps
    3504 Jps
    2842 QuorumPeerMain
    3147 Kafka
  7. 创建一个topic

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
    [root@localhost kafka_2.12-2.2.1]#
  8. 查看topic消息

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
    test
  9. 发送消息 到test

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    hi,>welcome to to kafka    
    >hi ,how are you
  10. 消费者取消息

    [root@localhost kafka_2.12-2.2.1]#  bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    hi,welcome to to kafka
    hi ,how are you

    生产者发送消息以后,消费者有通知 ,

    Ar6FbaI.png!web

  11. 进行多台机子测试

    因为我们是单台机子,所以把配置文件复制两份,更改端口和id配置进行第二台,第三台的模拟

    1. [root@localhost ~]# cd /usr/local/kafka_2.12-2.2.1/
      [root@localhost kafka_2.12-2.2.1]# cp config/server.properties config/server-1.properties
      [root@localhost kafka_2.12-2.2.1]# cp config/server.properties config/server-2.properties

      修改第二台机子的配置

      vi config/server-1.properties
      log.dirs=/tmp/kafka-logs-1
      listeners=PLAINTEXT://:9093
      broker.id=1

    iEJbQbY.png!web

    修改第三台机子

    vi config/server-2.properties
    log.dirs=/tmp/kafka-logs-2
    listeners=PLAINTEXT://:9094
    broker.id=2

    fYN7nmn.png!web

  12. 启动新模拟的两台服务器

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-server-start.sh config/server-1.properties 
    [2019-06-22 18:23:56,237] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)

    新开连接 继续启动第三台,顺便查看下当前的进程 。发现有两个kafka存在了

    [root@localhost ~]# jps
    4370 ConsoleProducer
    2842 QuorumPeerMain
    5642 Jps
    3147 Kafka
    4955 ConsoleConsumer
    5278 Kafka
    [root@localhost ~]# cd /usr/local/kafka_2.12-2.2.1/
    ^C[root@localhost kafka_2.12-2.2.1]# bin/kafka-server-start.sh config/server-2.properties 
    [2019-06-22 18:27:31,947] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)

    新开一个连接 ,查看下当前进程 ,三个kafka正常启动了

    [root@localhost ~]# jps
    4370 ConsoleProducer
    6307 Jps
    2842 QuorumPeerMain
    3147 Kafka
    4955 ConsoleConsumer
    5948 Kafka
    5278 Kafka
  13. 创建一个带有备份的topic

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replication-topic
  14. 查看哪个borke【kafka服务器】在工作

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replication-topic
    Topic:my-replication-topic  PartitionCount:1    ReplicationFactor:3 Configs:segment.bytes=1073741824
        Topic: my-replication-topic Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0

    leader:哪个broker在读写

    replicas:当前可以正常工作的kafka集群。当leader挂掉时会自动替补

    isr:同步消息的列表集合

  15. 查看我们之前创建的topic消息

    当时我们只有一个kafka服务器。可以看只leader是0,替被和备份的都是0,

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
    Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:segment.bytes=1073741824
        Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0
  16. 在新的topic中发布新的消息

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replication-topic
    >message one
    >message two
  17. 消费者去获取消息

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-console-consumer.sh  --bootstrap-server  localhost:9092 --from-beginning --topic my-replication-topic
    message one
    message two
  18. 检查当前的leader

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replication-topic
    Topic:my-replication-topic  PartitionCount:1    ReplicationFactor:3 Configs:segment.bytes=1073741824
        Topic: my-replication-topic Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0
  19. 模拟leader1挂掉以后的状态

    把leader1关掉

    检查leader1的进程

    ps aux 显示用户当前的所有进程 。并根据grep后面的内容进行搜索

    用kill杀死相关进程

    [root@localhost kafka_2.12-2.2.1]# ps aux | grep server-1.properties
    root       5278  3.5 20.5 3232460 205560 pts/5  Sl+  18:23   1:06 /usr/local/jdk1.8.0_211/bin/java -Xmx1G
    [root@localhost kafka_2.12-2.2.1]# kill -9 5278
  20. 再次检查当前topic的消息

    发现leader已经从1变成了2.

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replication-topic
    Topic:my-replication-topic  PartitionCount:1    ReplicationFactor:3 Configs:segment.bytes=1073741824
        Topic: my-replication-topic Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0
  21. 使用kafka connect 导入导出数据

    souce connector 从text.txt读取文件 ,把内容发送到connect-test., sink connector 从conect-test读写消息

    [root@localhost kafka_2.12-2.2.1]# bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties  config/connect-file-sink.properties 
    [2019-06-22 19:05:55,493] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:67)

    进行jps分发现多了一个ConnectStandalone的进程

    [root@localhost ~]# jps
    4370 ConsoleProducer
    9478 Jps
    9160 ConnectStandalone
    2842 QuorumPeerMain
    3147 Kafka
    4955 ConsoleConsumer
    5948 Kafka

    显示文件内容

    more 命令类似 cat ,不过会以一页一页的形式显示,更方便使用者逐页阅读,

    [root@localhost kafka_2.12-2.2.1]# more test.sink.txt 
    foo
    bar

    使用消费者控制 台显示

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
    {"schema":{"type":"string","optional":false},"payload":"foo"}
    {"schema":{"type":"string","optional":false},"payload":"bar"}

    继续测试

    生产者进行消息追加

    [root@localhost kafka_2.12-2.2.1]# echo -e "foo\nbarddddaaa\aaaaa\dddd\1\2\2\3" > test.txt
    [root@localhost kafka_2.12-2.2.1]# echo -e "foo\nbarddddaaa\aaaaa\dddd\1\2\2\3\new append" > test.txt

    消费者进行实时显示

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
    {"schema":{"type":"string","optional":false},"payload":"foo"}
    {"schema":{"type":"string","optional":false},"payload":"bar"}
    {"schema":{"type":"string","optional":false},"payload":"dddd"}
    {"schema":{"type":"string","optional":false},"payload":"aaaaaaad"}
    {"schema":{"type":"string","optional":false},"payload":"dd"}
    ^[[A^[[A^[[B{"schema":{"type":"string","optional":false},"payload":"1\\2\\2\\3"}
    {"schema":{"type":"string","optional":false},"payload":"ew append"}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK