Kafka从上手到实践-Kafka集群:Kafka Listeners
source link: http://www.devtalking.com/articles/kafka-practice-16/?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
这一章节主要对和Listener相关的四个配置项做以详细解释。 listeners
、 advertised.listeners
、 listener.security.protocol.map
、 inter.broker.listener.name
这四个配置项可能是大家最容易混淆和最不容易理解的。
在解释这些配置项之前,我们先来明确几个概念。
- 部署Broker的阿里云ECS称为Host Machine。
- 在阿里云ECS里启动的Producer或者Consumer,比如使用Kafka CLI启动的称为Internal Client。
- 在大家的IDEA中使用Java编写的,或者第三方的Producer/Consumer,称为External Client。
- Host Machine具有外网IP和内网IP。
- Internal Client可以同时和Host Machine的外网IP及内网IP通信。
- External Client只能和Host Machine的外网IP通信。
- 多个阿里云ECS之间可以同时通过外网IP及内网IP通信。
- 既在这个特定的场景下,Host Machine之间可以同时通过外网IP及内网IP通信。
- 再换句话说就是不同Host Machine上的Broker之间可以同时通过外网IP及内网IP通信。
如上图所示,是一个很常见的Kafka集群场景,涵盖了上述的概念。图中那些通信虚线箭头就是靠Kafka的Listener建立的,并且是通过Kafka中不同的Listener建立的,这些Listener分为Internal Listener和External Listener。如下图所示:
那么这些Listener的创建以及内外部如何通信都是由上面那四个配置项决定的。
listener.security.protocol.map
先来看 listener.security.protocol.map
配置项,在上一章节中介绍过,它是配置监听者的安全协议的,比如 PLAINTEXT
、 SSL
、 SASL_PLAINTEXT
、 SASL_SSL
。因为它是以Key/Value的形式配置的,所以往往我们也使用该参数给Listener命名:
listener.security.protocol.map=EXTERNAL_LISTENER_CLIENTS:SSL,INTERNAL_LISTENER_CLIENTS:PLAINTEXT,INTERNAL_LISTENER_BROKER:PLAINTEXT
使用Key作为Listener的名称。就如上图所示,Internal Producer、External Producer、Internal Consumer、External Consumer和Broker通信以及Broker之间互相通信时都很有可能使用不同的Listener。这些不同的Listener有监听内网IP的,有监听外网IP的,还有不同安全协议的,所以使用Key来表示更加直观。当然这只是一种非官方的用法,Key本质上还是代表了安全协议,如果只有一个安全协议,多个Listener的话,那么这些Listener所谓的名称肯定都是相同的。
listeners
listeners
就是主要用来定义Kafka Broker的Listener的配置项。
listeners=EXTERNAL_LISTENER_CLIENTS://阿里云ECS外网IP:9092,INTERNAL_LISTENER_CLIENTS://阿里云ECS内网IP:9093,INTERNAL_LISTENER_BROKER://阿里云ECS内网IP:9094
上面的配置表示,这个Broker定义了三个Listener,一个External Listener,用于External Producer和External Consumer连接使用。也许因为业务场景的关系,Internal Producer和Broker之间使用不同的安全协议进行连接,所以定义了两个不同协议的Internal Listener,分别用于Internal Producer和Broker之间连接使用。
通过之前的章节,我们知道Kafka是由Zookeeper进行管理的,由Zookeeper负责Leader选举,Broker Rebalance等工作。所以External Producer和External Consumer其实是通过Zookeeper中提供的信息和Broker通信交互的。所以 listeners
中配置的信息都会发布到Zookeeper中,但是这样就会把Broker的所有Listener信息都暴露给了外部Clients,在安全上是存在隐患的,我们希望只把给外部Clients使用的Listener暴露出去,此时就需要用到下面这个配置项了。
advertised.listeners
advertised.listeners
参数的作用就是将Broker的Listener信息发布到Zookeeper中,供Clients(Producer/Consumer)使用。如果配置了 advertised.listeners
,那么就不会将 listeners
配置的信息发布到Zookeeper中去了:
advertised.listeners=EXTERNAL_LISTENER_CLIENTS://阿里云ECS外网IP:9092
这里在Zookeeper中发布了供External Clients(Producer/Consumer)使用的Listener EXTERNAL_LISTENER_CLIENTS
。所以 advertised.listeners
配置项实现了只把给外部Clients使用的Listener暴露出去的需求。
inter.broker.listener.name
这个配置项从名称就可以看出它的作用了,就是指定一个 listener.security.protocol.map
配置项中配置的Key,或者说指定一个或一类Listener的名称,将它作为Internal Listener。这个Listener 专门用于Kafka集群中Broker之间的通信 :
inter.broker.listener.name=INTERNAL_LISTENER_BROKER
listener 和 advertised.listeners 的关系
先来看看 KafkaConfig.scala
和 SocketServer.scala
源码中的这几行代码片段:
// KafkaConfig.scala ... val ListenersProp = "listeners" ... def dataPlaneListeners: Seq[EndPoint] = { Option(getString(KafkaConfig.ControlPlaneListenerNameProp)) match { case Some(controlPlaneListenerName) => listeners.filterNot(_.listenerName.value() == controlPlaneListenerName) case None => listeners } } ... def listeners: Seq[EndPoint] = { Option(getString(KafkaConfig.ListenersProp)).map { listenerProp => CoreUtils.listenerListToEndPoints(listenerProp, listenerSecurityProtocolMap) }.getOrElse(CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port, listenerSecurityProtocolMap)) } // SocketServer.scala def startup(startupProcessors: Boolean = true) { this.synchronized { connectionQuotas = new ConnectionQuotas(config.maxConnectionsPerIp, config.maxConnectionsPerIpOverrides) createControlPlaneAcceptorAndProcessor(config.controlPlaneListener) createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners) if (startupProcessors) { startControlPlaneProcessor() startDataPlaneProcessors() } } ... private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int, endpoints: Seq[EndPoint]): Unit = synchronized { endpoints.foreach { endpoint => val dataPlaneAcceptor = createAcceptor(endpoint) addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener) KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", dataPlaneAcceptor).start() dataPlaneAcceptor.awaitStartup() dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor) info(s"Created data-plane acceptor and processors for endpoint : $endpoint") } }
startup()
方法是Kafka Broker创建启动Socket连接的入口,既用来创建Acceptor线程的入口,该线程负责处理Socket连接。 createDataPlaneAcceptorsAndProcessors()
方法的第二个参数 config.dataPlaneListeners
可以看到取的就是 listeners
配置项的内容。
/** * Create a server socket to listen for connections on. */ private def openServerSocket(host: String, port: Int): ServerSocketChannel = { val socketAddress = if (host == null || host.trim.isEmpty) new InetSocketAddress(port) else new InetSocketAddress(host, port) val serverChannel = ServerSocketChannel.open() serverChannel.configureBlocking(false) if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) serverChannel.socket().setReceiveBufferSize(recvBufferSize) try { serverChannel.socket.bind(socketAddress) info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostString, serverChannel.socket.getLocalPort)) } catch { case e: SocketException => throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostString, port, e.getMessage), e) } serverChannel }
跟到里面,可以看到如果没有配置 listeners
,那么会使用网卡地址创建Socket连接,对于阿里云ECS,就是内网IP。
再来看看 KafkaServer.scala
源码中的这几行代码片段:
... val brokerInfo = createBrokerInfo val brokerEpoch = zkClient.registerBroker(brokerInfo) ... private[server] def createBrokerInfo: BrokerInfo = { val endPoints = config.advertisedListeners.map(e => s"${e.host}:${e.port}") zkClient.getAllBrokersInCluster.filter(_.id != config.brokerId).foreach { broker => val commonEndPoints = broker.endPoints.map(e => s"${e.host}:${e.port}").intersect(endPoints) require(commonEndPoints.isEmpty, s"Configured end points ${commonEndPoints.mkString(",")} in" + s" advertised listeners are already registered by broker ${broker.id}") } val listeners = config.advertisedListeners.map { endpoint => if (endpoint.port == 0) endpoint.copy(port = socketServer.boundPort(endpoint.listenerName)) else endpoint } val updatedEndpoints = listeners.map(endpoint => if (endpoint.host == null || endpoint.host.trim.isEmpty) endpoint.copy(host = InetAddress.getLocalHost.getCanonicalHostName) else endpoint ) val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt BrokerInfo(Broker(config.brokerId, updatedEndpoints, config.rack), config.interBrokerProtocolVersion, jmxPort) }
从上面的代码可以看到, advertised.listeners
主要用于向Zookeeper注册Broker的连接信息,但是不参与创建Socket连接。
所以从这几处源码内容可以得出结论,Kafka Broker真正建立通信连接使用的是 listeners
配置项里的内容,而 advertised.listeners
只用于向Zookeeper注册Broker的连接信息,既向Client暴露Broker对外的连接信息(Endpoint)。
另外在 KafkaConfig.scala
源码中还有有这么几行代码:
val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet val listenerNames = listeners.map(_.listenerName).toSet require(advertisedListenerNames.contains(interBrokerListenerName), s"${KafkaConfig.InterBrokerListenerNameProp} must be a listener name defined in ${KafkaConfig.AdvertisedListenersProp}. " + s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}") require(advertisedListenerNames.subsetOf(listenerNames), s"${KafkaConfig.AdvertisedListenersProp} listener names must be equal to or a subset of the ones defined in ${KafkaConfig.ListenersProp}. " + s"Found ${advertisedListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " + s"are ${listenerNames.map(_.value).mkString(",")}"
从上面的代码片段可以得出两个结论:
-
advertised.listeners
配置项中配置的Listener名称或者说安全协议必须在listeners
中存在。因为真正创建连接的是listeners
中的信息。 -
inter.broker.listener.name
配置项中配置的Listener名称或者说安全协议必须在advertised.listeners
中存在。因为Broker之间也是要通过advertised.listeners
配置项获取Internal Listener信息的。
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK