Jun: I run the test more than 2 days! The packaget receive rate of broker in my test is about 20MB/s-60MB/s. The message is compressed! You can change the each Producer to java -Xmx10G -jar kafkaThreadTest.jar 10 1024 a, try that! All server use centos6.2! The config of broker is like as:
# The id of the broker. This must be set to a unique integer for each broker. brokerid=3 # Hostname the broker will advertise to consumers. If not set, kafka will use the value returned # from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost # may not be what you want. hostname=192.168.75.102 ############################# Socket Server Settings ############################# # The port the socket server listens on port=9092 # The number of processor threads the socket server uses for receiving and answering requests. # Defaults to the number of cores on the machine num.threads=24 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer=20971520 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer=20971520 # The maximum size of a request that the socket server will accept (protection against OOM) max.socket.request.bytes=204857600 ############################# Log Basics ############################# # The directory under which to store log files log.dir=/data/kafka # The number of logical partitions per topic per server. More partitions allow greater parallelism # for consumption, but also mean more files. num.partitions=4 # Overrides for for the default given by num.partitions on a per-topic basis topic.partition.count.map=test2:1 ############################# Log Flush Policy ############################# # The following configurations control the flush of data to disk. This is the most # important performance knob in kafka. # There are a few important trade-offs here: # 1. Durability: Unflushed data is at greater risk of loss in the event of a crash. # 2. Latency: Data is not made available to consumers until it is flushed (which adds latency). # 3. Throughput: The flush is generally the most expensive operation. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk log.flush.interval=4000 # The maximum amount of time a message can sit in a log before we force a flush log.default.flush.interval.ms=4000 # Per-topic overrides for log.default.flush.interval.ms #topic.flush.intervals.ms=topic1:1000, topic2:3000 # The interval (in ms) at which logs are checked to see if they need to be flushed to disk. log.default.flush.scheduler.interval.ms=3000 ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion log.retention.hours=336 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.size. log.retention.size=2073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.file.size=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.cleanup.interval.mins=1 ############################# Zookeeper ############################# # Enable connecting to zookeeper enable.zookeeper=true # Zk connection string (see zk docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zk.connect=192.168.75.45:2181,192.168.75.55:2181,192.168.75.65:2181 # Timeout in ms for connecting to zookeeper zk.connectiontimeout.ms=1000000 zk.sessiontimeout.ms = 100000 zk.synctime.ms = 10000 2012/7/28 Jun Rao <jun...@gmail.com> > Hi, > > I was trying to reproduce this problem locally, but couldn't. I set up 1 > server to run the broker and used another server to run 10 instances of > ProducerThreadTest1 with the parameters you provided. No exceptions showed > up in the broker log after the tests were running for 5 minutes. > > Could you share your detailed setup? What kind of servers were you using? > Did you change any config on the broker? How long did you have to run the > test before the exception shows up? > > Thanks, > > Jun > > > On Thu, Jul 12, 2012 at 6:51 PM, jjian fan <xiaofanhad...@gmail.com> > wrote: > > > I post my code here: > > > > ProducerThread.java > > package com.tz.kafka; > > > > > > import java.io.Serializable; > > import java.util.Properties; > > import kafka.producer.ProducerConfig; > > import kafka.javaapi.producer.*; > > import java.util.*; > > import java.util.concurrent.CopyOnWriteArrayList; > > > > public class ProducerThread implements Runnable ,Serializable > > { > > /** > > * > > */ > > private static final long serialVersionUID = 18977854555656L; > > //private final kafka.javaapi.producer.Producer<Integer, String> > producer; > > private String topic; > > private Properties props = new Properties(); > > private String messageStr; > > public ProducerThread(String kafkatopic,String message) > > { > > synchronized(this){ > > props.put("zk.connect", "192.168.75.45:2181,192.168.75.55:2181, > > 192.168.75.65:2181"); > > //props.put("broker.list", "4:192.168.75.104:9092"); > > //props.put("serializer.class", "kafka.serializer.StringEncoder"); > > props.put("serializer.class", "kafka.serializer.StringEncoder"); > > props.put("producer.type", "sync"); > > props.put("compression.codec", "1"); > > props.put("batch.size", "5"); > > props.put("queue.enqueueTimeout.ms", "-1"); > > props.put("queue.size", "2000"); > > props.put("buffer.size", "10240000"); > > //props.put("event.handler", "kafka.producer.async.EventHandler<T>"); > > props.put("zk.sessiontimeout.ms", "6000000"); > > props.put("zk.connectiontimeout.ms", "6000000"); > > props.put("socket.timeout.ms", "60000000"); > > props.put("connect.timeout.ms", "60000000"); > > props.put("max.message.size", "20000"); > > props.put("reconnect.interval", String.valueOf(Integer.MAX_VALUE)); > > props.put("reconnect.interval.ms", "3000"); > > // Use random partitioner. Don't need the key type. Just set it to > > Integer. > > // The message is of type String. > > //producer = new kafka.javaapi.producer.Producer<Integer, String>(new > > ProducerConfig(props)); > > //producer = new kafka.javaapi.producer.Producer<String, String>(new > > ProducerConfig(props)); > > this.topic = kafkatopic; > > this.messageStr = message; > > > > } > > } > > > > public void run() { > > synchronized(this) { > > Producer<String, String> producer = new Producer<String, String>(new > > ProducerConfig(props)); > > //producer. > > long messageNo = 0; > > long t = System.currentTimeMillis(); > > long r = System.currentTimeMillis(); > > long time = r-t; > > long rate = 0; > > List<String> messageSet = new CopyOnWriteArrayList<String>(); > > while(true) > > { > > if(topic.length() > 0 ) > > { > > messageSet.add(this.messageStr.toString()); > > ProducerData<String, String> data = new ProducerData<String, > > String>(topic,null,messageSet); > > > > producer.send(data); > > messageSet.clear(); > > data = null; > > messageNo++; > > > > } > > > > if(messageNo % 200000 ==0) > > { > > r = System.currentTimeMillis(); > > time = r-t; > > rate = 200000000/time; > > System.out.println(this.topic + " send message per second:"+rate); > > t = r; > > } > > > > } > > } > > } > > } > > > > ProducerThreadTest1.java > > > > package com.tz.kafka; > > > > import java.util.concurrent.ThreadPoolExecutor; > > import java.util.concurrent.TimeUnit; > > import java.util.concurrent.LinkedBlockingQueue; > > > > public class ProducerThreadTest1 { > > > > /** > > * @param args > > * @throws InterruptedException > > */ > > public static void main(String[] args) throws InterruptedException { > > // TODO Auto-generated method stub > > int i = Integer.parseInt(args[0]); > > ThreadPoolExecutor threadPool = new ThreadPoolExecutor(i, i, 5, > > TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(i), > > new ThreadPoolExecutor.DiscardOldestPolicy()); > > int messageSize = Integer.parseInt(args[1]); > > StringBuffer messageStr = new StringBuffer(); > > for(int messagesize=0;messagesize<messageSize;messagesize++) > > { > > messageStr.append("X"); > > } > > String topic = args[2]; > > for(int j=0;j < i; j++) > > { > > topic += "x"; > > threadPool.execute(new ProducerThread(topic,messageStr.toString())); > > Thread.sleep(1000); > > > > } > > } > > > > } > > > > > > the shell scripte kafkaThreadTest.sh like this: > > > > java -Xmx10G -jar kafkaThreadTest.jar 2 1024 a > > > > I deploy the shell at ten servers! > > > > Thanks! > > Best Regards! > > > > Jian Fan > > > > 2012/7/13 Jun Rao <jun...@gmail.com> > > > > > That seems like a Kafka bug. Do you have a script that can reproduce > > this? > > > > > > Thanks, > > > > > > Jun > > > > > > On Thu, Jul 12, 2012 at 5:44 PM, jjian fan <xiaofanhad...@gmail.com> > > > wrote: > > > > > > > HI: > > > > I use kafka0.7.1, here is the stack trace in kafka server: > > > > > > > > ERROR Error processing MultiProducerRequest on bxx:2 > > > > (kafka.server.KafkaRequestHandlers) > > > > kafka.message.InvalidMessageException: message is invalid, > compression > > > > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0 > > > > at > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > > > > at > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) > > > > at > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > > > > at > > > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > > > > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > > > > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) > > > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > > > > at kafka.message.MessageSet.foreach(MessageSet.scala:87) > > > > at kafka.log.Log.append(Log.scala:205) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > > > > at > > > > > > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > > > at > > > > > > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > > > at > > > > > > > > > > > > > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > > > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > > > > at > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > > > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > > > > at kafka.network.Processor.handle(SocketServer.scala:296) > > > > at kafka.network.Processor.read(SocketServer.scala:319) > > > > at kafka.network.Processor.run(SocketServer.scala:214) > > > > at java.lang.Thread.run(Thread.java:722) > > > > [2012-07-13 08:40:06,182] ERROR Closing socket for > > /192.168.75.13because > > > > of error (kafka.network.Processor) > > > > kafka.message.InvalidMessageException: message is invalid, > compression > > > > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0 > > > > at > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > > > > at > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) > > > > at > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > > > > at > > > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > > > > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > > > > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) > > > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > > > > at kafka.message.MessageSet.foreach(MessageSet.scala:87) > > > > at kafka.log.Log.append(Log.scala:205) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > > > > at > > > > > > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > > > at > > > > > > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > > > at > > > > > > > > > > > > > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > > > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > > > > at > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > > > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > > > > at kafka.network.Processor.handle(SocketServer.scala:296) > > > > at kafka.network.Processor.read(SocketServer.scala:319) > > > > at kafka.network.Processor.run(SocketServer.scala:214) > > > > at java.lang.Thread.run(Thread.java:722) > > > > > > > > here is the track stace in kafka producer: > > > > ERROR Connection attempt to 192.168.75.104:9092 failed, next attempt > > in > > > > 60000 ms (kafka.producer.SyncProducer) > > > > java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect(Native Method) > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525) > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:173) > > > > at > > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:92) > > > > at kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135) > > > > at > > > > > > > > > > kafka.producer.async.DefaultEventHandler.send(DefaultEventHandler.scala:58) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44) > > > > at > > > > > > > > > > > > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116) > > > > at > > > > > > > > > > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95) > > > > at > > > > > > > > > > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71) > > > > at scala.collection.immutable.Stream.foreach(Stream.scala:254) > > > > at > > > > > > > > > > > > > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70) > > > > at > > > > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41) > > > > > > > > The kafka producer is multi-thread program. > > > > > > > > Thanks! > > > > > > > > Best Regards! > > > > > > > > > > > > 2012/7/13 Neha Narkhede <neha.narkh...@gmail.com> > > > > > > > > > In addition to Jun's question, > > > > > > > > > > which version are you using ? Do you have a reproducible test case > ? > > > > > > > > > > Thanks, > > > > > Neha > > > > > > > > > > On Thu, Jul 12, 2012 at 7:19 AM, Jun Rao <jun...@gmail.com> wrote: > > > > > > What's the stack trace? > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jun > > > > > > > > > > > > On Thu, Jul 12, 2012 at 12:55 AM, jjian fan < > > xiaofanhad...@gmail.com > > > > > > > > > wrote: > > > > > > > > > > > >> HI: > > > > > >> > > > > > >> Guys, I test kafka in our test high cocunnrent enivorment, I > > always > > > > get > > > > > the > > > > > >> error message as follows: > > > > > >> > > > > > >> ERROR Error processing MultiProducerRequest on axxxxxxxx:2 > > > > > >> (kafka.server.KafkaRequestHandlers) > > > > > >> kafka.message.InvalidMessageException: message is invalid, > > > compression > > > > > >> codec: NoCompressionCodec size: 1034 curr offset: 3114 init > > offset: > > > 0 > > > > > >> > > > > > >> Can anyone help? Thanks! > > > > > >> > > > > > >> Best Regards > > > > > >> > > > > > >> Jian Fan > > > > > >> > > > > > > > > > > > > > > >