The new error: [2012-07-13 10:12:41,178] ERROR Error processing MultiProducerRequest on oxxxx:0 (kafka.server.KafkaRequestHandlers) kafka.message.InvalidMessageException: message is invalid, compression codec: GZIPCompressionCodec size: 48 curr offset: 0 init offset: 0 at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) at kafka.message.MessageSet.foreach(MessageSet.scala:87) at kafka.log.Log.append(Log.scala:205) at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) at kafka.network.Processor.handle(SocketServer.scala:296) at kafka.network.Processor.read(SocketServer.scala:319) at kafka.network.Processor.run(SocketServer.scala:214) at java.lang.Thread.run(Thread.java:722)
2012/7/13 jjian fan <xiaofanhad...@gmail.com> > OK,sometime it has this error : > > [2012-07-13 10:08:03,205] ERROR Closing socket for /192.168.75.15 because > of error (kafka.network.Processor) > kafka.common.InvalidTopicException: topic name can't be empty > at kafka.log.LogManager.getLogPool(LogManager.scala:159) > at kafka.log.LogManager.getOrCreateLog(LogManager.scala:195) > at > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > at > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > at > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) > at > kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) > at > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > at > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > at kafka.network.Processor.handle(SocketServer.scala:296) > at kafka.network.Processor.read(SocketServer.scala:319) > at kafka.network.Processor.run(SocketServer.scala:214) > at java.lang.Thread.run(Thread.java:722) > > 2012/7/13 jjian fan <xiaofanhad...@gmail.com> > >> sorry! >> >> The producer.type shoud be async!! >> >> I have change it to sync in my new test! >> >> Best Regards! >> Jian Fan >> >> >> 2012/7/13 jjian fan <xiaofanhad...@gmail.com> >> >>> I post my code here: >>> >>> ProducerThread.java >>> package com.tz.kafka; >>> >>> >>> import java.io.Serializable; >>> import java.util.Properties; >>> import kafka.producer.ProducerConfig; >>> import kafka.javaapi.producer.*; >>> import java.util.*; >>> import java.util.concurrent.CopyOnWriteArrayList; >>> >>> public class ProducerThread implements Runnable ,Serializable >>> { >>> /** >>> * >>> */ >>> private static final long serialVersionUID = 18977854555656L; >>> //private final kafka.javaapi.producer.Producer<Integer, String> >>> producer; >>> private String topic; >>> private Properties props = new Properties(); >>> private String messageStr; >>> public ProducerThread(String kafkatopic,String message) >>> { >>> synchronized(this){ >>> props.put("zk.connect", "192.168.75.45:2181,192.168.75.55:2181, >>> 192.168.75.65:2181"); >>> //props.put("broker.list", "4:192.168.75.104:9092"); >>> //props.put("serializer.class", "kafka.serializer.StringEncoder"); >>> props.put("serializer.class", "kafka.serializer.StringEncoder"); >>> props.put("producer.type", "sync"); >>> props.put("compression.codec", "1"); >>> props.put("batch.size", "5"); >>> props.put("queue.enqueueTimeout.ms", "-1"); >>> props.put("queue.size", "2000"); >>> props.put("buffer.size", "10240000"); >>> //props.put("event.handler", "kafka.producer.async.EventHandler<T>"); >>> props.put("zk.sessiontimeout.ms", "6000000"); >>> props.put("zk.connectiontimeout.ms", "6000000"); >>> props.put("socket.timeout.ms", "60000000"); >>> props.put("connect.timeout.ms", "60000000"); >>> props.put("max.message.size", "20000"); >>> props.put("reconnect.interval", String.valueOf(Integer.MAX_VALUE)); >>> props.put("reconnect.interval.ms", "3000"); >>> // Use random partitioner. Don't need the key type. Just set it to >>> Integer. >>> // The message is of type String. >>> //producer = new kafka.javaapi.producer.Producer<Integer, String>(new >>> ProducerConfig(props)); >>> //producer = new kafka.javaapi.producer.Producer<String, >>> String>(new ProducerConfig(props)); >>> this.topic = kafkatopic; >>> this.messageStr = message; >>> >>> } >>> } >>> >>> public void run() { >>> synchronized(this) { >>> Producer<String, String> producer = new Producer<String, String>(new >>> ProducerConfig(props)); >>> //producer. >>> long messageNo = 0; >>> long t = System.currentTimeMillis(); >>> long r = System.currentTimeMillis(); >>> long time = r-t; >>> long rate = 0; >>> List<String> messageSet = new CopyOnWriteArrayList<String>(); >>> while(true) >>> { >>> if(topic.length() > 0 ) >>> { >>> messageSet.add(this.messageStr.toString()); >>> ProducerData<String, String> data = new ProducerData<String, >>> String>(topic,null,messageSet); >>> >>> producer.send(data); >>> messageSet.clear(); >>> data = null; >>> messageNo++; >>> >>> } >>> >>> if(messageNo % 200000 ==0) >>> { >>> r = System.currentTimeMillis(); >>> time = r-t; >>> rate = 200000000/time; >>> System.out.println(this.topic + " send message per second:"+rate); >>> t = r; >>> } >>> >>> } >>> } >>> } >>> } >>> >>> ProducerThreadTest1.java >>> >>> package com.tz.kafka; >>> >>> import java.util.concurrent.ThreadPoolExecutor; >>> import java.util.concurrent.TimeUnit; >>> import java.util.concurrent.LinkedBlockingQueue; >>> >>> public class ProducerThreadTest1 { >>> >>> /** >>> * @param args >>> * @throws InterruptedException >>> */ >>> public static void main(String[] args) throws InterruptedException { >>> // TODO Auto-generated method stub >>> int i = Integer.parseInt(args[0]); >>> ThreadPoolExecutor threadPool = new ThreadPoolExecutor(i, i, 5, >>> TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(i), >>> new ThreadPoolExecutor.DiscardOldestPolicy()); >>> int messageSize = Integer.parseInt(args[1]); >>> StringBuffer messageStr = new StringBuffer(); >>> for(int messagesize=0;messagesize<messageSize;messagesize++) >>> { >>> messageStr.append("X"); >>> } >>> String topic = args[2]; >>> for(int j=0;j < i; j++) >>> { >>> topic += "x"; >>> threadPool.execute(new ProducerThread(topic,messageStr.toString())); >>> Thread.sleep(1000); >>> >>> } >>> } >>> >>> } >>> >>> >>> the shell scripte kafkaThreadTest.sh like this: >>> >>> java -Xmx10G -jar kafkaThreadTest.jar 2 1024 a >>> >>> I deploy the shell at ten servers! >>> >>> Thanks! >>> Best Regards! >>> >>> Jian Fan >>> >>> 2012/7/13 Jun Rao <jun...@gmail.com> >>> >>>> That seems like a Kafka bug. Do you have a script that can reproduce >>>> this? >>>> >>>> Thanks, >>>> >>>> Jun >>>> >>>> On Thu, Jul 12, 2012 at 5:44 PM, jjian fan <xiaofanhad...@gmail.com> >>>> wrote: >>>> >>>> > HI: >>>> > I use kafka0.7.1, here is the stack trace in kafka server: >>>> > >>>> > ERROR Error processing MultiProducerRequest on bxx:2 >>>> > (kafka.server.KafkaRequestHandlers) >>>> > kafka.message.InvalidMessageException: message is invalid, compression >>>> > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0 >>>> > at >>>> > >>>> > >>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) >>>> > at >>>> > >>>> > >>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) >>>> > at >>>> > >>>> > >>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) >>>> > at >>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) >>>> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) >>>> > at scala.collection.Iterator$class.foreach(Iterator.scala:631) >>>> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) >>>> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) >>>> > at kafka.message.MessageSet.foreach(MessageSet.scala:87) >>>> > at kafka.log.Log.append(Log.scala:205) >>>> > at >>>> > >>>> > >>>> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) >>>> > at >>>> > >>>> > >>>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) >>>> > at >>>> > >>>> > >>>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) >>>> > at >>>> > >>>> > >>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) >>>> > at >>>> > >>>> > >>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) >>>> > at >>>> > >>>> > >>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) >>>> > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) >>>> > at >>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:206) >>>> > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) >>>> > at >>>> > >>>> > >>>> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) >>>> > at >>>> > >>>> > >>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) >>>> > at >>>> > >>>> > >>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) >>>> > at kafka.network.Processor.handle(SocketServer.scala:296) >>>> > at kafka.network.Processor.read(SocketServer.scala:319) >>>> > at kafka.network.Processor.run(SocketServer.scala:214) >>>> > at java.lang.Thread.run(Thread.java:722) >>>> > [2012-07-13 08:40:06,182] ERROR Closing socket for /192.168.75.13because >>>> > of error (kafka.network.Processor) >>>> > kafka.message.InvalidMessageException: message is invalid, compression >>>> > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0 >>>> > at >>>> > >>>> > >>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) >>>> > at >>>> > >>>> > >>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) >>>> > at >>>> > >>>> > >>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) >>>> > at >>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) >>>> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) >>>> > at scala.collection.Iterator$class.foreach(Iterator.scala:631) >>>> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) >>>> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) >>>> > at kafka.message.MessageSet.foreach(MessageSet.scala:87) >>>> > at kafka.log.Log.append(Log.scala:205) >>>> > at >>>> > >>>> > >>>> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) >>>> > at >>>> > >>>> > >>>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) >>>> > at >>>> > >>>> > >>>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) >>>> > at >>>> > >>>> > >>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) >>>> > at >>>> > >>>> > >>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) >>>> > at >>>> > >>>> > >>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) >>>> > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) >>>> > at >>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:206) >>>> > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) >>>> > at >>>> > >>>> > >>>> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) >>>> > at >>>> > >>>> > >>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) >>>> > at >>>> > >>>> > >>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) >>>> > at kafka.network.Processor.handle(SocketServer.scala:296) >>>> > at kafka.network.Processor.read(SocketServer.scala:319) >>>> > at kafka.network.Processor.run(SocketServer.scala:214) >>>> > at java.lang.Thread.run(Thread.java:722) >>>> > >>>> > here is the track stace in kafka producer: >>>> > ERROR Connection attempt to 192.168.75.104:9092 failed, next attempt >>>> in >>>> > 60000 ms (kafka.producer.SyncProducer) >>>> > java.net.ConnectException: Connection refused >>>> > at sun.nio.ch.Net.connect(Native Method) >>>> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525) >>>> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:173) >>>> > at >>>> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196) >>>> > at kafka.producer.SyncProducer.send(SyncProducer.scala:92) >>>> > at kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135) >>>> > at >>>> > >>>> kafka.producer.async.DefaultEventHandler.send(DefaultEventHandler.scala:58) >>>> > at >>>> > >>>> > >>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44) >>>> > at >>>> > >>>> > >>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116) >>>> > at >>>> > >>>> > >>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95) >>>> > at >>>> > >>>> > >>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71) >>>> > at scala.collection.immutable.Stream.foreach(Stream.scala:254) >>>> > at >>>> > >>>> > >>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70) >>>> > at >>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41) >>>> > >>>> > The kafka producer is multi-thread program. >>>> > >>>> > Thanks! >>>> > >>>> > Best Regards! >>>> > >>>> > >>>> > 2012/7/13 Neha Narkhede <neha.narkh...@gmail.com> >>>> > >>>> > > In addition to Jun's question, >>>> > > >>>> > > which version are you using ? Do you have a reproducible test case ? >>>> > > >>>> > > Thanks, >>>> > > Neha >>>> > > >>>> > > On Thu, Jul 12, 2012 at 7:19 AM, Jun Rao <jun...@gmail.com> wrote: >>>> > > > What's the stack trace? >>>> > > > >>>> > > > Thanks, >>>> > > > >>>> > > > Jun >>>> > > > >>>> > > > On Thu, Jul 12, 2012 at 12:55 AM, jjian fan < >>>> xiaofanhad...@gmail.com> >>>> > > wrote: >>>> > > > >>>> > > >> HI: >>>> > > >> >>>> > > >> Guys, I test kafka in our test high cocunnrent enivorment, I >>>> always >>>> > get >>>> > > the >>>> > > >> error message as follows: >>>> > > >> >>>> > > >> ERROR Error processing MultiProducerRequest on axxxxxxxx:2 >>>> > > >> (kafka.server.KafkaRequestHandlers) >>>> > > >> kafka.message.InvalidMessageException: message is invalid, >>>> compression >>>> > > >> codec: NoCompressionCodec size: 1034 curr offset: 3114 init >>>> offset: 0 >>>> > > >> >>>> > > >> Can anyone help? Thanks! >>>> > > >> >>>> > > >> Best Regards >>>> > > >> >>>> > > >> Jian Fan >>>> > > >> >>>> > > >>>> > >>>> >>> >>> >> >