Hi Jun

just to give you abit more info in terms of how I work out the offset for
the next fetch, please advice if this is correct

                ByteBufferMessageSet msgs = consumer.fetch(
                        new FetchRequest(
                                kafkaConf.topic,
                                hostPartition,
                                offsetStart,
                                kafkaConf.fetchSizeBytes));

                    java.util.Iterator <MessageAndOffset> msgSet =
msgs.iterator();
                    Long finalOffset = 0L;
                    int cnt = 0;

                    while(msgSet.hasNext()) {
                        cnt++;
                        MessageAndOffset mao = msgSet.next();
                        Long off = mao.offset();

                        if (off.compareTo(finalOffset) > 0) finalOffset =
off;

                        if (maoMap == null) maoMap = new
HashMap<Integer,MessageAndOffset>();

                        maoMap.put(cnt,mao);
                        if (msgAwaiting == null) msgAwaiting = new
LinkedList<Integer>();
                        msgAwaiting.add(cnt);
                    }
                    finalOffset += msgs.getInitialOffset();
                    offsetStart = finalOffset;  // for next fetch
Ray

On Wed, Sep 26, 2012 at 6:53 PM, Raymond Ng <raymond...@gmail.com> wrote:

> Hi Jun,
>
> thanks for your reply, however I'm still getting the InvalidMessageSize
> exception, as I mentioned before my consumer fetchsize is set to 10MB and
> the producer is batching the messages upto 3MB and GZIP'ed when sending to
> kafka.  DumpLogSegments reported no error (no result when greping "isvalid:
> false").
>
> I'm also getting the following error from the broker server, appreciate
> your advice on this
>
> [2012-09-26 17:31:42,921] ERROR Error processing ProduceRequest on
> probe-data26:0 (kafka.server.KafkaRequestHandlers)
> kafka.message.InvalidMessageException: message is invalid, compression
> codec: GZIPCompressionCodec size: 320200 curr offset: 0 init offset: 0
>     at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>     at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
>     at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>     at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>     at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>     at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>     at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>     at kafka.log.Log.append(Log.scala:205)
>     at
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
>     at
> kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
>     at
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
>     at
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
>     at kafka.network.Processor.handle(SocketServer.scala:296)
>     at kafka.network.Processor.read(SocketServer.scala:319)
>     at kafka.network.Processor.run(SocketServer.scala:214)
>     at java.lang.Thread.run(Thread.java:636)
> [2012-09-26 17:31:43,001] ERROR Closing socket for /192.168.210.35because of 
> error (kafka.network.Processor)
> kafka.message.InvalidMessageException: message is invalid, compression
> codec: GZIPCompressionCodec size: 320200 curr offset: 0 init offset: 0
>     at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>     at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
>     at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>     at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>     at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>     at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>     at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>     at kafka.log.Log.append(Log.scala:205)
>     at
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
>     at
> kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
>     at
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
>     at
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
>     at kafka.network.Processor.handle(SocketServer.scala:296)
>     at kafka.network.Processor.read(SocketServer.scala:319)
>     at kafka.network.Processor.run(SocketServer.scala:214)
>     at java.lang.Thread.run(Thread.java:636)
> [2012-09-26 17:31:43,371] ERROR Error processing ProduceRequest on
> probe-data26:0 (kafka.server.KafkaRequestHandlers)
> kafka.message.InvalidMessageException: message is invalid, compression
> codec: GZIPCompressionCodec size: 102923 curr offset: 0 init offset: 0
>
>
>
>
>
> On Tue, Sep 25, 2012 at 6:20 PM, Jun Rao <jun...@gmail.com> wrote:
>
>> Raymond,
>>
>> Your understanding is correct. There are 2 common reasons for
>> getting InvalidMessageSizeException: (1) fetch size smaller than the
>> largest message (compressed messages in the same batch is treated as 1
>> message), (2) the offset in the fetch request is wrong. Both can be
>> verified using the DumpLogSegment tool.
>>
>> Thanks,
>>
>> Jun
>>
>> On Tue, Sep 25, 2012 at 7:25 AM, Raymond Ng <raymond...@gmail.com> wrote:
>>
>> > Hi
>> >
>> > I'd like to know how SimpleConsumer.fetch and
>> > ByteBufferMessageSet.iterartor work in terms of dealing with GZIP'd
>> > messages
>> >
>> > is it right to say that SimpleConsumer.fetch will fetch upto the amount
>> > specified in the fetch size regardless whether a complete GZIP batch has
>> > been retrieved, and ByteBufferMessageSet.iterartor is responsible for
>> > converting the fetched MessageSet into a set of complete
>> MessageAndOffset
>> > objects and disgarding any incomplete batch due to fetch size limit?
>> >
>> > I'm asking because I'm regularly getting InvalidMessageSizeException
>> after
>> > a consumer fetch, and I can see this topic being discussed in other mail
>> > threads already, and I've also used the DumpLogSegments to check the
>> kafka
>> > file reported in the exception, nothing negative reported by the tool in
>> > terms of the offset
>> >
>> > I'm using GZIP compression to send msgs to kafka, and a limit of 3MB of
>> raw
>> > data batch has been imposed and the batch size will be smaller once
>> GZIP'ed
>> > to kafka
>> >
>> > On the consumer the fetch size is set to 10 MB to allow for plenty of
>> > leeway to fetch at least 1 full batch of GZIP'd content
>> >
>> > what else can be wrong in this setup ?
>> >
>> > --
>> > Rgds
>> > Ray
>> >
>>
>
>
>
> --
> Rgds
> Ray
>



-- 
Rgds
Ray

Reply via email to