I've got this now, so I can just use the offset() value directly for
my next fetch

now I just need to work out a better way to handle failure and
minimize duplicated msgs in a compressed msg

thanks
Ray

On 9/27/12, Raymond Ng <raymond...@gmail.com> wrote:
> Hi Jun
>
> so to work out the offset for the next compressed message within the
> ByteBufferMessageSet, which method can give me this info?  I've come
> across shallow iterator but apparantly this is not exposed externally.
>
> I need the correct offset for the next fetch, currently using 0.7.1
>
> thanks
> Ray
>
> On 9/27/12, Jun Rao <jun...@gmail.com> wrote:
>> MessageAndOffset.offset() gives you the offset of the next message and
>> you
>> can use it directly. For compressed messages, offset will only move until
>> we crosses the compressed batch.
>>
>> Thanks,
>>
>> Jun
>>
>> On Wed, Sep 26, 2012 at 11:58 AM, Raymond Ng <raymond...@gmail.com>
>> wrote:
>>
>>> Hi Jun
>>>
>>> just to give you abit more info in terms of how I work out the offset
>>> for
>>> the next fetch, please advice if this is correct
>>>
>>>                 ByteBufferMessageSet msgs = consumer.fetch(
>>>                         new FetchRequest(
>>>                                 kafkaConf.topic,
>>>                                 hostPartition,
>>>                                 offsetStart,
>>>                                 kafkaConf.fetchSizeBytes));
>>>
>>>                     java.util.Iterator <MessageAndOffset> msgSet =
>>> msgs.iterator();
>>>                     Long finalOffset = 0L;
>>>                     int cnt = 0;
>>>
>>>                     while(msgSet.hasNext()) {
>>>                         cnt++;
>>>                         MessageAndOffset mao = msgSet.next();
>>>                         Long off = mao.offset();
>>>
>>>                         if (off.compareTo(finalOffset) > 0) finalOffset
>>> =
>>> off;
>>>
>>>                         if (maoMap == null) maoMap = new
>>> HashMap<Integer,MessageAndOffset>();
>>>
>>>                         maoMap.put(cnt,mao);
>>>                         if (msgAwaiting == null) msgAwaiting = new
>>> LinkedList<Integer>();
>>>                         msgAwaiting.add(cnt);
>>>                     }
>>>                     finalOffset += msgs.getInitialOffset();
>>>                     offsetStart = finalOffset;  // for next fetch
>>> Ray
>>>
>>> On Wed, Sep 26, 2012 at 6:53 PM, Raymond Ng <raymond...@gmail.com>
>>> wrote:
>>>
>>> > Hi Jun,
>>> >
>>> > thanks for your reply, however I'm still getting the
>>> > InvalidMessageSize
>>> > exception, as I mentioned before my consumer fetchsize is set to 10MB
>>> > and
>>> > the producer is batching the messages upto 3MB and GZIP'ed when
>>> > sending
>>> to
>>> > kafka.  DumpLogSegments reported no error (no result when greping
>>> "isvalid:
>>> > false").
>>> >
>>> > I'm also getting the following error from the broker server,
>>> > appreciate
>>> > your advice on this
>>> >
>>> > [2012-09-26 17:31:42,921] ERROR Error processing ProduceRequest on
>>> > probe-data26:0 (kafka.server.KafkaRequestHandlers)
>>> > kafka.message.InvalidMessageException: message is invalid, compression
>>> > codec: GZIPCompressionCodec size: 320200 curr offset: 0 init offset: 0
>>> >     at
>>> >
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>>> >     at
>>> >
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
>>> >     at
>>> >
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>>> >     at
>>> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>>> >     at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>>> >     at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>>> >     at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>>> >     at
>>> > scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>>> >     at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>>> >     at kafka.log.Log.append(Log.scala:205)
>>> >     at
>>> >
>>> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
>>> >     at
>>> >
>>> kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
>>> >     at
>>> >
>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
>>> >     at
>>> >
>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
>>> >     at kafka.network.Processor.handle(SocketServer.scala:296)
>>> >     at kafka.network.Processor.read(SocketServer.scala:319)
>>> >     at kafka.network.Processor.run(SocketServer.scala:214)
>>> >     at java.lang.Thread.run(Thread.java:636)
>>> > [2012-09-26 17:31:43,001] ERROR Closing socket for
>>> /192.168.210.35because of error (kafka.network.Processor)
>>> > kafka.message.InvalidMessageException: message is invalid, compression
>>> > codec: GZIPCompressionCodec size: 320200 curr offset: 0 init offset: 0
>>> >     at
>>> >
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>>> >     at
>>> >
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
>>> >     at
>>> >
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>>> >     at
>>> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>>> >     at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>>> >     at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>>> >     at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>>> >     at
>>> > scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>>> >     at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>>> >     at kafka.log.Log.append(Log.scala:205)
>>> >     at
>>> >
>>> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
>>> >     at
>>> >
>>> kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
>>> >     at
>>> >
>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
>>> >     at
>>> >
>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
>>> >     at kafka.network.Processor.handle(SocketServer.scala:296)
>>> >     at kafka.network.Processor.read(SocketServer.scala:319)
>>> >     at kafka.network.Processor.run(SocketServer.scala:214)
>>> >     at java.lang.Thread.run(Thread.java:636)
>>> > [2012-09-26 17:31:43,371] ERROR Error processing ProduceRequest on
>>> > probe-data26:0 (kafka.server.KafkaRequestHandlers)
>>> > kafka.message.InvalidMessageException: message is invalid, compression
>>> > codec: GZIPCompressionCodec size: 102923 curr offset: 0 init offset: 0
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > On Tue, Sep 25, 2012 at 6:20 PM, Jun Rao <jun...@gmail.com> wrote:
>>> >
>>> >> Raymond,
>>> >>
>>> >> Your understanding is correct. There are 2 common reasons for
>>> >> getting InvalidMessageSizeException: (1) fetch size smaller than the
>>> >> largest message (compressed messages in the same batch is treated as
>>> >> 1
>>> >> message), (2) the offset in the fetch request is wrong. Both can be
>>> >> verified using the DumpLogSegment tool.
>>> >>
>>> >> Thanks,
>>> >>
>>> >> Jun
>>> >>
>>> >> On Tue, Sep 25, 2012 at 7:25 AM, Raymond Ng <raymond...@gmail.com>
>>> wrote:
>>> >>
>>> >> > Hi
>>> >> >
>>> >> > I'd like to know how SimpleConsumer.fetch and
>>> >> > ByteBufferMessageSet.iterartor work in terms of dealing with GZIP'd
>>> >> > messages
>>> >> >
>>> >> > is it right to say that SimpleConsumer.fetch will fetch upto the
>>> amount
>>> >> > specified in the fetch size regardless whether a complete GZIP
>>> >> > batch
>>> has
>>> >> > been retrieved, and ByteBufferMessageSet.iterartor is responsible
>>> >> > for
>>> >> > converting the fetched MessageSet into a set of complete
>>> >> MessageAndOffset
>>> >> > objects and disgarding any incomplete batch due to fetch size
>>> >> > limit?
>>> >> >
>>> >> > I'm asking because I'm regularly getting
>>> >> > InvalidMessageSizeException
>>> >> after
>>> >> > a consumer fetch, and I can see this topic being discussed in other
>>> mail
>>> >> > threads already, and I've also used the DumpLogSegments to check
>>> >> > the
>>> >> kafka
>>> >> > file reported in the exception, nothing negative reported by the
>>> >> > tool
>>> in
>>> >> > terms of the offset
>>> >> >
>>> >> > I'm using GZIP compression to send msgs to kafka, and a limit of
>>> >> > 3MB
>>> of
>>> >> raw
>>> >> > data batch has been imposed and the batch size will be smaller once
>>> >> GZIP'ed
>>> >> > to kafka
>>> >> >
>>> >> > On the consumer the fetch size is set to 10 MB to allow for plenty
>>> >> > of
>>> >> > leeway to fetch at least 1 full batch of GZIP'd content
>>> >> >
>>> >> > what else can be wrong in this setup ?
>>> >> >
>>> >> > --
>>> >> > Rgds
>>> >> > Ray
>>> >> >
>>> >>
>>> >
>>> >
>>> >
>>> > --
>>> > Rgds
>>> > Ray
>>> >
>>>
>>>
>>>
>>> --
>>> Rgds
>>> Ray
>>>
>>
>
>
> --
> Rgds
> Ray
>


-- 
Rgds
Ray

Reply via email to