Jun:

    How about the server power of the broker, you can deploy more producer
clients to increase the borker pressure. In my test, we send 300 thousand
messages per second to the broker, the message size is 1024. In this
scenario, these exceptions are often be seen.

Thanks!
Jian Fan

2012/8/1 Jun Rao <jun...@gmail.com>

> Jian,
>
> The message format is documented in the Message class and has the following
> format.
>
> /**
>  * A message. The format of an N byte message is the following:
>  *
>  * If magic byte is 0
>  *
>  * 1. 1 byte "magic" identifier to allow format changes
>  *
>  * 2. 4 byte CRC32 of the payload
>  *
>  * 3. N - 5 byte payload
>  *
>  * If magic byte is 1
>  *
>  * 1. 1 byte "magic" identifier to allow format changes
>  *
>  * 2. 1 byte "attributes" identifier to allow annotations on the message
> independent of the version (e.g. compression enabled, type of codec used)
>  *
>  * 3. 4 byte CRC32 of the payload
>  *
>  * 4. N - 6 byte payload
>  *
>  */
>
> The flow is the following:
> 1. SyncProducer.send serializes a MultiProduceRequest to bytes and sends
> the bytes to socket.
> 2. On the server side:
> 2.1 Processor.read reads the bytes off socket and deserializes the bytes
> into a MultiProduceRequest
> 2.2 The request is then handled in KafkaRequestHandler
>
> BTW, I ran your test for a couple of days, but couldn't reproduce the
> exception. In your test, how frequently do you see the exceptions?
>
> Thanks,
>
> Jun
>
> On Wed, Aug 1, 2012 at 6:43 AM, jjian fan <xiaofanhad...@gmail.com> wrote:
>
> > Jun:
> >
> >    Can you give more detail of the bytebuffer structure of messages, and
> > the process of sending and receiving the messages?
> >
> > Thanks
> >
> > Jian Fan
> >
> >
> > 2012/7/31 Jun Rao <jun...@gmail.com>
> >
> > > Jian,
> > >
> > > Thanks for the patch. It may not be the right fix though since it fixes
> > > the symptom, but not the cause. For each produce request, the broker
> does
> > > the following: (1) read all bytes of the request into
> > > a BoundedByteBufferReceive (SocketServer.read); (2) after all bytes of
> > the
> > > request are ready, deserialize the bytes into a ProducerRequest
> > > (KafkaRequestHandler.handleProducerRequest); (3) finally, serve the
> > request
> > > by adding topic data to logs.
> > >
> > > What you observed is that in step 3, a topic name is corrupted somehow.
> > > However, this means that the corresponding ProducerRequest is
> corrupted.
> > > Assuming there is no corruption at the network layer (very unlikely),
> the
> > > corruption much have happened in step 1 or step 2. So, instead of
> > patching
> > > a corrupted topic name, we should understand why a ProducerRequest can
> be
> > > corrupted and fix the cause. BTW, what's caused the corrupted topic
> could
> > > be causing the corrupted messages too.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Mon, Jul 30, 2012 at 2:18 AM, jjian fan <xiaofanhad...@gmail.com>
> > > wrote:
> > >
> > > > Jun:
> > > >
> > > >   Hi. I find why the error appear. In high cocurrent environment, the
> > tcp
> > > > server will drop some package when the tcp buffer is over. So there
> are
> > > > some chances that "topic" contains one or more characters that encode
> > to
> > > > bytes that include NULL (0).
> > > >   I have submit the patch to kafka-411, pls check that!
> > > >
> > > > Thanks!
> > > > Jian Fan
> > > >
> > > > 2012/7/30 Jun Rao <jun...@gmail.com>
> > > >
> > > > > Jian,
> > > > >
> > > > > All log directories in kafka are created by LogManager.createLog().
> > As
> > > > you
> > > > > can see, the directory always has the form of topic-partitionId.
> So,
> > > it's
> > > > > not clear how a directory of "a" can be created in your case. I
> will
> > > try
> > > > to
> > > > > rerun your test and see if it can be reproduced.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Sat, Jul 28, 2012 at 7:35 PM, jjian fan <
> xiaofanhad...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Jay:
> > > > > >
> > > > > >    You can try to send 600 thousand message per second to the
> > broker,
> > > > you
> > > > > > can find the tcp will drop packages, so sometimes the topic of ax
> > > will
> > > > be
> > > > > > a. I don't mean to slove the tcp problem from application level,
> I
> > > just
> > > > > > find there are myabe a bug in file.mkdir() of
> LogManager.createlog.
> > > It
> > > > > will
> > > > > > infect the kafka useage.
> > > > > >
> > > > > > Thanks
> > > > > > Jian Fan
> > > > > >
> > > > > > 2012/7/29 Jay Kreps <jay.kr...@gmail.com>
> > > > > >
> > > > > > > Hmm, that is not my understanding of TCP. TCP is a reliable
> > > protocol
> > > > so
> > > > > > it
> > > > > > > is supposed to either deliver packets in order or timeout
> > retrying.
> > > > In
> > > > > > the
> > > > > > > case of the topic name, that is a size-delimited string, there
> > > should
> > > > > be
> > > > > > no
> > > > > > > way for it to drop a single byte in the middle of the request
> > like
> > > > > that.
> > > > > > If
> > > > > > > that is in fact happening, I don't think it is something we can
> > > hope
> > > > to
> > > > > > > recover from at the application level...
> > > > > > >
> > > > > > > -Jay
> > > > > > >
> > > > > > > On Fri, Jul 27, 2012 at 9:45 PM, jjian fan <
> > > xiaofanhad...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Jun:
> > > > > > > >    Dropping packages in TCP is an issue of OS/JVM, but it can
> > > also
> > > > > > cause
> > > > > > > > some kafka issue!
> > > > > > > >    For example, the topic of the message is ax, but it can
> > change
> > > > to
> > > > > a
> > > > > > in
> > > > > > > > broker because the some packages is drop, so the log
> directory
> > > > > > > >    should be like a-0,a-1, a-2 and so on ,but file.mkdir()
> > create
> > > > log
> > > > > > > > directory like a. Seems some bugs in file.mkdir() of
> > > > > > > LogManager.createlog.
> > > > > > > >    If you shutdown the broker and restart it. The the broker
> > will
> > > > > > report
> > > > > > > > the exception like this:
> > > > > > > >
> > > > > > > > [2012-07-28 12:43:44,565] INFO Loading log 'a'
> > > > (kafka.log.LogManager)
> > > > > > > > [2012-07-28 12:43:44,574] FATAL Fatal error during
> > > > KafkaServerStable
> > > > > > > > startup. Prepare to shutdown
> > (kafka.server.KafkaServerStartable)
> > > > > > > > java.lang.StringIndexOutOfBoundsException: String index out
> of
> > > > range:
> > > > > > -1
> > > > > > > >     at java.lang.String.substring(String.java:1949)
> > > > > > > >     at kafka.utils.Utils$.getTopicPartition(Utils.scala:558)
> > > > > > > >     at
> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71)
> > > > > > > >     at
> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65)
> > > > > > > >     at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > > > > >     at
> > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > > > > >     at kafka.log.LogManager.<init>(LogManager.scala:65)
> > > > > > > >     at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
> > > > > > > >     at
> > > > > > > >
> > > > > >
> > > >
> > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
> > > > > > > >     at kafka.Kafka$.main(Kafka.scala:50)
> > > > > > > >     at kafka.Kafka.main(Kafka.scala)
> > > > > > > >
> > > > > > > >
> > > > > > > > 2012/7/28 Jun Rao <jun...@gmail.com>
> > > > > > > >
> > > > > > > > > Jian,
> > > > > > > > >
> > > > > > > > > I am not sure if I understand this completely. Dropping
> > > packages
> > > > in
> > > > > > TCP
> > > > > > > > > shouldn't cause corruption in the TCP buffer, right? Is
> this
> > an
> > > > > issue
> > > > > > > in
> > > > > > > > > Kafka or OS/JVM?
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Jun
> > > > > > > > >
> > > > > > > > > On Fri, Jul 27, 2012 at 8:29 PM, jjian fan <
> > > > > xiaofanhad...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Jun:
> > > > > > > > > > Yes, if the socket server can't handle the package
> quickly,
> > > tcp
> > > > > > > > protocol
> > > > > > > > > > will drop some network package until the buffer is
> > overflow,
> > > >  the
> > > > > > > > > corrupted
> > > > > > > > > > messages is also appear on this situtation!  I run a
> > > systemtap
> > > > > > script
> > > > > > > > to
> > > > > > > > > > find the package droping ,also you can type " cat
> > > > > > /proc/net/sockstat"
> > > > > > > > to
> > > > > > > > > > see the tcp memory increase.  I debug the whole kafka
> > source
> > > > code
> > > > > > to
> > > > > > > > find
> > > > > > > > > > the bug in file.mkdir() of LogManager.createlog.
> > > > > > > > > >
> > > > > > > > > > JIan Fan
> > > > > > > > > >
> > > > > > > > > > 2012/7/27 Jun Rao <jun...@gmail.com>
> > > > > > > > > >
> > > > > > > > > > > Thanks for the finding. Are you saying that this
> problem
> > is
> > > > > > caused
> > > > > > > by
> > > > > > > > > the
> > > > > > > > > > > buffering in Kafka socket server? How did you figure
> that
> > > > out?
> > > > > Is
> > > > > > > > this
> > > > > > > > > > > problem exposed by the same test that caused the
> > corrupted
> > > > > > messages
> > > > > > > > in
> > > > > > > > > > the
> > > > > > > > > > > broker?
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > >
> > > > > > > > > > > Jun
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan <
> > > > > > > xiaofanhad...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > >     In high cocurrent environment, the tcp server
> will
> > > drop
> > > > > > some
> > > > > > > > > > package
> > > > > > > > > > > > when the tcp buffer is over. Then
> LogManager.createlog
> > > will
> > > > > > > create
> > > > > > > > > some
> > > > > > > > > > > > no-exists topic log. But one thing is very strange,
> the
> > > log
> > > > > > > > directory
> > > > > > > > > > > > should be like a-0,a-1, a-2 and so on ,but
> file.mkdir()
> > > > > create
> > > > > > > log
> > > > > > > > > > > > directory like a. Seems some bug in file.mkdir() of
> > > > > > > > > > LogManager.createlog.
> > > > > > > > > > > >
> > > > > > > > > > > > the exception message is
> > > > > > > > > > > >
> > > > > > > > > > > > [2012-07-27 17:08:00,559] INFO create directory
> > > > > > /data/kafka/axx-0
> > > > > > > > > > > > (kafka.log.LogManager)
> > > > > > > > > > > > [2012-07-27 17:08:00,561] ERROR Error processing
> > > > > > > > MultiProducerRequest
> > > > > > > > > > on
> > > > > > > > > > > > axx:0 (kafka.server.KafkaRequestHandlers)
> > > > > > > > > > > > java.io.FileNotFoundException:
> > > > > > > > > > > /data/kafka/axx-0/00000000000000000000.kafka
> > > > > > > > > > > > (Is a directory)
> > > > > > > > > > > > at java.io.RandomAccessFile.open(Native Method)
> > > > > > > > > > > > at
> > > > java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
> > > > > > > > > > > > at kafka.utils.Utils$.openChannel(Utils.scala:324)
> > > > > > > > > > > > at
> > > > > kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
> > > > > > > > > > > > at kafka.log.Log.loadSegments(Log.scala:144)
> > > > > > > > > > > > at kafka.log.Log.<init>(Log.scala:116)
> > > > > > > > > > > > at
> kafka.log.LogManager.createLog(LogManager.scala:159)
> > > > > > > > > > > > at
> > > > kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
> > > > > > > > > > > > at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
> > > > > > > > > > > > at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > > > > > > > at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > > > > > > > at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > > > > > > > at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > > > > > > > at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > > > > > > > > > at
> > > > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > > > > > > > > > at
> > > > > > > > > >
> > > > > >
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > > > > > > > > > > at
> > > scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > > > > > > > > > > at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > > > > > > > > > > at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > > > > > > > at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > > > > > > > at
> > kafka.network.Processor.handle(SocketServer.scala:296)
> > > > > > > > > > > > at
> kafka.network.Processor.read(SocketServer.scala:319)
> > > > > > > > > > > > at
> kafka.network.Processor.run(SocketServer.scala:214)
> > > > > > > > > > > > at java.lang.Thread.run(Thread.java:679)
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to