Jun

I was just bashing myself for not having read the spec carefully;
particularly the magic byte.

The Java producer seems to use a different encoder for strings. I am, now,
able to correctly produce and consume messages (bytes in, bytes out) as per
the spec in erlang. (simple producer, simple consumer).

On disk, there seems to be a constant 10 byte gap between each message;
apart from the message itself (which ofcourse is variable length). This is
how I calculate offset. So far, it seems to work (next offset).
On Sep 28, 2012 8:33 AM, "Jun Rao" <jun...@gmail.com> wrote:

> Milind,
>
> The spec that you listed seems correct. Perhaps you can send the same
> message using the java producer. Then you can look at the on disk format of
> the message and see how it differs from the one generated from your Erlang
> producer.
>
> Thanks,
>
> Jun
>
> On Wed, Sep 26, 2012 at 11:21 PM, Milind Parikh <milindpar...@gmail.com
> >wrote:
>
> > I am writing an erlang driver for Kafka. I am using the spec from
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka
> > .
> > Just learnt something that I thought should be the ML for someone
> > developing a different driver.
> >
> > My specific issue currently has to do with the PRODUCE request. It
> appears
> > that the request header is getting parsed correctly. BUT the specific
> > message does not seem to parsed with the topic of "test" and the message
> of
> > "hi", partition 0, magic 0, compression 0.
> >
> >
> > [2012-09-26 22:55:30,131] INFO Created log for 'test'-0
> > (kafka.log.LogManager)
> > [2012-09-26 22:55:30,134] INFO Begin registering broker topic
> > /brokers/topics/test/0 with 1 partitions (kafka.server.KafkaZooKeeper)
> > [2012-09-26 22:55:30,138] ERROR Error processing ProduceRequest on test:0
> > (kafka.server.KafkaRequestHandlers)
> > kafka.message.InvalidMessageException: message is invalid, compression
> > codec: NoCompressionCodec size: 8 curr offset: 0 init offset: 0
> >     at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> >     at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> >     at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> >     at
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> >     at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> >     at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> >     at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> >     at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> >     at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> >     at kafka.log.Log.append(Log.scala:205)
> >     at
> >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> >     at
> >
> >
> kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
> >     at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
> >     at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
> >     at kafka.network.Processor.handle(SocketServer.scala:296)
> >     at kafka.network.Processor.read(SocketServer.scala:319)
> >     at kafka.network.Processor.run(SocketServer.scala:214)
> >     at java.lang.Thread.run(Thread.java:679)
> > [2012-09-26 22:55:30,143] ERROR Closing socket for /127.0.0.1 because of
> > error (kafka.network.Processor)
> > kafka.message.InvalidMessageException: message is invalid, compression
> > codec: NoCompressionCodec size: 8 curr offset: 0 init offset: 0
> >     at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> >     at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> >     at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> >     at
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> >     at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> >     at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> >     at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> >     at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> >     at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> >     at kafka.log.Log.append(Log.scala:205)
> >     at
> >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> >     at
> >
> >
> kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
> >     at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
> >     at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
> >     at kafka.network.Processor.handle(SocketServer.scala:296)
> >     at kafka.network.Processor.read(SocketServer.scala:319)
> >     at kafka.network.Processor.run(SocketServer.scala:214)
> >     at java.lang.Thread.run(Thread.java:679)
> > [2012-09-26 22:55:30,233] INFO End registering broker topic
> > /brokers/topics/test/0 (kafka.server.KafkaZooKeeper)
> > ^C[2012-09-26 22:56:18,290] INFO Shutting down Kafka server
> > (kafka.server.KafkaServer)
> > [2012-09-26 22:56:18,292] INFO shutdown scheduler kafka-logcleaner-
> > (kafka.utils.KafkaScheduler)
> > [2012-09-26 22:56:18,304] INFO shutdown scheduler kafka-logflusher-
> > (kafka.utils.KafkaScheduler)
> > [2012-09-26 22:56:18,364] INFO Closing zookeeper client...
> > (kafka.server.KafkaZooKeeper)
> > [2012-09-26 22:56:18,364] INFO Terminate ZkClient event thread.
> > (org.I0Itec.zkclient.ZkEventThread)
> > [2012-09-26 22:56:18,364] INFO zkActor stopped (kafka.log.LogManager)
> > [2012-09-26 22:56:18,375] INFO EventThread shut down
> > (org.apache.zookeeper.ClientCnxn)
> > [2012-09-26 22:56:18,376] INFO Session: 0x13a0647aa7d0000 closed
> > (org.apache.zookeeper.ZooKeeper)
> > [2012-09-26 22:56:18,376] INFO Kafka server shut down completed
> > (kafka.server.KafkaServer)
> >
> >
> > BUT the specific message *does GET* parsed with the topic of "test" and
> the
> > message of "hi", partition 0, *magic 1*, compression 0. *It's a case of
> > RTFM. *
> >
> >
> >
> >
> >  I produce the following byte map.
> >
> > |------- REQUEST HEADER ---------|-----MESSAGES_LENGTH ---|------MESSAGE
> > ---|
> >                  16 bytes                                       4
> > bytes                      12 bytes
> >
> >
> > REQUEST_HEADER : 16 bytes
> >
> >              | -- REQUEST_LENGTH-|---RQ-TYPE  --|-TP_LENGTH -|-----TOPIC
> > ----------------------------------|---------PARTITION ------------|
> >
> >              1          2          3           4          5         6
> >          7         8          9        10         11          12
>  13
> >       14     15    16
> >
> >              0          0           0         28          0        0
> >    0         4         116      101      115         116        0
>   0
> >       0      0
> >
> > t          e         s             t
> >
> >
> > MESSAGES_LENGTH : 4 bytes
> >
> >              1          2          3           4
> >
> >              0          0          0           12
> > MESSAGE :  12 BYTES
> >
> >              |----- LENGTH --------------|           M        C
> > |-----------CRC32-----------|-------PAYLOAD---- -|
> >
> >              1          2          3          4          5         6
> >          7        8          9        10         11            12
> >
> >              0          0          0          8          1         0
> > 216     147       42       172       104          105
> >
> > h              i
> >
> >
> >
> > Thanks
> > Milind
> >
> >
> > A PRODUCE request is
> >
> >            REQUEST_HEADER
> >            MESSAGES_LENGTH
> >            MESSAGES
> >
> >
> > The REQUEST_HEADER
> >
> >             Request_Length:32/integer               % length of entire
> > request except this field    in 4 bytes
> >             0:16/integer                                     % Request
> type
> > for produce is   0                in 2 bytes
> >             TopicSize:16/integer                        % Topic Size
> >                                        in 2 bytes
> >             Topic/binary                                    %
> > Topic                                                     in variable
> bytes
> >             Partition:32/integer                          %
> > Partition                                                 in 4 bytes
> >
> > The MESSAGES_LENGTH
> >              Messages_Length:32/integer          % Length in bytes of the
> > MESSAGES section
> >
> > The MESSAGES
> >              Length:32/integer                           % Length in
> bytes
> > of entire message excluding this field    in 4 bytes
> >              Magic:8/integer                              % Magic Number
> > (0|1)                                                     in 1 byte
> >              Compression:8/integer                    % Compression
> > (0|1|2)                                                    in 1 bytes
> >              Checksum:32/integer                      % CRC32
> > checksum                                                      in 4 bytes
> >              Payload                                         % Message
> > Payload                                                      in variable
> > bytes
> >
>

Reply via email to