Milind,

The spec that you listed seems correct. Perhaps you can send the same
message using the java producer. Then you can look at the on disk format of
the message and see how it differs from the one generated from your Erlang
producer.

Thanks,

Jun

On Wed, Sep 26, 2012 at 11:21 PM, Milind Parikh <milindpar...@gmail.com>wrote:

> I am writing an erlang driver for Kafka. I am using the spec from
>
> https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka
> .
> Just learnt something that I thought should be the ML for someone
> developing a different driver.
>
> My specific issue currently has to do with the PRODUCE request. It appears
> that the request header is getting parsed correctly. BUT the specific
> message does not seem to parsed with the topic of "test" and the message of
> "hi", partition 0, magic 0, compression 0.
>
>
> [2012-09-26 22:55:30,131] INFO Created log for 'test'-0
> (kafka.log.LogManager)
> [2012-09-26 22:55:30,134] INFO Begin registering broker topic
> /brokers/topics/test/0 with 1 partitions (kafka.server.KafkaZooKeeper)
> [2012-09-26 22:55:30,138] ERROR Error processing ProduceRequest on test:0
> (kafka.server.KafkaRequestHandlers)
> kafka.message.InvalidMessageException: message is invalid, compression
> codec: NoCompressionCodec size: 8 curr offset: 0 init offset: 0
>     at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>     at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
>     at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>     at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>     at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>     at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>     at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>     at kafka.log.Log.append(Log.scala:205)
>     at
>
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
>     at
>
> kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
>     at
>
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
>     at
>
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
>     at kafka.network.Processor.handle(SocketServer.scala:296)
>     at kafka.network.Processor.read(SocketServer.scala:319)
>     at kafka.network.Processor.run(SocketServer.scala:214)
>     at java.lang.Thread.run(Thread.java:679)
> [2012-09-26 22:55:30,143] ERROR Closing socket for /127.0.0.1 because of
> error (kafka.network.Processor)
> kafka.message.InvalidMessageException: message is invalid, compression
> codec: NoCompressionCodec size: 8 curr offset: 0 init offset: 0
>     at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>     at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
>     at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>     at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>     at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>     at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>     at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>     at kafka.log.Log.append(Log.scala:205)
>     at
>
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
>     at
>
> kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
>     at
>
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
>     at
>
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
>     at kafka.network.Processor.handle(SocketServer.scala:296)
>     at kafka.network.Processor.read(SocketServer.scala:319)
>     at kafka.network.Processor.run(SocketServer.scala:214)
>     at java.lang.Thread.run(Thread.java:679)
> [2012-09-26 22:55:30,233] INFO End registering broker topic
> /brokers/topics/test/0 (kafka.server.KafkaZooKeeper)
> ^C[2012-09-26 22:56:18,290] INFO Shutting down Kafka server
> (kafka.server.KafkaServer)
> [2012-09-26 22:56:18,292] INFO shutdown scheduler kafka-logcleaner-
> (kafka.utils.KafkaScheduler)
> [2012-09-26 22:56:18,304] INFO shutdown scheduler kafka-logflusher-
> (kafka.utils.KafkaScheduler)
> [2012-09-26 22:56:18,364] INFO Closing zookeeper client...
> (kafka.server.KafkaZooKeeper)
> [2012-09-26 22:56:18,364] INFO Terminate ZkClient event thread.
> (org.I0Itec.zkclient.ZkEventThread)
> [2012-09-26 22:56:18,364] INFO zkActor stopped (kafka.log.LogManager)
> [2012-09-26 22:56:18,375] INFO EventThread shut down
> (org.apache.zookeeper.ClientCnxn)
> [2012-09-26 22:56:18,376] INFO Session: 0x13a0647aa7d0000 closed
> (org.apache.zookeeper.ZooKeeper)
> [2012-09-26 22:56:18,376] INFO Kafka server shut down completed
> (kafka.server.KafkaServer)
>
>
> BUT the specific message *does GET* parsed with the topic of "test" and the
> message of "hi", partition 0, *magic 1*, compression 0. *It's a case of
> RTFM. *
>
>
>
>
>  I produce the following byte map.
>
> |------- REQUEST HEADER ---------|-----MESSAGES_LENGTH ---|------MESSAGE
> ---|
>                  16 bytes                                       4
> bytes                      12 bytes
>
>
> REQUEST_HEADER : 16 bytes
>
>              | -- REQUEST_LENGTH-|---RQ-TYPE  --|-TP_LENGTH -|-----TOPIC
> ----------------------------------|---------PARTITION ------------|
>
>              1          2          3           4          5         6
>          7         8          9        10         11          12        13
>       14     15    16
>
>              0          0           0         28          0        0
>    0         4         116      101      115         116        0         0
>       0      0
>
> t          e         s             t
>
>
> MESSAGES_LENGTH : 4 bytes
>
>              1          2          3           4
>
>              0          0          0           12
> MESSAGE :  12 BYTES
>
>              |----- LENGTH --------------|           M        C
> |-----------CRC32-----------|-------PAYLOAD---- -|
>
>              1          2          3          4          5         6
>          7        8          9        10         11            12
>
>              0          0          0          8          1         0
> 216     147       42       172       104          105
>
> h              i
>
>
>
> Thanks
> Milind
>
>
> A PRODUCE request is
>
>            REQUEST_HEADER
>            MESSAGES_LENGTH
>            MESSAGES
>
>
> The REQUEST_HEADER
>
>             Request_Length:32/integer               % length of entire
> request except this field    in 4 bytes
>             0:16/integer                                     % Request type
> for produce is   0                in 2 bytes
>             TopicSize:16/integer                        % Topic Size
>                                        in 2 bytes
>             Topic/binary                                    %
> Topic                                                     in variable bytes
>             Partition:32/integer                          %
> Partition                                                 in 4 bytes
>
> The MESSAGES_LENGTH
>              Messages_Length:32/integer          % Length in bytes of the
> MESSAGES section
>
> The MESSAGES
>              Length:32/integer                           % Length in bytes
> of entire message excluding this field    in 4 bytes
>              Magic:8/integer                              % Magic Number
> (0|1)                                                     in 1 byte
>              Compression:8/integer                    % Compression
> (0|1|2)                                                    in 1 bytes
>              Checksum:32/integer                      % CRC32
> checksum                                                      in 4 bytes
>              Payload                                         % Message
> Payload                                                      in variable
> bytes
>

Reply via email to