This seems to happen when fetching the metadata. Are you using a VIP as broker.list?
Thanks, Jun On Fri, Aug 8, 2014 at 2:42 PM, S. Zhou <myx...@yahoo.com.invalid> wrote: > Thanks Guozhang. Any ideas on what could be wrong on that machine? We set > up multiple producers in the same way but only one has this issue. > > > On Friday, August 8, 2014 2:41 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > This might be due to some issue on that producer machine, the "producer > queue full and message sent rate low" is likely to be the result of the > frequent connection timeout, but not the cause of it. > > Guozhang > > > > > On Fri, Aug 8, 2014 at 2:30 PM, S. Zhou <myx...@yahoo.com.invalid> wrote: > > A Kafka producer frequently timeout when connecting to a remote Kafka > cluster while producers on other machine (same data center) can connect to > the Kafka cluster with no problem. From the monitoring, the > ProductQueueSize is always full and message sent rate is low. We use Kafka > 0.8. We set "batch.num.messages=10000" and "queue.buffering.max.ms=5000". > > > >Here is the error message: > >[2014-08-08 17:52:02,786] ProducerSendThread producer.SyncProducer ERROR > Producer connection to kafka-XXX.com:9092 unsuccessful > >java.net.ConnectException: Connection timed out > > at sun.nio.ch.Net.connect(Native Method) > > at > sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525) > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > at > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) > > at kafka.utils.Utils$.swallow(Utils.scala:187) > > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > > at kafka.utils.Utils$.swallowError(Utils.scala:46) > > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) > > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) > > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) > > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) > > at scala.collection.immutable.Stream.foreach(Stream.scala:548) > > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) > > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) > > > > > -- > > -- Guozhang >