Neha :

    I have create the bug in jira with the name of kafka-411, pls check it!

Thanks!
Jian Fan

2012/7/21 Neha Narkhede <neha.narkh...@gmail.com>

> Jian,
>
> This is a bug in Kafka. Would you mind filing a JIRA with the testcase you
> have and the error log ?
>
> Thanks,
> Neha
>
> On Fri, Jul 13, 2012 at 7:49 PM, jjian fan <xiaofanhad...@gmail.com>
> wrote:
>
> > Jun
> >
> >   I have enabled kafka.producer.SyncProducer in my test, In produce side
> > ,there are no InvalidMessageException throw. So, error must be in server
> > side.
> >
> > Thanks!
> > Jian Fan
> >
> > 2012/7/14 Jun Rao <jun...@gmail.com>
> >
> > > Jian,
> > >
> > > Thanks. We will take a look.
> > >
> > > Another thing. Could you try enabling debug level logging in
> > > kafka.producer.SyncProducer while running your test? This will enable
> > > message verification on the producer side and will tell us if the
> > > corruption was introduced on the producer side or the broker side.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Jul 12, 2012 at 6:51 PM, jjian fan <xiaofanhad...@gmail.com>
> > > wrote:
> > >
> > > > I post my code here:
> > > >
> > > > ProducerThread.java
> > > > package com.tz.kafka;
> > > >
> > > >
> > > > import java.io.Serializable;
> > > > import java.util.Properties;
> > > > import kafka.producer.ProducerConfig;
> > > > import kafka.javaapi.producer.*;
> > > > import java.util.*;
> > > > import java.util.concurrent.CopyOnWriteArrayList;
> > > >
> > > > public class ProducerThread implements Runnable ,Serializable
> > > > {
> > > >   /**
> > > >  *
> > > >  */
> > > > private static final long serialVersionUID = 18977854555656L;
> > > > //private final kafka.javaapi.producer.Producer<Integer, String>
> > > producer;
> > > >   private String topic;
> > > >   private Properties props = new Properties();
> > > >       private String messageStr;
> > > >   public  ProducerThread(String kafkatopic,String message)
> > > >   {
> > > >     synchronized(this){
> > > >     props.put("zk.connect", "192.168.75.45:2181,192.168.75.55:2181,
> > > > 192.168.75.65:2181");
> > > > //props.put("broker.list", "4:192.168.75.104:9092");
> > > > //props.put("serializer.class", "kafka.serializer.StringEncoder");
> > > > props.put("serializer.class", "kafka.serializer.StringEncoder");
> > > > props.put("producer.type", "sync");
> > > > props.put("compression.codec", "1");
> > > > props.put("batch.size", "5");
> > > > props.put("queue.enqueueTimeout.ms", "-1");
> > > > props.put("queue.size", "2000");
> > > > props.put("buffer.size", "10240000");
> > > > //props.put("event.handler", "kafka.producer.async.EventHandler<T>");
> > > > props.put("zk.sessiontimeout.ms", "6000000");
> > > > props.put("zk.connectiontimeout.ms", "6000000");
> > > > props.put("socket.timeout.ms", "60000000");
> > > > props.put("connect.timeout.ms", "60000000");
> > > > props.put("max.message.size", "20000");
> > > > props.put("reconnect.interval", String.valueOf(Integer.MAX_VALUE));
> > > > props.put("reconnect.interval.ms", "3000");
> > > >     // Use random partitioner. Don't need the key type. Just set it
> to
> > > > Integer.
> > > >     // The message is of type String.
> > > > //producer = new kafka.javaapi.producer.Producer<Integer, String>(new
> > > > ProducerConfig(props));
> > > >     //producer = new kafka.javaapi.producer.Producer<String,
> > String>(new
> > > > ProducerConfig(props));
> > > >     this.topic = kafkatopic;
> > > >     this.messageStr = message;
> > > >
> > > >   }
> > > >   }
> > > >
> > > >   public void run() {
> > > > synchronized(this) {
> > > > Producer<String, String> producer  = new Producer<String, String>(new
> > > > ProducerConfig(props));
> > > >     //producer.
> > > > long messageNo = 0;
> > > >     long t = System.currentTimeMillis();
> > > >     long r = System.currentTimeMillis();
> > > >     long time = r-t;
> > > >     long rate = 0;
> > > >     List<String> messageSet = new CopyOnWriteArrayList<String>();
> > > >     while(true)
> > > >     {
> > > >       if(topic.length() > 0 )
> > > >       {
> > > >      messageSet.add(this.messageStr.toString());
> > > >          ProducerData<String, String> data = new ProducerData<String,
> > > > String>(topic,null,messageSet);
> > > >
> > > >          producer.send(data);
> > > >          messageSet.clear();
> > > >          data = null;
> > > >          messageNo++;
> > > >
> > > >       }
> > > >
> > > >       if(messageNo % 200000 ==0)
> > > >       {
> > > >       r = System.currentTimeMillis();
> > > >       time = r-t;
> > > >       rate = 200000000/time;
> > > >       System.out.println(this.topic + " send message per
> > second:"+rate);
> > > >       t = r;
> > > >       }
> > > >
> > > >      }
> > > > }
> > > >   }
> > > >     }
> > > >
> > > > ProducerThreadTest1.java
> > > >
> > > > package com.tz.kafka;
> > > >
> > > > import java.util.concurrent.ThreadPoolExecutor;
> > > > import java.util.concurrent.TimeUnit;
> > > > import java.util.concurrent.LinkedBlockingQueue;
> > > >
> > > > public class ProducerThreadTest1 {
> > > >
> > > > /**
> > > >  * @param args
> > > >  * @throws InterruptedException
> > > >  */
> > > > public static void main(String[] args) throws InterruptedException {
> > > > // TODO Auto-generated method stub
> > > > int i = Integer.parseInt(args[0]);
> > > >  ThreadPoolExecutor threadPool = new ThreadPoolExecutor(i, i, 5,
> > > > TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(i),
> > > > new ThreadPoolExecutor.DiscardOldestPolicy());
> > > > int messageSize = Integer.parseInt(args[1]);
> > > >  StringBuffer messageStr = new StringBuffer();
> > > > for(int messagesize=0;messagesize<messageSize;messagesize++)
> > > >      {
> > > >      messageStr.append("X");
> > > >      }
> > > > String topic = args[2];
> > > > for(int j=0;j < i; j++)
> > > > {
> > > >    topic += "x";
> > > >    threadPool.execute(new
> ProducerThread(topic,messageStr.toString()));
> > > >    Thread.sleep(1000);
> > > >
> > > > }
> > > > }
> > > >
> > > > }
> > > >
> > > >
> > > > the shell scripte kafkaThreadTest.sh like this:
> > > >
> > > > java -Xmx10G -jar kafkaThreadTest.jar 2 1024 a
> > > >
> > > > I deploy the shell at ten servers!
> > > >
> > > > Thanks!
> > > > Best Regards!
> > > >
> > > > Jian Fan
> > > >
> > > > 2012/7/13 Jun Rao <jun...@gmail.com>
> > > >
> > > > > That seems like a Kafka bug. Do you have a script that can
> reproduce
> > > > this?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Thu, Jul 12, 2012 at 5:44 PM, jjian fan <
> xiaofanhad...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > HI:
> > > > > > I use kafka0.7.1, here is the stack trace in kafka server:
> > > > > >
> > > > > >  ERROR Error processing MultiProducerRequest on bxx:2
> > > > > > (kafka.server.KafkaRequestHandlers)
> > > > > > kafka.message.InvalidMessageException: message is invalid,
> > > compression
> > > > > > codec: NoCompressionCodec size: 1030 curr offset: 1034 init
> > offset: 0
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> > > > > > at
> > > > >
> > >
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > > > > > at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > > > > > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> > > > > > at
> kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> > > > > > at
> > scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> > > > > > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> > > > > > at kafka.log.Log.append(Log.scala:205)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > > > at
> > > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > > > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > > > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > > > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > > > > at java.lang.Thread.run(Thread.java:722)
> > > > > > [2012-07-13 08:40:06,182] ERROR Closing socket for
> > > > /192.168.75.13because
> > > > > > of error (kafka.network.Processor)
> > > > > > kafka.message.InvalidMessageException: message is invalid,
> > > compression
> > > > > > codec: NoCompressionCodec size: 1030 curr offset: 1034 init
> > offset: 0
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> > > > > > at
> > > > >
> > >
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > > > > > at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > > > > > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> > > > > > at
> kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> > > > > > at
> > scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> > > > > > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> > > > > > at kafka.log.Log.append(Log.scala:205)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > > > at
> > > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > > > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > > > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > > > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > > > > at java.lang.Thread.run(Thread.java:722)
> > > > > >
> > > > > > here is the track stace in kafka producer:
> > > > > > ERROR Connection attempt to 192.168.75.104:9092 failed, next
> > attempt
> > > > in
> > > > > > 60000 ms (kafka.producer.SyncProducer)
> > > > > > java.net.ConnectException: Connection refused
> > > > > > at sun.nio.ch.Net.connect(Native Method)
> > > > > > at
> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525)
> > > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:173)
> > > > > > at
> > > > >
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196)
> > > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:92)
> > > > > > at kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.send(DefaultEventHandler.scala:58)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71)
> > > > > > at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70)
> > > > > > at
> > > > >
> > >
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41)
> > > > > >
> > > > > > The kafka producer is multi-thread program.
> > > > > >
> > > > > > Thanks!
> > > > > >
> > > > > > Best Regards!
> > > > > >
> > > > > >
> > > > > > 2012/7/13 Neha Narkhede <neha.narkh...@gmail.com>
> > > > > >
> > > > > > > In addition to Jun's question,
> > > > > > >
> > > > > > > which version are you using ? Do you have a reproducible test
> > case
> > > ?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Neha
> > > > > > >
> > > > > > > On Thu, Jul 12, 2012 at 7:19 AM, Jun Rao <jun...@gmail.com>
> > wrote:
> > > > > > > > What's the stack trace?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > > On Thu, Jul 12, 2012 at 12:55 AM, jjian fan <
> > > > xiaofanhad...@gmail.com
> > > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > >> HI:
> > > > > > > >>
> > > > > > > >> Guys, I test kafka in our test high cocunnrent enivorment, I
> > > > always
> > > > > > get
> > > > > > > the
> > > > > > > >> error message as follows:
> > > > > > > >>
> > > > > > > >> ERROR Error processing MultiProducerRequest on axxxxxxxx:2
> > > > > > > >> (kafka.server.KafkaRequestHandlers)
> > > > > > > >> kafka.message.InvalidMessageException: message is invalid,
> > > > > compression
> > > > > > > >> codec: NoCompressionCodec size: 1034 curr offset: 3114 init
> > > > offset:
> > > > > 0
> > > > > > > >>
> > > > > > > >> Can anyone help? Thanks!
> > > > > > > >>
> > > > > > > >> Best Regards
> > > > > > > >>
> > > > > > > >> Jian Fan
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to