You can stop the iterating of messages by calling connector.shutdown. When a consumer is restarted, it resumes from the last checkpointed offset in ZK, assuming the consuming group id is the same.
Jun On Tue, May 15, 2012 at 6:04 AM, navneet sharma <navneetsharma0...@gmail.com > wrote: > Jun, > > Deleting zookeeper directory or broker logs is problematic and should not > be done. > > How about killing a consumer process? Since, its in a infinite loop, i > can't see any other clean option to stop it. > > Also, if the same consumer is restarted, is it treated as the same previous > consumer in zookeeper or a different one? I checked the docs but its not > very clear to me. > > Thanks, > Navneet Sharma > > On Fri, May 11, 2012 at 8:07 PM, Jun Rao <jun...@gmail.com> wrote: > > > Navneet, > > > > Normally, you shouldn't delete the broker log yourself (it's GC-ed based > on > > the retention time configured at the broker). If this is for testing, > then > > you need to clean up the ZK data too. > > > > Jun > > > > On Fri, May 11, 2012 at 1:05 AM, navneet sharma < > > navneetsharma0...@gmail.com > > > wrote: > > > > > i deleted the broker log file and that fixed the problem. But is there > > any > > > better way to fix? > > > > > > On Fri, May 11, 2012 at 12:39 PM, navneet sharma < > > > navneetsharma0...@gmail.com> wrote: > > > > > > > Hi, > > > > > > > > I tried following scenario: > > > > 1) Created producer for sending messages to 3 topics. > > > > 2) Created 3 consumers in same group for 1 topic, so 2 topics should > be > > > > unread. > > > > 3) After successful execution of consumer-producer for multiple > times, > > i > > > > thought to delete the log file because it grew very large. > > > > 4) So in effect, the messages for 2 topics which were unread got > > deleted. > > > > 5) I ran the above experiment again. > > > > 6) Now, changed the consumer code - and created 3 consumers for each > > of 3 > > > > topics in 3 different groups. So, now i wanted to read messages for > > all 3 > > > > topics. > > > > > > > > But, after that i am seeing following exception in broker log::: > > > > 20736 [kafka-processor-0] ERROR kafka.server.KafkaRequestHandlers - > > > error > > > > when processing request FetchRequest(topic:orderTopic, part:0 > > > > offset:298534904 maxSize:307200) > > > > kafka.common.OffsetOutOfRangeException: offset 298534904 is out of > > range > > > > at kafka.log.Log$.findRange(Log.scala:48) > > > > at kafka.log.Log.read(Log.scala:224) > > > > at > > > > > > > > > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$readMessageSet(KafkaRequestHandlers.scala:116) > > > > at > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$2.apply(KafkaRequestHandlers.scala:106) > > > > at > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$2.apply(KafkaRequestHandlers.scala:105) > > > > at > > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > > > at > > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > > > at > > > > > > > > > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > > > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > > > > at > > > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > > > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) > > > > at > > > > > > > > > > kafka.server.KafkaRequestHandlers.handleMultiFetchRequest(KafkaRequestHandlers.scala:105) > > > > at > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$3.apply(KafkaRequestHandlers.scala:45) > > > > at > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$3.apply(KafkaRequestHandlers.scala:45) > > > > at kafka.network.Processor.handle(SocketServer.scala:289) > > > > at kafka.network.Processor.read(SocketServer.scala:312) > > > > at kafka.network.Processor.run(SocketServer.scala:207) > > > > at java.lang.Thread.run(Thread.java:662) > > > > > > > > and this exception at consumer side::: > > > > 12:27:36,259 [FetchRunnable-0] ERROR kafka.consumer.FetcherRunnable > - > > > > error in FetcherRunnable for orderTopic:1-1: fetched offset = > > 254633932: > > > > consumed offset = 254633932 > > > > kafka.common.InvalidMessageSizeException: invalid message size: > > > 1681733685 > > > > only received bytes: 307196 at 254633932( possible causes (1) a > single > > > > message larger than the fetch size; (2) log corruption ) > > > > at > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:103) > > > > at > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:138) > > > > at > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82) > > > > at > > > > > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > > > at > > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > > > > at > > > > > > > > > > kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:65) > > > > at > > > > > > > > > > kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:60) > > > > at > > > > > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:62) > > > > at > > > > > > > > > > kafka.consumer.FetcherRunnable$$anonfun$run$3.apply(FetcherRunnable.scala:82) > > > > at > > > > > > > > > > kafka.consumer.FetcherRunnable$$anonfun$run$3.apply(FetcherRunnable.scala:68) > > > > at > > > > > > > > > > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) > > > > at scala.collection.immutable.List.foreach(List.scala:45) > > > > at > kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:68) > > > > > > > > > > > > in fact consumer is getting killed after throwing this exception. > > > > > > > > i feel i got into trouble because i deleted the logs in between and > > > > consumer for some reason is still trying to retrieve messages from an > > > older > > > > offset. Is that the case? > > > > > > > > How to get over this problem? > > > > > > > > Thanks, > > > > Navneet Sharma > > > > > > > > > >