kafka集群公网问题

kafka集群公网问题

Posted by CaiJiahe on August 17, 2017

0x01 环境

kafka 0.11.0.0
zookeeper 3.4.10
jdk 1.8

0x02 问题

在阿里云按照内网集群配置部好后,发现公司机器无法连接,配置如下:

	#listeners=PLAINTEXT://0.0.0.0:9093
	advertised.listeners=PLAINTEXT://59.110.243.142:9093

但是在启动kafka集群的时候异常如下:

	kafka.common.KafkaException: Socket server failed to bind to 0.0.0.0:9092: Address already in use.
			at kafka.network.Acceptor.openServerSocket(SocketServer.scala:327)
			at kafka.network.Acceptor.<init>(SocketServer.scala:252)
			at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:91)
			at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:83)
			at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
			at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
			at kafka.network.SocketServer.startup(SocketServer.scala:83)
			at kafka.server.KafkaServer.startup(KafkaServer.scala:222)
			at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
			at kafka.Kafka$.main(Kafka.scala:65)
			at kafka.Kafka.main(Kafka.scala)
	Caused by: java.net.BindException: Address already in use
			at sun.nio.ch.Net.bind0(Native Method)
			at sun.nio.ch.Net.bind(Net.java:433)
			at sun.nio.ch.Net.bind(Net.java:425)
			at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
			at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
			at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)
			at kafka.network.Acceptor.openServerSocket(SocketServer.scala:323)

0x03 原因

翻看源码:

  /*
   * Create a server socket to listen for connections on.
   */
  private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
    val socketAddress =
      if(host == null || host.trim.isEmpty)
        new InetSocketAddress(port)
      else
        new InetSocketAddress(host, port)
    val serverChannel = ServerSocketChannel.open()
    serverChannel.configureBlocking(false)
    if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
      serverChannel.socket().setReceiveBufferSize(recvBufferSize)

    try {
      serverChannel.socket.bind(socketAddress)
      info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostString, serverChannel.socket.getLocalPort))
    } catch {
      case e: SocketException =>
        throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostString, port, e.getMessage), e)
    }
    serverChannel
  }
  /** ********* Socket Server Configuration ***********/
  val Port = 9092
  val HostName: String = new String("")
  
  ...
  
  /*********** Socket Server Configuration ***********/
  .define(PortProp, INT, Defaults.Port, HIGH, PortDoc)
  .define(HostNameProp, STRING, Defaults.HostName, HIGH, HostNameDoc)
  .define(ListenersProp, STRING, null, HIGH, ListenersDoc)
  

发现如果host是null,则会绑定参数port,根据KafkaConfig加载配置的源码可知,该port是broker config中的port,默认值为9092,介绍如下:

port DEPRECATED: only used when `listeners` is not set. Use `listeners` instead. the port to listen and accept connections on 9092 high

也就意味着listeners中的port和单独配置port都是OK的。

0x04 解决办法

将配置修改为:

	listeners=PLAINTEXT://0.0.0.0:9093
	advertised.listeners=PLAINTEXT://59.110.243.142:9093