Jian, All log directories in kafka are created by LogManager.createLog(). As you can see, the directory always has the form of topic-partitionId. So, it's not clear how a directory of "a" can be created in your case. I will try to rerun your test and see if it can be reproduced.
Thanks, Jun On Sat, Jul 28, 2012 at 7:35 PM, jjian fan <xiaofanhad...@gmail.com> wrote: > Jay: > > You can try to send 600 thousand message per second to the broker, you > can find the tcp will drop packages, so sometimes the topic of ax will be > a. I don't mean to slove the tcp problem from application level, I just > find there are myabe a bug in file.mkdir() of LogManager.createlog. It will > infect the kafka useage. > > Thanks > Jian Fan > > 2012/7/29 Jay Kreps <jay.kr...@gmail.com> > > > Hmm, that is not my understanding of TCP. TCP is a reliable protocol so > it > > is supposed to either deliver packets in order or timeout retrying. In > the > > case of the topic name, that is a size-delimited string, there should be > no > > way for it to drop a single byte in the middle of the request like that. > If > > that is in fact happening, I don't think it is something we can hope to > > recover from at the application level... > > > > -Jay > > > > On Fri, Jul 27, 2012 at 9:45 PM, jjian fan <xiaofanhad...@gmail.com> > > wrote: > > > > > Jun: > > > Dropping packages in TCP is an issue of OS/JVM, but it can also > cause > > > some kafka issue! > > > For example, the topic of the message is ax, but it can change to a > in > > > broker because the some packages is drop, so the log directory > > > should be like a-0,a-1, a-2 and so on ,but file.mkdir() create log > > > directory like a. Seems some bugs in file.mkdir() of > > LogManager.createlog. > > > If you shutdown the broker and restart it. The the broker will > report > > > the exception like this: > > > > > > [2012-07-28 12:43:44,565] INFO Loading log 'a' (kafka.log.LogManager) > > > [2012-07-28 12:43:44,574] FATAL Fatal error during KafkaServerStable > > > startup. Prepare to shutdown (kafka.server.KafkaServerStartable) > > > java.lang.StringIndexOutOfBoundsException: String index out of range: > -1 > > > at java.lang.String.substring(String.java:1949) > > > at kafka.utils.Utils$.getTopicPartition(Utils.scala:558) > > > at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71) > > > at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65) > > > at > > > > > > > > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > > > at kafka.log.LogManager.<init>(LogManager.scala:65) > > > at kafka.server.KafkaServer.startup(KafkaServer.scala:58) > > > at > > > > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) > > > at kafka.Kafka$.main(Kafka.scala:50) > > > at kafka.Kafka.main(Kafka.scala) > > > > > > > > > 2012/7/28 Jun Rao <jun...@gmail.com> > > > > > > > Jian, > > > > > > > > I am not sure if I understand this completely. Dropping packages in > TCP > > > > shouldn't cause corruption in the TCP buffer, right? Is this an issue > > in > > > > Kafka or OS/JVM? > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > On Fri, Jul 27, 2012 at 8:29 PM, jjian fan <xiaofanhad...@gmail.com> > > > > wrote: > > > > > > > > > Jun: > > > > > Yes, if the socket server can't handle the package quickly, tcp > > > protocol > > > > > will drop some network package until the buffer is overflow, the > > > > corrupted > > > > > messages is also appear on this situtation! I run a systemtap > script > > > to > > > > > find the package droping ,also you can type " cat > /proc/net/sockstat" > > > to > > > > > see the tcp memory increase. I debug the whole kafka source code > to > > > find > > > > > the bug in file.mkdir() of LogManager.createlog. > > > > > > > > > > JIan Fan > > > > > > > > > > 2012/7/27 Jun Rao <jun...@gmail.com> > > > > > > > > > > > Thanks for the finding. Are you saying that this problem is > caused > > by > > > > the > > > > > > buffering in Kafka socket server? How did you figure that out? Is > > > this > > > > > > problem exposed by the same test that caused the corrupted > messages > > > in > > > > > the > > > > > > broker? > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jun > > > > > > > > > > > > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan < > > xiaofanhad...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > In high cocurrent environment, the tcp server will drop > some > > > > > package > > > > > > > when the tcp buffer is over. Then LogManager.createlog will > > create > > > > some > > > > > > > no-exists topic log. But one thing is very strange, the log > > > directory > > > > > > > should be like a-0,a-1, a-2 and so on ,but file.mkdir() create > > log > > > > > > > directory like a. Seems some bug in file.mkdir() of > > > > > LogManager.createlog. > > > > > > > > > > > > > > the exception message is > > > > > > > > > > > > > > [2012-07-27 17:08:00,559] INFO create directory > /data/kafka/axx-0 > > > > > > > (kafka.log.LogManager) > > > > > > > [2012-07-27 17:08:00,561] ERROR Error processing > > > MultiProducerRequest > > > > > on > > > > > > > axx:0 (kafka.server.KafkaRequestHandlers) > > > > > > > java.io.FileNotFoundException: > > > > > > /data/kafka/axx-0/00000000000000000000.kafka > > > > > > > (Is a directory) > > > > > > > at java.io.RandomAccessFile.open(Native Method) > > > > > > > at java.io.RandomAccessFile.<init>(RandomAccessFile.java:233) > > > > > > > at kafka.utils.Utils$.openChannel(Utils.scala:324) > > > > > > > at kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75) > > > > > > > at kafka.log.Log.loadSegments(Log.scala:144) > > > > > > > at kafka.log.Log.<init>(Log.scala:116) > > > > > > > at kafka.log.LogManager.createLog(LogManager.scala:159) > > > > > > > at kafka.log.LogManager.getOrCreateLog(LogManager.scala:214) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > > > > > > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > > > > > > > at > > > > > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > > > > > > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > > > > > > > at kafka.network.Processor.handle(SocketServer.scala:296) > > > > > > > at kafka.network.Processor.read(SocketServer.scala:319) > > > > > > > at kafka.network.Processor.run(SocketServer.scala:214) > > > > > > > at java.lang.Thread.run(Thread.java:679) > > > > > > > > > > > > > > > > > > > > > > > > > > > >