I have a simple multi-threaded app trying to send numerous fixed-length, 2048 
byte or 3072 byte messages into an Apache Kafka 0.7.2 cluster (3 machines) 
running in AWS on some AWS AMIs. When the messaging volume increases rapidly, a 
spike, I start running into lots of problems, specifically 
InvalidMessageException errors.

I'm using a default Kafka server config with the exception of bumping up the 
network threads from 3 to 4. Zookeeper is in the mix as well. I'm not using any 
compression (none). Here is my producer config:

            props.put("zk.connect", zkConnect);
            props.put("batch.num.messages", 500);
            props.put("queue.buffering.max.messages", 30000);
            props.put("serializer.class", "kafka.serializer.StringEncoder");


I'm at a loss on what knobs I need to turn to fix this. Can anyone on the list 
offer any insight into this?

[2013-05-17 13:55:16,715] ERROR Error processing MultiProducerRequest on ETL:0 
(kafka.server.KafkaRequestHandlers)
kafka.message.InvalidMessageException: message is invalid, compression codec: 
NoCompressionCodec size: 3078 curr offset: 0 init offset: 0
at 
kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
at 
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
at 
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
at 
kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89)
at kafka.log.Log.append(Log.scala:218)
at 
kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
at 
kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
at 
kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
at 
kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
at 
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
at 
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
at kafka.network.Processor.handle(SocketServer.scala:296)
at kafka.network.Processor.read(SocketServer.scala:319)
at kafka.network.Processor.run(SocketServer.scala:214)
at java.lang.Thread.run(Thread.java:679)

This electronic message contains information which may be confidential or 
privileged. The information is intended for the use of the individual or entity 
named above. If you are not the intended recipient, be aware that any 
disclosure, copying, distribution or use of the contents of this information is 
prohibited. If you have received this electronic transmission in error, please 
notify us by e-mail at ([email protected]) immediately.

Reply via email to