I am writing an erlang driver for Kafka. I am using the spec from
https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka.
Just learnt something that I thought should be the ML for someone
developing a different driver.

My specific issue currently has to do with the PRODUCE request. It appears
that the request header is getting parsed correctly. BUT the specific
message does not seem to parsed with the topic of "test" and the message of
"hi", partition 0, magic 0, compression 0.


[2012-09-26 22:55:30,131] INFO Created log for 'test'-0
(kafka.log.LogManager)
[2012-09-26 22:55:30,134] INFO Begin registering broker topic
/brokers/topics/test/0 with 1 partitions (kafka.server.KafkaZooKeeper)
[2012-09-26 22:55:30,138] ERROR Error processing ProduceRequest on test:0
(kafka.server.KafkaRequestHandlers)
kafka.message.InvalidMessageException: message is invalid, compression
codec: NoCompressionCodec size: 8 curr offset: 0 init offset: 0
    at
kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
    at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
    at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
    at
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
    at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
    at scala.collection.Iterator$class.foreach(Iterator.scala:631)
    at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
    at kafka.message.MessageSet.foreach(MessageSet.scala:87)
    at kafka.log.Log.append(Log.scala:205)
    at
kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
    at
kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
    at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
    at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
    at kafka.network.Processor.handle(SocketServer.scala:296)
    at kafka.network.Processor.read(SocketServer.scala:319)
    at kafka.network.Processor.run(SocketServer.scala:214)
    at java.lang.Thread.run(Thread.java:679)
[2012-09-26 22:55:30,143] ERROR Closing socket for /127.0.0.1 because of
error (kafka.network.Processor)
kafka.message.InvalidMessageException: message is invalid, compression
codec: NoCompressionCodec size: 8 curr offset: 0 init offset: 0
    at
kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
    at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
    at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
    at
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
    at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
    at scala.collection.Iterator$class.foreach(Iterator.scala:631)
    at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
    at kafka.message.MessageSet.foreach(MessageSet.scala:87)
    at kafka.log.Log.append(Log.scala:205)
    at
kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
    at
kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
    at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
    at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
    at kafka.network.Processor.handle(SocketServer.scala:296)
    at kafka.network.Processor.read(SocketServer.scala:319)
    at kafka.network.Processor.run(SocketServer.scala:214)
    at java.lang.Thread.run(Thread.java:679)
[2012-09-26 22:55:30,233] INFO End registering broker topic
/brokers/topics/test/0 (kafka.server.KafkaZooKeeper)
^C[2012-09-26 22:56:18,290] INFO Shutting down Kafka server
(kafka.server.KafkaServer)
[2012-09-26 22:56:18,292] INFO shutdown scheduler kafka-logcleaner-
(kafka.utils.KafkaScheduler)
[2012-09-26 22:56:18,304] INFO shutdown scheduler kafka-logflusher-
(kafka.utils.KafkaScheduler)
[2012-09-26 22:56:18,364] INFO Closing zookeeper client...
(kafka.server.KafkaZooKeeper)
[2012-09-26 22:56:18,364] INFO Terminate ZkClient event thread.
(org.I0Itec.zkclient.ZkEventThread)
[2012-09-26 22:56:18,364] INFO zkActor stopped (kafka.log.LogManager)
[2012-09-26 22:56:18,375] INFO EventThread shut down
(org.apache.zookeeper.ClientCnxn)
[2012-09-26 22:56:18,376] INFO Session: 0x13a0647aa7d0000 closed
(org.apache.zookeeper.ZooKeeper)
[2012-09-26 22:56:18,376] INFO Kafka server shut down completed
(kafka.server.KafkaServer)


BUT the specific message *does GET* parsed with the topic of "test" and the
message of "hi", partition 0, *magic 1*, compression 0. *It's a case of
RTFM. *




 I produce the following byte map.

|------- REQUEST HEADER ---------|-----MESSAGES_LENGTH ---|------MESSAGE
---|
                 16 bytes                                       4
bytes                      12 bytes


REQUEST_HEADER : 16 bytes

             | -- REQUEST_LENGTH-|---RQ-TYPE  --|-TP_LENGTH -|-----TOPIC
----------------------------------|---------PARTITION ------------|

             1          2          3           4          5         6
         7         8          9        10         11          12        13
      14     15    16

             0          0           0         28          0        0
   0         4         116      101      115         116        0         0
      0      0

t          e         s             t


MESSAGES_LENGTH : 4 bytes

             1          2          3           4

             0          0          0           12
MESSAGE :  12 BYTES

             |----- LENGTH --------------|           M        C
|-----------CRC32-----------|-------PAYLOAD---- -|

             1          2          3          4          5         6
         7        8          9        10         11            12

             0          0          0          8          1         0
216     147       42       172       104          105

h              i



Thanks
Milind


A PRODUCE request is

           REQUEST_HEADER
           MESSAGES_LENGTH
           MESSAGES


The REQUEST_HEADER

            Request_Length:32/integer               % length of entire
request except this field    in 4 bytes
            0:16/integer                                     % Request type
for produce is   0                in 2 bytes
            TopicSize:16/integer                        % Topic Size
                                       in 2 bytes
            Topic/binary                                    %
Topic                                                     in variable bytes
            Partition:32/integer                          %
Partition                                                 in 4 bytes

The MESSAGES_LENGTH
             Messages_Length:32/integer          % Length in bytes of the
MESSAGES section

The MESSAGES
             Length:32/integer                           % Length in bytes
of entire message excluding this field    in 4 bytes
             Magic:8/integer                              % Magic Number
(0|1)                                                     in 1 byte
             Compression:8/integer                    % Compression
(0|1|2)                                                    in 1 bytes
             Checksum:32/integer                      % CRC32
checksum                                                      in 4 bytes
             Payload                                         % Message
Payload                                                      in variable
bytes

Reply via email to