Jun, The messages are always available. My requirement is to get them in batches (without worrying about offsets) so that I can do batch aggregation. I want to mark each batch with a batch id so that my bolts can commit the aggregated results to the database.
Thanks for your quick reply, Vaibhav On Thu, Jul 12, 2012 at 11:33 AM, Jun Rao <jun...@gmail.com> wrote: > If you don't want to shutdown the connector each time, you can also set > consumer.timeout.ms. This way, the iterator hasNext will get an exception > if no new messages are available after the timeout. The iterator is > re-enterable. > > Thanks, > > Jun > > On Thu, Jul 12, 2012 at 11:24 AM, Vaibhav Puranik <vpura...@gmail.com > >wrote: > > > Neha, > > > > I tried calling shutdown on ConsumerConnector. The only problem I have > with > > it is that it forces me to close the connection thereby forcing me to > open > > the connection every single time I want to fetch new set of messages. > > Here is my sample code - http://pastebin.com/6NBHtteL > > > > But it's not a big deal. I am writing a Zookeeper based KafakSpout. It's > > possible for me to open the connection everytime. I don't think it's > going > > to be huge performance problem at our scale. > > > > I am still getting those exceptions inspite of calling > > ConsumerConnector.shutdown. *But I noticed that it's caught and handled > by > > Kafka code. It's being logged by Kafka with INFO log level. *It lets me > > continue inspite of the exception. > > > > Ideally I would have liked to have a cleaner INFO log. This puts a stack > > trace in the Info log which if possible should be avoided. If you want I > > can file a jira issue to get rid of it. > > > > Regards, > > Vaibhav > > > > > > > > > > On Thu, Jul 12, 2012 at 10:26 AM, Neha Narkhede <neha.narkh...@gmail.com > > >wrote: > > > > > Vaibhav, > > > > > > The cleaner way of exiting the consumer iterator loop is by calling > > > the shutdown() API on the ZookeeperConsumerConnector. But probably > > > there is a reason you are not able to use that approach ? > > > > > > Thanks, > > > Neha > > > > > > On Thu, Jul 12, 2012 at 9:34 AM, Vaibhav Puranik <vpura...@gmail.com> > > > wrote: > > > > I tried breaking the loop in the middle and here is what I got: > > > > > > > > 10158 [FetchRunnable-0] INFO kafka.consumer.SimpleConsumer - > > Reconnect > > > in > > > > multifetch due to socket error: > > > > java.nio.channels.ClosedByInterruptException > > > > at > > > > > > > > > > java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184) > > > > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:270) > > > > at kafka.utils.Utils$.read(Utils.scala:538) > > > > at > > > > > > > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67) > > > > at > > kafka.network.Receive$class.readCompletely(Transmission.scala:55) > > > > at > > > > > > > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > > > at > > > kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177) > > > > at > > > kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117) > > > > at > > kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115) > > > > at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60) > > > > 10163 [FetchRunnable-0] INFO kafka.consumer.FetcherRunnable - > > > > FecherRunnable Thread[FetchRunnable-0,5,main] interrupted > > > > > > > > I get this exception one per broker. I get three exceptions because > we > > > have > > > > three brokers. > > > > > > > > I am calling KafakStream.clear() just before breaking the loop. > > > > > > > > Is there any way to break the stream cleanly? Or am I just suppose to > > > catch > > > > this exception? > > > > (My fetch size if 1 MB and batch size is small for now - 5. But I > > don't > > > > think we can ever match batch size with fetch size accurately because > > > each > > > > message size is not fixed). > > > > > > > > Regards, > > > > Vaibhav > > > > > > > > > > > > On Thu, Jul 12, 2012 at 7:17 AM, Jun Rao <jun...@gmail.com> wrote: > > > > > > > >> Yes, it knows. The consumer offset is only advanced every time a > > > message is > > > >> iterated over. > > > >> > > > >> Thanks, > > > >> > > > >> Jun > > > >> > > > >> On Wed, Jul 11, 2012 at 10:03 PM, Vaibhav Puranik < > vpura...@gmail.com > > > >> >wrote: > > > >> > > > >> > The inner loop keeps running. If I break it in the middle, is > Kafka > > > >> broker > > > >> > going to know that rest of the mesaages in the stream were not > > > delivered? > > > >> > > > > >> > Regards, > > > >> > Vaibhav > > > >> > GumGum > > > >> > On Jul 11, 2012 5:05 PM, "Vaibhav Puranik" <vpura...@gmail.com> > > > wrote: > > > >> > > > > >> > > Hi all, > > > >> > > > > > >> > > Is there any way to get a fixed amount of messages using > Zookeeper > > > >> based > > > >> > > consumer (ConsumerConnector)? > > > >> > > > > > >> > > I know that with SimpleConsumer you can pass fetchSize as an > > > argument > > > >> and > > > >> > > limit the number of messages coming back. > > > >> > > > > > >> > > This sample code creates 4 threads that keep consuming forever. > > > >> > > > > > >> > > > > > >> > > // consume the messages in the threads > > > >> > > for(final KafkaStream<Message> stream: streams) { > > > >> > > executor.submit(new Runnable() { > > > >> > > public void run() { > > > >> > > for(MessageAndMetadata msgAndMetadata: stream) { > > > >> > > // process message (msgAndMetadata.message()) > > > >> > > } > > > >> > > } > > > >> > > }); > > > >> > > } > > > >> > > > > > >> > > Regards, > > > >> > > Vaibhav > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > >> > > > > > >