72

Kafka -- Java消费者管理TCP连接

 4 years ago
source link: https://www.tuicool.com/articles/EB3uuaj
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
  1. 消费者会为每个要消费的分区创建与该分区 领导者副本 所在Broker的Socket连接
  2. 假设消费者要消费5个分区的数据,这5个分区各自的领导者副本分布在4台Broker上
    • 那么消费者在消费时会创建与这4台Broker的Socket连接

TCP连接数

日志详解

[2019-05-27 10:00:54,142] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9092 ( id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)

消费者程序创建的 第一个TCP连接 ,该Socket用于发送 FindCoordinator 请求

此时消费者对要连接的Kafka集群 一无所知 ,因此它连接的Broker节点的ID为 -1 ,表示不知道要连接的Broker的任何信息

[2019-05-27 10:00:54,188] DEBUG [Consumer clientId=consumer-1, groupId=test] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name=’t4’)], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node localhost:9092 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:1097)

消费者 复用 刚刚创建的Socket连接,向Kafka集群发送 元数据请求 以获取 整个集群的信息

[2019-05-27 10:00:54,188] TRACE [Consumer clientId=consumer-1, groupId=test] Sending FIND_COORDINATOR {key=test,key_type=0} with correlation id 0 to node -1 (org.apache.kafka.clients.NetworkClient:496)

消费者程序开始发送 FindCoordinator 请求给第一步中连接的Broker,即 localhost:9092 (nodeId为 -1

[2019-05-27 10:00:54,203] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node -1 for FIND_COORDINATOR with correlation id 0, received {throttle_time_ms=0,error_code=0,error_message=null, node_id=2,host=localhost,port=9094 } (org.apache.kafka.clients.NetworkClient:837)

十几毫秒后,消费者程序成功地获悉 Coordinator所在的Broker ,即 node_id=2,host=localhost,port=9094

[2019-05-27 10:00:54,204] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9094 ( id: 2147483645 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)

消费者此时已经知道 协调者Broker的连接信息 了,发起第二个Socket连接,创建连向 localhost:9094 的TCP连接

只有连接了Coordinator,消费者才能正常地开启 消费组的各种功能 以及 后续的消息消费

此时的id是由 Integer.MAX_VALUE 减去 Coordinator所在的Broker的Id 计算出来的,即 2147483647 - 2 = 2147483645

这种节点ID的标记方式是Kafka社区 特意为之 ,目的是要让 组协调请求真正的数据获取请求 使用 不同的Socket连接

[2019-05-27 10:00:54,237] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9094 ( id: 2 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)

[2019-05-27 10:00:54,237] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9092 ( id: 0 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)

[2019-05-27 10:00:54,238] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9093 ( id: 1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)

消费者又分别创建了 新的TCP连接 ,主要用于 实际的消息获取

3类TCP连接

  1. 确定协调者获取集群元数据
  2. 连接协调者 ,令其执行组成员管理操作
  3. 执行 实际的消息获取

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK