BTW it seems you are referring to the producer not the consumer as claimed in the title.
On Mon, Apr 13, 2015 at 5:54 PM, François Méthot <fmetho...@gmail.com> wrote: > It could be that the maximum number of connections was reached for client > IP you are using. The defaut is 10. But it can be changed. I had similar > intermittents issue because of that. > > The property is max.connections. per.ip > > Le 2015-04-12 11:20 PM, "kaybin wong" <kaybinw...@gmail.com> a écrit : > > > > hi there. > > i got a really issue,consumer(java client) sometimes works,but sometimes > > not. > > i had read the sources code, bug got nothing,can u help me ? > > > > --------------------------------------------log------------------------------------ > > [root@slave3 kafka_2.10-0.8.2.0]# ./bin/kafka-console-producer.sh > > --broker-list 192.168.1.159:2181 --topic testX > > [2015-04-13 10:11:03,218] WARN Property topic is not valid > > (kafka.utils.VerifiableProperties) > > a^Hte > > [2015-04-13 10:11:26,441] WARN Fetching topic metadata with correlation > id > > 0 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181] > > failed (kafka.client.ClientUtils$) > > java.io.EOFException: Received -1 when reading from channel, socket has > > likely been closed. > > at kafka.utils.Utils$.read(Utils.scala:381) > > at > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > at > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) > > at > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > > at > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > at > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) > > at kafka.utils.Utils$.swallow(Utils.scala:172) > > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > at > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) > > at > > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > > at > > > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > > [2015-04-13 10:11:26,446] ERROR fetching topic metadata for topics > > [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] > > failed (kafka.utils.Utils$) > > kafka.common.KafkaException: fetching topic metadata for topics > > [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] > > failed > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) > > at > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > at > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) > > at kafka.utils.Utils$.swallow(Utils.scala:172) > > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > at > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) > > at > > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > > at > > > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > > Caused by: java.io.EOFException: Received -1 when reading from channel, > > socket has likely been closed. > > at kafka.utils.Utils$.read(Utils.scala:381) > > at > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > at > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) > > at > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > > ... 12 more > > [2015-04-13 10:11:26,452] WARN Fetching topic metadata with correlation > id > > 1 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181] > > failed (kafka.client.ClientUtils$) > > java.io.EOFException: Received -1 when reading from channel, socket has > > likely been closed. > > at kafka.utils.Utils$.read(Utils.scala:381) > > at > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > at > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) > > at > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > > at > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > at > > > > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > > at > > > > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) > > at > > > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > > at > > > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149) > > at > > > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > > at > > > > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149) > > at > > > > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95) > > at > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > > at > > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > > at > > > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > > [2015-04-13 10:11:26,455] ERROR Failed to collate messages by topic, > > partition due to: fetching topic metadata for topics [Set(testX)] from > > broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] failed > > (kafka.producer.async.DefaultEventHandler) > > [2015-04-13 10:11:26,560] WARN Fetching topic metadata with correlation > id > > 2 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181] > > failed (kafka.client.ClientUtils$) > > java.io.EOFException: Received -1 when reading from channel, socket has > > likely been closed. > > at kafka.utils.Utils$.read(Utils.scala:381) > > at > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > at > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) > > at > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > > at > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > at > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > > at kafka.utils.Utils$.swallow(Utils.scala:172) > > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > at > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > > at > > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > > at > > > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > > [2015-04-13 10:11:26,562] ERROR fetching topic metadata for topics > > [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] > > failed (kafka.utils.Utils$) > > kafka.common.KafkaException: fetching topic metadata for topics > > [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] > > failed > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) > > at > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > at > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > > at kafka.utils.Utils$.swallow(Utils.scala:172) > > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > at > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > > at > > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > > at > > > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > > Caused by: java.io.EOFException: Received -1 when reading from channel, > > socket has likely been closed. > > at kafka.utils.Utils$.read(Utils.scala:381) > > at > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > at > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) > > at > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > > ... 12 more > > [2015-04-13 10:11:26,566] WARN Fetching topic metadata with correlation > id > > 3 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181] > > failed (kafka.client.ClientUtils$) > > java.io.EOFException: Received -1 when reading from channel, socket has > > likely been closed. > > at kafka.utils.Utils$.read(Utils.scala:381) > > at > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > at > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) > > at > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > > at > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > at > > > > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > > at > > > > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) > > at > > > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > > at > > > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149) > > at > > > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > > at > > > > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149) > > at > > > > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95) > > at > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > > at > > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > > at > > > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > > [2015-04-13 10:11:26,568] ERROR Failed to collate messages by topic, > > partition due to: fetching topic metadata for topics [Set(testX)] from > > broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] failed > > (kafka.producer.async.DefaultEventHandler) > > [2015-04-13 10:11:26,670] WARN Fetching topic metadata with correlation > id > > 4 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181] > > failed (kafka.client.ClientUtils$) > > java.io.EOFException: Received -1 when reading from channel, socket has > > likely been closed. > > at kafka.utils.Utils$.read(Utils.scala:381) > > at > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > at > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) > > at > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > > at > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > at > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > > at kafka.utils.Utils$.swallow(Utils.scala:172) > > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > at > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > > at > > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > > at > > > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > > [2015-04-13 10:11:26,672] ERROR fetching topic metadata for topics > > [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] > > failed (kafka.utils.Utils$) > > kafka.common.KafkaException: fetching topic metadata for topics > > [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] > > failed > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) > > at > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > at > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > > at kafka.utils.Utils$.swallow(Utils.scala:172) > > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > at > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > > at > > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > > at > > > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > > Caused by: java.io.EOFException: Received -1 when reading from channel, > > socket has likely been closed. > > at kafka.utils.Utils$.read(Utils.scala:381) > > at > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > at > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) > > at > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > > ... 12 more > > [2015-04-13 10:11:26,676] WARN Fetching topic metadata with correlation > id > > 5 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181] > > failed (kafka.client.ClientUtils$) > > java.io.EOFException: Received -1 when reading from channel, socket has > > likely been closed. > > at kafka.utils.Utils$.read(Utils.scala:381) > > at > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > at > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) > > at > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > > at > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > at > > > > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > > at > > > > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) > > at > > > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > > at > > > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149) > > at > > > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > > at > > > > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149) > > at > > > > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95) > > at > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > > at > > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > > at > > > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > > [2015-04-13 10:11:26,677] ERROR Failed to collate messages by topic, > > partition due to: fetching topic metadata for topics [Set(testX)] from > > broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] failed > > (kafka.producer.async.DefaultEventHandler) > > [2015-04-13 10:11:26,780] WARN Fetching topic metadata with correlation > id > > 6 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181] > > failed (kafka.client.ClientUtils$) > > java.io.EOFException: Received -1 when reading from channel, socket has > > likely been closed. > > at kafka.utils.Utils$.read(Utils.scala:381) > > at > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > at > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) > > at > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > > at > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > at > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > > at kafka.utils.Utils$.swallow(Utils.scala:172) > > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > at > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > > at > > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > > at > > > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > > [2015-04-13 10:11:26,781] ERROR fetching topic metadata for topics > > [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] > > failed (kafka.utils.Utils$) > > kafka.common.KafkaException: fetching topic metadata for topics > > [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] > > failed > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) > > at > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > at > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > > at kafka.utils.Utils$.swallow(Utils.scala:172) > > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > at > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > > at > > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > > at > > > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > > Caused by: java.io.EOFException: Received -1 when reading from channel, > > socket has likely been closed. > > at kafka.utils.Utils$.read(Utils.scala:381) > > at > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > at > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) > > at > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > > ... 12 more > > [2015-04-13 10:11:26,784] WARN Fetching topic metadata with correlation > id > > 7 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181] > > failed (kafka.client.ClientUtils$) > > java.io.EOFException: Received -1 when reading from channel, socket has > > likely been closed. > > at kafka.utils.Utils$.read(Utils.scala:381) > > at > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > at > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) > > at > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > > at > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > at > > > > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > > at > > > > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) > > at > > > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > > at > > > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149) > > at > > > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > > at > > > > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149) > > at > > > > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95) > > at > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > > at > > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > > at > > > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > > [2015-04-13 10:11:26,786] ERROR Failed to collate messages by topic, > > partition due to: fetching topic metadata for topics [Set(testX)] from > > broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] failed > > (kafka.producer.async.DefaultEventHandler) > > [2015-04-13 10:11:26,888] WARN Fetching topic metadata with correlation > id > > 8 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181] > > failed (kafka.client.ClientUtils$) > > java.io.EOFException: Received -1 when reading from channel, socket has > > likely been closed. > > at kafka.utils.Utils$.read(Utils.scala:381) > > at > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > at > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) > > at > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > > at > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > at > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > > at kafka.utils.Utils$.swallow(Utils.scala:172) > > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > at > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > > at > > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > > at > > > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > > [2015-04-13 10:11:26,889] ERROR fetching topic metadata for topics > > [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] > > failed (kafka.utils.Utils$) > > kafka.common.KafkaException: fetching topic metadata for topics > > [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] > > failed > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) > > at > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > at > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > > at kafka.utils.Utils$.swallow(Utils.scala:172) > > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > at > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > > at > > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > > at > > > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > > Caused by: java.io.EOFException: Received -1 when reading from channel, > > socket has likely been closed. > > at kafka.utils.Utils$.read(Utils.scala:381) > > at > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > at > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) > > at > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > > ... 12 more > > [2015-04-13 10:11:26,893] ERROR Failed to send requests for topics testX > > with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler) > > [2015-04-13 10:11:26,894] ERROR Error in handling batch of 1 events > > (kafka.producer.async.ProducerSendThread) > > kafka.common.FailedToSendMessageException: Failed to send messages after > 3 > > tries. > > at > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) > > at > > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > > at > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > > at > > > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > -- -- Guozhang