41

Kafka从上手到实践-Kafka集群:Kafka Listeners

 5 years ago
source link: http://www.devtalking.com/articles/kafka-practice-16/?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

这一章节主要对和Listener相关的四个配置项做以详细解释。 listenersadvertised.listenerslistener.security.protocol.mapinter.broker.listener.name 这四个配置项可能是大家最容易混淆和最不容易理解的。

在解释这些配置项之前,我们先来明确几个概念。

  • 部署Broker的阿里云ECS称为Host Machine。
  • 在阿里云ECS里启动的Producer或者Consumer,比如使用Kafka CLI启动的称为Internal Client。
  • 在大家的IDEA中使用Java编写的,或者第三方的Producer/Consumer,称为External Client。
  • Host Machine具有外网IP和内网IP。
  • Internal Client可以同时和Host Machine的外网IP及内网IP通信。
  • External Client只能和Host Machine的外网IP通信。
  • 多个阿里云ECS之间可以同时通过外网IP及内网IP通信。
    • 既在这个特定的场景下,Host Machine之间可以同时通过外网IP及内网IP通信。
    • 再换句话说就是不同Host Machine上的Broker之间可以同时通过外网IP及内网IP通信。

6F7Bnyq.png!web

如上图所示,是一个很常见的Kafka集群场景,涵盖了上述的概念。图中那些通信虚线箭头就是靠Kafka的Listener建立的,并且是通过Kafka中不同的Listener建立的,这些Listener分为Internal Listener和External Listener。如下图所示:

ZRb263f.png!web

那么这些Listener的创建以及内外部如何通信都是由上面那四个配置项决定的。

listener.security.protocol.map

先来看 listener.security.protocol.map 配置项,在上一章节中介绍过,它是配置监听者的安全协议的,比如 PLAINTEXTSSLSASL_PLAINTEXTSASL_SSL 。因为它是以Key/Value的形式配置的,所以往往我们也使用该参数给Listener命名:

listener.security.protocol.map=EXTERNAL_LISTENER_CLIENTS:SSL,INTERNAL_LISTENER_CLIENTS:PLAINTEXT,INTERNAL_LISTENER_BROKER:PLAINTEXT

使用Key作为Listener的名称。就如上图所示,Internal Producer、External Producer、Internal Consumer、External Consumer和Broker通信以及Broker之间互相通信时都很有可能使用不同的Listener。这些不同的Listener有监听内网IP的,有监听外网IP的,还有不同安全协议的,所以使用Key来表示更加直观。当然这只是一种非官方的用法,Key本质上还是代表了安全协议,如果只有一个安全协议,多个Listener的话,那么这些Listener所谓的名称肯定都是相同的。

listeners

listeners 就是主要用来定义Kafka Broker的Listener的配置项。

listeners=EXTERNAL_LISTENER_CLIENTS://阿里云ECS外网IP:9092,INTERNAL_LISTENER_CLIENTS://阿里云ECS内网IP:9093,INTERNAL_LISTENER_BROKER://阿里云ECS内网IP:9094

上面的配置表示,这个Broker定义了三个Listener,一个External Listener,用于External Producer和External Consumer连接使用。也许因为业务场景的关系,Internal Producer和Broker之间使用不同的安全协议进行连接,所以定义了两个不同协议的Internal Listener,分别用于Internal Producer和Broker之间连接使用。

通过之前的章节,我们知道Kafka是由Zookeeper进行管理的,由Zookeeper负责Leader选举,Broker Rebalance等工作。所以External Producer和External Consumer其实是通过Zookeeper中提供的信息和Broker通信交互的。所以 listeners 中配置的信息都会发布到Zookeeper中,但是这样就会把Broker的所有Listener信息都暴露给了外部Clients,在安全上是存在隐患的,我们希望只把给外部Clients使用的Listener暴露出去,此时就需要用到下面这个配置项了。

advertised.listeners

advertised.listeners 参数的作用就是将Broker的Listener信息发布到Zookeeper中,供Clients(Producer/Consumer)使用。如果配置了 advertised.listeners ,那么就不会将 listeners 配置的信息发布到Zookeeper中去了:

advertised.listeners=EXTERNAL_LISTENER_CLIENTS://阿里云ECS外网IP:9092

这里在Zookeeper中发布了供External Clients(Producer/Consumer)使用的Listener EXTERNAL_LISTENER_CLIENTS 。所以 advertised.listeners 配置项实现了只把给外部Clients使用的Listener暴露出去的需求。

inter.broker.listener.name

这个配置项从名称就可以看出它的作用了,就是指定一个 listener.security.protocol.map 配置项中配置的Key,或者说指定一个或一类Listener的名称,将它作为Internal Listener。这个Listener 专门用于Kafka集群中Broker之间的通信

inter.broker.listener.name=INTERNAL_LISTENER_BROKER

listener 和 advertised.listeners 的关系

先来看看 KafkaConfig.scalaSocketServer.scala 源码中的这几行代码片段:

// KafkaConfig.scala
...

val ListenersProp = "listeners"

...

def dataPlaneListeners: Seq[EndPoint] = {
    Option(getString(KafkaConfig.ControlPlaneListenerNameProp)) match {
      case Some(controlPlaneListenerName) => listeners.filterNot(_.listenerName.value() == controlPlaneListenerName)
      case None => listeners
    }
  }

