Vaibhav,

The cleaner way of exiting the consumer iterator loop is by calling
the shutdown() API on the ZookeeperConsumerConnector. But probably
there is a reason you are not able to use that approach ?

Thanks,
Neha

On Thu, Jul 12, 2012 at 9:34 AM, Vaibhav Puranik <vpura...@gmail.com> wrote:
> I tried breaking the loop in the middle and here is what I got:
>
> 10158 [FetchRunnable-0] INFO  kafka.consumer.SimpleConsumer  - Reconnect in
> multifetch due to socket error:
> java.nio.channels.ClosedByInterruptException
>     at
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
>     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:270)
>     at kafka.utils.Utils$.read(Utils.scala:538)
>     at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
>     at kafka.network.Receive$class.readCompletely(Transmission.scala:55)
>     at
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>     at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177)
>     at kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117)
>     at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115)
>     at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60)
> 10163 [FetchRunnable-0] INFO  kafka.consumer.FetcherRunnable  -
> FecherRunnable Thread[FetchRunnable-0,5,main] interrupted
>
> I get this exception one per broker. I get three exceptions because we have
> three brokers.
>
> I am calling KafakStream.clear() just before breaking the loop.
>
> Is there any way to break the stream cleanly? Or am I just suppose to catch
> this exception?
> (My fetch size if 1 MB and batch size is small for now - 5. But I  don't
> think we can ever match batch size with fetch size accurately because each
> message size is not fixed).
>
> Regards,
> Vaibhav
>
>
> On Thu, Jul 12, 2012 at 7:17 AM, Jun Rao <jun...@gmail.com> wrote:
>
>> Yes, it knows. The consumer offset is only advanced every time a message is
>> iterated over.
>>
>> Thanks,
>>
>> Jun
>>
>> On Wed, Jul 11, 2012 at 10:03 PM, Vaibhav Puranik <vpura...@gmail.com
>> >wrote:
>>
>> > The inner loop keeps running. If I break it in the middle, is Kafka
>> broker
>> > going to know that rest of the mesaages in the stream were not delivered?
>> >
>> > Regards,
>> > Vaibhav
>> > GumGum
>> > On Jul 11, 2012 5:05 PM, "Vaibhav Puranik" <vpura...@gmail.com> wrote:
>> >
>> > > Hi all,
>> > >
>> > > Is there any way to get a fixed amount of messages using Zookeeper
>> based
>> > > consumer (ConsumerConnector)?
>> > >
>> > > I know that with SimpleConsumer you can pass fetchSize as an argument
>> and
>> > > limit the number of messages coming back.
>> > >
>> > > This sample code creates 4 threads that keep consuming forever.
>> > >
>> > >
>> > > // consume the messages in the threads
>> > > for(final KafkaStream<Message> stream: streams) {
>> > >   executor.submit(new Runnable() {
>> > >     public void run() {
>> > >       for(MessageAndMetadata msgAndMetadata: stream) {
>> > >         // process message (msgAndMetadata.message())
>> > >       }
>> > >     }
>> > >   });
>> > > }
>> > >
>> > > Regards,
>> > > Vaibhav
>> > >
>> > >
>> > >
>> >
>>

Reply via email to