Answered my own question: The gzipped payload is an encoded MessageSet itself, not just the gzip of a single message's payload.
-David On Sep 26, 2012, at 7:43 PM, David Arthur wrote: > I'm working on adding gzip support to the Python client, and I'm running into > some issues. I think I might not understand exactly how the compression is > supposed to be implemented. > > My initial approach was to set the compression byte to 1 to indicate gzip, > and then simply gzip the payload. Here is an example request sent to Kafka > (with byte-by-byte breakdown). I am sending the payload "test" to "my-topic" > partition 0: > > \x00\x00\x006\x00\x00\x00\x08my-topic\x00\x00\x00\x00\x00\x00\x00"\x00\x00\x00\x1e\x01\x011\xc6\xb4\x08\x1f\x8b\x08\x00\x97\x91cP\x02\xff+I-.\x01\x00\x0c~\x7f\xd8\x04\x00\x00\x00 > > \x00 \x00 \x00 6 > '--------------' > request length = 54 > > \x00 \x00 > '-------' > type = 0 > > \x00 \x08 m y - t o p i c > '-------' '-------------' > len = 8 topic > > \x00 \x00 \x00 \x00 > '-----------------' > partition = 0 > > \x00 \x00 \x00 " > '--------------' > messageset length = 34 > > \x00 \x00 \x00 \x1e > '-----------------' > message length = 30 > > \x01 magic = 1 > \x01 compression = 1 > \xbb\x83\x82\xe0 checksum = -1149009184 > > \x1f\x8b\x08\x00\x8b\x8acP\x02\xff+I-.\x01\x00\x0c~\x7f\xd8\x04\x00\x00\x00 = > "test" gzipped, length=24 > > This all seems fine to me, but Kafka is throwing a strange error message: > > [2012-09-26 19:36:55,253] TRACE 54 bytes read from /127.0.0.1:64196 > (kafka.network.Processor) > [2012-09-26 19:36:55,253] TRACE Handling produce request from > /127.0.0.1:64196 (kafka.request.logger) > [2012-09-26 19:36:55,256] TRACE Producer request > ProducerRequest(my-topic,0,34) (kafka.request.logger) > [2012-09-26 19:36:55,259] DEBUG makeNext() in internalIterator: innerDone = > true (kafka.message.ByteBufferMessageSet) > [2012-09-26 19:36:55,260] TRACE Remaining bytes in iterator = 30 > (kafka.message.ByteBufferMessageSet) > [2012-09-26 19:36:55,260] TRACE size of data = 30 > (kafka.message.ByteBufferMessageSet) > [2012-09-26 19:36:55,264] DEBUG Message is compressed. Valid byte count = 0 > (kafka.message.ByteBufferMessageSet) > [2012-09-26 19:36:55,276] DEBUG makeNext() in internalIterator: innerDone = > true (kafka.message.ByteBufferMessageSet) > [2012-09-26 19:36:55,276] TRACE Remaining bytes in iterator = 0 > (kafka.message.ByteBufferMessageSet) > [2012-09-26 19:36:55,276] TRACE size of data = 1952805748 > (kafka.message.ByteBufferMessageSet) > [2012-09-26 19:36:55,277] ERROR Error processing ProduceRequest on my-topic:0 > (kafka.server.KafkaRequestHandlers) > kafka.common.InvalidMessageSizeException: invalid message size: 1952805748 > only received bytes: 0 at 0( possible causes (1) a single message larger than > the fetch size; (2) log corruption ) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:120) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:149) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > at kafka.message.MessageSet.foreach(MessageSet.scala:87) > at kafka.log.Log.append(Log.scala:205) > at > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > at > kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) > at > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > at > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > at kafka.network.Processor.handle(SocketServer.scala:296) > at kafka.network.Processor.read(SocketServer.scala:319) > at kafka.network.Processor.run(SocketServer.scala:214) > at java.lang.Thread.run(Thread.java:680) > > The strangest part is that the "invalid message size" 1952805748 is the > decompressed message payload "test" represented as an int32. Any ideas? > > -David >