I've got this now, so I can just use the offset() value directly for my next fetch
now I just need to work out a better way to handle failure and minimize duplicated msgs in a compressed msg thanks Ray On 9/27/12, Raymond Ng <raymond...@gmail.com> wrote: > Hi Jun > > so to work out the offset for the next compressed message within the > ByteBufferMessageSet, which method can give me this info? I've come > across shallow iterator but apparantly this is not exposed externally. > > I need the correct offset for the next fetch, currently using 0.7.1 > > thanks > Ray > > On 9/27/12, Jun Rao <jun...@gmail.com> wrote: >> MessageAndOffset.offset() gives you the offset of the next message and >> you >> can use it directly. For compressed messages, offset will only move until >> we crosses the compressed batch. >> >> Thanks, >> >> Jun >> >> On Wed, Sep 26, 2012 at 11:58 AM, Raymond Ng <raymond...@gmail.com> >> wrote: >> >>> Hi Jun >>> >>> just to give you abit more info in terms of how I work out the offset >>> for >>> the next fetch, please advice if this is correct >>> >>> ByteBufferMessageSet msgs = consumer.fetch( >>> new FetchRequest( >>> kafkaConf.topic, >>> hostPartition, >>> offsetStart, >>> kafkaConf.fetchSizeBytes)); >>> >>> java.util.Iterator <MessageAndOffset> msgSet = >>> msgs.iterator(); >>> Long finalOffset = 0L; >>> int cnt = 0; >>> >>> while(msgSet.hasNext()) { >>> cnt++; >>> MessageAndOffset mao = msgSet.next(); >>> Long off = mao.offset(); >>> >>> if (off.compareTo(finalOffset) > 0) finalOffset >>> = >>> off; >>> >>> if (maoMap == null) maoMap = new >>> HashMap<Integer,MessageAndOffset>(); >>> >>> maoMap.put(cnt,mao); >>> if (msgAwaiting == null) msgAwaiting = new >>> LinkedList<Integer>(); >>> msgAwaiting.add(cnt); >>> } >>> finalOffset += msgs.getInitialOffset(); >>> offsetStart = finalOffset; // for next fetch >>> Ray >>> >>> On Wed, Sep 26, 2012 at 6:53 PM, Raymond Ng <raymond...@gmail.com> >>> wrote: >>> >>> > Hi Jun, >>> > >>> > thanks for your reply, however I'm still getting the >>> > InvalidMessageSize >>> > exception, as I mentioned before my consumer fetchsize is set to 10MB >>> > and >>> > the producer is batching the messages upto 3MB and GZIP'ed when >>> > sending >>> to >>> > kafka. DumpLogSegments reported no error (no result when greping >>> "isvalid: >>> > false"). >>> > >>> > I'm also getting the following error from the broker server, >>> > appreciate >>> > your advice on this >>> > >>> > [2012-09-26 17:31:42,921] ERROR Error processing ProduceRequest on >>> > probe-data26:0 (kafka.server.KafkaRequestHandlers) >>> > kafka.message.InvalidMessageException: message is invalid, compression >>> > codec: GZIPCompressionCodec size: 320200 curr offset: 0 init offset: 0 >>> > at >>> > >>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) >>> > at >>> > >>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) >>> > at >>> > >>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) >>> > at >>> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) >>> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) >>> > at scala.collection.Iterator$class.foreach(Iterator.scala:631) >>> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) >>> > at >>> > scala.collection.IterableLike$class.foreach(IterableLike.scala:79) >>> > at kafka.message.MessageSet.foreach(MessageSet.scala:87) >>> > at kafka.log.Log.append(Log.scala:205) >>> > at >>> > >>> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) >>> > at >>> > >>> kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) >>> > at >>> > >>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) >>> > at >>> > >>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) >>> > at kafka.network.Processor.handle(SocketServer.scala:296) >>> > at kafka.network.Processor.read(SocketServer.scala:319) >>> > at kafka.network.Processor.run(SocketServer.scala:214) >>> > at java.lang.Thread.run(Thread.java:636) >>> > [2012-09-26 17:31:43,001] ERROR Closing socket for >>> /192.168.210.35because of error (kafka.network.Processor) >>> > kafka.message.InvalidMessageException: message is invalid, compression >>> > codec: GZIPCompressionCodec size: 320200 curr offset: 0 init offset: 0 >>> > at >>> > >>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) >>> > at >>> > >>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) >>> > at >>> > >>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) >>> > at >>> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) >>> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) >>> > at scala.collection.Iterator$class.foreach(Iterator.scala:631) >>> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) >>> > at >>> > scala.collection.IterableLike$class.foreach(IterableLike.scala:79) >>> > at kafka.message.MessageSet.foreach(MessageSet.scala:87) >>> > at kafka.log.Log.append(Log.scala:205) >>> > at >>> > >>> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) >>> > at >>> > >>> kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) >>> > at >>> > >>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) >>> > at >>> > >>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) >>> > at kafka.network.Processor.handle(SocketServer.scala:296) >>> > at kafka.network.Processor.read(SocketServer.scala:319) >>> > at kafka.network.Processor.run(SocketServer.scala:214) >>> > at java.lang.Thread.run(Thread.java:636) >>> > [2012-09-26 17:31:43,371] ERROR Error processing ProduceRequest on >>> > probe-data26:0 (kafka.server.KafkaRequestHandlers) >>> > kafka.message.InvalidMessageException: message is invalid, compression >>> > codec: GZIPCompressionCodec size: 102923 curr offset: 0 init offset: 0 >>> > >>> > >>> > >>> > >>> > >>> > On Tue, Sep 25, 2012 at 6:20 PM, Jun Rao <jun...@gmail.com> wrote: >>> > >>> >> Raymond, >>> >> >>> >> Your understanding is correct. There are 2 common reasons for >>> >> getting InvalidMessageSizeException: (1) fetch size smaller than the >>> >> largest message (compressed messages in the same batch is treated as >>> >> 1 >>> >> message), (2) the offset in the fetch request is wrong. Both can be >>> >> verified using the DumpLogSegment tool. >>> >> >>> >> Thanks, >>> >> >>> >> Jun >>> >> >>> >> On Tue, Sep 25, 2012 at 7:25 AM, Raymond Ng <raymond...@gmail.com> >>> wrote: >>> >> >>> >> > Hi >>> >> > >>> >> > I'd like to know how SimpleConsumer.fetch and >>> >> > ByteBufferMessageSet.iterartor work in terms of dealing with GZIP'd >>> >> > messages >>> >> > >>> >> > is it right to say that SimpleConsumer.fetch will fetch upto the >>> amount >>> >> > specified in the fetch size regardless whether a complete GZIP >>> >> > batch >>> has >>> >> > been retrieved, and ByteBufferMessageSet.iterartor is responsible >>> >> > for >>> >> > converting the fetched MessageSet into a set of complete >>> >> MessageAndOffset >>> >> > objects and disgarding any incomplete batch due to fetch size >>> >> > limit? >>> >> > >>> >> > I'm asking because I'm regularly getting >>> >> > InvalidMessageSizeException >>> >> after >>> >> > a consumer fetch, and I can see this topic being discussed in other >>> mail >>> >> > threads already, and I've also used the DumpLogSegments to check >>> >> > the >>> >> kafka >>> >> > file reported in the exception, nothing negative reported by the >>> >> > tool >>> in >>> >> > terms of the offset >>> >> > >>> >> > I'm using GZIP compression to send msgs to kafka, and a limit of >>> >> > 3MB >>> of >>> >> raw >>> >> > data batch has been imposed and the batch size will be smaller once >>> >> GZIP'ed >>> >> > to kafka >>> >> > >>> >> > On the consumer the fetch size is set to 10 MB to allow for plenty >>> >> > of >>> >> > leeway to fetch at least 1 full batch of GZIP'd content >>> >> > >>> >> > what else can be wrong in this setup ? >>> >> > >>> >> > -- >>> >> > Rgds >>> >> > Ray >>> >> > >>> >> >>> > >>> > >>> > >>> > -- >>> > Rgds >>> > Ray >>> > >>> >>> >>> >>> -- >>> Rgds >>> Ray >>> >> > > > -- > Rgds > Ray > -- Rgds Ray