Is there another broker running on that ip? If the replication factor is larger than 1, the follower will be fetching data from the leader just like a regular consumer.
Thanks, Jun On Tue, Jan 27, 2015 at 9:52 AM, Scott Reynolds <sreyno...@twilio.com> wrote: > On my brokers I am seeing this error log message: > > Closing socket for /X because of error (X is the ip address of a consumer) > > 2015-01-27_17:32:58.29890 java.io.IOException: Connection reset by peer > > 2015-01-27_17:32:58.29890 at > > sun.nio.ch.FileChannelImpl.transferTo0(Native Method) > > 2015-01-27_17:32:58.29891 at > > sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:433) > > 2015-01-27_17:32:58.29892 at > > sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:565) > > 2015-01-27_17:32:58.29892 at > > kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:147) > > 2015-01-27_17:32:58.29893 at > > kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:69) > > 2015-01-27_17:32:58.29893 at > > kafka.network.MultiSend.writeTo(Transmission.scala:102) > > 2015-01-27_17:32:58.29894 at > > kafka.api.TopicDataSend.writeTo(FetchResponse.scala:124) > > 2015-01-27_17:32:58.29895 at > > kafka.network.MultiSend.writeTo(Transmission.scala:102) > > 2015-01-27_17:32:58.29895 at > > kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:219) > > 2015-01-27_17:32:58.29896 at > > kafka.network.Processor.write(SocketServer.scala:375) > > 2015-01-27_17:32:58.29896 at > > kafka.network.Processor.run(SocketServer.scala:247) > > 2015-01-27_17:32:58.29897 at java.lang.Thread.run(Thread.java:745) > > > > This is because the Processor doesn't handle java.io.IOException and it > falls through to the catch all. > > My consumers seem actually really happy. So I don't think there is a real > issue here. But I could use some help figuring out if there is. > > We are using the Java consumer like so: > > > final ConsumerConnector consumer = > > kafka.consumer.Consumer.createJavaConsumerConnector(config); > > Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); > > topicCountMap.put(topicName, new Integer(1)); > > final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = > > consumer.createMessageStreams(topicCountMap); > > final KafkaStream<byte[], byte[]> stream = > > consumerMap.get(topicName).get(0); > > > > and we just iterate over the stream > > Questions: > 1. What class is the one that makes the network connection to the consumer? > 2. Do legit cases exist where the consumer would close its socket > connection ? Zookeeper issues ? Consumer too far behind ? >