Hi, I was trying to reproduce this problem locally, but couldn't. I set up 1 server to run the broker and used another server to run 10 instances of ProducerThreadTest1 with the parameters you provided. No exceptions showed up in the broker log after the tests were running for 5 minutes.
Could you share your detailed setup? What kind of servers were you using? Did you change any config on the broker? How long did you have to run the test before the exception shows up? Thanks, Jun On Thu, Jul 12, 2012 at 6:51 PM, jjian fan <xiaofanhad...@gmail.com> wrote: > I post my code here: > > ProducerThread.java > package com.tz.kafka; > > > import java.io.Serializable; > import java.util.Properties; > import kafka.producer.ProducerConfig; > import kafka.javaapi.producer.*; > import java.util.*; > import java.util.concurrent.CopyOnWriteArrayList; > > public class ProducerThread implements Runnable ,Serializable > { > /** > * > */ > private static final long serialVersionUID = 18977854555656L; > //private final kafka.javaapi.producer.Producer<Integer, String> producer; > private String topic; > private Properties props = new Properties(); > private String messageStr; > public ProducerThread(String kafkatopic,String message) > { > synchronized(this){ > props.put("zk.connect", "192.168.75.45:2181,192.168.75.55:2181, > 192.168.75.65:2181"); > //props.put("broker.list", "4:192.168.75.104:9092"); > //props.put("serializer.class", "kafka.serializer.StringEncoder"); > props.put("serializer.class", "kafka.serializer.StringEncoder"); > props.put("producer.type", "sync"); > props.put("compression.codec", "1"); > props.put("batch.size", "5"); > props.put("queue.enqueueTimeout.ms", "-1"); > props.put("queue.size", "2000"); > props.put("buffer.size", "10240000"); > //props.put("event.handler", "kafka.producer.async.EventHandler<T>"); > props.put("zk.sessiontimeout.ms", "6000000"); > props.put("zk.connectiontimeout.ms", "6000000"); > props.put("socket.timeout.ms", "60000000"); > props.put("connect.timeout.ms", "60000000"); > props.put("max.message.size", "20000"); > props.put("reconnect.interval", String.valueOf(Integer.MAX_VALUE)); > props.put("reconnect.interval.ms", "3000"); > // Use random partitioner. Don't need the key type. Just set it to > Integer. > // The message is of type String. > //producer = new kafka.javaapi.producer.Producer<Integer, String>(new > ProducerConfig(props)); > //producer = new kafka.javaapi.producer.Producer<String, String>(new > ProducerConfig(props)); > this.topic = kafkatopic; > this.messageStr = message; > > } > } > > public void run() { > synchronized(this) { > Producer<String, String> producer = new Producer<String, String>(new > ProducerConfig(props)); > //producer. > long messageNo = 0; > long t = System.currentTimeMillis(); > long r = System.currentTimeMillis(); > long time = r-t; > long rate = 0; > List<String> messageSet = new CopyOnWriteArrayList<String>(); > while(true) > { > if(topic.length() > 0 ) > { > messageSet.add(this.messageStr.toString()); > ProducerData<String, String> data = new ProducerData<String, > String>(topic,null,messageSet); > > producer.send(data); > messageSet.clear(); > data = null; > messageNo++; > > } > > if(messageNo % 200000 ==0) > { > r = System.currentTimeMillis(); > time = r-t; > rate = 200000000/time; > System.out.println(this.topic + " send message per second:"+rate); > t = r; > } > > } > } > } > } > > ProducerThreadTest1.java > > package com.tz.kafka; > > import java.util.concurrent.ThreadPoolExecutor; > import java.util.concurrent.TimeUnit; > import java.util.concurrent.LinkedBlockingQueue; > > public class ProducerThreadTest1 { > > /** > * @param args > * @throws InterruptedException > */ > public static void main(String[] args) throws InterruptedException { > // TODO Auto-generated method stub > int i = Integer.parseInt(args[0]); > ThreadPoolExecutor threadPool = new ThreadPoolExecutor(i, i, 5, > TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(i), > new ThreadPoolExecutor.DiscardOldestPolicy()); > int messageSize = Integer.parseInt(args[1]); > StringBuffer messageStr = new StringBuffer(); > for(int messagesize=0;messagesize<messageSize;messagesize++) > { > messageStr.append("X"); > } > String topic = args[2]; > for(int j=0;j < i; j++) > { > topic += "x"; > threadPool.execute(new ProducerThread(topic,messageStr.toString())); > Thread.sleep(1000); > > } > } > > } > > > the shell scripte kafkaThreadTest.sh like this: > > java -Xmx10G -jar kafkaThreadTest.jar 2 1024 a > > I deploy the shell at ten servers! > > Thanks! > Best Regards! > > Jian Fan > > 2012/7/13 Jun Rao <jun...@gmail.com> > > > That seems like a Kafka bug. Do you have a script that can reproduce > this? > > > > Thanks, > > > > Jun > > > > On Thu, Jul 12, 2012 at 5:44 PM, jjian fan <xiaofanhad...@gmail.com> > > wrote: > > > > > HI: > > > I use kafka0.7.1, here is the stack trace in kafka server: > > > > > > ERROR Error processing MultiProducerRequest on bxx:2 > > > (kafka.server.KafkaRequestHandlers) > > > kafka.message.InvalidMessageException: message is invalid, compression > > > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0 > > > at > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > > > at > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) > > > at > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > > > at > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > > > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > > > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) > > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > > > at kafka.message.MessageSet.foreach(MessageSet.scala:87) > > > at kafka.log.Log.append(Log.scala:205) > > > at > > > > > > > > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > > > at > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > > > at > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > > > at > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > > at > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > > at > > > > > > > > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > > > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) > > > at > > > > > > > > > kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) > > > at > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > > > at > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > > > at kafka.network.Processor.handle(SocketServer.scala:296) > > > at kafka.network.Processor.read(SocketServer.scala:319) > > > at kafka.network.Processor.run(SocketServer.scala:214) > > > at java.lang.Thread.run(Thread.java:722) > > > [2012-07-13 08:40:06,182] ERROR Closing socket for > /192.168.75.13because > > > of error (kafka.network.Processor) > > > kafka.message.InvalidMessageException: message is invalid, compression > > > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0 > > > at > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > > > at > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) > > > at > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > > > at > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > > > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > > > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) > > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > > > at kafka.message.MessageSet.foreach(MessageSet.scala:87) > > > at kafka.log.Log.append(Log.scala:205) > > > at > > > > > > > > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > > > at > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > > > at > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > > > at > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > > at > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > > at > > > > > > > > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > > > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) > > > at > > > > > > > > > kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) > > > at > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > > > at > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > > > at kafka.network.Processor.handle(SocketServer.scala:296) > > > at kafka.network.Processor.read(SocketServer.scala:319) > > > at kafka.network.Processor.run(SocketServer.scala:214) > > > at java.lang.Thread.run(Thread.java:722) > > > > > > here is the track stace in kafka producer: > > > ERROR Connection attempt to 192.168.75.104:9092 failed, next attempt > in > > > 60000 ms (kafka.producer.SyncProducer) > > > java.net.ConnectException: Connection refused > > > at sun.nio.ch.Net.connect(Native Method) > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525) > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:173) > > > at > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196) > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:92) > > > at kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135) > > > at > > > > > > kafka.producer.async.DefaultEventHandler.send(DefaultEventHandler.scala:58) > > > at > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44) > > > at > > > > > > > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116) > > > at > > > > > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95) > > > at > > > > > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71) > > > at scala.collection.immutable.Stream.foreach(Stream.scala:254) > > > at > > > > > > > > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70) > > > at > > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41) > > > > > > The kafka producer is multi-thread program. > > > > > > Thanks! > > > > > > Best Regards! > > > > > > > > > 2012/7/13 Neha Narkhede <neha.narkh...@gmail.com> > > > > > > > In addition to Jun's question, > > > > > > > > which version are you using ? Do you have a reproducible test case ? > > > > > > > > Thanks, > > > > Neha > > > > > > > > On Thu, Jul 12, 2012 at 7:19 AM, Jun Rao <jun...@gmail.com> wrote: > > > > > What's the stack trace? > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > On Thu, Jul 12, 2012 at 12:55 AM, jjian fan < > xiaofanhad...@gmail.com > > > > > > > wrote: > > > > > > > > > >> HI: > > > > >> > > > > >> Guys, I test kafka in our test high cocunnrent enivorment, I > always > > > get > > > > the > > > > >> error message as follows: > > > > >> > > > > >> ERROR Error processing MultiProducerRequest on axxxxxxxx:2 > > > > >> (kafka.server.KafkaRequestHandlers) > > > > >> kafka.message.InvalidMessageException: message is invalid, > > compression > > > > >> codec: NoCompressionCodec size: 1034 curr offset: 3114 init > offset: > > 0 > > > > >> > > > > >> Can anyone help? Thanks! > > > > >> > > > > >> Best Regards > > > > >> > > > > >> Jian Fan > > > > >> > > > > > > > > > >