Hi ,

     We are evaluating kafka 0.8 for our product as a queue system. Our 
architecture remains simple. Our producer (single) will send mesages to any of 
the topics in broker. Thread will be running for each of the topic for every 10 
secs,which in turn consume from its corresponding topic.  For each stream we 
will be using ThreadPool to increase the degree of parallelism. But when 
shutting down the Consumer, i am getting following exception in broker,


 [2014-02-01 13:06:43,240] ERROR Closing socket for /127.0.0.1 because of error 
(kafka.network.Processor)java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcher.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:233)
 at sun.nio.ch.IOUtil.read(IOUtil.java:206)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:236)
 at kafka.utils.Utils$.read(Utils.scala:395)
 at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
 at kafka.network.Processor.read(SocketServer.scala:347)
 at kafka.network.Processor.run(SocketServer.scala:245)
 at java.lang.Thread.run(Thread.java:619)




I am using Highlevel consumer, and the configuration for consumer is given 
below,




                Properties props = new Properties();
 props.put("zookeeper.connect", "localhost:2181");
 props.put("group.id", groupid);
 props.put("zookeeper.session.timeout.ms", "12000");
 props.put("zookeeper.sync.time.ms", "200");
 // props.put("auto.commit.interval.ms", "1000");
 props.put("auto.commit.enable", "false");
 //props.put("consumer.timeout.ms", "10000");
 props.put("auto.offset.reset", "smallest");
 props.put("socket.timeout.ms", "10000");



Am i missing something ? Or this is a expected exception???


PS: We are using Java Client


Thanks in advance




Ranjith Venkatesan


Reply via email to