sorry! The producer.type shoud be async!!
I have change it to sync in my new test! Best Regards! Jian Fan 2012/7/13 jjian fan <xiaofanhad...@gmail.com> > I post my code here: > > ProducerThread.java > package com.tz.kafka; > > > import java.io.Serializable; > import java.util.Properties; > import kafka.producer.ProducerConfig; > import kafka.javaapi.producer.*; > import java.util.*; > import java.util.concurrent.CopyOnWriteArrayList; > > public class ProducerThread implements Runnable ,Serializable > { > /** > * > */ > private static final long serialVersionUID = 18977854555656L; > //private final kafka.javaapi.producer.Producer<Integer, String> producer; > private String topic; > private Properties props = new Properties(); > private String messageStr; > public ProducerThread(String kafkatopic,String message) > { > synchronized(this){ > props.put("zk.connect", "192.168.75.45:2181,192.168.75.55:2181, > 192.168.75.65:2181"); > //props.put("broker.list", "4:192.168.75.104:9092"); > //props.put("serializer.class", "kafka.serializer.StringEncoder"); > props.put("serializer.class", "kafka.serializer.StringEncoder"); > props.put("producer.type", "sync"); > props.put("compression.codec", "1"); > props.put("batch.size", "5"); > props.put("queue.enqueueTimeout.ms", "-1"); > props.put("queue.size", "2000"); > props.put("buffer.size", "10240000"); > //props.put("event.handler", "kafka.producer.async.EventHandler<T>"); > props.put("zk.sessiontimeout.ms", "6000000"); > props.put("zk.connectiontimeout.ms", "6000000"); > props.put("socket.timeout.ms", "60000000"); > props.put("connect.timeout.ms", "60000000"); > props.put("max.message.size", "20000"); > props.put("reconnect.interval", String.valueOf(Integer.MAX_VALUE)); > props.put("reconnect.interval.ms", "3000"); > // Use random partitioner. Don't need the key type. Just set it to > Integer. > // The message is of type String. > //producer = new kafka.javaapi.producer.Producer<Integer, String>(new > ProducerConfig(props)); > //producer = new kafka.javaapi.producer.Producer<String, String>(new > ProducerConfig(props)); > this.topic = kafkatopic; > this.messageStr = message; > > } > } > > public void run() { > synchronized(this) { > Producer<String, String> producer = new Producer<String, String>(new > ProducerConfig(props)); > //producer. > long messageNo = 0; > long t = System.currentTimeMillis(); > long r = System.currentTimeMillis(); > long time = r-t; > long rate = 0; > List<String> messageSet = new CopyOnWriteArrayList<String>(); > while(true) > { > if(topic.length() > 0 ) > { > messageSet.add(this.messageStr.toString()); > ProducerData<String, String> data = new ProducerData<String, > String>(topic,null,messageSet); > > producer.send(data); > messageSet.clear(); > data = null; > messageNo++; > > } > > if(messageNo % 200000 ==0) > { > r = System.currentTimeMillis(); > time = r-t; > rate = 200000000/time; > System.out.println(this.topic + " send message per second:"+rate); > t = r; > } > > } > } > } > } > > ProducerThreadTest1.java > > package com.tz.kafka; > > import java.util.concurrent.ThreadPoolExecutor; > import java.util.concurrent.TimeUnit; > import java.util.concurrent.LinkedBlockingQueue; > > public class ProducerThreadTest1 { > > /** > * @param args > * @throws InterruptedException > */ > public static void main(String[] args) throws InterruptedException { > // TODO Auto-generated method stub > int i = Integer.parseInt(args[0]); > ThreadPoolExecutor threadPool = new ThreadPoolExecutor(i, i, 5, > TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(i), > new ThreadPoolExecutor.DiscardOldestPolicy()); > int messageSize = Integer.parseInt(args[1]); > StringBuffer messageStr = new StringBuffer(); > for(int messagesize=0;messagesize<messageSize;messagesize++) > { > messageStr.append("X"); > } > String topic = args[2]; > for(int j=0;j < i; j++) > { > topic += "x"; > threadPool.execute(new ProducerThread(topic,messageStr.toString())); > Thread.sleep(1000); > > } > } > > } > > > the shell scripte kafkaThreadTest.sh like this: > > java -Xmx10G -jar kafkaThreadTest.jar 2 1024 a > > I deploy the shell at ten servers! > > Thanks! > Best Regards! > > Jian Fan > > 2012/7/13 Jun Rao <jun...@gmail.com> > >> That seems like a Kafka bug. Do you have a script that can reproduce this? >> >> Thanks, >> >> Jun >> >> On Thu, Jul 12, 2012 at 5:44 PM, jjian fan <xiaofanhad...@gmail.com> >> wrote: >> >> > HI: >> > I use kafka0.7.1, here is the stack trace in kafka server: >> > >> > ERROR Error processing MultiProducerRequest on bxx:2 >> > (kafka.server.KafkaRequestHandlers) >> > kafka.message.InvalidMessageException: message is invalid, compression >> > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0 >> > at >> > >> > >> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) >> > at >> > >> > >> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) >> > at >> > >> > >> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) >> > at >> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) >> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) >> > at scala.collection.Iterator$class.foreach(Iterator.scala:631) >> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) >> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) >> > at kafka.message.MessageSet.foreach(MessageSet.scala:87) >> > at kafka.log.Log.append(Log.scala:205) >> > at >> > >> > >> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) >> > at >> > >> > >> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) >> > at >> > >> > >> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) >> > at >> > >> > >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) >> > at >> > >> > >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) >> > at >> > >> > >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) >> > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) >> > at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) >> > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) >> > at >> > >> > >> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) >> > at >> > >> > >> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) >> > at >> > >> > >> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) >> > at kafka.network.Processor.handle(SocketServer.scala:296) >> > at kafka.network.Processor.read(SocketServer.scala:319) >> > at kafka.network.Processor.run(SocketServer.scala:214) >> > at java.lang.Thread.run(Thread.java:722) >> > [2012-07-13 08:40:06,182] ERROR Closing socket for /192.168.75.13because >> > of error (kafka.network.Processor) >> > kafka.message.InvalidMessageException: message is invalid, compression >> > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0 >> > at >> > >> > >> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) >> > at >> > >> > >> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) >> > at >> > >> > >> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) >> > at >> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) >> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) >> > at scala.collection.Iterator$class.foreach(Iterator.scala:631) >> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) >> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) >> > at kafka.message.MessageSet.foreach(MessageSet.scala:87) >> > at kafka.log.Log.append(Log.scala:205) >> > at >> > >> > >> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) >> > at >> > >> > >> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) >> > at >> > >> > >> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) >> > at >> > >> > >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) >> > at >> > >> > >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) >> > at >> > >> > >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) >> > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) >> > at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) >> > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) >> > at >> > >> > >> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) >> > at >> > >> > >> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) >> > at >> > >> > >> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) >> > at kafka.network.Processor.handle(SocketServer.scala:296) >> > at kafka.network.Processor.read(SocketServer.scala:319) >> > at kafka.network.Processor.run(SocketServer.scala:214) >> > at java.lang.Thread.run(Thread.java:722) >> > >> > here is the track stace in kafka producer: >> > ERROR Connection attempt to 192.168.75.104:9092 failed, next attempt in >> > 60000 ms (kafka.producer.SyncProducer) >> > java.net.ConnectException: Connection refused >> > at sun.nio.ch.Net.connect(Native Method) >> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525) >> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:173) >> > at >> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196) >> > at kafka.producer.SyncProducer.send(SyncProducer.scala:92) >> > at kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135) >> > at >> > >> kafka.producer.async.DefaultEventHandler.send(DefaultEventHandler.scala:58) >> > at >> > >> > >> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44) >> > at >> > >> > >> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116) >> > at >> > >> > >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95) >> > at >> > >> > >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71) >> > at scala.collection.immutable.Stream.foreach(Stream.scala:254) >> > at >> > >> > >> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70) >> > at >> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41) >> > >> > The kafka producer is multi-thread program. >> > >> > Thanks! >> > >> > Best Regards! >> > >> > >> > 2012/7/13 Neha Narkhede <neha.narkh...@gmail.com> >> > >> > > In addition to Jun's question, >> > > >> > > which version are you using ? Do you have a reproducible test case ? >> > > >> > > Thanks, >> > > Neha >> > > >> > > On Thu, Jul 12, 2012 at 7:19 AM, Jun Rao <jun...@gmail.com> wrote: >> > > > What's the stack trace? >> > > > >> > > > Thanks, >> > > > >> > > > Jun >> > > > >> > > > On Thu, Jul 12, 2012 at 12:55 AM, jjian fan < >> xiaofanhad...@gmail.com> >> > > wrote: >> > > > >> > > >> HI: >> > > >> >> > > >> Guys, I test kafka in our test high cocunnrent enivorment, I always >> > get >> > > the >> > > >> error message as follows: >> > > >> >> > > >> ERROR Error processing MultiProducerRequest on axxxxxxxx:2 >> > > >> (kafka.server.KafkaRequestHandlers) >> > > >> kafka.message.InvalidMessageException: message is invalid, >> compression >> > > >> codec: NoCompressionCodec size: 1034 curr offset: 3114 init >> offset: 0 >> > > >> >> > > >> Can anyone help? Thanks! >> > > >> >> > > >> Best Regards >> > > >> >> > > >> Jian Fan >> > > >> >> > > >> > >> > >