MessageAndOffset.offset() gives you the offset of the next message and you can use it directly. For compressed messages, offset will only move until we crosses the compressed batch.
Thanks, Jun On Wed, Sep 26, 2012 at 11:58 AM, Raymond Ng <raymond...@gmail.com> wrote: > Hi Jun > > just to give you abit more info in terms of how I work out the offset for > the next fetch, please advice if this is correct > > ByteBufferMessageSet msgs = consumer.fetch( > new FetchRequest( > kafkaConf.topic, > hostPartition, > offsetStart, > kafkaConf.fetchSizeBytes)); > > java.util.Iterator <MessageAndOffset> msgSet = > msgs.iterator(); > Long finalOffset = 0L; > int cnt = 0; > > while(msgSet.hasNext()) { > cnt++; > MessageAndOffset mao = msgSet.next(); > Long off = mao.offset(); > > if (off.compareTo(finalOffset) > 0) finalOffset = > off; > > if (maoMap == null) maoMap = new > HashMap<Integer,MessageAndOffset>(); > > maoMap.put(cnt,mao); > if (msgAwaiting == null) msgAwaiting = new > LinkedList<Integer>(); > msgAwaiting.add(cnt); > } > finalOffset += msgs.getInitialOffset(); > offsetStart = finalOffset; // for next fetch > Ray > > On Wed, Sep 26, 2012 at 6:53 PM, Raymond Ng <raymond...@gmail.com> wrote: > > > Hi Jun, > > > > thanks for your reply, however I'm still getting the InvalidMessageSize > > exception, as I mentioned before my consumer fetchsize is set to 10MB and > > the producer is batching the messages upto 3MB and GZIP'ed when sending > to > > kafka. DumpLogSegments reported no error (no result when greping > "isvalid: > > false"). > > > > I'm also getting the following error from the broker server, appreciate > > your advice on this > > > > [2012-09-26 17:31:42,921] ERROR Error processing ProduceRequest on > > probe-data26:0 (kafka.server.KafkaRequestHandlers) > > kafka.message.InvalidMessageException: message is invalid, compression > > codec: GZIPCompressionCodec size: 320200 curr offset: 0 init offset: 0 > > at > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > > at > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) > > at > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > > at > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > > at kafka.message.MessageSet.foreach(MessageSet.scala:87) > > at kafka.log.Log.append(Log.scala:205) > > at > > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > > at > > > kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) > > at > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > > at > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > > at kafka.network.Processor.handle(SocketServer.scala:296) > > at kafka.network.Processor.read(SocketServer.scala:319) > > at kafka.network.Processor.run(SocketServer.scala:214) > > at java.lang.Thread.run(Thread.java:636) > > [2012-09-26 17:31:43,001] ERROR Closing socket for > /192.168.210.35because of error (kafka.network.Processor) > > kafka.message.InvalidMessageException: message is invalid, compression > > codec: GZIPCompressionCodec size: 320200 curr offset: 0 init offset: 0 > > at > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > > at > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) > > at > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > > at > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > > at kafka.message.MessageSet.foreach(MessageSet.scala:87) > > at kafka.log.Log.append(Log.scala:205) > > at > > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > > at > > > kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) > > at > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > > at > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > > at kafka.network.Processor.handle(SocketServer.scala:296) > > at kafka.network.Processor.read(SocketServer.scala:319) > > at kafka.network.Processor.run(SocketServer.scala:214) > > at java.lang.Thread.run(Thread.java:636) > > [2012-09-26 17:31:43,371] ERROR Error processing ProduceRequest on > > probe-data26:0 (kafka.server.KafkaRequestHandlers) > > kafka.message.InvalidMessageException: message is invalid, compression > > codec: GZIPCompressionCodec size: 102923 curr offset: 0 init offset: 0 > > > > > > > > > > > > On Tue, Sep 25, 2012 at 6:20 PM, Jun Rao <jun...@gmail.com> wrote: > > > >> Raymond, > >> > >> Your understanding is correct. There are 2 common reasons for > >> getting InvalidMessageSizeException: (1) fetch size smaller than the > >> largest message (compressed messages in the same batch is treated as 1 > >> message), (2) the offset in the fetch request is wrong. Both can be > >> verified using the DumpLogSegment tool. > >> > >> Thanks, > >> > >> Jun > >> > >> On Tue, Sep 25, 2012 at 7:25 AM, Raymond Ng <raymond...@gmail.com> > wrote: > >> > >> > Hi > >> > > >> > I'd like to know how SimpleConsumer.fetch and > >> > ByteBufferMessageSet.iterartor work in terms of dealing with GZIP'd > >> > messages > >> > > >> > is it right to say that SimpleConsumer.fetch will fetch upto the > amount > >> > specified in the fetch size regardless whether a complete GZIP batch > has > >> > been retrieved, and ByteBufferMessageSet.iterartor is responsible for > >> > converting the fetched MessageSet into a set of complete > >> MessageAndOffset > >> > objects and disgarding any incomplete batch due to fetch size limit? > >> > > >> > I'm asking because I'm regularly getting InvalidMessageSizeException > >> after > >> > a consumer fetch, and I can see this topic being discussed in other > mail > >> > threads already, and I've also used the DumpLogSegments to check the > >> kafka > >> > file reported in the exception, nothing negative reported by the tool > in > >> > terms of the offset > >> > > >> > I'm using GZIP compression to send msgs to kafka, and a limit of 3MB > of > >> raw > >> > data batch has been imposed and the batch size will be smaller once > >> GZIP'ed > >> > to kafka > >> > > >> > On the consumer the fetch size is set to 10 MB to allow for plenty of > >> > leeway to fetch at least 1 full batch of GZIP'd content > >> > > >> > what else can be wrong in this setup ? > >> > > >> > -- > >> > Rgds > >> > Ray > >> > > >> > > > > > > > > -- > > Rgds > > Ray > > > > > > -- > Rgds > Ray >