I tried breaking the loop in the middle and here is what I got: 10158 [FetchRunnable-0] INFO kafka.consumer.SimpleConsumer - Reconnect in multifetch due to socket error: java.nio.channels.ClosedByInterruptException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:270) at kafka.utils.Utils$.read(Utils.scala:538) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67) at kafka.network.Receive$class.readCompletely(Transmission.scala:55) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177) at kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117) at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115) at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60) 10163 [FetchRunnable-0] INFO kafka.consumer.FetcherRunnable - FecherRunnable Thread[FetchRunnable-0,5,main] interrupted
I get this exception one per broker. I get three exceptions because we have three brokers. I am calling KafakStream.clear() just before breaking the loop. Is there any way to break the stream cleanly? Or am I just suppose to catch this exception? (My fetch size if 1 MB and batch size is small for now - 5. But I don't think we can ever match batch size with fetch size accurately because each message size is not fixed). Regards, Vaibhav On Thu, Jul 12, 2012 at 7:17 AM, Jun Rao <jun...@gmail.com> wrote: > Yes, it knows. The consumer offset is only advanced every time a message is > iterated over. > > Thanks, > > Jun > > On Wed, Jul 11, 2012 at 10:03 PM, Vaibhav Puranik <vpura...@gmail.com > >wrote: > > > The inner loop keeps running. If I break it in the middle, is Kafka > broker > > going to know that rest of the mesaages in the stream were not delivered? > > > > Regards, > > Vaibhav > > GumGum > > On Jul 11, 2012 5:05 PM, "Vaibhav Puranik" <vpura...@gmail.com> wrote: > > > > > Hi all, > > > > > > Is there any way to get a fixed amount of messages using Zookeeper > based > > > consumer (ConsumerConnector)? > > > > > > I know that with SimpleConsumer you can pass fetchSize as an argument > and > > > limit the number of messages coming back. > > > > > > This sample code creates 4 threads that keep consuming forever. > > > > > > > > > // consume the messages in the threads > > > for(final KafkaStream<Message> stream: streams) { > > > executor.submit(new Runnable() { > > > public void run() { > > > for(MessageAndMetadata msgAndMetadata: stream) { > > > // process message (msgAndMetadata.message()) > > > } > > > } > > > }); > > > } > > > > > > Regards, > > > Vaibhav > > > > > > > > > > > >