1. I don't think this is something kafka can do. We never rename
directories once we create them. I'm a little at a loss as to how/why the
os would do it either, though.
2. Can you check if it ran recovery or not (you would see a bunch of
messages about recovering each topic). This process goes through each
message sequentially and checks the CRC. If that did happen it is unlikely
that there was a problem in the log itself.

-Jay


On Tue, Jul 16, 2013 at 3:04 PM, Blake Smith <blake.sm...@tempo-db.com>wrote:

> Thanks for the response Jay,
>
> 1. Do you think the corrupt partition directory names is happening at the
> kafka level, or the file system level? If it's at the file system level,
> perhaps we can investigate either changing FS options or selecting a
> different filesystem for the log files. Is there a recommended filesystem /
> filesystem options?
>
> 2. I'm a little confused about terminology, but from what I observed the
> disk corruption was in the last segment for the partition. At the time I
> investigated last segment file for the bad partition was ~300mb large and
> the corrupt message was around around 288mb byte offset (Our segment files
> seem to get rotated at 512mb, which I believe is the default). When the
> server came back up, the corrupt last segment file was still being written
> to beyond the corrupt message. Is this the behavior you're describing?
>
> Blake
>
>
> On Tue, Jul 16, 2013 at 3:29 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>
> > 1. The corrupt directory names is not something we can really handle. The
> > names have to work with zookeeper so if weird characters get inserted I
> > think the best we can do is give an error. I believe we have fixed the 0
> > byte file problem in 0.8.
> > 2. The assumption for log recovery is that data that has been flushed to
> > disk is stable, but data which has not been flushed can be in any state
> > (garbage bytes, missing, partial writes, etc). Recovery runs only on the
> > last segment of unflushed data (e.g. the file with the highest number)
> and
> > only in the case of an unclean shutdown. This is basically an
> > optimization--if we recover all the log on startup startup can take hours
> > for large logs (e.g. 5TB at say 10MB/sec). From what you are saying it
> > sounds like you had disk corruption prior to the last segment--in other
> > words flushed data became corrupt. This is not handled in 0.7. In 0.8 you
> > would have the option of just deleting the problematic data and restoring
> > from replicas.
> >
> > -Jay
> >
> >
> > On Tue, Jul 16, 2013 at 1:10 PM, Blake Smith <blake.sm...@tempo-db.com
> > >wrote:
> >
> > > Hi everyone,
> > >
> > > Last week, one of our production Kafka 0.7.1 servers had a hardware
> > failure
> > > that resulted in an unclean restart. When the server came back up 5
> > minutes
> > > later, there were two topic corruption problems that we had to handle
> to
> > > get the pipeline working again.
> > >
> > > 1. The kafka log directory had malformed partition directory names [1].
> > The
> > > partition directories were shortened names of our topic naming scheme.
> > > Inside some of these directories was a 0 byte .kafka file. This
> prevented
> > > the kafka server process from starting. To resolve this, we manually
> > > removed the malformed topic directories and the empty log files.
> > >
> > > Is this a known issue? If so, has it been addressed in 0.8? This seems
> > like
> > > a bug.
> > >
> > > 2. One of the partitions had a corrupt log message (crc32 check
> failure)
> > > that prevented consumers from advancing on that partition [2]. To
> resolve
> > > this issue we had to hexdump the log file seeking to the byte offset
> > > provided by the consumer exception and remove the corrupt message at
> the
> > > frame boundary. After removing the corrupt message, we created a task
> to
> > > parse the log file and put it back into our producer / consumer
> pipeline.
> > > The InvalidMessageException never bubbled to our consumer thread,
> > > preventing us from handling this error without manual manipulation of
> the
> > > log files (our consumer code was catching InvalidMessageException, but
> it
> > > never reached that point).
> > >
> > > Is a corrupt log file a state that Kafka is supposed to handle, or is
> our
> > > consumer code supposed to handle it? I'm confused about how we can
> avoid
> > > manually cleaning up log files with a hex editor if we get into this
> > state
> > > again.
> > >
> > > Thanks!
> > >
> > > Blake
> > >
> > > [1] Corrupt partition directories in the broker data directories:
> > >
> > > 0    [main] INFO  kafka.utils.Utils$  - The number of partitions for
> > > topic  datapoints-write : 50
> > > 1    [main] INFO  kafka.utils.Utils$  - The number of partitions for
> > > topic  datapoints-increment : 50
> > > 2    [main] INFO  kafka.utils.Utils$  - The number of partitions for
> > > topic  series-index : 5
> > > 5    [main] INFO  kafka.server.KafkaServer  - Starting Kafka server...
> > > 17   [main] INFO  kafka.log.LogManager  - Loading log
> > 'datapoints-write-18'
> > > 32   [main] INFO  kafka.log.Log  - Loading the last segment
> > > /var/kafka/datapoints-write-18/00000000095029068903.kafka in mutable
> > > mode, recovery false
> > > 46   [main] INFO  kafka.log.LogManager  - Loading log
> > > 'datapoints-increment-15'
> > > 46   [main] INFO  kafka.log.Log  - Loading the last segment
> > > /var/kafka/datapoints-increment-15/00000000000000000000.kafka in
> > > mutable mode, recovery false
> > > 47   [main] INFO  kafka.log.LogManager  - Loading log
> > > 'datapoints-increment-25'
> > > 47   [main] INFO  kafka.log.Log  - Loading the last segment
> > > /var/kafka/datapoints-increment-25/00000000000000000000.kafka in
> > > mutable mode, recovery false
> > > 47   [main] INFO  kafka.log.LogManager  - Loading log
> > 'datapoints-write-27'
> > > 48   [main] INFO  kafka.log.Log  - Loading the last segment
> > > /var/kafka/datapoints-write-27/00000000093417873484.kafka in mutable
> > > mode, recovery false
> > > 48   [main] INFO  kafka.log.LogManager  - Loading log
> > 'datapoints-write-9'
> > > 48   [main] INFO  kafka.log.Log  - Loading the last segment
> > > /var/kafka/datapoints-write-9/00000000092880448733.kafka in mutable
> > > mode, recovery false
> > > 49   [main] INFO  kafka.log.LogManager  - Loading log
> > > 'datapoints-increment-8'
> > > 49   [main] INFO  kafka.log.Log  - Loading the last segment
> > > /var/kafka/datapoints-increment-8/00000000000000000000.kafka in
> > > mutable mode, recovery false
> > > 49   [main] INFO  kafka.log.LogManager  - Loading log
> > > 'datapoints-increment-29'
> > > 49   [main] INFO  kafka.log.Log  - Loading the last segment
> > > /var/kafka/datapoints-increment-29/00000000008053064625.kafka in
> > > mutable mode, recovery false
> > > 50   [main] INFO  kafka.log.LogManager  - Loading log
> > > 'datapoints-increment-34'
> > > 50   [main] INFO  kafka.log.Log  - Loading the last segment
> > > /var/kafka/datapoints-increment-34/00000000000000000000.kafka in
> > > mutable mode, recovery false
> > > 50   [main] INFO  kafka.log.LogManager  - Loading log
> > > 'datapoints-increment-19'
> > > 51   [main] INFO  kafka.log.Log  - Loading the last segment
> > > /var/kafka/datapoints-increment-19/00000000000000000000.kafka in
> > > mutable mode, recovery false
> > > 51   [main] INFO  kafka.log.LogManager  - Loading log
> > > 'datapoints-increment-16'
> > > 51   [main] INFO  kafka.log.Log  - Loading the last segment
> > > /var/kafka/datapoints-increment-16/00000000000000000000.kafka in
> > > mutable mode, recovery false
> > > 51   [main] INFO  kafka.log.LogManager  - Loading log
> > > 'datapoints-increment-27'
> > > 52   [main] INFO  kafka.log.Log  - Loading the last segment
> > > /var/kafka/datapoints-increment-27/00000000000000000000.kafka in
> > > mutable mode, recovery false
> > > 52   [main] INFO  kafka.log.LogManager  - Loading log
> > 'datapoints-write-34'
> > > 52   [main] INFO  kafka.log.Log  - Loading the last segment
> > > /var/kafka/datapoints-write-34/00000000092880904403.kafka in mutable
> > > mode, recovery false
> > > 53   [main] INFO  kafka.log.LogManager  - Loading log
> > 'datapoints-write-21'
> > > 53   [main] INFO  kafka.log.Log  - Loading the last segment
> > > /var/kafka/datapoints-write-21/00000000095028969469.kafka in mutable
> > > mode, recovery false
> > > 53   [main] INFO  kafka.log.LogManager  - Loading log
> > 'datapoints-write-48'
> > > 54   [main] INFO  kafka.log.Log  - Loading the last segment
> > > /var/kafka/datapoints-write-48/00000000094491048876.kafka in mutable
> > > mode, recovery false
> > > 54   [main] INFO  kafka.log.LogManager  - Loading log
> > > 'datapoints-increment-40'
> > > 55   [main] INFO  kafka.log.Log  - Loading the last segment
> > > /var/kafka/datapoints-increment-40/00000000000000000000.kafka in
> > > mutable mode, recovery false
> > > 55   [main] INFO  kafka.log.LogManager  - Loading log
> > > 'datapoints-increment-6'
> > > 55   [main] INFO  kafka.log.Log  - Loading the last segment
> > > /var/kafka/datapoints-increment-6/00000000000000000000.kafka in
> > > mutable mode, recovery false
> > > 56   [main] INFO  kafka.log.LogManager  - Loading log
> > 'datapoints-write-25'
> > > 56   [main] INFO  kafka.log.Log  - Loading the last segment
> > > /var/kafka/datapoints-write-25/00000000093417394574.kafka in mutable
> > > mode, recovery false
> > > 56   [main] INFO  kafka.log.LogManager  - Loading log
> > 'datapoints-write-33'
> > > 57   [main] INFO  kafka.log.Log  - Loading the last segment
> > > /var/kafka/datapoints-write-33/00000000093417473922.kafka in mutable
> > > mode, recovery false
> > > 57   [main] INFO  kafka.log.LogManager  - Loading log
> > 'datapoints-write-35'
> > > 58   [main] INFO  kafka.log.Log  - Loading the last segment
> > > /var/kafka/datapoints-write-35/00000000092880891040.kafka in mutable
> > > mode, recovery false
> > > 58   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-wr'
> > > 58   [main] INFO  kafka.log.Log  - Loading the last segment
> > > /var/kafka/datapoints-wr/00000000000000000000.kafka in mutable mode,
> > > recovery false
> > > 60   [main] FATAL kafka.server.KafkaServerStartable  - Fatal error
> > > during KafkaServerStable startup. Prepare to shutdown
> > > java.lang.NumberFormatException: For input string: "wr"
> > >         at
> > >
> >
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> > >         at java.lang.Integer.parseInt(Integer.java:481)
> > >         at java.lang.Integer.parseInt(Integer.java:514)
> > >         at
> > > scala.collection.immutable.StringLike$class.toInt(StringLike.scala:207)
> > >         at
> scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
> > >         at kafka.utils.Utils$.getTopicPartition(Utils.scala:558)
> > >         at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71)
> > >         at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65)
> > >         at
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > >         at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > >         at kafka.log.LogManager.<init>(LogManager.scala:65)
> > >         at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
> > >         at
> > >
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
> > >         at kafka.Kafka$.main(Kafka.scala:50)
> > >         at kafka.Kafka.main(Kafka.scala)
> > > 61   [main] INFO  kafka.server.KafkaServer  - Shutting down Kafka
> server
> > > 62   [main] INFO  kafka.utils.KafkaScheduler  - shutdown scheduler
> > > kafka-logcleaner-
> > > 62   [main] INFO  kafka.server.KafkaServer  - Kafka server shut down
> > > completed
> > >
> > > [blake@kafka03 kafka]$ ls
> > > dat                      datapoints-increment-40  datapoints-write-26
> > > datapoints-increment-0   datapoints-increment-41  datapoints-write-27
> > > datapoints-increment-1   datapoints-increment-42  datapoints-write-28
> > > datapoints-increment-10  datapoints-increment-43  datapoints-write-29
> > > datapoints-increment-11  datapoints-increment-44  datapoints-write-3
> > > datapoints-increment-12  datapoints-increment-45  datapoints-write-30
> > > datapoints-increment-13  datapoints-increment-46  datapoints-write-31
> > > datapoints-increment-14  datapoints-increment-47  datapoints-write-32
> > > datapoints-increment-15  datapoints-increment-48  datapoints-write-33
> > > datapoints-increment-16  datapoints-increment-49  datapoints-write-34
> > > datapoints-increment-17  datapoints-increment-5   datapoints-write-35
> > > datapoints-increment-18  datapoints-increment-6   datapoints-write-36
> > > datapoints-increment-19  datapoints-increment-7   datapoints-write-37
> > > datapoints-increment-2   datapoints-increment-8   datapoints-write-38
> > > datapoints-increment-20  datapoints-increment-9   datapoints-write-39
> > > datapoints-increment-21  datapoints-wr            datapoints-write-4
> > > datapoints-increment-22  datapoints-writ          datapoints-write-40
> > > datapoints-increment-23  datapoints-write-0       datapoints-write-41
> > > datapoints-increment-24  datapoints-write-1       datapoints-write-42
> > > datapoints-increment-25  datapoints-write-10      datapoints-write-43
> > > datapoints-increment-26  datapoints-write-11      datapoints-write-44
> > > datapoints-increment-27  datapoints-write-12      datapoints-write-45
> > > datapoints-increment-28  datapoints-write-13      datapoints-write-46
> > > datapoints-increment-29  datapoints-write-14      datapoints-write-47
> > > datapoints-increment-3   datapoints-write-15      datapoints-write-48
> > > datapoints-increment-30  datapoints-write-16      datapoints-write-49
> > > datapoints-increment-31  datapoints-write-17      datapoints-write-5
> > > datapoints-increment-32  datapoints-write-18      datapoints-write-6
> > > datapoints-increment-33  datapoints-write-19      datapoints-write-7
> > > datapoints-increment-34  datapoints-write-2       datapoints-write-8
> > > datapoints-increment-35  datapoints-write-20      datapoints-write-9
> > > datapoints-increment-36  datapoints-write-21      series-index-0
> > > datapoints-increment-37  datapoints-write-22      series-index-1
> > > datapoints-increment-38  datapoints-write-23      series-index-2
> > > datapoints-increment-39  datapoints-write-24      series-index-3
> > > datapoints-increment-4   datapoints-write-25      series-index-4
> > > [blake@kafka03 kafka]$ cd datapoints-wr
> > > [blake@kafka03 datapoints-wr]$ ls -lah
> > > total 8.0K
> > > drwxr-xr-x   2 kafka kafka 4.0K Jul 11 18:51 .
> > > drwxr-xr-x 110 kafka kafka 4.0K Jul 11 19:05 ..
> > > -rw-r--r--   1 kafka kafka    0 Jul 11 18:51 00000000000000000000.kafka
> > >
> > >
> > >
> > > [2] In our consumer logs:
> > >
> > > 013-07-11 21:33:01,136 61867 [FetchRunnable-0] ERROR
> > > kafka.consumer.FetcherRunnable  - error in FetcherRunnable
> > > kafka.message.InvalidMessageException: message is invalid, compression
> > > codec: NoCompressionCodec size: 79 curr offset: 93706191248 init
> > > offset: 93706191165
> > >         at
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> > >         at
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
> > >         at
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> > >         at
> > >
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > >         at
> > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > >         at
> > >
> >
> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64)
> > >         at
> > >
> >
> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59)
> > >         at
> > > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57)
> > >         at
> > >
> >
> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79)
> > >         at
> > >
> >
> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65)
> > >         at
> > >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
> > >         at scala.collection.immutable.List.foreach(List.scala:76)
> > >         at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
> > >
> >
>

Reply via email to