This indicates the messages sent to the broker are corrupted. Typically, this is because either the producer sends the corrupted data somehow or the network is flaky. Are you using a java producer? Is this reproducible?
Thanks, Jun On Fri, May 17, 2013 at 7:08 AM, Jason Weiss <[email protected]> wrote: > I have a simple multi-threaded app trying to send numerous fixed-length, > 2048 byte or 3072 byte messages into an Apache Kafka 0.7.2 cluster (3 > machines) running in AWS on some AWS AMIs. When the messaging volume > increases rapidly, a spike, I start running into lots of problems, > specifically InvalidMessageException errors. > > I'm using a default Kafka server config with the exception of bumping up > the network threads from 3 to 4. Zookeeper is in the mix as well. I'm not > using any compression (none). Here is my producer config: > > props.put("zk.connect", zkConnect); > props.put("batch.num.messages", 500); > props.put("queue.buffering.max.messages", 30000); > props.put("serializer.class", > "kafka.serializer.StringEncoder"); > > > I'm at a loss on what knobs I need to turn to fix this. Can anyone on the > list offer any insight into this? > > [2013-05-17 13:55:16,715] ERROR Error processing MultiProducerRequest on > ETL:0 (kafka.server.KafkaRequestHandlers) > kafka.message.InvalidMessageException: message is invalid, compression > codec: NoCompressionCodec size: 3078 curr offset: 0 init offset: 0 > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > at > kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89) > at kafka.log.Log.append(Log.scala:218) > at > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > at > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > at > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) > at > kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) > at > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > at > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > at kafka.network.Processor.handle(SocketServer.scala:296) > at kafka.network.Processor.read(SocketServer.scala:319) > at kafka.network.Processor.run(SocketServer.scala:214) > at java.lang.Thread.run(Thread.java:679) > > This electronic message contains information which may be confidential or > privileged. The information is intended for the use of the individual or > entity named above. If you are not the intended recipient, be aware that > any disclosure, copying, distribution or use of the contents of this > information is prohibited. If you have received this electronic > transmission in error, please notify us by e-mail at ( > [email protected]) immediately. >
