1. I don't think this is something kafka can do. We never rename directories once we create them. I'm a little at a loss as to how/why the os would do it either, though. 2. Can you check if it ran recovery or not (you would see a bunch of messages about recovering each topic). This process goes through each message sequentially and checks the CRC. If that did happen it is unlikely that there was a problem in the log itself.
-Jay On Tue, Jul 16, 2013 at 3:04 PM, Blake Smith <blake.sm...@tempo-db.com>wrote: > Thanks for the response Jay, > > 1. Do you think the corrupt partition directory names is happening at the > kafka level, or the file system level? If it's at the file system level, > perhaps we can investigate either changing FS options or selecting a > different filesystem for the log files. Is there a recommended filesystem / > filesystem options? > > 2. I'm a little confused about terminology, but from what I observed the > disk corruption was in the last segment for the partition. At the time I > investigated last segment file for the bad partition was ~300mb large and > the corrupt message was around around 288mb byte offset (Our segment files > seem to get rotated at 512mb, which I believe is the default). When the > server came back up, the corrupt last segment file was still being written > to beyond the corrupt message. Is this the behavior you're describing? > > Blake > > > On Tue, Jul 16, 2013 at 3:29 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > 1. The corrupt directory names is not something we can really handle. The > > names have to work with zookeeper so if weird characters get inserted I > > think the best we can do is give an error. I believe we have fixed the 0 > > byte file problem in 0.8. > > 2. The assumption for log recovery is that data that has been flushed to > > disk is stable, but data which has not been flushed can be in any state > > (garbage bytes, missing, partial writes, etc). Recovery runs only on the > > last segment of unflushed data (e.g. the file with the highest number) > and > > only in the case of an unclean shutdown. This is basically an > > optimization--if we recover all the log on startup startup can take hours > > for large logs (e.g. 5TB at say 10MB/sec). From what you are saying it > > sounds like you had disk corruption prior to the last segment--in other > > words flushed data became corrupt. This is not handled in 0.7. In 0.8 you > > would have the option of just deleting the problematic data and restoring > > from replicas. > > > > -Jay > > > > > > On Tue, Jul 16, 2013 at 1:10 PM, Blake Smith <blake.sm...@tempo-db.com > > >wrote: > > > > > Hi everyone, > > > > > > Last week, one of our production Kafka 0.7.1 servers had a hardware > > failure > > > that resulted in an unclean restart. When the server came back up 5 > > minutes > > > later, there were two topic corruption problems that we had to handle > to > > > get the pipeline working again. > > > > > > 1. The kafka log directory had malformed partition directory names [1]. > > The > > > partition directories were shortened names of our topic naming scheme. > > > Inside some of these directories was a 0 byte .kafka file. This > prevented > > > the kafka server process from starting. To resolve this, we manually > > > removed the malformed topic directories and the empty log files. > > > > > > Is this a known issue? If so, has it been addressed in 0.8? This seems > > like > > > a bug. > > > > > > 2. One of the partitions had a corrupt log message (crc32 check > failure) > > > that prevented consumers from advancing on that partition [2]. To > resolve > > > this issue we had to hexdump the log file seeking to the byte offset > > > provided by the consumer exception and remove the corrupt message at > the > > > frame boundary. After removing the corrupt message, we created a task > to > > > parse the log file and put it back into our producer / consumer > pipeline. > > > The InvalidMessageException never bubbled to our consumer thread, > > > preventing us from handling this error without manual manipulation of > the > > > log files (our consumer code was catching InvalidMessageException, but > it > > > never reached that point). > > > > > > Is a corrupt log file a state that Kafka is supposed to handle, or is > our > > > consumer code supposed to handle it? I'm confused about how we can > avoid > > > manually cleaning up log files with a hex editor if we get into this > > state > > > again. > > > > > > Thanks! > > > > > > Blake > > > > > > [1] Corrupt partition directories in the broker data directories: > > > > > > 0 [main] INFO kafka.utils.Utils$ - The number of partitions for > > > topic datapoints-write : 50 > > > 1 [main] INFO kafka.utils.Utils$ - The number of partitions for > > > topic datapoints-increment : 50 > > > 2 [main] INFO kafka.utils.Utils$ - The number of partitions for > > > topic series-index : 5 > > > 5 [main] INFO kafka.server.KafkaServer - Starting Kafka server... > > > 17 [main] INFO kafka.log.LogManager - Loading log > > 'datapoints-write-18' > > > 32 [main] INFO kafka.log.Log - Loading the last segment > > > /var/kafka/datapoints-write-18/00000000095029068903.kafka in mutable > > > mode, recovery false > > > 46 [main] INFO kafka.log.LogManager - Loading log > > > 'datapoints-increment-15' > > > 46 [main] INFO kafka.log.Log - Loading the last segment > > > /var/kafka/datapoints-increment-15/00000000000000000000.kafka in > > > mutable mode, recovery false > > > 47 [main] INFO kafka.log.LogManager - Loading log > > > 'datapoints-increment-25' > > > 47 [main] INFO kafka.log.Log - Loading the last segment > > > /var/kafka/datapoints-increment-25/00000000000000000000.kafka in > > > mutable mode, recovery false > > > 47 [main] INFO kafka.log.LogManager - Loading log > > 'datapoints-write-27' > > > 48 [main] INFO kafka.log.Log - Loading the last segment > > > /var/kafka/datapoints-write-27/00000000093417873484.kafka in mutable > > > mode, recovery false > > > 48 [main] INFO kafka.log.LogManager - Loading log > > 'datapoints-write-9' > > > 48 [main] INFO kafka.log.Log - Loading the last segment > > > /var/kafka/datapoints-write-9/00000000092880448733.kafka in mutable > > > mode, recovery false > > > 49 [main] INFO kafka.log.LogManager - Loading log > > > 'datapoints-increment-8' > > > 49 [main] INFO kafka.log.Log - Loading the last segment > > > /var/kafka/datapoints-increment-8/00000000000000000000.kafka in > > > mutable mode, recovery false > > > 49 [main] INFO kafka.log.LogManager - Loading log > > > 'datapoints-increment-29' > > > 49 [main] INFO kafka.log.Log - Loading the last segment > > > /var/kafka/datapoints-increment-29/00000000008053064625.kafka in > > > mutable mode, recovery false > > > 50 [main] INFO kafka.log.LogManager - Loading log > > > 'datapoints-increment-34' > > > 50 [main] INFO kafka.log.Log - Loading the last segment > > > /var/kafka/datapoints-increment-34/00000000000000000000.kafka in > > > mutable mode, recovery false > > > 50 [main] INFO kafka.log.LogManager - Loading log > > > 'datapoints-increment-19' > > > 51 [main] INFO kafka.log.Log - Loading the last segment > > > /var/kafka/datapoints-increment-19/00000000000000000000.kafka in > > > mutable mode, recovery false > > > 51 [main] INFO kafka.log.LogManager - Loading log > > > 'datapoints-increment-16' > > > 51 [main] INFO kafka.log.Log - Loading the last segment > > > /var/kafka/datapoints-increment-16/00000000000000000000.kafka in > > > mutable mode, recovery false > > > 51 [main] INFO kafka.log.LogManager - Loading log > > > 'datapoints-increment-27' > > > 52 [main] INFO kafka.log.Log - Loading the last segment > > > /var/kafka/datapoints-increment-27/00000000000000000000.kafka in > > > mutable mode, recovery false > > > 52 [main] INFO kafka.log.LogManager - Loading log > > 'datapoints-write-34' > > > 52 [main] INFO kafka.log.Log - Loading the last segment > > > /var/kafka/datapoints-write-34/00000000092880904403.kafka in mutable > > > mode, recovery false > > > 53 [main] INFO kafka.log.LogManager - Loading log > > 'datapoints-write-21' > > > 53 [main] INFO kafka.log.Log - Loading the last segment > > > /var/kafka/datapoints-write-21/00000000095028969469.kafka in mutable > > > mode, recovery false > > > 53 [main] INFO kafka.log.LogManager - Loading log > > 'datapoints-write-48' > > > 54 [main] INFO kafka.log.Log - Loading the last segment > > > /var/kafka/datapoints-write-48/00000000094491048876.kafka in mutable > > > mode, recovery false > > > 54 [main] INFO kafka.log.LogManager - Loading log > > > 'datapoints-increment-40' > > > 55 [main] INFO kafka.log.Log - Loading the last segment > > > /var/kafka/datapoints-increment-40/00000000000000000000.kafka in > > > mutable mode, recovery false > > > 55 [main] INFO kafka.log.LogManager - Loading log > > > 'datapoints-increment-6' > > > 55 [main] INFO kafka.log.Log - Loading the last segment > > > /var/kafka/datapoints-increment-6/00000000000000000000.kafka in > > > mutable mode, recovery false > > > 56 [main] INFO kafka.log.LogManager - Loading log > > 'datapoints-write-25' > > > 56 [main] INFO kafka.log.Log - Loading the last segment > > > /var/kafka/datapoints-write-25/00000000093417394574.kafka in mutable > > > mode, recovery false > > > 56 [main] INFO kafka.log.LogManager - Loading log > > 'datapoints-write-33' > > > 57 [main] INFO kafka.log.Log - Loading the last segment > > > /var/kafka/datapoints-write-33/00000000093417473922.kafka in mutable > > > mode, recovery false > > > 57 [main] INFO kafka.log.LogManager - Loading log > > 'datapoints-write-35' > > > 58 [main] INFO kafka.log.Log - Loading the last segment > > > /var/kafka/datapoints-write-35/00000000092880891040.kafka in mutable > > > mode, recovery false > > > 58 [main] INFO kafka.log.LogManager - Loading log 'datapoints-wr' > > > 58 [main] INFO kafka.log.Log - Loading the last segment > > > /var/kafka/datapoints-wr/00000000000000000000.kafka in mutable mode, > > > recovery false > > > 60 [main] FATAL kafka.server.KafkaServerStartable - Fatal error > > > during KafkaServerStable startup. Prepare to shutdown > > > java.lang.NumberFormatException: For input string: "wr" > > > at > > > > > > java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) > > > at java.lang.Integer.parseInt(Integer.java:481) > > > at java.lang.Integer.parseInt(Integer.java:514) > > > at > > > scala.collection.immutable.StringLike$class.toInt(StringLike.scala:207) > > > at > scala.collection.immutable.StringOps.toInt(StringOps.scala:31) > > > at kafka.utils.Utils$.getTopicPartition(Utils.scala:558) > > > at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71) > > > at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65) > > > at > > > > > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > > > at kafka.log.LogManager.<init>(LogManager.scala:65) > > > at kafka.server.KafkaServer.startup(KafkaServer.scala:58) > > > at > > > > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) > > > at kafka.Kafka$.main(Kafka.scala:50) > > > at kafka.Kafka.main(Kafka.scala) > > > 61 [main] INFO kafka.server.KafkaServer - Shutting down Kafka > server > > > 62 [main] INFO kafka.utils.KafkaScheduler - shutdown scheduler > > > kafka-logcleaner- > > > 62 [main] INFO kafka.server.KafkaServer - Kafka server shut down > > > completed > > > > > > [blake@kafka03 kafka]$ ls > > > dat datapoints-increment-40 datapoints-write-26 > > > datapoints-increment-0 datapoints-increment-41 datapoints-write-27 > > > datapoints-increment-1 datapoints-increment-42 datapoints-write-28 > > > datapoints-increment-10 datapoints-increment-43 datapoints-write-29 > > > datapoints-increment-11 datapoints-increment-44 datapoints-write-3 > > > datapoints-increment-12 datapoints-increment-45 datapoints-write-30 > > > datapoints-increment-13 datapoints-increment-46 datapoints-write-31 > > > datapoints-increment-14 datapoints-increment-47 datapoints-write-32 > > > datapoints-increment-15 datapoints-increment-48 datapoints-write-33 > > > datapoints-increment-16 datapoints-increment-49 datapoints-write-34 > > > datapoints-increment-17 datapoints-increment-5 datapoints-write-35 > > > datapoints-increment-18 datapoints-increment-6 datapoints-write-36 > > > datapoints-increment-19 datapoints-increment-7 datapoints-write-37 > > > datapoints-increment-2 datapoints-increment-8 datapoints-write-38 > > > datapoints-increment-20 datapoints-increment-9 datapoints-write-39 > > > datapoints-increment-21 datapoints-wr datapoints-write-4 > > > datapoints-increment-22 datapoints-writ datapoints-write-40 > > > datapoints-increment-23 datapoints-write-0 datapoints-write-41 > > > datapoints-increment-24 datapoints-write-1 datapoints-write-42 > > > datapoints-increment-25 datapoints-write-10 datapoints-write-43 > > > datapoints-increment-26 datapoints-write-11 datapoints-write-44 > > > datapoints-increment-27 datapoints-write-12 datapoints-write-45 > > > datapoints-increment-28 datapoints-write-13 datapoints-write-46 > > > datapoints-increment-29 datapoints-write-14 datapoints-write-47 > > > datapoints-increment-3 datapoints-write-15 datapoints-write-48 > > > datapoints-increment-30 datapoints-write-16 datapoints-write-49 > > > datapoints-increment-31 datapoints-write-17 datapoints-write-5 > > > datapoints-increment-32 datapoints-write-18 datapoints-write-6 > > > datapoints-increment-33 datapoints-write-19 datapoints-write-7 > > > datapoints-increment-34 datapoints-write-2 datapoints-write-8 > > > datapoints-increment-35 datapoints-write-20 datapoints-write-9 > > > datapoints-increment-36 datapoints-write-21 series-index-0 > > > datapoints-increment-37 datapoints-write-22 series-index-1 > > > datapoints-increment-38 datapoints-write-23 series-index-2 > > > datapoints-increment-39 datapoints-write-24 series-index-3 > > > datapoints-increment-4 datapoints-write-25 series-index-4 > > > [blake@kafka03 kafka]$ cd datapoints-wr > > > [blake@kafka03 datapoints-wr]$ ls -lah > > > total 8.0K > > > drwxr-xr-x 2 kafka kafka 4.0K Jul 11 18:51 . > > > drwxr-xr-x 110 kafka kafka 4.0K Jul 11 19:05 .. > > > -rw-r--r-- 1 kafka kafka 0 Jul 11 18:51 00000000000000000000.kafka > > > > > > > > > > > > [2] In our consumer logs: > > > > > > 013-07-11 21:33:01,136 61867 [FetchRunnable-0] ERROR > > > kafka.consumer.FetcherRunnable - error in FetcherRunnable > > > kafka.message.InvalidMessageException: message is invalid, compression > > > codec: NoCompressionCodec size: 79 curr offset: 93706191248 init > > > offset: 93706191165 > > > at > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > > > at > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) > > > at > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > > > at > > > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > > at > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > > > at > > > > > > kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64) > > > at > > > > > > kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59) > > > at > > > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57) > > > at > > > > > > kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79) > > > at > > > > > > kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65) > > > at > > > > > > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) > > > at scala.collection.immutable.List.foreach(List.scala:76) > > > at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65) > > > > > >