The exception reason may be tcp buffer overflow, pls check the paper
http://os.korea.ac.kr/publication_papers/inter_journal/jhchoi_cn_2007.pdf

Thanks!

2012/8/2 jjian fan <xiaofanhad...@gmail.com>

> Jun:
>
>     How about the server power of the broker, you can deploy more producer
> clients to increase the borker pressure. In my test, we send 300 thousand
> messages per second to the broker, the message size is 1024. In this
> scenario, these exceptions are often be seen.
>
> Thanks!
> Jian Fan
>
> 2012/8/1 Jun Rao <jun...@gmail.com>
>
>> Jian,
>>
>> The message format is documented in the Message class and has the
>> following
>> format.
>>
>> /**
>>  * A message. The format of an N byte message is the following:
>>  *
>>  * If magic byte is 0
>>  *
>>  * 1. 1 byte "magic" identifier to allow format changes
>>  *
>>  * 2. 4 byte CRC32 of the payload
>>  *
>>  * 3. N - 5 byte payload
>>  *
>>  * If magic byte is 1
>>  *
>>  * 1. 1 byte "magic" identifier to allow format changes
>>  *
>>  * 2. 1 byte "attributes" identifier to allow annotations on the message
>> independent of the version (e.g. compression enabled, type of codec used)
>>  *
>>  * 3. 4 byte CRC32 of the payload
>>  *
>>  * 4. N - 6 byte payload
>>  *
>>  */
>>
>> The flow is the following:
>> 1. SyncProducer.send serializes a MultiProduceRequest to bytes and sends
>> the bytes to socket.
>> 2. On the server side:
>> 2.1 Processor.read reads the bytes off socket and deserializes the bytes
>> into a MultiProduceRequest
>> 2.2 The request is then handled in KafkaRequestHandler
>>
>> BTW, I ran your test for a couple of days, but couldn't reproduce the
>> exception. In your test, how frequently do you see the exceptions?
>>
>> Thanks,
>>
>> Jun
>>
>> On Wed, Aug 1, 2012 at 6:43 AM, jjian fan <xiaofanhad...@gmail.com>
>> wrote:
>>
>> > Jun:
>> >
>> >    Can you give more detail of the bytebuffer structure of messages, and
>> > the process of sending and receiving the messages?
>> >
>> > Thanks
>> >
>> > Jian Fan
>> >
>> >
>> > 2012/7/31 Jun Rao <jun...@gmail.com>
>> >
>> > > Jian,
>> > >
>> > > Thanks for the patch. It may not be the right fix though since it
>> fixes
>> > > the symptom, but not the cause. For each produce request, the broker
>> does
>> > > the following: (1) read all bytes of the request into
>> > > a BoundedByteBufferReceive (SocketServer.read); (2) after all bytes of
>> > the
>> > > request are ready, deserialize the bytes into a ProducerRequest
>> > > (KafkaRequestHandler.handleProducerRequest); (3) finally, serve the
>> > request
>> > > by adding topic data to logs.
>> > >
>> > > What you observed is that in step 3, a topic name is corrupted
>> somehow.
>> > > However, this means that the corresponding ProducerRequest is
>> corrupted.
>> > > Assuming there is no corruption at the network layer (very unlikely),
>> the
>> > > corruption much have happened in step 1 or step 2. So, instead of
>> > patching
>> > > a corrupted topic name, we should understand why a ProducerRequest
>> can be
>> > > corrupted and fix the cause. BTW, what's caused the corrupted topic
>> could
>> > > be causing the corrupted messages too.
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > > On Mon, Jul 30, 2012 at 2:18 AM, jjian fan <xiaofanhad...@gmail.com>
>> > > wrote:
>> > >
>> > > > Jun:
>> > > >
>> > > >   Hi. I find why the error appear. In high cocurrent environment,
>> the
>> > tcp
>> > > > server will drop some package when the tcp buffer is over. So there
>> are
>> > > > some chances that "topic" contains one or more characters that
>> encode
>> > to
>> > > > bytes that include NULL (0).
>> > > >   I have submit the patch to kafka-411, pls check that!
>> > > >
>> > > > Thanks!
>> > > > Jian Fan
>> > > >
>> > > > 2012/7/30 Jun Rao <jun...@gmail.com>
>> > > >
>> > > > > Jian,
>> > > > >
>> > > > > All log directories in kafka are created by
>> LogManager.createLog().
>> > As
>> > > > you
>> > > > > can see, the directory always has the form of topic-partitionId.
>> So,
>> > > it's
>> > > > > not clear how a directory of "a" can be created in your case. I
>> will
>> > > try
>> > > > to
>> > > > > rerun your test and see if it can be reproduced.
>> > > > >
>> > > > > Thanks,
>> > > > >
>> > > > > Jun
>> > > > >
>> > > > > On Sat, Jul 28, 2012 at 7:35 PM, jjian fan <
>> xiaofanhad...@gmail.com>
>> > > > > wrote:
>> > > > >
>> > > > > > Jay:
>> > > > > >
>> > > > > >    You can try to send 600 thousand message per second to the
>> > broker,
>> > > > you
>> > > > > > can find the tcp will drop packages, so sometimes the topic of
>> ax
>> > > will
>> > > > be
>> > > > > > a. I don't mean to slove the tcp problem from application
>> level, I
>> > > just
>> > > > > > find there are myabe a bug in file.mkdir() of
>> LogManager.createlog.
>> > > It
>> > > > > will
>> > > > > > infect the kafka useage.
>> > > > > >
>> > > > > > Thanks
>> > > > > > Jian Fan
>> > > > > >
>> > > > > > 2012/7/29 Jay Kreps <jay.kr...@gmail.com>
>> > > > > >
>> > > > > > > Hmm, that is not my understanding of TCP. TCP is a reliable
>> > > protocol
>> > > > so
>> > > > > > it
>> > > > > > > is supposed to either deliver packets in order or timeout
>> > retrying.
>> > > > In
>> > > > > > the
>> > > > > > > case of the topic name, that is a size-delimited string, there
>> > > should
>> > > > > be
>> > > > > > no
>> > > > > > > way for it to drop a single byte in the middle of the request
>> > like
>> > > > > that.
>> > > > > > If
>> > > > > > > that is in fact happening, I don't think it is something we
>> can
>> > > hope
>> > > > to
>> > > > > > > recover from at the application level...
>> > > > > > >
>> > > > > > > -Jay
>> > > > > > >
>> > > > > > > On Fri, Jul 27, 2012 at 9:45 PM, jjian fan <
>> > > xiaofanhad...@gmail.com>
>> > > > > > > wrote:
>> > > > > > >
>> > > > > > > > Jun:
>> > > > > > > >    Dropping packages in TCP is an issue of OS/JVM, but it
>> can
>> > > also
>> > > > > > cause
>> > > > > > > > some kafka issue!
>> > > > > > > >    For example, the topic of the message is ax, but it can
>> > change
>> > > > to
>> > > > > a
>> > > > > > in
>> > > > > > > > broker because the some packages is drop, so the log
>> directory
>> > > > > > > >    should be like a-0,a-1, a-2 and so on ,but file.mkdir()
>> > create
>> > > > log
>> > > > > > > > directory like a. Seems some bugs in file.mkdir() of
>> > > > > > > LogManager.createlog.
>> > > > > > > >    If you shutdown the broker and restart it. The the broker
>> > will
>> > > > > > report
>> > > > > > > > the exception like this:
>> > > > > > > >
>> > > > > > > > [2012-07-28 12:43:44,565] INFO Loading log 'a'
>> > > > (kafka.log.LogManager)
>> > > > > > > > [2012-07-28 12:43:44,574] FATAL Fatal error during
>> > > > KafkaServerStable
>> > > > > > > > startup. Prepare to shutdown
>> > (kafka.server.KafkaServerStartable)
>> > > > > > > > java.lang.StringIndexOutOfBoundsException: String index out
>> of
>> > > > range:
>> > > > > > -1
>> > > > > > > >     at java.lang.String.substring(String.java:1949)
>> > > > > > > >     at kafka.utils.Utils$.getTopicPartition(Utils.scala:558)
>> > > > > > > >     at
>> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71)
>> > > > > > > >     at
>> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65)
>> > > > > > > >     at
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>> > > > > > > >     at
>> > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>> > > > > > > >     at kafka.log.LogManager.<init>(LogManager.scala:65)
>> > > > > > > >     at
>> kafka.server.KafkaServer.startup(KafkaServer.scala:58)
>> > > > > > > >     at
>> > > > > > > >
>> > > > > >
>> > > >
>> > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
>> > > > > > > >     at kafka.Kafka$.main(Kafka.scala:50)
>> > > > > > > >     at kafka.Kafka.main(Kafka.scala)
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > 2012/7/28 Jun Rao <jun...@gmail.com>
>> > > > > > > >
>> > > > > > > > > Jian,
>> > > > > > > > >
>> > > > > > > > > I am not sure if I understand this completely. Dropping
>> > > packages
>> > > > in
>> > > > > > TCP
>> > > > > > > > > shouldn't cause corruption in the TCP buffer, right? Is
>> this
>> > an
>> > > > > issue
>> > > > > > > in
>> > > > > > > > > Kafka or OS/JVM?
>> > > > > > > > >
>> > > > > > > > > Thanks,
>> > > > > > > > >
>> > > > > > > > > Jun
>> > > > > > > > >
>> > > > > > > > > On Fri, Jul 27, 2012 at 8:29 PM, jjian fan <
>> > > > > xiaofanhad...@gmail.com>
>> > > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Jun:
>> > > > > > > > > > Yes, if the socket server can't handle the package
>> quickly,
>> > > tcp
>> > > > > > > > protocol
>> > > > > > > > > > will drop some network package until the buffer is
>> > overflow,
>> > > >  the
>> > > > > > > > > corrupted
>> > > > > > > > > > messages is also appear on this situtation!  I run a
>> > > systemtap
>> > > > > > script
>> > > > > > > > to
>> > > > > > > > > > find the package droping ,also you can type " cat
>> > > > > > /proc/net/sockstat"
>> > > > > > > > to
>> > > > > > > > > > see the tcp memory increase.  I debug the whole kafka
>> > source
>> > > > code
>> > > > > > to
>> > > > > > > > find
>> > > > > > > > > > the bug in file.mkdir() of LogManager.createlog.
>> > > > > > > > > >
>> > > > > > > > > > JIan Fan
>> > > > > > > > > >
>> > > > > > > > > > 2012/7/27 Jun Rao <jun...@gmail.com>
>> > > > > > > > > >
>> > > > > > > > > > > Thanks for the finding. Are you saying that this
>> problem
>> > is
>> > > > > > caused
>> > > > > > > by
>> > > > > > > > > the
>> > > > > > > > > > > buffering in Kafka socket server? How did you figure
>> that
>> > > > out?
>> > > > > Is
>> > > > > > > > this
>> > > > > > > > > > > problem exposed by the same test that caused the
>> > corrupted
>> > > > > > messages
>> > > > > > > > in
>> > > > > > > > > > the
>> > > > > > > > > > > broker?
>> > > > > > > > > > >
>> > > > > > > > > > > Thanks,
>> > > > > > > > > > >
>> > > > > > > > > > > Jun
>> > > > > > > > > > >
>> > > > > > > > > > > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan <
>> > > > > > > xiaofanhad...@gmail.com>
>> > > > > > > > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > >     In high cocurrent environment, the tcp server
>> will
>> > > drop
>> > > > > > some
>> > > > > > > > > > package
>> > > > > > > > > > > > when the tcp buffer is over. Then
>> LogManager.createlog
>> > > will
>> > > > > > > create
>> > > > > > > > > some
>> > > > > > > > > > > > no-exists topic log. But one thing is very strange,
>> the
>> > > log
>> > > > > > > > directory
>> > > > > > > > > > > > should be like a-0,a-1, a-2 and so on ,but
>> file.mkdir()
>> > > > > create
>> > > > > > > log
>> > > > > > > > > > > > directory like a. Seems some bug in file.mkdir() of
>> > > > > > > > > > LogManager.createlog.
>> > > > > > > > > > > >
>> > > > > > > > > > > > the exception message is
>> > > > > > > > > > > >
>> > > > > > > > > > > > [2012-07-27 17:08:00,559] INFO create directory
>> > > > > > /data/kafka/axx-0
>> > > > > > > > > > > > (kafka.log.LogManager)
>> > > > > > > > > > > > [2012-07-27 17:08:00,561] ERROR Error processing
>> > > > > > > > MultiProducerRequest
>> > > > > > > > > > on
>> > > > > > > > > > > > axx:0 (kafka.server.KafkaRequestHandlers)
>> > > > > > > > > > > > java.io.FileNotFoundException:
>> > > > > > > > > > > /data/kafka/axx-0/00000000000000000000.kafka
>> > > > > > > > > > > > (Is a directory)
>> > > > > > > > > > > > at java.io.RandomAccessFile.open(Native Method)
>> > > > > > > > > > > > at
>> > > > java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
>> > > > > > > > > > > > at kafka.utils.Utils$.openChannel(Utils.scala:324)
>> > > > > > > > > > > > at
>> > > > > kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
>> > > > > > > > > > > > at kafka.log.Log.loadSegments(Log.scala:144)
>> > > > > > > > > > > > at kafka.log.Log.<init>(Log.scala:116)
>> > > > > > > > > > > > at
>> kafka.log.LogManager.createLog(LogManager.scala:159)
>> > > > > > > > > > > > at
>> > > > kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
>> > > > > > > > > > > > at
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
>> > > > > > > > > > > > at
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>> > > > > > > > > > > > at
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>> > > > > > > > > > > > at
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>> > > > > > > > > > > > at
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>> > > > > > > > > > > > at
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>> > > > > > > > > > > > at
>> > > > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>> > > > > > > > > > > > at
>> > > > > > > > > >
>> > > > > >
>> > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>> > > > > > > > > > > > at
>> > > scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
>> > > > > > > > > > > > at
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
>> > > > > > > > > > > > at
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>> > > > > > > > > > > > at
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>> > > > > > > > > > > > at
>> > kafka.network.Processor.handle(SocketServer.scala:296)
>> > > > > > > > > > > > at
>> kafka.network.Processor.read(SocketServer.scala:319)
>> > > > > > > > > > > > at
>> kafka.network.Processor.run(SocketServer.scala:214)
>> > > > > > > > > > > > at java.lang.Thread.run(Thread.java:679)
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to