I am writing an erlang driver for Kafka. I am using the spec from https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka. Just learnt something that I thought should be the ML for someone developing a different driver.
My specific issue currently has to do with the PRODUCE request. It appears that the request header is getting parsed correctly. BUT the specific message does not seem to parsed with the topic of "test" and the message of "hi", partition 0, magic 0, compression 0. [2012-09-26 22:55:30,131] INFO Created log for 'test'-0 (kafka.log.LogManager) [2012-09-26 22:55:30,134] INFO Begin registering broker topic /brokers/topics/test/0 with 1 partitions (kafka.server.KafkaZooKeeper) [2012-09-26 22:55:30,138] ERROR Error processing ProduceRequest on test:0 (kafka.server.KafkaRequestHandlers) kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 8 curr offset: 0 init offset: 0 at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) at kafka.message.MessageSet.foreach(MessageSet.scala:87) at kafka.log.Log.append(Log.scala:205) at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) at kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) at kafka.network.Processor.handle(SocketServer.scala:296) at kafka.network.Processor.read(SocketServer.scala:319) at kafka.network.Processor.run(SocketServer.scala:214) at java.lang.Thread.run(Thread.java:679) [2012-09-26 22:55:30,143] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor) kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 8 curr offset: 0 init offset: 0 at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) at kafka.message.MessageSet.foreach(MessageSet.scala:87) at kafka.log.Log.append(Log.scala:205) at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) at kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) at kafka.network.Processor.handle(SocketServer.scala:296) at kafka.network.Processor.read(SocketServer.scala:319) at kafka.network.Processor.run(SocketServer.scala:214) at java.lang.Thread.run(Thread.java:679) [2012-09-26 22:55:30,233] INFO End registering broker topic /brokers/topics/test/0 (kafka.server.KafkaZooKeeper) ^C[2012-09-26 22:56:18,290] INFO Shutting down Kafka server (kafka.server.KafkaServer) [2012-09-26 22:56:18,292] INFO shutdown scheduler kafka-logcleaner- (kafka.utils.KafkaScheduler) [2012-09-26 22:56:18,304] INFO shutdown scheduler kafka-logflusher- (kafka.utils.KafkaScheduler) [2012-09-26 22:56:18,364] INFO Closing zookeeper client... (kafka.server.KafkaZooKeeper) [2012-09-26 22:56:18,364] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2012-09-26 22:56:18,364] INFO zkActor stopped (kafka.log.LogManager) [2012-09-26 22:56:18,375] INFO EventThread shut down (org.apache.zookeeper.ClientCnxn) [2012-09-26 22:56:18,376] INFO Session: 0x13a0647aa7d0000 closed (org.apache.zookeeper.ZooKeeper) [2012-09-26 22:56:18,376] INFO Kafka server shut down completed (kafka.server.KafkaServer) BUT the specific message *does GET* parsed with the topic of "test" and the message of "hi", partition 0, *magic 1*, compression 0. *It's a case of RTFM. * I produce the following byte map. |------- REQUEST HEADER ---------|-----MESSAGES_LENGTH ---|------MESSAGE ---| 16 bytes 4 bytes 12 bytes REQUEST_HEADER : 16 bytes | -- REQUEST_LENGTH-|---RQ-TYPE --|-TP_LENGTH -|-----TOPIC ----------------------------------|---------PARTITION ------------| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 0 0 0 28 0 0 0 4 116 101 115 116 0 0 0 0 t e s t MESSAGES_LENGTH : 4 bytes 1 2 3 4 0 0 0 12 MESSAGE : 12 BYTES |----- LENGTH --------------| M C |-----------CRC32-----------|-------PAYLOAD---- -| 1 2 3 4 5 6 7 8 9 10 11 12 0 0 0 8 1 0 216 147 42 172 104 105 h i Thanks Milind A PRODUCE request is REQUEST_HEADER MESSAGES_LENGTH MESSAGES The REQUEST_HEADER Request_Length:32/integer % length of entire request except this field in 4 bytes 0:16/integer % Request type for produce is 0 in 2 bytes TopicSize:16/integer % Topic Size in 2 bytes Topic/binary % Topic in variable bytes Partition:32/integer % Partition in 4 bytes The MESSAGES_LENGTH Messages_Length:32/integer % Length in bytes of the MESSAGES section The MESSAGES Length:32/integer % Length in bytes of entire message excluding this field in 4 bytes Magic:8/integer % Magic Number (0|1) in 1 byte Compression:8/integer % Compression (0|1|2) in 1 bytes Checksum:32/integer % CRC32 checksum in 4 bytes Payload % Message Payload in variable bytes