I have 2 Kafkas backed by 3 ZK nodes. I want to test the Kafka nodes by running the kafka-console-producer and -consumer locally on each node.
So I SSH into one of my Kafka brokers using 2 different terminals. In terminal #1 I run the consumer like so: /opt/kafka/bin/kafka-console-consumer.sh --zookeeper a.b.c.d:2181 --topic test1 Where a.b.c.d is the private IP of one of my 3 ZK nodes. Then in terminal #2 I run the producer like so: /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1 I am able to start both the consumer and producer just fine without any issues. However, in the producer terminal, if I "fire" a message at the test1 topic by entering some text (such as "hello") and hitting the ENTER key, I immediately begin seeing this: [2017-01-17 19:45:57,353] WARN Error while fetching metadata with correlation id 0 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) [2017-01-17 19:45:57,372] WARN Error while fetching metadata with correlation id 1 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) [2017-01-17 19:45:57,477] WARN Error while fetching metadata with correlation id 2 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) [2017-01-17 19:45:57,582] WARN Error while fetching metadata with correlation id 3 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) ...and it keeps going! And, in the consumer terminal, even though I don't get any errors when I start the consumer, after about 30 seconds I get the following warning message: [2017-01-17 19:46:07,292] WARN Fetching topic metadata with correlation id 1 for topics [Set(test1)] from broker [BrokerEndPoint(1,ip-x-y-z-w.ec2.internal,9092)] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) at kafka.producer.SyncProducer.send(SyncProducer.scala:124) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) Interestingly, ip-x-y-z-w.ec2.internal is the private DNS for the other Kafka broker, so perhaps this is some kind of failure during interbroker communication? Any ideas as to what is going on here and what I can do to troubleshoot?