That seems like a Kafka bug. Do you have a script that can reproduce this? Thanks,
Jun On Thu, Jul 12, 2012 at 5:44 PM, jjian fan <xiaofanhad...@gmail.com> wrote: > HI: > I use kafka0.7.1, here is the stack trace in kafka server: > > ERROR Error processing MultiProducerRequest on bxx:2 > (kafka.server.KafkaRequestHandlers) > kafka.message.InvalidMessageException: message is invalid, compression > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0 > at > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > at > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) > at > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > at kafka.message.MessageSet.foreach(MessageSet.scala:87) > at kafka.log.Log.append(Log.scala:205) > at > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > at > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > at > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) > at > > kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) > at > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > at > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > at kafka.network.Processor.handle(SocketServer.scala:296) > at kafka.network.Processor.read(SocketServer.scala:319) > at kafka.network.Processor.run(SocketServer.scala:214) > at java.lang.Thread.run(Thread.java:722) > [2012-07-13 08:40:06,182] ERROR Closing socket for /192.168.75.13 because > of error (kafka.network.Processor) > kafka.message.InvalidMessageException: message is invalid, compression > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0 > at > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > at > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) > at > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > at kafka.message.MessageSet.foreach(MessageSet.scala:87) > at kafka.log.Log.append(Log.scala:205) > at > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > at > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > at > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) > at > > kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) > at > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > at > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > at kafka.network.Processor.handle(SocketServer.scala:296) > at kafka.network.Processor.read(SocketServer.scala:319) > at kafka.network.Processor.run(SocketServer.scala:214) > at java.lang.Thread.run(Thread.java:722) > > here is the track stace in kafka producer: > ERROR Connection attempt to 192.168.75.104:9092 failed, next attempt in > 60000 ms (kafka.producer.SyncProducer) > java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect(Native Method) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:173) > at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196) > at kafka.producer.SyncProducer.send(SyncProducer.scala:92) > at kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135) > at > kafka.producer.async.DefaultEventHandler.send(DefaultEventHandler.scala:58) > at > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44) > at > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116) > at > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95) > at > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71) > at scala.collection.immutable.Stream.foreach(Stream.scala:254) > at > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70) > at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41) > > The kafka producer is multi-thread program. > > Thanks! > > Best Regards! > > > 2012/7/13 Neha Narkhede <neha.narkh...@gmail.com> > > > In addition to Jun's question, > > > > which version are you using ? Do you have a reproducible test case ? > > > > Thanks, > > Neha > > > > On Thu, Jul 12, 2012 at 7:19 AM, Jun Rao <jun...@gmail.com> wrote: > > > What's the stack trace? > > > > > > Thanks, > > > > > > Jun > > > > > > On Thu, Jul 12, 2012 at 12:55 AM, jjian fan <xiaofanhad...@gmail.com> > > wrote: > > > > > >> HI: > > >> > > >> Guys, I test kafka in our test high cocunnrent enivorment, I always > get > > the > > >> error message as follows: > > >> > > >> ERROR Error processing MultiProducerRequest on axxxxxxxx:2 > > >> (kafka.server.KafkaRequestHandlers) > > >> kafka.message.InvalidMessageException: message is invalid, compression > > >> codec: NoCompressionCodec size: 1034 curr offset: 3114 init offset: 0 > > >> > > >> Can anyone help? Thanks! > > >> > > >> Best Regards > > >> > > >> Jian Fan > > >> > > >