Jian,

I am not sure if I understand this completely. Dropping packages in TCP
shouldn't cause corruption in the TCP buffer, right? Is this an issue in
Kafka or OS/JVM?

Thanks,

Jun

On Fri, Jul 27, 2012 at 8:29 PM, jjian fan <xiaofanhad...@gmail.com> wrote:

> Jun:
> Yes, if the socket server can't handle the package quickly, tcp protocol
> will drop some network package until the buffer is overflow,  the corrupted
> messages is also appear on this situtation!  I run a systemtap script to
> find the package droping ,also you can type " cat /proc/net/sockstat" to
> see the tcp memory increase.  I debug the whole kafka source code to find
> the bug in file.mkdir() of LogManager.createlog.
>
> JIan Fan
>
> 2012/7/27 Jun Rao <jun...@gmail.com>
>
> > Thanks for the finding. Are you saying that this problem is caused by the
> > buffering in Kafka socket server? How did you figure that out? Is this
> > problem exposed by the same test that caused the corrupted messages in
> the
> > broker?
> >
> > Thanks,
> >
> > Jun
> >
> > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan <xiaofanhad...@gmail.com>
> > wrote:
> >
> > >     In high cocurrent environment, the tcp server will drop some
> package
> > > when the tcp buffer is over. Then LogManager.createlog will create some
> > > no-exists topic log. But one thing is very strange, the log directory
> > > should be like a-0,a-1, a-2 and so on ,but file.mkdir() create log
> > > directory like a. Seems some bug in file.mkdir() of
> LogManager.createlog.
> > >
> > > the exception message is
> > >
> > > [2012-07-27 17:08:00,559] INFO create directory /data/kafka/axx-0
> > > (kafka.log.LogManager)
> > > [2012-07-27 17:08:00,561] ERROR Error processing MultiProducerRequest
> on
> > > axx:0 (kafka.server.KafkaRequestHandlers)
> > > java.io.FileNotFoundException:
> > /data/kafka/axx-0/00000000000000000000.kafka
> > > (Is a directory)
> > > at java.io.RandomAccessFile.open(Native Method)
> > > at java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
> > > at kafka.utils.Utils$.openChannel(Utils.scala:324)
> > > at kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
> > > at kafka.log.Log.loadSegments(Log.scala:144)
> > > at kafka.log.Log.<init>(Log.scala:116)
> > > at kafka.log.LogManager.createLog(LogManager.scala:159)
> > > at kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > at
> > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > at java.lang.Thread.run(Thread.java:679)
> > >
> >
>

Reply via email to