Hi Jun, thanks for your reply, however I'm still getting the InvalidMessageSize exception, as I mentioned before my consumer fetchsize is set to 10MB and the producer is batching the messages upto 3MB and GZIP'ed when sending to kafka. DumpLogSegments reported no error (no result when greping "isvalid: false").
I'm also getting the following error from the broker server, appreciate your advice on this [2012-09-26 17:31:42,921] ERROR Error processing ProduceRequest on probe-data26:0 (kafka.server.KafkaRequestHandlers) kafka.message.InvalidMessageException: message is invalid, compression codec: GZIPCompressionCodec size: 320200 curr offset: 0 init offset: 0 at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) at kafka.message.MessageSet.foreach(MessageSet.scala:87) at kafka.log.Log.append(Log.scala:205) at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) at kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) at kafka.network.Processor.handle(SocketServer.scala:296) at kafka.network.Processor.read(SocketServer.scala:319) at kafka.network.Processor.run(SocketServer.scala:214) at java.lang.Thread.run(Thread.java:636) [2012-09-26 17:31:43,001] ERROR Closing socket for /192.168.210.35 because of error (kafka.network.Processor) kafka.message.InvalidMessageException: message is invalid, compression codec: GZIPCompressionCodec size: 320200 curr offset: 0 init offset: 0 at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) at kafka.message.MessageSet.foreach(MessageSet.scala:87) at kafka.log.Log.append(Log.scala:205) at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) at kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) at kafka.network.Processor.handle(SocketServer.scala:296) at kafka.network.Processor.read(SocketServer.scala:319) at kafka.network.Processor.run(SocketServer.scala:214) at java.lang.Thread.run(Thread.java:636) [2012-09-26 17:31:43,371] ERROR Error processing ProduceRequest on probe-data26:0 (kafka.server.KafkaRequestHandlers) kafka.message.InvalidMessageException: message is invalid, compression codec: GZIPCompressionCodec size: 102923 curr offset: 0 init offset: 0 On Tue, Sep 25, 2012 at 6:20 PM, Jun Rao <jun...@gmail.com> wrote: > Raymond, > > Your understanding is correct. There are 2 common reasons for > getting InvalidMessageSizeException: (1) fetch size smaller than the > largest message (compressed messages in the same batch is treated as 1 > message), (2) the offset in the fetch request is wrong. Both can be > verified using the DumpLogSegment tool. > > Thanks, > > Jun > > On Tue, Sep 25, 2012 at 7:25 AM, Raymond Ng <raymond...@gmail.com> wrote: > > > Hi > > > > I'd like to know how SimpleConsumer.fetch and > > ByteBufferMessageSet.iterartor work in terms of dealing with GZIP'd > > messages > > > > is it right to say that SimpleConsumer.fetch will fetch upto the amount > > specified in the fetch size regardless whether a complete GZIP batch has > > been retrieved, and ByteBufferMessageSet.iterartor is responsible for > > converting the fetched MessageSet into a set of complete MessageAndOffset > > objects and disgarding any incomplete batch due to fetch size limit? > > > > I'm asking because I'm regularly getting InvalidMessageSizeException > after > > a consumer fetch, and I can see this topic being discussed in other mail > > threads already, and I've also used the DumpLogSegments to check the > kafka > > file reported in the exception, nothing negative reported by the tool in > > terms of the offset > > > > I'm using GZIP compression to send msgs to kafka, and a limit of 3MB of > raw > > data batch has been imposed and the batch size will be smaller once > GZIP'ed > > to kafka > > > > On the consumer the fetch size is set to 10 MB to allow for plenty of > > leeway to fetch at least 1 full batch of GZIP'd content > > > > what else can be wrong in this setup ? > > > > -- > > Rgds > > Ray > > > -- Rgds Ray