The new error:

[2012-07-13 10:12:41,178] ERROR Error processing MultiProducerRequest on
oxxxx:0 (kafka.server.KafkaRequestHandlers)
kafka.message.InvalidMessageException: message is invalid, compression
codec: GZIPCompressionCodec size: 48 curr offset: 0 init offset: 0
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
at kafka.message.MessageSet.foreach(MessageSet.scala:87)
at kafka.log.Log.append(Log.scala:205)
at
kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
at
kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
at
kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
at
kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
at kafka.network.Processor.handle(SocketServer.scala:296)
at kafka.network.Processor.read(SocketServer.scala:319)
at kafka.network.Processor.run(SocketServer.scala:214)
at java.lang.Thread.run(Thread.java:722)

2012/7/13 jjian fan <xiaofanhad...@gmail.com>

> OK,sometime it has this error :
>
> [2012-07-13 10:08:03,205] ERROR Closing socket for /192.168.75.15 because
> of error (kafka.network.Processor)
> kafka.common.InvalidTopicException: topic name can't be empty
>  at kafka.log.LogManager.getLogPool(LogManager.scala:159)
> at kafka.log.LogManager.getOrCreateLog(LogManager.scala:195)
>  at
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> at
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>  at
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>  at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>  at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> at
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
>  at
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> at
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>  at kafka.network.Processor.handle(SocketServer.scala:296)
> at kafka.network.Processor.read(SocketServer.scala:319)
>  at kafka.network.Processor.run(SocketServer.scala:214)
> at java.lang.Thread.run(Thread.java:722)
>
> 2012/7/13 jjian fan <xiaofanhad...@gmail.com>
>
>> sorry!
>>
>> The  producer.type  shoud be async!!
>>
>> I have change it to sync in my new test!
>>
>> Best Regards!
>> Jian Fan
>>
>>
>> 2012/7/13 jjian fan <xiaofanhad...@gmail.com>
>>
>>> I post my code here:
>>>
>>> ProducerThread.java
>>> package com.tz.kafka;
>>>
>>>
>>> import java.io.Serializable;
>>> import java.util.Properties;
>>> import kafka.producer.ProducerConfig;
>>> import kafka.javaapi.producer.*;
>>> import java.util.*;
>>> import java.util.concurrent.CopyOnWriteArrayList;
>>>
>>> public class ProducerThread implements Runnable ,Serializable
>>>  {
>>>   /**
>>>  *
>>>  */
>>>  private static final long serialVersionUID = 18977854555656L;
>>> //private final kafka.javaapi.producer.Producer<Integer, String>
>>> producer;
>>>   private String topic;
>>>   private Properties props = new Properties();
>>>       private String messageStr;
>>>   public  ProducerThread(String kafkatopic,String message)
>>>   {
>>>     synchronized(this){
>>>     props.put("zk.connect", "192.168.75.45:2181,192.168.75.55:2181,
>>> 192.168.75.65:2181");
>>>  //props.put("broker.list", "4:192.168.75.104:9092");
>>> //props.put("serializer.class", "kafka.serializer.StringEncoder");
>>>  props.put("serializer.class", "kafka.serializer.StringEncoder");
>>> props.put("producer.type", "sync");
>>>  props.put("compression.codec", "1");
>>> props.put("batch.size", "5");
>>>  props.put("queue.enqueueTimeout.ms", "-1");
>>> props.put("queue.size", "2000");
>>>  props.put("buffer.size", "10240000");
>>> //props.put("event.handler", "kafka.producer.async.EventHandler<T>");
>>>  props.put("zk.sessiontimeout.ms", "6000000");
>>> props.put("zk.connectiontimeout.ms", "6000000");
>>>  props.put("socket.timeout.ms", "60000000");
>>> props.put("connect.timeout.ms", "60000000");
>>>  props.put("max.message.size", "20000");
>>> props.put("reconnect.interval", String.valueOf(Integer.MAX_VALUE));
>>>  props.put("reconnect.interval.ms", "3000");
>>>     // Use random partitioner. Don't need the key type. Just set it to
>>> Integer.
>>>     // The message is of type String.
>>> //producer = new kafka.javaapi.producer.Producer<Integer, String>(new
>>> ProducerConfig(props));
>>>     //producer = new kafka.javaapi.producer.Producer<String,
>>> String>(new ProducerConfig(props));
>>>     this.topic = kafkatopic;
>>>     this.messageStr = message;
>>>
>>>   }
>>>   }
>>>
>>>   public void run() {
>>> synchronized(this) {
>>>  Producer<String, String> producer  = new Producer<String, String>(new
>>> ProducerConfig(props));
>>>     //producer.
>>>  long messageNo = 0;
>>>     long t = System.currentTimeMillis();
>>>     long r = System.currentTimeMillis();
>>>     long time = r-t;
>>>     long rate = 0;
>>>     List<String> messageSet = new CopyOnWriteArrayList<String>();
>>>     while(true)
>>>     {
>>>       if(topic.length() > 0 )
>>>       {
>>>      messageSet.add(this.messageStr.toString());
>>>          ProducerData<String, String> data = new ProducerData<String,
>>> String>(topic,null,messageSet);
>>>
>>>          producer.send(data);
>>>          messageSet.clear();
>>>          data = null;
>>>          messageNo++;
>>>
>>>       }
>>>
>>>       if(messageNo % 200000 ==0)
>>>       {
>>>       r = System.currentTimeMillis();
>>>       time = r-t;
>>>       rate = 200000000/time;
>>>       System.out.println(this.topic + " send message per second:"+rate);
>>>       t = r;
>>>       }
>>>
>>>      }
>>> }
>>>   }
>>>     }
>>>
>>> ProducerThreadTest1.java
>>>
>>> package com.tz.kafka;
>>>
>>> import java.util.concurrent.ThreadPoolExecutor;
>>> import java.util.concurrent.TimeUnit;
>>> import java.util.concurrent.LinkedBlockingQueue;
>>>
>>> public class ProducerThreadTest1 {
>>>
>>> /**
>>>  * @param args
>>>  * @throws InterruptedException
>>>  */
>>>  public static void main(String[] args) throws InterruptedException {
>>> // TODO Auto-generated method stub
>>>  int i = Integer.parseInt(args[0]);
>>>  ThreadPoolExecutor threadPool = new ThreadPoolExecutor(i, i, 5,
>>>  TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(i),
>>> new ThreadPoolExecutor.DiscardOldestPolicy());
>>>  int messageSize = Integer.parseInt(args[1]);
>>>  StringBuffer messageStr = new StringBuffer();
>>>  for(int messagesize=0;messagesize<messageSize;messagesize++)
>>>      {
>>>      messageStr.append("X");
>>>      }
>>>  String topic = args[2];
>>> for(int j=0;j < i; j++)
>>> {
>>>    topic += "x";
>>>    threadPool.execute(new ProducerThread(topic,messageStr.toString()));
>>>    Thread.sleep(1000);
>>>
>>> }
>>>  }
>>>
>>> }
>>>
>>>
>>> the shell scripte kafkaThreadTest.sh like this:
>>>
>>> java -Xmx10G -jar kafkaThreadTest.jar 2 1024 a
>>>
>>> I deploy the shell at ten servers!
>>>
>>> Thanks!
>>> Best Regards!
>>>
>>> Jian Fan
>>>
>>> 2012/7/13 Jun Rao <jun...@gmail.com>
>>>
>>>> That seems like a Kafka bug. Do you have a script that can reproduce
>>>> this?
>>>>
>>>> Thanks,
>>>>
>>>> Jun
>>>>
>>>> On Thu, Jul 12, 2012 at 5:44 PM, jjian fan <xiaofanhad...@gmail.com>
>>>> wrote:
>>>>
>>>> > HI:
>>>> > I use kafka0.7.1, here is the stack trace in kafka server:
>>>> >
>>>> >  ERROR Error processing MultiProducerRequest on bxx:2
>>>> > (kafka.server.KafkaRequestHandlers)
>>>> > kafka.message.InvalidMessageException: message is invalid, compression
>>>> > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
>>>> > at
>>>> >
>>>> >
>>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>>>> > at
>>>> >
>>>> >
>>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
>>>> > at
>>>> >
>>>> >
>>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>>>> > at
>>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>>>> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>>>> > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>>>> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>>>> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>>>> > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>>>> > at kafka.log.Log.append(Log.scala:205)
>>>> > at
>>>> >
>>>> >
>>>> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
>>>> > at
>>>> >
>>>> >
>>>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>>>> > at
>>>> >
>>>> >
>>>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>>>> > at
>>>> >
>>>> >
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>>>> > at
>>>> >
>>>> >
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>>>> > at
>>>> >
>>>> >
>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>>>> > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>>>> > at
>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>>>> > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
>>>> > at
>>>> >
>>>> >
>>>> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
>>>> > at
>>>> >
>>>> >
>>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>>>> > at
>>>> >
>>>> >
>>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>>>> > at kafka.network.Processor.handle(SocketServer.scala:296)
>>>> > at kafka.network.Processor.read(SocketServer.scala:319)
>>>> > at kafka.network.Processor.run(SocketServer.scala:214)
>>>> > at java.lang.Thread.run(Thread.java:722)
>>>> > [2012-07-13 08:40:06,182] ERROR Closing socket for /192.168.75.13because
>>>> > of error (kafka.network.Processor)
>>>> > kafka.message.InvalidMessageException: message is invalid, compression
>>>> > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
>>>> > at
>>>> >
>>>> >
>>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>>>> > at
>>>> >
>>>> >
>>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
>>>> > at
>>>> >
>>>> >
>>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>>>> > at
>>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>>>> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>>>> > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>>>> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>>>> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>>>> > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>>>> > at kafka.log.Log.append(Log.scala:205)
>>>> > at
>>>> >
>>>> >
>>>> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
>>>> > at
>>>> >
>>>> >
>>>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>>>> > at
>>>> >
>>>> >
>>>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>>>> > at
>>>> >
>>>> >
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>>>> > at
>>>> >
>>>> >
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>>>> > at
>>>> >
>>>> >
>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>>>> > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>>>> > at
>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>>>> > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
>>>> > at
>>>> >
>>>> >
>>>> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
>>>> > at
>>>> >
>>>> >
>>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>>>> > at
>>>> >
>>>> >
>>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>>>> > at kafka.network.Processor.handle(SocketServer.scala:296)
>>>> > at kafka.network.Processor.read(SocketServer.scala:319)
>>>> > at kafka.network.Processor.run(SocketServer.scala:214)
>>>> > at java.lang.Thread.run(Thread.java:722)
>>>> >
>>>> > here is the track stace in kafka producer:
>>>> > ERROR Connection attempt to 192.168.75.104:9092 failed, next attempt
>>>> in
>>>> > 60000 ms (kafka.producer.SyncProducer)
>>>> > java.net.ConnectException: Connection refused
>>>> > at sun.nio.ch.Net.connect(Native Method)
>>>> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525)
>>>> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:173)
>>>> > at
>>>> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196)
>>>> > at kafka.producer.SyncProducer.send(SyncProducer.scala:92)
>>>> > at kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135)
>>>> > at
>>>> >
>>>> kafka.producer.async.DefaultEventHandler.send(DefaultEventHandler.scala:58)
>>>> > at
>>>> >
>>>> >
>>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44)
>>>> > at
>>>> >
>>>> >
>>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116)
>>>> > at
>>>> >
>>>> >
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95)
>>>> > at
>>>> >
>>>> >
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71)
>>>> > at scala.collection.immutable.Stream.foreach(Stream.scala:254)
>>>> > at
>>>> >
>>>> >
>>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70)
>>>> > at
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41)
>>>> >
>>>> > The kafka producer is multi-thread program.
>>>> >
>>>> > Thanks!
>>>> >
>>>> > Best Regards!
>>>> >
>>>> >
>>>> > 2012/7/13 Neha Narkhede <neha.narkh...@gmail.com>
>>>> >
>>>> > > In addition to Jun's question,
>>>> > >
>>>> > > which version are you using ? Do you have a reproducible test case ?
>>>> > >
>>>> > > Thanks,
>>>> > > Neha
>>>> > >
>>>> > > On Thu, Jul 12, 2012 at 7:19 AM, Jun Rao <jun...@gmail.com> wrote:
>>>> > > > What's the stack trace?
>>>> > > >
>>>> > > > Thanks,
>>>> > > >
>>>> > > > Jun
>>>> > > >
>>>> > > > On Thu, Jul 12, 2012 at 12:55 AM, jjian fan <
>>>> xiaofanhad...@gmail.com>
>>>> > > wrote:
>>>> > > >
>>>> > > >> HI:
>>>> > > >>
>>>> > > >> Guys, I test kafka in our test high cocunnrent enivorment, I
>>>> always
>>>> > get
>>>> > > the
>>>> > > >> error message as follows:
>>>> > > >>
>>>> > > >> ERROR Error processing MultiProducerRequest on axxxxxxxx:2
>>>> > > >> (kafka.server.KafkaRequestHandlers)
>>>> > > >> kafka.message.InvalidMessageException: message is invalid,
>>>> compression
>>>> > > >> codec: NoCompressionCodec size: 1034 curr offset: 3114 init
>>>> offset: 0
>>>> > > >>
>>>> > > >> Can anyone help? Thanks!
>>>> > > >>
>>>> > > >> Best Regards
>>>> > > >>
>>>> > > >> Jian Fan
>>>> > > >>
>>>> > >
>>>> >
>>>>
>>>
>>>
>>
>

Reply via email to