Thanks for the finding. Are you saying that this problem is caused by the buffering in Kafka socket server? How did you figure that out? Is this problem exposed by the same test that caused the corrupted messages in the broker?
Thanks, Jun On Fri, Jul 27, 2012 at 2:16 AM, jjian fan <xiaofanhad...@gmail.com> wrote: > In high cocurrent environment, the tcp server will drop some package > when the tcp buffer is over. Then LogManager.createlog will create some > no-exists topic log. But one thing is very strange, the log directory > should be like a-0,a-1, a-2 and so on ,but file.mkdir() create log > directory like a. Seems some bug in file.mkdir() of LogManager.createlog. > > the exception message is > > [2012-07-27 17:08:00,559] INFO create directory /data/kafka/axx-0 > (kafka.log.LogManager) > [2012-07-27 17:08:00,561] ERROR Error processing MultiProducerRequest on > axx:0 (kafka.server.KafkaRequestHandlers) > java.io.FileNotFoundException: /data/kafka/axx-0/00000000000000000000.kafka > (Is a directory) > at java.io.RandomAccessFile.open(Native Method) > at java.io.RandomAccessFile.<init>(RandomAccessFile.java:233) > at kafka.utils.Utils$.openChannel(Utils.scala:324) > at kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75) > at kafka.log.Log.loadSegments(Log.scala:144) > at kafka.log.Log.<init>(Log.scala:116) > at kafka.log.LogManager.createLog(LogManager.scala:159) > at kafka.log.LogManager.getOrCreateLog(LogManager.scala:214) > at > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74) > at > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > at > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) > at > > kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) > at > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > at > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > at kafka.network.Processor.handle(SocketServer.scala:296) > at kafka.network.Processor.read(SocketServer.scala:319) > at kafka.network.Processor.run(SocketServer.scala:214) > at java.lang.Thread.run(Thread.java:679) >