Vaibhav, The cleaner way of exiting the consumer iterator loop is by calling the shutdown() API on the ZookeeperConsumerConnector. But probably there is a reason you are not able to use that approach ?
Thanks, Neha On Thu, Jul 12, 2012 at 9:34 AM, Vaibhav Puranik <vpura...@gmail.com> wrote: > I tried breaking the loop in the middle and here is what I got: > > 10158 [FetchRunnable-0] INFO kafka.consumer.SimpleConsumer - Reconnect in > multifetch due to socket error: > java.nio.channels.ClosedByInterruptException > at > java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:270) > at kafka.utils.Utils$.read(Utils.scala:538) > at > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67) > at kafka.network.Receive$class.readCompletely(Transmission.scala:55) > at > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177) > at kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117) > at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115) > at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60) > 10163 [FetchRunnable-0] INFO kafka.consumer.FetcherRunnable - > FecherRunnable Thread[FetchRunnable-0,5,main] interrupted > > I get this exception one per broker. I get three exceptions because we have > three brokers. > > I am calling KafakStream.clear() just before breaking the loop. > > Is there any way to break the stream cleanly? Or am I just suppose to catch > this exception? > (My fetch size if 1 MB and batch size is small for now - 5. But I don't > think we can ever match batch size with fetch size accurately because each > message size is not fixed). > > Regards, > Vaibhav > > > On Thu, Jul 12, 2012 at 7:17 AM, Jun Rao <jun...@gmail.com> wrote: > >> Yes, it knows. The consumer offset is only advanced every time a message is >> iterated over. >> >> Thanks, >> >> Jun >> >> On Wed, Jul 11, 2012 at 10:03 PM, Vaibhav Puranik <vpura...@gmail.com >> >wrote: >> >> > The inner loop keeps running. If I break it in the middle, is Kafka >> broker >> > going to know that rest of the mesaages in the stream were not delivered? >> > >> > Regards, >> > Vaibhav >> > GumGum >> > On Jul 11, 2012 5:05 PM, "Vaibhav Puranik" <vpura...@gmail.com> wrote: >> > >> > > Hi all, >> > > >> > > Is there any way to get a fixed amount of messages using Zookeeper >> based >> > > consumer (ConsumerConnector)? >> > > >> > > I know that with SimpleConsumer you can pass fetchSize as an argument >> and >> > > limit the number of messages coming back. >> > > >> > > This sample code creates 4 threads that keep consuming forever. >> > > >> > > >> > > // consume the messages in the threads >> > > for(final KafkaStream<Message> stream: streams) { >> > > executor.submit(new Runnable() { >> > > public void run() { >> > > for(MessageAndMetadata msgAndMetadata: stream) { >> > > // process message (msgAndMetadata.message()) >> > > } >> > > } >> > > }); >> > > } >> > > >> > > Regards, >> > > Vaibhav >> > > >> > > >> > > >> > >>