Jian,

Thanks for the patch. It may not be the right fix though since it fixes
the symptom, but not the cause. For each produce request, the broker does
the following: (1) read all bytes of the request into
a BoundedByteBufferReceive (SocketServer.read); (2) after all bytes of the
request are ready, deserialize the bytes into a ProducerRequest
(KafkaRequestHandler.handleProducerRequest); (3) finally, serve the request
by adding topic data to logs.

What you observed is that in step 3, a topic name is corrupted somehow.
However, this means that the corresponding ProducerRequest is corrupted.
Assuming there is no corruption at the network layer (very unlikely), the
corruption much have happened in step 1 or step 2. So, instead of patching
a corrupted topic name, we should understand why a ProducerRequest can be
corrupted and fix the cause. BTW, what's caused the corrupted topic could
be causing the corrupted messages too.

Thanks,

Jun

On Mon, Jul 30, 2012 at 2:18 AM, jjian fan <xiaofanhad...@gmail.com> wrote:

> Jun:
>
>   Hi. I find why the error appear. In high cocurrent environment, the tcp
> server will drop some package when the tcp buffer is over. So there are
> some chances that "topic" contains one or more characters that encode to
> bytes that include NULL (0).
>   I have submit the patch to kafka-411, pls check that!
>
> Thanks!
> Jian Fan
>
> 2012/7/30 Jun Rao <jun...@gmail.com>
>
> > Jian,
> >
> > All log directories in kafka are created by LogManager.createLog(). As
> you
> > can see, the directory always has the form of topic-partitionId. So, it's
> > not clear how a directory of "a" can be created in your case. I will try
> to
> > rerun your test and see if it can be reproduced.
> >
> > Thanks,
> >
> > Jun
> >
> > On Sat, Jul 28, 2012 at 7:35 PM, jjian fan <xiaofanhad...@gmail.com>
> > wrote:
> >
> > > Jay:
> > >
> > >    You can try to send 600 thousand message per second to the broker,
> you
> > > can find the tcp will drop packages, so sometimes the topic of ax will
> be
> > > a. I don't mean to slove the tcp problem from application level, I just
> > > find there are myabe a bug in file.mkdir() of LogManager.createlog. It
> > will
> > > infect the kafka useage.
> > >
> > > Thanks
> > > Jian Fan
> > >
> > > 2012/7/29 Jay Kreps <jay.kr...@gmail.com>
> > >
> > > > Hmm, that is not my understanding of TCP. TCP is a reliable protocol
> so
> > > it
> > > > is supposed to either deliver packets in order or timeout retrying.
> In
> > > the
> > > > case of the topic name, that is a size-delimited string, there should
> > be
> > > no
> > > > way for it to drop a single byte in the middle of the request like
> > that.
> > > If
> > > > that is in fact happening, I don't think it is something we can hope
> to
> > > > recover from at the application level...
> > > >
> > > > -Jay
> > > >
> > > > On Fri, Jul 27, 2012 at 9:45 PM, jjian fan <xiaofanhad...@gmail.com>
> > > > wrote:
> > > >
> > > > > Jun:
> > > > >    Dropping packages in TCP is an issue of OS/JVM, but it can also
> > > cause
> > > > > some kafka issue!
> > > > >    For example, the topic of the message is ax, but it can change
> to
> > a
> > > in
> > > > > broker because the some packages is drop, so the log directory
> > > > >    should be like a-0,a-1, a-2 and so on ,but file.mkdir() create
> log
> > > > > directory like a. Seems some bugs in file.mkdir() of
> > > > LogManager.createlog.
> > > > >    If you shutdown the broker and restart it. The the broker will
> > > report
> > > > > the exception like this:
> > > > >
> > > > > [2012-07-28 12:43:44,565] INFO Loading log 'a'
> (kafka.log.LogManager)
> > > > > [2012-07-28 12:43:44,574] FATAL Fatal error during
> KafkaServerStable
> > > > > startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
> > > > > java.lang.StringIndexOutOfBoundsException: String index out of
> range:
> > > -1
> > > > >     at java.lang.String.substring(String.java:1949)
> > > > >     at kafka.utils.Utils$.getTopicPartition(Utils.scala:558)
> > > > >     at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71)
> > > > >     at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65)
> > > > >     at
> > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > >     at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > >     at kafka.log.LogManager.<init>(LogManager.scala:65)
> > > > >     at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
> > > > >     at
> > > > >
> > >
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
> > > > >     at kafka.Kafka$.main(Kafka.scala:50)
> > > > >     at kafka.Kafka.main(Kafka.scala)
> > > > >
> > > > >
> > > > > 2012/7/28 Jun Rao <jun...@gmail.com>
> > > > >
> > > > > > Jian,
> > > > > >
> > > > > > I am not sure if I understand this completely. Dropping packages
> in
> > > TCP
> > > > > > shouldn't cause corruption in the TCP buffer, right? Is this an
> > issue
> > > > in
> > > > > > Kafka or OS/JVM?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Fri, Jul 27, 2012 at 8:29 PM, jjian fan <
> > xiaofanhad...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Jun:
> > > > > > > Yes, if the socket server can't handle the package quickly, tcp
> > > > > protocol
> > > > > > > will drop some network package until the buffer is overflow,
>  the
> > > > > > corrupted
> > > > > > > messages is also appear on this situtation!  I run a systemtap
> > > script
> > > > > to
> > > > > > > find the package droping ,also you can type " cat
> > > /proc/net/sockstat"
> > > > > to
> > > > > > > see the tcp memory increase.  I debug the whole kafka source
> code
> > > to
> > > > > find
> > > > > > > the bug in file.mkdir() of LogManager.createlog.
> > > > > > >
> > > > > > > JIan Fan
> > > > > > >
> > > > > > > 2012/7/27 Jun Rao <jun...@gmail.com>
> > > > > > >
> > > > > > > > Thanks for the finding. Are you saying that this problem is
> > > caused
> > > > by
> > > > > > the
> > > > > > > > buffering in Kafka socket server? How did you figure that
> out?
> > Is
> > > > > this
> > > > > > > > problem exposed by the same test that caused the corrupted
> > > messages
> > > > > in
> > > > > > > the
> > > > > > > > broker?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan <
> > > > xiaofanhad...@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > >     In high cocurrent environment, the tcp server will drop
> > > some
> > > > > > > package
> > > > > > > > > when the tcp buffer is over. Then LogManager.createlog will
> > > > create
> > > > > > some
> > > > > > > > > no-exists topic log. But one thing is very strange, the log
> > > > > directory
> > > > > > > > > should be like a-0,a-1, a-2 and so on ,but file.mkdir()
> > create
> > > > log
> > > > > > > > > directory like a. Seems some bug in file.mkdir() of
> > > > > > > LogManager.createlog.
> > > > > > > > >
> > > > > > > > > the exception message is
> > > > > > > > >
> > > > > > > > > [2012-07-27 17:08:00,559] INFO create directory
> > > /data/kafka/axx-0
> > > > > > > > > (kafka.log.LogManager)
> > > > > > > > > [2012-07-27 17:08:00,561] ERROR Error processing
> > > > > MultiProducerRequest
> > > > > > > on
> > > > > > > > > axx:0 (kafka.server.KafkaRequestHandlers)
> > > > > > > > > java.io.FileNotFoundException:
> > > > > > > > /data/kafka/axx-0/00000000000000000000.kafka
> > > > > > > > > (Is a directory)
> > > > > > > > > at java.io.RandomAccessFile.open(Native Method)
> > > > > > > > > at
> java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
> > > > > > > > > at kafka.utils.Utils$.openChannel(Utils.scala:324)
> > > > > > > > > at
> > kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
> > > > > > > > > at kafka.log.Log.loadSegments(Log.scala:144)
> > > > > > > > > at kafka.log.Log.<init>(Log.scala:116)
> > > > > > > > > at kafka.log.LogManager.createLog(LogManager.scala:159)
> > > > > > > > > at
> kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > > > > > > at
> > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > > > > > > at
> > > > > > >
> > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > > > > > > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > > > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > > > > > > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > > > > > > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > > > > > > > at java.lang.Thread.run(Thread.java:679)
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to