Jun I was just bashing myself for not having read the spec carefully; particularly the magic byte.
The Java producer seems to use a different encoder for strings. I am, now, able to correctly produce and consume messages (bytes in, bytes out) as per the spec in erlang. (simple producer, simple consumer). On disk, there seems to be a constant 10 byte gap between each message; apart from the message itself (which ofcourse is variable length). This is how I calculate offset. So far, it seems to work (next offset). On Sep 28, 2012 8:33 AM, "Jun Rao" <jun...@gmail.com> wrote: > Milind, > > The spec that you listed seems correct. Perhaps you can send the same > message using the java producer. Then you can look at the on disk format of > the message and see how it differs from the one generated from your Erlang > producer. > > Thanks, > > Jun > > On Wed, Sep 26, 2012 at 11:21 PM, Milind Parikh <milindpar...@gmail.com > >wrote: > > > I am writing an erlang driver for Kafka. I am using the spec from > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka > > . > > Just learnt something that I thought should be the ML for someone > > developing a different driver. > > > > My specific issue currently has to do with the PRODUCE request. It > appears > > that the request header is getting parsed correctly. BUT the specific > > message does not seem to parsed with the topic of "test" and the message > of > > "hi", partition 0, magic 0, compression 0. > > > > > > [2012-09-26 22:55:30,131] INFO Created log for 'test'-0 > > (kafka.log.LogManager) > > [2012-09-26 22:55:30,134] INFO Begin registering broker topic > > /brokers/topics/test/0 with 1 partitions (kafka.server.KafkaZooKeeper) > > [2012-09-26 22:55:30,138] ERROR Error processing ProduceRequest on test:0 > > (kafka.server.KafkaRequestHandlers) > > kafka.message.InvalidMessageException: message is invalid, compression > > codec: NoCompressionCodec size: 8 curr offset: 0 init offset: 0 > > at > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > > at > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) > > at > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > > at > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > > at kafka.message.MessageSet.foreach(MessageSet.scala:87) > > at kafka.log.Log.append(Log.scala:205) > > at > > > > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > > at > > > > > kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) > > at > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > > at > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > > at kafka.network.Processor.handle(SocketServer.scala:296) > > at kafka.network.Processor.read(SocketServer.scala:319) > > at kafka.network.Processor.run(SocketServer.scala:214) > > at java.lang.Thread.run(Thread.java:679) > > [2012-09-26 22:55:30,143] ERROR Closing socket for /127.0.0.1 because of > > error (kafka.network.Processor) > > kafka.message.InvalidMessageException: message is invalid, compression > > codec: NoCompressionCodec size: 8 curr offset: 0 init offset: 0 > > at > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > > at > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) > > at > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > > at > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > > at kafka.message.MessageSet.foreach(MessageSet.scala:87) > > at kafka.log.Log.append(Log.scala:205) > > at > > > > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > > at > > > > > kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) > > at > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > > at > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > > at kafka.network.Processor.handle(SocketServer.scala:296) > > at kafka.network.Processor.read(SocketServer.scala:319) > > at kafka.network.Processor.run(SocketServer.scala:214) > > at java.lang.Thread.run(Thread.java:679) > > [2012-09-26 22:55:30,233] INFO End registering broker topic > > /brokers/topics/test/0 (kafka.server.KafkaZooKeeper) > > ^C[2012-09-26 22:56:18,290] INFO Shutting down Kafka server > > (kafka.server.KafkaServer) > > [2012-09-26 22:56:18,292] INFO shutdown scheduler kafka-logcleaner- > > (kafka.utils.KafkaScheduler) > > [2012-09-26 22:56:18,304] INFO shutdown scheduler kafka-logflusher- > > (kafka.utils.KafkaScheduler) > > [2012-09-26 22:56:18,364] INFO Closing zookeeper client... > > (kafka.server.KafkaZooKeeper) > > [2012-09-26 22:56:18,364] INFO Terminate ZkClient event thread. > > (org.I0Itec.zkclient.ZkEventThread) > > [2012-09-26 22:56:18,364] INFO zkActor stopped (kafka.log.LogManager) > > [2012-09-26 22:56:18,375] INFO EventThread shut down > > (org.apache.zookeeper.ClientCnxn) > > [2012-09-26 22:56:18,376] INFO Session: 0x13a0647aa7d0000 closed > > (org.apache.zookeeper.ZooKeeper) > > [2012-09-26 22:56:18,376] INFO Kafka server shut down completed > > (kafka.server.KafkaServer) > > > > > > BUT the specific message *does GET* parsed with the topic of "test" and > the > > message of "hi", partition 0, *magic 1*, compression 0. *It's a case of > > RTFM. * > > > > > > > > > > I produce the following byte map. > > > > |------- REQUEST HEADER ---------|-----MESSAGES_LENGTH ---|------MESSAGE > > ---| > > 16 bytes 4 > > bytes 12 bytes > > > > > > REQUEST_HEADER : 16 bytes > > > > | -- REQUEST_LENGTH-|---RQ-TYPE --|-TP_LENGTH -|-----TOPIC > > ----------------------------------|---------PARTITION ------------| > > > > 1 2 3 4 5 6 > > 7 8 9 10 11 12 > 13 > > 14 15 16 > > > > 0 0 0 28 0 0 > > 0 4 116 101 115 116 0 > 0 > > 0 0 > > > > t e s t > > > > > > MESSAGES_LENGTH : 4 bytes > > > > 1 2 3 4 > > > > 0 0 0 12 > > MESSAGE : 12 BYTES > > > > |----- LENGTH --------------| M C > > |-----------CRC32-----------|-------PAYLOAD---- -| > > > > 1 2 3 4 5 6 > > 7 8 9 10 11 12 > > > > 0 0 0 8 1 0 > > 216 147 42 172 104 105 > > > > h i > > > > > > > > Thanks > > Milind > > > > > > A PRODUCE request is > > > > REQUEST_HEADER > > MESSAGES_LENGTH > > MESSAGES > > > > > > The REQUEST_HEADER > > > > Request_Length:32/integer % length of entire > > request except this field in 4 bytes > > 0:16/integer % Request > type > > for produce is 0 in 2 bytes > > TopicSize:16/integer % Topic Size > > in 2 bytes > > Topic/binary % > > Topic in variable > bytes > > Partition:32/integer % > > Partition in 4 bytes > > > > The MESSAGES_LENGTH > > Messages_Length:32/integer % Length in bytes of the > > MESSAGES section > > > > The MESSAGES > > Length:32/integer % Length in > bytes > > of entire message excluding this field in 4 bytes > > Magic:8/integer % Magic Number > > (0|1) in 1 byte > > Compression:8/integer % Compression > > (0|1|2) in 1 bytes > > Checksum:32/integer % CRC32 > > checksum in 4 bytes > > Payload % Message > > Payload in variable > > bytes > > >