You can stop the iterating of messages by calling connector.shutdown. When
a consumer is restarted, it resumes from the last checkpointed offset in
ZK, assuming the consuming group id is the same.

Jun

On Tue, May 15, 2012 at 6:04 AM, navneet sharma <navneetsharma0...@gmail.com
> wrote:

> Jun,
>
> Deleting zookeeper directory or broker logs is problematic and should not
> be done.
>
> How about killing a consumer process? Since, its in a infinite loop, i
> can't see any other clean option to stop it.
>
> Also, if the same consumer is restarted, is it treated as the same previous
> consumer in zookeeper or a different one? I checked the docs but its not
> very clear to me.
>
> Thanks,
> Navneet Sharma
>
> On Fri, May 11, 2012 at 8:07 PM, Jun Rao <jun...@gmail.com> wrote:
>
> > Navneet,
> >
> > Normally, you shouldn't delete the broker log yourself (it's GC-ed based
> on
> > the retention time configured at the broker). If this is for testing,
> then
> > you need to clean up the ZK data too.
> >
> > Jun
> >
> > On Fri, May 11, 2012 at 1:05 AM, navneet sharma <
> > navneetsharma0...@gmail.com
> > > wrote:
> >
> > > i deleted the broker log file and that fixed the problem. But is there
> > any
> > > better way to fix?
> > >
> > > On Fri, May 11, 2012 at 12:39 PM, navneet sharma <
> > > navneetsharma0...@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > I tried following scenario:
> > > > 1) Created producer for sending messages to 3 topics.
> > > > 2) Created 3 consumers in same group for 1 topic, so 2 topics should
> be
> > > > unread.
> > > > 3) After successful execution of consumer-producer for multiple
> times,
> > i
> > > > thought to delete the log file because it grew very large.
> > > > 4) So in effect, the messages for 2 topics which were unread got
> > deleted.
> > > > 5) I ran the above experiment again.
> > > > 6) Now, changed the consumer code - and created 3 consumers for each
> > of 3
> > > > topics in 3 different groups. So, now i wanted to read messages for
> > all 3
> > > > topics.
> > > >
> > > > But, after that i am seeing following exception in broker log:::
> > > > 20736 [kafka-processor-0] ERROR kafka.server.KafkaRequestHandlers  -
> > > error
> > > > when processing request FetchRequest(topic:orderTopic, part:0
> > > > offset:298534904 maxSize:307200)
> > > > kafka.common.OffsetOutOfRangeException: offset 298534904 is out of
> > range
> > > >     at kafka.log.Log$.findRange(Log.scala:48)
> > > >     at kafka.log.Log.read(Log.scala:224)
> > > >     at
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$readMessageSet(KafkaRequestHandlers.scala:116)
> > > >     at
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$2.apply(KafkaRequestHandlers.scala:106)
> > > >     at
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$2.apply(KafkaRequestHandlers.scala:105)
> > > >     at
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > >     at
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > >     at
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > >     at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > >     at
> > > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > >     at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > >     at
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiFetchRequest(KafkaRequestHandlers.scala:105)
> > > >     at
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$3.apply(KafkaRequestHandlers.scala:45)
> > > >     at
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$3.apply(KafkaRequestHandlers.scala:45)
> > > >     at kafka.network.Processor.handle(SocketServer.scala:289)
> > > >     at kafka.network.Processor.read(SocketServer.scala:312)
> > > >     at kafka.network.Processor.run(SocketServer.scala:207)
> > > >     at java.lang.Thread.run(Thread.java:662)
> > > >
> > > > and this exception at consumer side:::
> > > > 12:27:36,259 [FetchRunnable-0] ERROR kafka.consumer.FetcherRunnable
>  -
> > > > error in FetcherRunnable for orderTopic:1-1: fetched offset =
> > 254633932:
> > > > consumed offset = 254633932
> > > > kafka.common.InvalidMessageSizeException: invalid message size:
> > > 1681733685
> > > > only received bytes: 307196 at 254633932( possible causes (1) a
> single
> > > > message larger than the fetch size; (2) log corruption )
> > > >         at
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:103)
> > > >         at
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:138)
> > > >         at
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82)
> > > >         at
> > > >
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > > >         at
> > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > > >         at
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:65)
> > > >         at
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:60)
> > > >         at
> > > >
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:62)
> > > >         at
> > > >
> > >
> >
> kafka.consumer.FetcherRunnable$$anonfun$run$3.apply(FetcherRunnable.scala:82)
> > > >         at
> > > >
> > >
> >
> kafka.consumer.FetcherRunnable$$anonfun$run$3.apply(FetcherRunnable.scala:68)
> > > >         at
> > > >
> > >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> > > >         at scala.collection.immutable.List.foreach(List.scala:45)
> > > >         at
> kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:68)
> > > >
> > > >
> > > > in fact consumer is getting killed after throwing this exception.
> > > >
> > > > i feel i got into trouble because i deleted the logs in between and
> > > > consumer for some reason is still trying to retrieve messages from an
> > > older
> > > > offset. Is that the case?
> > > >
> > > > How to get over this problem?
> > > >
> > > > Thanks,
> > > > Navneet Sharma
> > > >
> > >
> >
>

Reply via email to