Jun: Dropping packages in TCP is an issue of OS/JVM, but it can also cause some kafka issue! For example, the topic of the message is ax, but it can change to a in broker because the some packages is drop, so the log directory should be like a-0,a-1, a-2 and so on ,but file.mkdir() create log directory like a. Seems some bugs in file.mkdir() of LogManager.createlog. If you shutdown the broker and restart it. The the broker will report the exception like this:
[2012-07-28 12:43:44,565] INFO Loading log 'a' (kafka.log.LogManager) [2012-07-28 12:43:44,574] FATAL Fatal error during KafkaServerStable startup. Prepare to shutdown (kafka.server.KafkaServerStartable) java.lang.StringIndexOutOfBoundsException: String index out of range: -1 at java.lang.String.substring(String.java:1949) at kafka.utils.Utils$.getTopicPartition(Utils.scala:558) at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71) at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) at kafka.log.LogManager.<init>(LogManager.scala:65) at kafka.server.KafkaServer.startup(KafkaServer.scala:58) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) at kafka.Kafka$.main(Kafka.scala:50) at kafka.Kafka.main(Kafka.scala) 2012/7/28 Jun Rao <jun...@gmail.com> > Jian, > > I am not sure if I understand this completely. Dropping packages in TCP > shouldn't cause corruption in the TCP buffer, right? Is this an issue in > Kafka or OS/JVM? > > Thanks, > > Jun > > On Fri, Jul 27, 2012 at 8:29 PM, jjian fan <xiaofanhad...@gmail.com> > wrote: > > > Jun: > > Yes, if the socket server can't handle the package quickly, tcp protocol > > will drop some network package until the buffer is overflow, the > corrupted > > messages is also appear on this situtation! I run a systemtap script to > > find the package droping ,also you can type " cat /proc/net/sockstat" to > > see the tcp memory increase. I debug the whole kafka source code to find > > the bug in file.mkdir() of LogManager.createlog. > > > > JIan Fan > > > > 2012/7/27 Jun Rao <jun...@gmail.com> > > > > > Thanks for the finding. Are you saying that this problem is caused by > the > > > buffering in Kafka socket server? How did you figure that out? Is this > > > problem exposed by the same test that caused the corrupted messages in > > the > > > broker? > > > > > > Thanks, > > > > > > Jun > > > > > > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan <xiaofanhad...@gmail.com> > > > wrote: > > > > > > > In high cocurrent environment, the tcp server will drop some > > package > > > > when the tcp buffer is over. Then LogManager.createlog will create > some > > > > no-exists topic log. But one thing is very strange, the log directory > > > > should be like a-0,a-1, a-2 and so on ,but file.mkdir() create log > > > > directory like a. Seems some bug in file.mkdir() of > > LogManager.createlog. > > > > > > > > the exception message is > > > > > > > > [2012-07-27 17:08:00,559] INFO create directory /data/kafka/axx-0 > > > > (kafka.log.LogManager) > > > > [2012-07-27 17:08:00,561] ERROR Error processing MultiProducerRequest > > on > > > > axx:0 (kafka.server.KafkaRequestHandlers) > > > > java.io.FileNotFoundException: > > > /data/kafka/axx-0/00000000000000000000.kafka > > > > (Is a directory) > > > > at java.io.RandomAccessFile.open(Native Method) > > > > at java.io.RandomAccessFile.<init>(RandomAccessFile.java:233) > > > > at kafka.utils.Utils$.openChannel(Utils.scala:324) > > > > at kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75) > > > > at kafka.log.Log.loadSegments(Log.scala:144) > > > > at kafka.log.Log.<init>(Log.scala:116) > > > > at kafka.log.LogManager.createLog(LogManager.scala:159) > > > > at kafka.log.LogManager.getOrCreateLog(LogManager.scala:214) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > > > > at > > > > > > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > > > at > > > > > > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > > > at > > > > > > > > > > > > > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > > > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > > > > at > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > > > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > > > > at kafka.network.Processor.handle(SocketServer.scala:296) > > > > at kafka.network.Processor.read(SocketServer.scala:319) > > > > at kafka.network.Processor.run(SocketServer.scala:214) > > > > at java.lang.Thread.run(Thread.java:679) > > > > > > > > > >