If you don't want to shutdown the connector each time, you can also set
consumer.timeout.ms. This way, the iterator hasNext will get an exception
if no new messages are available after the timeout. The iterator is
re-enterable.

Thanks,

Jun

On Thu, Jul 12, 2012 at 11:24 AM, Vaibhav Puranik <vpura...@gmail.com>wrote:

> Neha,
>
> I tried calling shutdown on ConsumerConnector. The only problem I have with
> it is that it forces me to close the connection thereby forcing me to open
> the connection every single time I want to fetch new set of messages.
> Here is my sample code - http://pastebin.com/6NBHtteL
>
> But it's not a big deal. I am writing a Zookeeper based KafakSpout. It's
> possible for me to open the connection everytime. I don't think it's going
> to be huge performance problem at our scale.
>
> I am still getting those exceptions inspite of calling
> ConsumerConnector.shutdown. *But I noticed that it's caught and handled by
> Kafka code. It's being logged by Kafka with INFO log level. *It lets me
> continue inspite of the exception.
>
> Ideally I would have liked to have a cleaner INFO log. This puts a stack
> trace in the Info log which if possible should be avoided. If you want I
> can file a jira issue to get rid of it.
>
> Regards,
> Vaibhav
>
>
>
>
> On Thu, Jul 12, 2012 at 10:26 AM, Neha Narkhede <neha.narkh...@gmail.com
> >wrote:
>
> > Vaibhav,
> >
> > The cleaner way of exiting the consumer iterator loop is by calling
> > the shutdown() API on the ZookeeperConsumerConnector. But probably
> > there is a reason you are not able to use that approach ?
> >
> > Thanks,
> > Neha
> >
> > On Thu, Jul 12, 2012 at 9:34 AM, Vaibhav Puranik <vpura...@gmail.com>
> > wrote:
> > > I tried breaking the loop in the middle and here is what I got:
> > >
> > > 10158 [FetchRunnable-0] INFO  kafka.consumer.SimpleConsumer  -
> Reconnect
> > in
> > > multifetch due to socket error:
> > > java.nio.channels.ClosedByInterruptException
> > >     at
> > >
> >
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
> > >     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:270)
> > >     at kafka.utils.Utils$.read(Utils.scala:538)
> > >     at
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
> > >     at
> kafka.network.Receive$class.readCompletely(Transmission.scala:55)
> > >     at
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > >     at
> > kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177)
> > >     at
> > kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117)
> > >     at
> kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115)
> > >     at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60)
> > > 10163 [FetchRunnable-0] INFO  kafka.consumer.FetcherRunnable  -
> > > FecherRunnable Thread[FetchRunnable-0,5,main] interrupted
> > >
> > > I get this exception one per broker. I get three exceptions because we
> > have
> > > three brokers.
> > >
> > > I am calling KafakStream.clear() just before breaking the loop.
> > >
> > > Is there any way to break the stream cleanly? Or am I just suppose to
> > catch
> > > this exception?
> > > (My fetch size if 1 MB and batch size is small for now - 5. But I
>  don't
> > > think we can ever match batch size with fetch size accurately because
> > each
> > > message size is not fixed).
> > >
> > > Regards,
> > > Vaibhav
> > >
> > >
> > > On Thu, Jul 12, 2012 at 7:17 AM, Jun Rao <jun...@gmail.com> wrote:
> > >
> > >> Yes, it knows. The consumer offset is only advanced every time a
> > message is
> > >> iterated over.
> > >>
> > >> Thanks,
> > >>
> > >> Jun
> > >>
> > >> On Wed, Jul 11, 2012 at 10:03 PM, Vaibhav Puranik <vpura...@gmail.com
> > >> >wrote:
> > >>
> > >> > The inner loop keeps running. If I break it in the middle, is Kafka
> > >> broker
> > >> > going to know that rest of the mesaages in the stream were not
> > delivered?
> > >> >
> > >> > Regards,
> > >> > Vaibhav
> > >> > GumGum
> > >> > On Jul 11, 2012 5:05 PM, "Vaibhav Puranik" <vpura...@gmail.com>
> > wrote:
> > >> >
> > >> > > Hi all,
> > >> > >
> > >> > > Is there any way to get a fixed amount of messages using Zookeeper
> > >> based
> > >> > > consumer (ConsumerConnector)?
> > >> > >
> > >> > > I know that with SimpleConsumer you can pass fetchSize as an
> > argument
> > >> and
> > >> > > limit the number of messages coming back.
> > >> > >
> > >> > > This sample code creates 4 threads that keep consuming forever.
> > >> > >
> > >> > >
> > >> > > // consume the messages in the threads
> > >> > > for(final KafkaStream<Message> stream: streams) {
> > >> > >   executor.submit(new Runnable() {
> > >> > >     public void run() {
> > >> > >       for(MessageAndMetadata msgAndMetadata: stream) {
> > >> > >         // process message (msgAndMetadata.message())
> > >> > >       }
> > >> > >     }
> > >> > >   });
> > >> > > }
> > >> > >
> > >> > > Regards,
> > >> > > Vaibhav
> > >> > >
> > >> > >
> > >> > >
> > >> >
> > >>
> >
>

Reply via email to