Is there another broker running on that ip? If the replication factor is
larger than 1, the follower will be fetching data from the leader just like
a regular consumer.

Thanks,

Jun

On Tue, Jan 27, 2015 at 9:52 AM, Scott Reynolds <sreyno...@twilio.com>
wrote:

> On my brokers I am seeing this error log message:
>
> Closing socket for /X because of error (X is the ip address of a consumer)
> > 2015-01-27_17:32:58.29890 java.io.IOException: Connection reset by peer
> > 2015-01-27_17:32:58.29890       at
> > sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
> > 2015-01-27_17:32:58.29891       at
> > sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:433)
> > 2015-01-27_17:32:58.29892       at
> > sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:565)
> > 2015-01-27_17:32:58.29892       at
> > kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:147)
> > 2015-01-27_17:32:58.29893       at
> > kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:69)
> > 2015-01-27_17:32:58.29893       at
> > kafka.network.MultiSend.writeTo(Transmission.scala:102)
> > 2015-01-27_17:32:58.29894       at
> > kafka.api.TopicDataSend.writeTo(FetchResponse.scala:124)
> > 2015-01-27_17:32:58.29895       at
> > kafka.network.MultiSend.writeTo(Transmission.scala:102)
> > 2015-01-27_17:32:58.29895       at
> > kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:219)
> > 2015-01-27_17:32:58.29896       at
> > kafka.network.Processor.write(SocketServer.scala:375)
> > 2015-01-27_17:32:58.29896       at
> > kafka.network.Processor.run(SocketServer.scala:247)
> > 2015-01-27_17:32:58.29897       at java.lang.Thread.run(Thread.java:745)
> >
>
> This is because the Processor doesn't handle java.io.IOException and it
> falls through to the catch all.
>
> My consumers seem actually really happy. So I don't think there is a real
> issue here. But I could use some help figuring out if there is.
>
> We are using the Java consumer like so:
>
> > final ConsumerConnector consumer =
> > kafka.consumer.Consumer.createJavaConsumerConnector(config);
> >     Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> >     topicCountMap.put(topicName, new Integer(1));
> >     final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > consumer.createMessageStreams(topicCountMap);
> >     final KafkaStream<byte[], byte[]> stream =
> > consumerMap.get(topicName).get(0);
> >
>
> and we just iterate over the stream
>
> Questions:
> 1. What class is the one that makes the network connection to the consumer?
> 2. Do legit cases exist where the consumer would close its socket
> connection ? Zookeeper issues ? Consumer too far behind ?
>

Reply via email to