...

def listeners: Seq[EndPoint] = {
    Option(getString(KafkaConfig.ListenersProp)).map { listenerProp =>
      CoreUtils.listenerListToEndPoints(listenerProp, listenerSecurityProtocolMap)
    }.getOrElse(CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port, listenerSecurityProtocolMap))
  }  

// SocketServer.scala

  def startup(startupProcessors: Boolean = true) {
    this.synchronized {
      connectionQuotas = new ConnectionQuotas(config.maxConnectionsPerIp, config.maxConnectionsPerIpOverrides)
      createControlPlaneAcceptorAndProcessor(config.controlPlaneListener)
      createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)
      if (startupProcessors) {
        startControlPlaneProcessor()
        startDataPlaneProcessors()
      }
    }

...

  private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,
                                                    endpoints: Seq[EndPoint]): Unit = synchronized {
    endpoints.foreach { endpoint =>
      val dataPlaneAcceptor = createAcceptor(endpoint)
      addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
      KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", dataPlaneAcceptor).start()
      dataPlaneAcceptor.awaitStartup()
      dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
      info(s"Created data-plane acceptor and processors for endpoint : $endpoint")
    }
  }

startup() 方法是Kafka Broker创建启动Socket连接的入口,既用来创建Acceptor线程的入口,该线程负责处理Socket连接。 createDataPlaneAcceptorsAndProcessors() 方法的第二个参数 config.dataPlaneListeners 可以看到取的就是 listeners 配置项的内容。

/**
* Create a server socket to listen for connections on.
*/
private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
  val socketAddress =
    if (host == null || host.trim.isEmpty)
      new InetSocketAddress(port)
    else
      new InetSocketAddress(host, port)
  val serverChannel = ServerSocketChannel.open()
  serverChannel.configureBlocking(false)
  if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
    serverChannel.socket().setReceiveBufferSize(recvBufferSize)

  try {
    serverChannel.socket.bind(socketAddress)
    info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostString, serverChannel.socket.getLocalPort))
  } catch {
    case e: SocketException =>
      throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostString, port, e.getMessage), e)
  }
  serverChannel
}

跟到里面,可以看到如果没有配置 listeners ,那么会使用网卡地址创建Socket连接,对于阿里云ECS,就是内网IP。

再来看看 KafkaServer.scala 源码中的这几行代码片段:

...

val brokerInfo = createBrokerInfo
val brokerEpoch = zkClient.registerBroker(brokerInfo)

...

private[server] def createBrokerInfo: BrokerInfo = {
    val endPoints = config.advertisedListeners.map(e => s"${e.host}:${e.port}")
    zkClient.getAllBrokersInCluster.filter(_.id != config.brokerId).foreach { broker =>
      val commonEndPoints = broker.endPoints.map(e => s"${e.host}:${e.port}").intersect(endPoints)
      require(commonEndPoints.isEmpty, s"Configured end points ${commonEndPoints.mkString(",")} in" +
        s" advertised listeners are already registered by broker ${broker.id}")
    }

    val listeners = config.advertisedListeners.map { endpoint =>
      if (endpoint.port == 0)
        endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
      else
        endpoint
    }

    val updatedEndpoints = listeners.map(endpoint =>
      if (endpoint.host == null || endpoint.host.trim.isEmpty)
        endpoint.copy(host = InetAddress.getLocalHost.getCanonicalHostName)
      else
        endpoint
    )

    val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
    BrokerInfo(Broker(config.brokerId, updatedEndpoints, config.rack), config.interBrokerProtocolVersion, jmxPort)
 }

从上面的代码可以看到, advertised.listeners 主要用于向Zookeeper注册Broker的连接信息,但是不参与创建Socket连接。

所以从这几处源码内容可以得出结论,Kafka Broker真正建立通信连接使用的是 listeners 配置项里的内容,而 advertised.listeners 只用于向Zookeeper注册Broker的连接信息,既向Client暴露Broker对外的连接信息(Endpoint)。

另外在 KafkaConfig.scala 源码中还有有这么几行代码:

val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
val listenerNames = listeners.map(_.listenerName).toSet

require(advertisedListenerNames.contains(interBrokerListenerName),
      s"${KafkaConfig.InterBrokerListenerNameProp} must be a listener name defined in ${KafkaConfig.AdvertisedListenersProp}. " +
      s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}")
require(advertisedListenerNames.subsetOf(listenerNames),
      s"${KafkaConfig.AdvertisedListenersProp} listener names must be equal to or a subset of the ones defined in ${KafkaConfig.ListenersProp}. " +
      s"Found ${advertisedListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " +
      s"are ${listenerNames.map(_.value).mkString(",")}"

从上面的代码片段可以得出两个结论:

  • advertised.listeners 配置项中配置的Listener名称或者说安全协议必须在 listeners 中存在。因为真正创建连接的是 listeners 中的信息。
  • inter.broker.listener.name 配置项中配置的Listener名称或者说安全协议必须在 advertised.listeners 中存在。因为Broker之间也是要通过 advertised.listeners 配置项获取Internal Listener信息的。

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK