sorry!

The  producer.type  shoud be async!!

I have change it to sync in my new test!

Best Regards!
Jian Fan

2012/7/13 jjian fan <xiaofanhad...@gmail.com>

> I post my code here:
>
> ProducerThread.java
> package com.tz.kafka;
>
>
> import java.io.Serializable;
> import java.util.Properties;
> import kafka.producer.ProducerConfig;
> import kafka.javaapi.producer.*;
> import java.util.*;
> import java.util.concurrent.CopyOnWriteArrayList;
>
> public class ProducerThread implements Runnable ,Serializable
> {
>   /**
>  *
>  */
>  private static final long serialVersionUID = 18977854555656L;
> //private final kafka.javaapi.producer.Producer<Integer, String> producer;
>   private String topic;
>   private Properties props = new Properties();
>       private String messageStr;
>   public  ProducerThread(String kafkatopic,String message)
>   {
>     synchronized(this){
>     props.put("zk.connect", "192.168.75.45:2181,192.168.75.55:2181,
> 192.168.75.65:2181");
>  //props.put("broker.list", "4:192.168.75.104:9092");
> //props.put("serializer.class", "kafka.serializer.StringEncoder");
>  props.put("serializer.class", "kafka.serializer.StringEncoder");
> props.put("producer.type", "sync");
>  props.put("compression.codec", "1");
> props.put("batch.size", "5");
>  props.put("queue.enqueueTimeout.ms", "-1");
> props.put("queue.size", "2000");
>  props.put("buffer.size", "10240000");
> //props.put("event.handler", "kafka.producer.async.EventHandler<T>");
>  props.put("zk.sessiontimeout.ms", "6000000");
> props.put("zk.connectiontimeout.ms", "6000000");
>  props.put("socket.timeout.ms", "60000000");
> props.put("connect.timeout.ms", "60000000");
>  props.put("max.message.size", "20000");
> props.put("reconnect.interval", String.valueOf(Integer.MAX_VALUE));
>  props.put("reconnect.interval.ms", "3000");
>     // Use random partitioner. Don't need the key type. Just set it to
> Integer.
>     // The message is of type String.
> //producer = new kafka.javaapi.producer.Producer<Integer, String>(new
> ProducerConfig(props));
>     //producer = new kafka.javaapi.producer.Producer<String, String>(new
> ProducerConfig(props));
>     this.topic = kafkatopic;
>     this.messageStr = message;
>
>   }
>   }
>
>   public void run() {
> synchronized(this) {
>  Producer<String, String> producer  = new Producer<String, String>(new
> ProducerConfig(props));
>     //producer.
>  long messageNo = 0;
>     long t = System.currentTimeMillis();
>     long r = System.currentTimeMillis();
>     long time = r-t;
>     long rate = 0;
>     List<String> messageSet = new CopyOnWriteArrayList<String>();
>     while(true)
>     {
>       if(topic.length() > 0 )
>       {
>      messageSet.add(this.messageStr.toString());
>          ProducerData<String, String> data = new ProducerData<String,
> String>(topic,null,messageSet);
>
>          producer.send(data);
>          messageSet.clear();
>          data = null;
>          messageNo++;
>
>       }
>
>       if(messageNo % 200000 ==0)
>       {
>       r = System.currentTimeMillis();
>       time = r-t;
>       rate = 200000000/time;
>       System.out.println(this.topic + " send message per second:"+rate);
>       t = r;
>       }
>
>      }
> }
>   }
>     }
>
> ProducerThreadTest1.java
>
> package com.tz.kafka;
>
> import java.util.concurrent.ThreadPoolExecutor;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.LinkedBlockingQueue;
>
> public class ProducerThreadTest1 {
>
> /**
>  * @param args
>  * @throws InterruptedException
>  */
>  public static void main(String[] args) throws InterruptedException {
> // TODO Auto-generated method stub
>  int i = Integer.parseInt(args[0]);
>  ThreadPoolExecutor threadPool = new ThreadPoolExecutor(i, i, 5,
>  TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(i),
> new ThreadPoolExecutor.DiscardOldestPolicy());
>  int messageSize = Integer.parseInt(args[1]);
>  StringBuffer messageStr = new StringBuffer();
>  for(int messagesize=0;messagesize<messageSize;messagesize++)
>      {
>      messageStr.append("X");
>      }
>  String topic = args[2];
> for(int j=0;j < i; j++)
> {
>    topic += "x";
>    threadPool.execute(new ProducerThread(topic,messageStr.toString()));
>    Thread.sleep(1000);
>
> }
>  }
>
> }
>
>
> the shell scripte kafkaThreadTest.sh like this:
>
> java -Xmx10G -jar kafkaThreadTest.jar 2 1024 a
>
> I deploy the shell at ten servers!
>
> Thanks!
> Best Regards!
>
> Jian Fan
>
> 2012/7/13 Jun Rao <jun...@gmail.com>
>
>> That seems like a Kafka bug. Do you have a script that can reproduce this?
>>
>> Thanks,
>>
>> Jun
>>
>> On Thu, Jul 12, 2012 at 5:44 PM, jjian fan <xiaofanhad...@gmail.com>
>> wrote:
>>
>> > HI:
>> > I use kafka0.7.1, here is the stack trace in kafka server:
>> >
>> >  ERROR Error processing MultiProducerRequest on bxx:2
>> > (kafka.server.KafkaRequestHandlers)
>> > kafka.message.InvalidMessageException: message is invalid, compression
>> > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
>> > at
>> >
>> >
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>> > at
>> >
>> >
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
>> > at
>> >
>> >
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>> > at
>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>> > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>> > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>> > at kafka.log.Log.append(Log.scala:205)
>> > at
>> >
>> >
>> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
>> > at
>> >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>> > at
>> >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>> > at
>> >
>> >
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>> > at
>> >
>> >
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>> > at
>> >
>> >
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>> > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>> > at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>> > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
>> > at
>> >
>> >
>> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
>> > at
>> >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>> > at
>> >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>> > at kafka.network.Processor.handle(SocketServer.scala:296)
>> > at kafka.network.Processor.read(SocketServer.scala:319)
>> > at kafka.network.Processor.run(SocketServer.scala:214)
>> > at java.lang.Thread.run(Thread.java:722)
>> > [2012-07-13 08:40:06,182] ERROR Closing socket for /192.168.75.13because
>> > of error (kafka.network.Processor)
>> > kafka.message.InvalidMessageException: message is invalid, compression
>> > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
>> > at
>> >
>> >
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>> > at
>> >
>> >
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
>> > at
>> >
>> >
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>> > at
>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>> > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>> > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>> > at kafka.log.Log.append(Log.scala:205)
>> > at
>> >
>> >
>> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
>> > at
>> >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>> > at
>> >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>> > at
>> >
>> >
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>> > at
>> >
>> >
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>> > at
>> >
>> >
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>> > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>> > at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>> > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
>> > at
>> >
>> >
>> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
>> > at
>> >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>> > at
>> >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>> > at kafka.network.Processor.handle(SocketServer.scala:296)
>> > at kafka.network.Processor.read(SocketServer.scala:319)
>> > at kafka.network.Processor.run(SocketServer.scala:214)
>> > at java.lang.Thread.run(Thread.java:722)
>> >
>> > here is the track stace in kafka producer:
>> > ERROR Connection attempt to 192.168.75.104:9092 failed, next attempt in
>> > 60000 ms (kafka.producer.SyncProducer)
>> > java.net.ConnectException: Connection refused
>> > at sun.nio.ch.Net.connect(Native Method)
>> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525)
>> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:173)
>> > at
>> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196)
>> > at kafka.producer.SyncProducer.send(SyncProducer.scala:92)
>> > at kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135)
>> > at
>> >
>> kafka.producer.async.DefaultEventHandler.send(DefaultEventHandler.scala:58)
>> > at
>> >
>> >
>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44)
>> > at
>> >
>> >
>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116)
>> > at
>> >
>> >
>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95)
>> > at
>> >
>> >
>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71)
>> > at scala.collection.immutable.Stream.foreach(Stream.scala:254)
>> > at
>> >
>> >
>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70)
>> > at
>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41)
>> >
>> > The kafka producer is multi-thread program.
>> >
>> > Thanks!
>> >
>> > Best Regards!
>> >
>> >
>> > 2012/7/13 Neha Narkhede <neha.narkh...@gmail.com>
>> >
>> > > In addition to Jun's question,
>> > >
>> > > which version are you using ? Do you have a reproducible test case ?
>> > >
>> > > Thanks,
>> > > Neha
>> > >
>> > > On Thu, Jul 12, 2012 at 7:19 AM, Jun Rao <jun...@gmail.com> wrote:
>> > > > What's the stack trace?
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Jun
>> > > >
>> > > > On Thu, Jul 12, 2012 at 12:55 AM, jjian fan <
>> xiaofanhad...@gmail.com>
>> > > wrote:
>> > > >
>> > > >> HI:
>> > > >>
>> > > >> Guys, I test kafka in our test high cocunnrent enivorment, I always
>> > get
>> > > the
>> > > >> error message as follows:
>> > > >>
>> > > >> ERROR Error processing MultiProducerRequest on axxxxxxxx:2
>> > > >> (kafka.server.KafkaRequestHandlers)
>> > > >> kafka.message.InvalidMessageException: message is invalid,
>> compression
>> > > >> codec: NoCompressionCodec size: 1034 curr offset: 3114 init
>> offset: 0
>> > > >>
>> > > >> Can anyone help? Thanks!
>> > > >>
>> > > >> Best Regards
>> > > >>
>> > > >> Jian Fan
>> > > >>
>> > >
>> >
>>
>
>

Reply via email to