Neha, I tried calling shutdown on ConsumerConnector. The only problem I have with it is that it forces me to close the connection thereby forcing me to open the connection every single time I want to fetch new set of messages. Here is my sample code - http://pastebin.com/6NBHtteL
But it's not a big deal. I am writing a Zookeeper based KafakSpout. It's possible for me to open the connection everytime. I don't think it's going to be huge performance problem at our scale. I am still getting those exceptions inspite of calling ConsumerConnector.shutdown. *But I noticed that it's caught and handled by Kafka code. It's being logged by Kafka with INFO log level. *It lets me continue inspite of the exception. Ideally I would have liked to have a cleaner INFO log. This puts a stack trace in the Info log which if possible should be avoided. If you want I can file a jira issue to get rid of it. Regards, Vaibhav On Thu, Jul 12, 2012 at 10:26 AM, Neha Narkhede <neha.narkh...@gmail.com>wrote: > Vaibhav, > > The cleaner way of exiting the consumer iterator loop is by calling > the shutdown() API on the ZookeeperConsumerConnector. But probably > there is a reason you are not able to use that approach ? > > Thanks, > Neha > > On Thu, Jul 12, 2012 at 9:34 AM, Vaibhav Puranik <vpura...@gmail.com> > wrote: > > I tried breaking the loop in the middle and here is what I got: > > > > 10158 [FetchRunnable-0] INFO kafka.consumer.SimpleConsumer - Reconnect > in > > multifetch due to socket error: > > java.nio.channels.ClosedByInterruptException > > at > > > java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184) > > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:270) > > at kafka.utils.Utils$.read(Utils.scala:538) > > at > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67) > > at kafka.network.Receive$class.readCompletely(Transmission.scala:55) > > at > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > at > kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177) > > at > kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117) > > at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115) > > at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60) > > 10163 [FetchRunnable-0] INFO kafka.consumer.FetcherRunnable - > > FecherRunnable Thread[FetchRunnable-0,5,main] interrupted > > > > I get this exception one per broker. I get three exceptions because we > have > > three brokers. > > > > I am calling KafakStream.clear() just before breaking the loop. > > > > Is there any way to break the stream cleanly? Or am I just suppose to > catch > > this exception? > > (My fetch size if 1 MB and batch size is small for now - 5. But I don't > > think we can ever match batch size with fetch size accurately because > each > > message size is not fixed). > > > > Regards, > > Vaibhav > > > > > > On Thu, Jul 12, 2012 at 7:17 AM, Jun Rao <jun...@gmail.com> wrote: > > > >> Yes, it knows. The consumer offset is only advanced every time a > message is > >> iterated over. > >> > >> Thanks, > >> > >> Jun > >> > >> On Wed, Jul 11, 2012 at 10:03 PM, Vaibhav Puranik <vpura...@gmail.com > >> >wrote: > >> > >> > The inner loop keeps running. If I break it in the middle, is Kafka > >> broker > >> > going to know that rest of the mesaages in the stream were not > delivered? > >> > > >> > Regards, > >> > Vaibhav > >> > GumGum > >> > On Jul 11, 2012 5:05 PM, "Vaibhav Puranik" <vpura...@gmail.com> > wrote: > >> > > >> > > Hi all, > >> > > > >> > > Is there any way to get a fixed amount of messages using Zookeeper > >> based > >> > > consumer (ConsumerConnector)? > >> > > > >> > > I know that with SimpleConsumer you can pass fetchSize as an > argument > >> and > >> > > limit the number of messages coming back. > >> > > > >> > > This sample code creates 4 threads that keep consuming forever. > >> > > > >> > > > >> > > // consume the messages in the threads > >> > > for(final KafkaStream<Message> stream: streams) { > >> > > executor.submit(new Runnable() { > >> > > public void run() { > >> > > for(MessageAndMetadata msgAndMetadata: stream) { > >> > > // process message (msgAndMetadata.message()) > >> > > } > >> > > } > >> > > }); > >> > > } > >> > > > >> > > Regards, > >> > > Vaibhav > >> > > > >> > > > >> > > > >> > > >> >