Raymond,

The consumer iterator will decompress the messages and then return to
the client. So you don't need to explicitly decompress data.

Thanks,
Neha

On Tue, Sep 11, 2012 at 8:30 AM, Raymond Ng <raymond...@gmail.com> wrote:
> Hi all
>
> I'm using kafka 0.7.0 and trying to process a message thats been GZIP
> compressed
>
> my test program which writes to a kafka queue has compression.codec = "1",
> and i can see the message has been written to the kafka server in a
> compressed format
>
> but I'm having trouble decompressing the message on the other end, my code
> as follows
>
> private void getMessage() {
>    SimpleConsumer consumer =
> kafkaPartitionsConns.getConsumer(currentPartition);
>    int hostPartition =
> kafkaPartitionsConns.getHostPartition(currentPartition);
>
>    ByteBufferMessageSet msgs = consumer.fetch(
>      new FetchRequest(
>        kafkaConf.topic,
>        hostPartition,
>        msgOffset,
>        kafkaConf.fetchSizeBytes));
>    java.util.Iterator <MessageAndOffset> compressedMsgSet = msgs.iterator();
>
>    while(compressedMsgSet.hasNext()) {
>     MessageAndOffset mao = compressedMsgSet.next();
>     Message msg = mao.message();
>     logger.info("msg : "+msg.toString());
>     Iterator decompressedMsgSet =
> CompressionUtils.decompress(mao.message()).iterator();
>     while (decompressedMsgSet.hasNext()) {
>      msgAwaiting.add(decompressedMsgSet.next());
>     }
>    }
> }
>
> 2012-09-11 16:16:24,581  INFO [Thread-3] KafkaTest.java - msg :
> message(magic = 1, attributes = 0, crc = 3908262406, payload =
> java.nio.HeapByteBuffer[pos=0 lim=7 cap=7])
> 2012-09-11 16:16:24,582 ERROR [Thread-3] PollableSourceRunner.java -
> Unhandled exception, logging and sleeping for 5000ms
> kafka.common.UnknownCodecException: Unknown Codec: NoCompressionCodec
>  at kafka.message.CompressionUtils$.decompress(CompressionUtils.scala:142)
>  at kafka.message.CompressionUtils.decompress(CompressionUtils.scala)
>
> I need to parse the message back to String for further processing, but
> can't work out why the original message didn't carry any compressionCodec
> any advice will be appreciated
>
> --
> Rgds
> Ray

Reply via email to