Hi Jun just to give you abit more info in terms of how I work out the offset for the next fetch, please advice if this is correct
ByteBufferMessageSet msgs = consumer.fetch( new FetchRequest( kafkaConf.topic, hostPartition, offsetStart, kafkaConf.fetchSizeBytes)); java.util.Iterator <MessageAndOffset> msgSet = msgs.iterator(); Long finalOffset = 0L; int cnt = 0; while(msgSet.hasNext()) { cnt++; MessageAndOffset mao = msgSet.next(); Long off = mao.offset(); if (off.compareTo(finalOffset) > 0) finalOffset = off; if (maoMap == null) maoMap = new HashMap<Integer,MessageAndOffset>(); maoMap.put(cnt,mao); if (msgAwaiting == null) msgAwaiting = new LinkedList<Integer>(); msgAwaiting.add(cnt); } finalOffset += msgs.getInitialOffset(); offsetStart = finalOffset; // for next fetch Ray On Wed, Sep 26, 2012 at 6:53 PM, Raymond Ng <raymond...@gmail.com> wrote: > Hi Jun, > > thanks for your reply, however I'm still getting the InvalidMessageSize > exception, as I mentioned before my consumer fetchsize is set to 10MB and > the producer is batching the messages upto 3MB and GZIP'ed when sending to > kafka. DumpLogSegments reported no error (no result when greping "isvalid: > false"). > > I'm also getting the following error from the broker server, appreciate > your advice on this > > [2012-09-26 17:31:42,921] ERROR Error processing ProduceRequest on > probe-data26:0 (kafka.server.KafkaRequestHandlers) > kafka.message.InvalidMessageException: message is invalid, compression > codec: GZIPCompressionCodec size: 320200 curr offset: 0 init offset: 0 > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > at kafka.message.MessageSet.foreach(MessageSet.scala:87) > at kafka.log.Log.append(Log.scala:205) > at > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > at > kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) > at > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > at > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > at kafka.network.Processor.handle(SocketServer.scala:296) > at kafka.network.Processor.read(SocketServer.scala:319) > at kafka.network.Processor.run(SocketServer.scala:214) > at java.lang.Thread.run(Thread.java:636) > [2012-09-26 17:31:43,001] ERROR Closing socket for /192.168.210.35because of > error (kafka.network.Processor) > kafka.message.InvalidMessageException: message is invalid, compression > codec: GZIPCompressionCodec size: 320200 curr offset: 0 init offset: 0 > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > at kafka.message.MessageSet.foreach(MessageSet.scala:87) > at kafka.log.Log.append(Log.scala:205) > at > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > at > kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) > at > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > at > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > at kafka.network.Processor.handle(SocketServer.scala:296) > at kafka.network.Processor.read(SocketServer.scala:319) > at kafka.network.Processor.run(SocketServer.scala:214) > at java.lang.Thread.run(Thread.java:636) > [2012-09-26 17:31:43,371] ERROR Error processing ProduceRequest on > probe-data26:0 (kafka.server.KafkaRequestHandlers) > kafka.message.InvalidMessageException: message is invalid, compression > codec: GZIPCompressionCodec size: 102923 curr offset: 0 init offset: 0 > > > > > > On Tue, Sep 25, 2012 at 6:20 PM, Jun Rao <jun...@gmail.com> wrote: > >> Raymond, >> >> Your understanding is correct. There are 2 common reasons for >> getting InvalidMessageSizeException: (1) fetch size smaller than the >> largest message (compressed messages in the same batch is treated as 1 >> message), (2) the offset in the fetch request is wrong. Both can be >> verified using the DumpLogSegment tool. >> >> Thanks, >> >> Jun >> >> On Tue, Sep 25, 2012 at 7:25 AM, Raymond Ng <raymond...@gmail.com> wrote: >> >> > Hi >> > >> > I'd like to know how SimpleConsumer.fetch and >> > ByteBufferMessageSet.iterartor work in terms of dealing with GZIP'd >> > messages >> > >> > is it right to say that SimpleConsumer.fetch will fetch upto the amount >> > specified in the fetch size regardless whether a complete GZIP batch has >> > been retrieved, and ByteBufferMessageSet.iterartor is responsible for >> > converting the fetched MessageSet into a set of complete >> MessageAndOffset >> > objects and disgarding any incomplete batch due to fetch size limit? >> > >> > I'm asking because I'm regularly getting InvalidMessageSizeException >> after >> > a consumer fetch, and I can see this topic being discussed in other mail >> > threads already, and I've also used the DumpLogSegments to check the >> kafka >> > file reported in the exception, nothing negative reported by the tool in >> > terms of the offset >> > >> > I'm using GZIP compression to send msgs to kafka, and a limit of 3MB of >> raw >> > data batch has been imposed and the batch size will be smaller once >> GZIP'ed >> > to kafka >> > >> > On the consumer the fetch size is set to 10 MB to allow for plenty of >> > leeway to fetch at least 1 full batch of GZIP'd content >> > >> > what else can be wrong in this setup ? >> > >> > -- >> > Rgds >> > Ray >> > >> > > > > -- > Rgds > Ray > -- Rgds Ray