OK,sometime it has this error : [2012-07-13 10:08:03,205] ERROR Closing socket for /192.168.75.15 because of error (kafka.network.Processor) kafka.common.InvalidTopicException: topic name can't be empty at kafka.log.LogManager.getLogPool(LogManager.scala:159) at kafka.log.LogManager.getOrCreateLog(LogManager.scala:195) at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) at kafka.network.Processor.handle(SocketServer.scala:296) at kafka.network.Processor.read(SocketServer.scala:319) at kafka.network.Processor.run(SocketServer.scala:214) at java.lang.Thread.run(Thread.java:722)
2012/7/13 jjian fan <xiaofanhad...@gmail.com> > sorry! > > The producer.type shoud be async!! > > I have change it to sync in my new test! > > Best Regards! > Jian Fan > > > 2012/7/13 jjian fan <xiaofanhad...@gmail.com> > >> I post my code here: >> >> ProducerThread.java >> package com.tz.kafka; >> >> >> import java.io.Serializable; >> import java.util.Properties; >> import kafka.producer.ProducerConfig; >> import kafka.javaapi.producer.*; >> import java.util.*; >> import java.util.concurrent.CopyOnWriteArrayList; >> >> public class ProducerThread implements Runnable ,Serializable >> { >> /** >> * >> */ >> private static final long serialVersionUID = 18977854555656L; >> //private final kafka.javaapi.producer.Producer<Integer, String> producer; >> private String topic; >> private Properties props = new Properties(); >> private String messageStr; >> public ProducerThread(String kafkatopic,String message) >> { >> synchronized(this){ >> props.put("zk.connect", "192.168.75.45:2181,192.168.75.55:2181, >> 192.168.75.65:2181"); >> //props.put("broker.list", "4:192.168.75.104:9092"); >> //props.put("serializer.class", "kafka.serializer.StringEncoder"); >> props.put("serializer.class", "kafka.serializer.StringEncoder"); >> props.put("producer.type", "sync"); >> props.put("compression.codec", "1"); >> props.put("batch.size", "5"); >> props.put("queue.enqueueTimeout.ms", "-1"); >> props.put("queue.size", "2000"); >> props.put("buffer.size", "10240000"); >> //props.put("event.handler", "kafka.producer.async.EventHandler<T>"); >> props.put("zk.sessiontimeout.ms", "6000000"); >> props.put("zk.connectiontimeout.ms", "6000000"); >> props.put("socket.timeout.ms", "60000000"); >> props.put("connect.timeout.ms", "60000000"); >> props.put("max.message.size", "20000"); >> props.put("reconnect.interval", String.valueOf(Integer.MAX_VALUE)); >> props.put("reconnect.interval.ms", "3000"); >> // Use random partitioner. Don't need the key type. Just set it to >> Integer. >> // The message is of type String. >> //producer = new kafka.javaapi.producer.Producer<Integer, String>(new >> ProducerConfig(props)); >> //producer = new kafka.javaapi.producer.Producer<String, String>(new >> ProducerConfig(props)); >> this.topic = kafkatopic; >> this.messageStr = message; >> >> } >> } >> >> public void run() { >> synchronized(this) { >> Producer<String, String> producer = new Producer<String, String>(new >> ProducerConfig(props)); >> //producer. >> long messageNo = 0; >> long t = System.currentTimeMillis(); >> long r = System.currentTimeMillis(); >> long time = r-t; >> long rate = 0; >> List<String> messageSet = new CopyOnWriteArrayList<String>(); >> while(true) >> { >> if(topic.length() > 0 ) >> { >> messageSet.add(this.messageStr.toString()); >> ProducerData<String, String> data = new ProducerData<String, >> String>(topic,null,messageSet); >> >> producer.send(data); >> messageSet.clear(); >> data = null; >> messageNo++; >> >> } >> >> if(messageNo % 200000 ==0) >> { >> r = System.currentTimeMillis(); >> time = r-t; >> rate = 200000000/time; >> System.out.println(this.topic + " send message per second:"+rate); >> t = r; >> } >> >> } >> } >> } >> } >> >> ProducerThreadTest1.java >> >> package com.tz.kafka; >> >> import java.util.concurrent.ThreadPoolExecutor; >> import java.util.concurrent.TimeUnit; >> import java.util.concurrent.LinkedBlockingQueue; >> >> public class ProducerThreadTest1 { >> >> /** >> * @param args >> * @throws InterruptedException >> */ >> public static void main(String[] args) throws InterruptedException { >> // TODO Auto-generated method stub >> int i = Integer.parseInt(args[0]); >> ThreadPoolExecutor threadPool = new ThreadPoolExecutor(i, i, 5, >> TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(i), >> new ThreadPoolExecutor.DiscardOldestPolicy()); >> int messageSize = Integer.parseInt(args[1]); >> StringBuffer messageStr = new StringBuffer(); >> for(int messagesize=0;messagesize<messageSize;messagesize++) >> { >> messageStr.append("X"); >> } >> String topic = args[2]; >> for(int j=0;j < i; j++) >> { >> topic += "x"; >> threadPool.execute(new ProducerThread(topic,messageStr.toString())); >> Thread.sleep(1000); >> >> } >> } >> >> } >> >> >> the shell scripte kafkaThreadTest.sh like this: >> >> java -Xmx10G -jar kafkaThreadTest.jar 2 1024 a >> >> I deploy the shell at ten servers! >> >> Thanks! >> Best Regards! >> >> Jian Fan >> >> 2012/7/13 Jun Rao <jun...@gmail.com> >> >>> That seems like a Kafka bug. Do you have a script that can reproduce >>> this? >>> >>> Thanks, >>> >>> Jun >>> >>> On Thu, Jul 12, 2012 at 5:44 PM, jjian fan <xiaofanhad...@gmail.com> >>> wrote: >>> >>> > HI: >>> > I use kafka0.7.1, here is the stack trace in kafka server: >>> > >>> > ERROR Error processing MultiProducerRequest on bxx:2 >>> > (kafka.server.KafkaRequestHandlers) >>> > kafka.message.InvalidMessageException: message is invalid, compression >>> > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0 >>> > at >>> > >>> > >>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) >>> > at >>> > >>> > >>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) >>> > at >>> > >>> > >>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) >>> > at >>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) >>> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) >>> > at scala.collection.Iterator$class.foreach(Iterator.scala:631) >>> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) >>> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) >>> > at kafka.message.MessageSet.foreach(MessageSet.scala:87) >>> > at kafka.log.Log.append(Log.scala:205) >>> > at >>> > >>> > >>> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) >>> > at >>> > >>> > >>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) >>> > at >>> > >>> > >>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) >>> > at >>> > >>> > >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) >>> > at >>> > >>> > >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) >>> > at >>> > >>> > >>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) >>> > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) >>> > at >>> scala.collection.TraversableLike$class.map(TraversableLike.scala:206) >>> > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) >>> > at >>> > >>> > >>> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) >>> > at >>> > >>> > >>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) >>> > at >>> > >>> > >>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) >>> > at kafka.network.Processor.handle(SocketServer.scala:296) >>> > at kafka.network.Processor.read(SocketServer.scala:319) >>> > at kafka.network.Processor.run(SocketServer.scala:214) >>> > at java.lang.Thread.run(Thread.java:722) >>> > [2012-07-13 08:40:06,182] ERROR Closing socket for /192.168.75.13because >>> > of error (kafka.network.Processor) >>> > kafka.message.InvalidMessageException: message is invalid, compression >>> > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0 >>> > at >>> > >>> > >>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) >>> > at >>> > >>> > >>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) >>> > at >>> > >>> > >>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) >>> > at >>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) >>> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) >>> > at scala.collection.Iterator$class.foreach(Iterator.scala:631) >>> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) >>> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) >>> > at kafka.message.MessageSet.foreach(MessageSet.scala:87) >>> > at kafka.log.Log.append(Log.scala:205) >>> > at >>> > >>> > >>> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) >>> > at >>> > >>> > >>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) >>> > at >>> > >>> > >>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) >>> > at >>> > >>> > >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) >>> > at >>> > >>> > >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) >>> > at >>> > >>> > >>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) >>> > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) >>> > at >>> scala.collection.TraversableLike$class.map(TraversableLike.scala:206) >>> > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) >>> > at >>> > >>> > >>> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) >>> > at >>> > >>> > >>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) >>> > at >>> > >>> > >>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) >>> > at kafka.network.Processor.handle(SocketServer.scala:296) >>> > at kafka.network.Processor.read(SocketServer.scala:319) >>> > at kafka.network.Processor.run(SocketServer.scala:214) >>> > at java.lang.Thread.run(Thread.java:722) >>> > >>> > here is the track stace in kafka producer: >>> > ERROR Connection attempt to 192.168.75.104:9092 failed, next attempt >>> in >>> > 60000 ms (kafka.producer.SyncProducer) >>> > java.net.ConnectException: Connection refused >>> > at sun.nio.ch.Net.connect(Native Method) >>> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525) >>> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:173) >>> > at >>> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196) >>> > at kafka.producer.SyncProducer.send(SyncProducer.scala:92) >>> > at kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135) >>> > at >>> > >>> kafka.producer.async.DefaultEventHandler.send(DefaultEventHandler.scala:58) >>> > at >>> > >>> > >>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44) >>> > at >>> > >>> > >>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116) >>> > at >>> > >>> > >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95) >>> > at >>> > >>> > >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71) >>> > at scala.collection.immutable.Stream.foreach(Stream.scala:254) >>> > at >>> > >>> > >>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70) >>> > at >>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41) >>> > >>> > The kafka producer is multi-thread program. >>> > >>> > Thanks! >>> > >>> > Best Regards! >>> > >>> > >>> > 2012/7/13 Neha Narkhede <neha.narkh...@gmail.com> >>> > >>> > > In addition to Jun's question, >>> > > >>> > > which version are you using ? Do you have a reproducible test case ? >>> > > >>> > > Thanks, >>> > > Neha >>> > > >>> > > On Thu, Jul 12, 2012 at 7:19 AM, Jun Rao <jun...@gmail.com> wrote: >>> > > > What's the stack trace? >>> > > > >>> > > > Thanks, >>> > > > >>> > > > Jun >>> > > > >>> > > > On Thu, Jul 12, 2012 at 12:55 AM, jjian fan < >>> xiaofanhad...@gmail.com> >>> > > wrote: >>> > > > >>> > > >> HI: >>> > > >> >>> > > >> Guys, I test kafka in our test high cocunnrent enivorment, I >>> always >>> > get >>> > > the >>> > > >> error message as follows: >>> > > >> >>> > > >> ERROR Error processing MultiProducerRequest on axxxxxxxx:2 >>> > > >> (kafka.server.KafkaRequestHandlers) >>> > > >> kafka.message.InvalidMessageException: message is invalid, >>> compression >>> > > >> codec: NoCompressionCodec size: 1034 curr offset: 3114 init >>> offset: 0 >>> > > >> >>> > > >> Can anyone help? Thanks! >>> > > >> >>> > > >> Best Regards >>> > > >> >>> > > >> Jian Fan >>> > > >> >>> > > >>> > >>> >> >> >