Hi Neha, Thanks for the quick response. Am i looking for "isvalid : false" in the output of the DumpLogSegments tool? If so, I don't see that (at least in about 180 messages on a single partition). Also, I've tried with an offset of -1 and ran into the same issue, so it can't be an invalid offset either right?
As a quick test, I modified the Storm Kafka Spout and commented out the logging that tries to write out the message size (line 122 below) https://github.com/nathanmarz/storm-contrib/blob/master/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java And I don't run into the same problem with that change, is it possible we are running into a race condition here? thanks, Rahul On Jul 31, 2012, at 2:28 PM, Neha Narkhede wrote: > Rahul, > > One of the first things you can check is whether the log on the server > is corrupted or not. For this, you can use the DumpLogSegments tool > and point it to the log segments for the topic partition that you see > errors for. > If that is fine, then the next thing to check would be the fetch > offset that the Kafka spout is using, since if the fetch offset is > incorrect, you will not be able to make any progress. > > Thanks, > Neha > > On Tue, Jul 31, 2012 at 2:09 PM, Rahul Biswas <rbis...@rocketfuelinc.com> > wrote: >> Hi, >> >> We are running into this exception quite frequently now. We are using >> Storm's Kafka Spout as a consumer and as part of a logging message it tries >> to print out the size of the underlying message, when the validation on the >> size seems to fail. >> >> The full stack trace is as below -- >> kafka.common.InvalidMessageSizeException: invalid message size: 161758511 >> only received bytes: 215831 at 10738556( possible causes (1) a single >> message larger than the fetch size; (2) log corruption ) >> at >> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:103) >> at >> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:138) >> at >> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82) >> at >> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) >> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) >> at scala.collection.Iterator$class.foreach(Iterator.scala:631) >> at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) >> at kafka.message.MessageSet.foreach(MessageSet.scala:87) >> at >> scala.collection.TraversableOnce$class.size(TraversableOnce.scala:100) >> at kafka.message.MessageSet.size(MessageSet.scala:87) >> at storm.kafka.KafkaSpout$PartitionManager.fill(KafkaSpout.java:111) >> at storm.kafka.KafkaSpout$PartitionManager.next(KafkaSpout.java:87) >> at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:208) >> at backtype.storm.daemon.task$fn__3349$fn__3404.invoke(task.clj:412) >> at >> backtype.storm.daemon.task$mk_task$iter__3281__3285$fn__3286$fn__3287$fn__3288.invoke(task.clj:262) >> at clojure.lang.AFn.applyToHelper(AFn.java:159) >> at clojure.lang.AFn.applyTo(AFn.java:151) >> at clojure.core$apply.invoke(core.clj:601) >> at backtype.storm.util$async_loop$fn__451.invoke(util.clj:369) >> at clojure.lang.AFn.run(AFn.java:24) >> at java.lang.Thread.run(Thread.java:619) >> There are also negative message sizes (for a different consumer with a fetch >> size of 5MB) -- >> kafka.common.InvalidMessageSizeException: invalid message size: -2012627084 >> only received bytes: 5242876 at 1054940626667( possible causes (1) a single >> message larger than the fetch size; (2) log core >> >> thanks. >> Rahul