Jun,

The messages are always available. My requirement is to get them in batches
(without worrying about offsets) so that I can do batch aggregation.
I want to mark each batch with a batch id so that my bolts can commit the
aggregated results to the database.

Thanks for your quick reply,
Vaibhav

On Thu, Jul 12, 2012 at 11:33 AM, Jun Rao <jun...@gmail.com> wrote:

> If you don't want to shutdown the connector each time, you can also set
> consumer.timeout.ms. This way, the iterator hasNext will get an exception
> if no new messages are available after the timeout. The iterator is
> re-enterable.
>
> Thanks,
>
> Jun
>
> On Thu, Jul 12, 2012 at 11:24 AM, Vaibhav Puranik <vpura...@gmail.com
> >wrote:
>
> > Neha,
> >
> > I tried calling shutdown on ConsumerConnector. The only problem I have
> with
> > it is that it forces me to close the connection thereby forcing me to
> open
> > the connection every single time I want to fetch new set of messages.
> > Here is my sample code - http://pastebin.com/6NBHtteL
> >
> > But it's not a big deal. I am writing a Zookeeper based KafakSpout. It's
> > possible for me to open the connection everytime. I don't think it's
> going
> > to be huge performance problem at our scale.
> >
> > I am still getting those exceptions inspite of calling
> > ConsumerConnector.shutdown. *But I noticed that it's caught and handled
> by
> > Kafka code. It's being logged by Kafka with INFO log level. *It lets me
> > continue inspite of the exception.
> >
> > Ideally I would have liked to have a cleaner INFO log. This puts a stack
> > trace in the Info log which if possible should be avoided. If you want I
> > can file a jira issue to get rid of it.
> >
> > Regards,
> > Vaibhav
> >
> >
> >
> >
> > On Thu, Jul 12, 2012 at 10:26 AM, Neha Narkhede <neha.narkh...@gmail.com
> > >wrote:
> >
> > > Vaibhav,
> > >
> > > The cleaner way of exiting the consumer iterator loop is by calling
> > > the shutdown() API on the ZookeeperConsumerConnector. But probably
> > > there is a reason you are not able to use that approach ?
> > >
> > > Thanks,
> > > Neha
> > >
> > > On Thu, Jul 12, 2012 at 9:34 AM, Vaibhav Puranik <vpura...@gmail.com>
> > > wrote:
> > > > I tried breaking the loop in the middle and here is what I got:
> > > >
> > > > 10158 [FetchRunnable-0] INFO  kafka.consumer.SimpleConsumer  -
> > Reconnect
> > > in
> > > > multifetch due to socket error:
> > > > java.nio.channels.ClosedByInterruptException
> > > >     at
> > > >
> > >
> >
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
> > > >     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:270)
> > > >     at kafka.utils.Utils$.read(Utils.scala:538)
> > > >     at
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
> > > >     at
> > kafka.network.Receive$class.readCompletely(Transmission.scala:55)
> > > >     at
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > > >     at
> > > kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177)
> > > >     at
> > > kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117)
> > > >     at
> > kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115)
> > > >     at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60)
> > > > 10163 [FetchRunnable-0] INFO  kafka.consumer.FetcherRunnable  -
> > > > FecherRunnable Thread[FetchRunnable-0,5,main] interrupted
> > > >
> > > > I get this exception one per broker. I get three exceptions because
> we
> > > have
> > > > three brokers.
> > > >
> > > > I am calling KafakStream.clear() just before breaking the loop.
> > > >
> > > > Is there any way to break the stream cleanly? Or am I just suppose to
> > > catch
> > > > this exception?
> > > > (My fetch size if 1 MB and batch size is small for now - 5. But I
> >  don't
> > > > think we can ever match batch size with fetch size accurately because
> > > each
> > > > message size is not fixed).
> > > >
> > > > Regards,
> > > > Vaibhav
> > > >
> > > >
> > > > On Thu, Jul 12, 2012 at 7:17 AM, Jun Rao <jun...@gmail.com> wrote:
> > > >
> > > >> Yes, it knows. The consumer offset is only advanced every time a
> > > message is
> > > >> iterated over.
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Jun
> > > >>
> > > >> On Wed, Jul 11, 2012 at 10:03 PM, Vaibhav Puranik <
> vpura...@gmail.com
> > > >> >wrote:
> > > >>
> > > >> > The inner loop keeps running. If I break it in the middle, is
> Kafka
> > > >> broker
> > > >> > going to know that rest of the mesaages in the stream were not
> > > delivered?
> > > >> >
> > > >> > Regards,
> > > >> > Vaibhav
> > > >> > GumGum
> > > >> > On Jul 11, 2012 5:05 PM, "Vaibhav Puranik" <vpura...@gmail.com>
> > > wrote:
> > > >> >
> > > >> > > Hi all,
> > > >> > >
> > > >> > > Is there any way to get a fixed amount of messages using
> Zookeeper
> > > >> based
> > > >> > > consumer (ConsumerConnector)?
> > > >> > >
> > > >> > > I know that with SimpleConsumer you can pass fetchSize as an
> > > argument
> > > >> and
> > > >> > > limit the number of messages coming back.
> > > >> > >
> > > >> > > This sample code creates 4 threads that keep consuming forever.
> > > >> > >
> > > >> > >
> > > >> > > // consume the messages in the threads
> > > >> > > for(final KafkaStream<Message> stream: streams) {
> > > >> > >   executor.submit(new Runnable() {
> > > >> > >     public void run() {
> > > >> > >       for(MessageAndMetadata msgAndMetadata: stream) {
> > > >> > >         // process message (msgAndMetadata.message())
> > > >> > >       }
> > > >> > >     }
> > > >> > >   });
> > > >> > > }
> > > >> > >
> > > >> > > Regards,
> > > >> > > Vaibhav
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
>

Reply via email to