I have a simple multi-threaded app trying to send numerous fixed-length, 2048
byte or 3072 byte messages into an Apache Kafka 0.7.2 cluster (3 machines)
running in AWS on some AWS AMIs. When the messaging volume increases rapidly, a
spike, I start running into lots of problems, specifically
InvalidMessageException errors.
I'm using a default Kafka server config with the exception of bumping up the
network threads from 3 to 4. Zookeeper is in the mix as well. I'm not using any
compression (none). Here is my producer config:
props.put("zk.connect", zkConnect);
props.put("batch.num.messages", 500);
props.put("queue.buffering.max.messages", 30000);
props.put("serializer.class", "kafka.serializer.StringEncoder");
I'm at a loss on what knobs I need to turn to fix this. Can anyone on the list
offer any insight into this?
[2013-05-17 13:55:16,715] ERROR Error processing MultiProducerRequest on ETL:0
(kafka.server.KafkaRequestHandlers)
kafka.message.InvalidMessageException: message is invalid, compression codec:
NoCompressionCodec size: 3078 curr offset: 0 init offset: 0
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
at
kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89)
at kafka.log.Log.append(Log.scala:218)
at
kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
at
kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
at
kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
at
kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
at kafka.network.Processor.handle(SocketServer.scala:296)
at kafka.network.Processor.read(SocketServer.scala:319)
at kafka.network.Processor.run(SocketServer.scala:214)
at java.lang.Thread.run(Thread.java:679)
This electronic message contains information which may be confidential or
privileged. The information is intended for the use of the individual or entity
named above. If you are not the intended recipient, be aware that any
disclosure, copying, distribution or use of the contents of this information is
prohibited. If you have received this electronic transmission in error, please
notify us by e-mail at ([email protected]) immediately.