Rahul,

One of the first things you can check is whether the log on the server
is corrupted or not. For this, you can use the DumpLogSegments tool
and point it to the log segments for the topic partition that you see
errors for.
If that is fine, then the next thing to check would be the fetch
offset that the Kafka spout is using, since if the fetch offset is
incorrect, you will not be able to make any progress.

Thanks,
Neha

On Tue, Jul 31, 2012 at 2:09 PM, Rahul Biswas <rbis...@rocketfuelinc.com> wrote:
> Hi,
>
> We are running into this exception quite frequently now. We are using Storm's 
> Kafka Spout as a consumer and as part of a logging message it tries to print 
> out the size of the underlying message, when the validation on the size seems 
> to fail.
>
> The full stack trace is as below --
> kafka.common.InvalidMessageSizeException: invalid message size: 161758511 
> only received bytes: 215831 at 10738556( possible causes (1) a single message 
> larger than the fetch size; (2) log corruption )
>         at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:103)
>         at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:138)
>         at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82)
>         at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>         at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>         at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>         at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>         at 
> scala.collection.TraversableOnce$class.size(TraversableOnce.scala:100)
>         at kafka.message.MessageSet.size(MessageSet.scala:87)
>         at storm.kafka.KafkaSpout$PartitionManager.fill(KafkaSpout.java:111)
>         at storm.kafka.KafkaSpout$PartitionManager.next(KafkaSpout.java:87)
>         at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:208)
>         at backtype.storm.daemon.task$fn__3349$fn__3404.invoke(task.clj:412)
>         at 
> backtype.storm.daemon.task$mk_task$iter__3281__3285$fn__3286$fn__3287$fn__3288.invoke(task.clj:262)
>         at clojure.lang.AFn.applyToHelper(AFn.java:159)
>         at clojure.lang.AFn.applyTo(AFn.java:151)
>         at clojure.core$apply.invoke(core.clj:601)
>         at backtype.storm.util$async_loop$fn__451.invoke(util.clj:369)
>         at clojure.lang.AFn.run(AFn.java:24)
>         at java.lang.Thread.run(Thread.java:619)
> There are also negative message sizes (for a different consumer with a fetch 
> size of 5MB) --
> kafka.common.InvalidMessageSizeException: invalid message size: -2012627084 
> only received bytes: 5242876 at 1054940626667( possible causes (1) a single 
> message larger than the fetch size; (2) log core
>
> thanks.
> Rahul

Reply via email to