Answered my own question:

The gzipped payload is an encoded MessageSet itself, not just the gzip of a 
single message's payload.

-David

On Sep 26, 2012, at 7:43 PM, David Arthur wrote:

> I'm working on adding gzip support to the Python client, and I'm running into 
> some issues. I think I might not understand exactly how the compression is 
> supposed to be implemented.
> 
> My initial approach was to set the compression byte to 1 to indicate gzip, 
> and then simply gzip the payload. Here is an example request sent to Kafka 
> (with byte-by-byte breakdown). I am sending the payload "test" to "my-topic" 
> partition 0:
> 
> \x00\x00\x006\x00\x00\x00\x08my-topic\x00\x00\x00\x00\x00\x00\x00"\x00\x00\x00\x1e\x01\x011\xc6\xb4\x08\x1f\x8b\x08\x00\x97\x91cP\x02\xff+I-.\x01\x00\x0c~\x7f\xd8\x04\x00\x00\x00
> 
> \x00 \x00 \x00 6  
> '--------------'
>  request length = 54
> 
> \x00 \x00
> '-------'
>   type = 0
> 
> \x00 \x08 m y - t o p i c
> '-------' '-------------'
>   len = 8     topic
> 
> \x00 \x00 \x00 \x00
> '-----------------'
>   partition = 0
>   
> \x00 \x00 \x00 "
> '--------------'
>  messageset length = 34
> 
> \x00 \x00 \x00 \x1e
> '-----------------'
>   message length = 30
> 
> \x01 magic = 1
> \x01 compression = 1
> \xbb\x83\x82\xe0 checksum = -1149009184
> 
> \x1f\x8b\x08\x00\x8b\x8acP\x02\xff+I-.\x01\x00\x0c~\x7f\xd8\x04\x00\x00\x00 = 
> "test" gzipped, length=24
> 
> This all seems fine to me, but Kafka is throwing a strange error message:
> 
> [2012-09-26 19:36:55,253] TRACE 54 bytes read from /127.0.0.1:64196 
> (kafka.network.Processor)
> [2012-09-26 19:36:55,253] TRACE Handling produce request from 
> /127.0.0.1:64196 (kafka.request.logger)
> [2012-09-26 19:36:55,256] TRACE Producer request 
> ProducerRequest(my-topic,0,34) (kafka.request.logger)
> [2012-09-26 19:36:55,259] DEBUG makeNext() in internalIterator: innerDone = 
> true (kafka.message.ByteBufferMessageSet)
> [2012-09-26 19:36:55,260] TRACE Remaining bytes in iterator = 30 
> (kafka.message.ByteBufferMessageSet)
> [2012-09-26 19:36:55,260] TRACE size of data = 30 
> (kafka.message.ByteBufferMessageSet)
> [2012-09-26 19:36:55,264] DEBUG Message is compressed. Valid byte count = 0 
> (kafka.message.ByteBufferMessageSet)
> [2012-09-26 19:36:55,276] DEBUG makeNext() in internalIterator: innerDone = 
> true (kafka.message.ByteBufferMessageSet)
> [2012-09-26 19:36:55,276] TRACE Remaining bytes in iterator = 0 
> (kafka.message.ByteBufferMessageSet)
> [2012-09-26 19:36:55,276] TRACE size of data = 1952805748 
> (kafka.message.ByteBufferMessageSet)
> [2012-09-26 19:36:55,277] ERROR Error processing ProduceRequest on my-topic:0 
> (kafka.server.KafkaRequestHandlers)
> kafka.common.InvalidMessageSizeException: invalid message size: 1952805748 
> only received bytes: 0 at 0( possible causes (1) a single message larger than 
> the fetch size; (2) log corruption )
>       at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:120)
>       at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
>       at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>       at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>       at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>       at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:149)
>       at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
>       at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>       at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>       at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>       at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>       at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>       at kafka.log.Log.append(Log.scala:205)
>       at 
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
>       at 
> kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
>       at 
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
>       at 
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
>       at kafka.network.Processor.handle(SocketServer.scala:296)
>       at kafka.network.Processor.read(SocketServer.scala:319)
>       at kafka.network.Processor.run(SocketServer.scala:214)
>       at java.lang.Thread.run(Thread.java:680)
> 
> The strangest part is that the "invalid message size" 1952805748 is the 
> decompressed message payload "test" represented as an int32. Any ideas?
> 
> -David
> 

Reply via email to