i deleted the broker log file and that fixed the problem. But is there any better way to fix?
On Fri, May 11, 2012 at 12:39 PM, navneet sharma < navneetsharma0...@gmail.com> wrote: > Hi, > > I tried following scenario: > 1) Created producer for sending messages to 3 topics. > 2) Created 3 consumers in same group for 1 topic, so 2 topics should be > unread. > 3) After successful execution of consumer-producer for multiple times, i > thought to delete the log file because it grew very large. > 4) So in effect, the messages for 2 topics which were unread got deleted. > 5) I ran the above experiment again. > 6) Now, changed the consumer code - and created 3 consumers for each of 3 > topics in 3 different groups. So, now i wanted to read messages for all 3 > topics. > > But, after that i am seeing following exception in broker log::: > 20736 [kafka-processor-0] ERROR kafka.server.KafkaRequestHandlers - error > when processing request FetchRequest(topic:orderTopic, part:0 > offset:298534904 maxSize:307200) > kafka.common.OffsetOutOfRangeException: offset 298534904 is out of range > at kafka.log.Log$.findRange(Log.scala:48) > at kafka.log.Log.read(Log.scala:224) > at > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$readMessageSet(KafkaRequestHandlers.scala:116) > at > kafka.server.KafkaRequestHandlers$$anonfun$2.apply(KafkaRequestHandlers.scala:106) > at > kafka.server.KafkaRequestHandlers$$anonfun$2.apply(KafkaRequestHandlers.scala:105) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) > at > kafka.server.KafkaRequestHandlers.handleMultiFetchRequest(KafkaRequestHandlers.scala:105) > at > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$3.apply(KafkaRequestHandlers.scala:45) > at > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$3.apply(KafkaRequestHandlers.scala:45) > at kafka.network.Processor.handle(SocketServer.scala:289) > at kafka.network.Processor.read(SocketServer.scala:312) > at kafka.network.Processor.run(SocketServer.scala:207) > at java.lang.Thread.run(Thread.java:662) > > and this exception at consumer side::: > 12:27:36,259 [FetchRunnable-0] ERROR kafka.consumer.FetcherRunnable - > error in FetcherRunnable for orderTopic:1-1: fetched offset = 254633932: > consumed offset = 254633932 > kafka.common.InvalidMessageSizeException: invalid message size: 1681733685 > only received bytes: 307196 at 254633932( possible causes (1) a single > message larger than the fetch size; (2) log corruption ) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:103) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:138) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > at > kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:65) > at > kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:60) > at > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:62) > at > kafka.consumer.FetcherRunnable$$anonfun$run$3.apply(FetcherRunnable.scala:82) > at > kafka.consumer.FetcherRunnable$$anonfun$run$3.apply(FetcherRunnable.scala:68) > at > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) > at scala.collection.immutable.List.foreach(List.scala:45) > at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:68) > > > in fact consumer is getting killed after throwing this exception. > > i feel i got into trouble because i deleted the logs in between and > consumer for some reason is still trying to retrieve messages from an older > offset. Is that the case? > > How to get over this problem? > > Thanks, > Navneet Sharma >