0x01 环境
kafka 0.11.0.0
zookeeper 3.4.10
jdk 1.8
0x02 问题
在阿里云按照内网集群配置部好后,发现公司机器无法连接,配置如下:
#listeners=PLAINTEXT://0.0.0.0:9093
advertised.listeners=PLAINTEXT://59.110.243.142:9093
但是在启动kafka集群的时候异常如下:
kafka.common.KafkaException: Socket server failed to bind to 0.0.0.0:9092: Address already in use.
at kafka.network.Acceptor.openServerSocket(SocketServer.scala:327)
at kafka.network.Acceptor.<init>(SocketServer.scala:252)
at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:91)
at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:83)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at kafka.network.SocketServer.startup(SocketServer.scala:83)
at kafka.server.KafkaServer.startup(KafkaServer.scala:222)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
at kafka.Kafka$.main(Kafka.scala:65)
at kafka.Kafka.main(Kafka.scala)
Caused by: java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:433)
at sun.nio.ch.Net.bind(Net.java:425)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)
at kafka.network.Acceptor.openServerSocket(SocketServer.scala:323)
0x03 原因
翻看源码:
/*
* Create a server socket to listen for connections on.
*/
private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
val socketAddress =
if(host == null || host.trim.isEmpty)
new InetSocketAddress(port)
else
new InetSocketAddress(host, port)
val serverChannel = ServerSocketChannel.open()
serverChannel.configureBlocking(false)
if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
serverChannel.socket().setReceiveBufferSize(recvBufferSize)
try {
serverChannel.socket.bind(socketAddress)
info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostString, serverChannel.socket.getLocalPort))
} catch {
case e: SocketException =>
throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostString, port, e.getMessage), e)
}
serverChannel
}
/** ********* Socket Server Configuration ***********/
val Port = 9092
val HostName: String = new String("")
...
/*********** Socket Server Configuration ***********/
.define(PortProp, INT, Defaults.Port, HIGH, PortDoc)
.define(HostNameProp, STRING, Defaults.HostName, HIGH, HostNameDoc)
.define(ListenersProp, STRING, null, HIGH, ListenersDoc)
发现如果host是null,则会绑定参数port,根据KafkaConfig加载配置的源码可知,该port是broker config中的port,默认值为9092,介绍如下:
port | DEPRECATED: only used when `listeners` is not set. Use `listeners` instead. the port to listen and accept connections on | 9092 | high |
也就意味着listeners中的port和单独配置port都是OK的。
0x04 解决办法
将配置修改为:
listeners=PLAINTEXT://0.0.0.0:9093
advertised.listeners=PLAINTEXT://59.110.243.142:9093