This sounds like Kafka not being entirely robust to disk corruption, which seems entirely possible and normal. I'd simply delete that log file and let a replica replay catch it up at broker bootup.
Trying to guard against all possible disk corruption bugs sounds very difficult to me, it seems better to let the operator handle corruption on a case by case basis. On Tue, Sep 6, 2016 at 7:14 AM, Jaikiran Pai <jai.forums2...@gmail.com> wrote: > I'm not from the Kafka dev team so I won't be able to comment whether this > is an expected way to fail or if this needs to be handled in a more > cleaner/robust manner (at least very least probably a better exception > message). Since you have put in efforts to write a test case and narrow it > down to this specific flow, maybe you can send a mail to their dev mailing > list and/or maybe create a JIRA to report this. > > -Jaikiran > > > On Tuesday 30 August 2016 12:07 PM, Gaurav Agarwal wrote: > >> Kafka version: 0.10.0 >> >> Exception Trace >> -------------------- >> java.util.NoSuchElementException >> at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:37) >> at kafka.log.LogSegment.recover(LogSegment.scala:189) >> at kafka.log.Log.recoverLog(Log.scala:268) >> at kafka.log.Log.loadSegments(Log.scala:243) >> at kafka.log.Log.<init>(Log.scala:101) >> at kafka.log.LogTest.testCorruptLog(LogTest.scala:830) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >> ssorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >> thodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:497) >> at >> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall( >> FrameworkMethod.java:50) >> at >> org.junit.internal.runners.model.ReflectiveCallable.run(Refl >> ectiveCallable.java:12) >> at >> org.junit.runners.model.FrameworkMethod.invokeExplosively(Fr >> ameworkMethod.java:47) >> at >> org.junit.internal.runners.statements.InvokeMethod.evaluate( >> InvokeMethod.java:17) >> at >> org.junit.internal.runners.statements.RunBefores.evaluate( >> RunBefores.java:26) >> at >> org.junit.internal.runners.statements.RunAfters.evaluate(Run >> Afters.java:27) >> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) >> at >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit >> 4ClassRunner.java:78) >> at >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit >> 4ClassRunner.java:57) >> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) >> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) >> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) >> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) >> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) >> at org.junit.runners.ParentRunner.run(ParentRunner.java:363) >> at org.junit.runner.JUnitCore.run(JUnitCore.java:137) >> at >> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs >> (JUnit4IdeaTestRunner.java:117) >> at >> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs >> (JUnit4IdeaTestRunner.java:42) >> at >> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsA >> ndStart(JUnitStarter.java:262) >> at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStart >> er.java:84) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >> ssorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >> thodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:497) >> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) >> >> Test Code (same exception trace is see in broker logs as well on prod >> machines with exactly the same lof files as given in this mini test) >> --------- >> >> val logProps = new Properties() >> logProps.put(LogConfig.MaxMessageBytesProp, 15 * 1024 * 1024: >> java.lang.Integer) >> val config = LogConfig(logProps) >> val cp = new File("/Users/gaurav/Downloads/corrupt/gaurav/kafka-logs/Topi >> c3-12") >> var log = new Log(cp, config, 0, time.scheduler, time >> >> >> On Tue, Aug 30, 2016 at 11:37 AM, Jaikiran Pai <jai.forums2...@gmail.com> >> wrote: >> >> Can you paste the entire exception stacktrace please? >>> >>> -Jaikiran >>> >>> On Tuesday 30 August 2016 11:23 AM, Gaurav Agarwal wrote: >>> >>> Hi there, just wanted to bump up the thread one more time to check if >>>> someone can point us in the right direction... This one was quite a >>>> serious >>>> failure that took down many of our kafka brokers.. >>>> >>>> On Sat, Aug 27, 2016 at 2:11 PM, Gaurav Agarwal < >>>> gauravagarw...@gmail.com >>>> wrote: >>>> >>>> Hi All, >>>> >>>>> We are facing a weird problem where Kafka broker fails to start due to >>>>> an >>>>> unhandled exception while 'recovering' a log segment. I have been able >>>>> to >>>>> isolate the problem to a single record and providing the details below: >>>>> >>>>> During Kafka restart, if index files are corrupted or they don't exist, >>>>> kafka broker is trying to 'recover' a LogSegment and rebuild the >>>>> indexes >>>>> - >>>>> LogSegment:recover() >>>>> I the main while loop here which iterates over the entries in the log: >>>>> while(iter.hasNext) { val entry = iter.next....}, I get an entry with >>>>> complete underlying byte buffer as follows: >>>>> >>>>> [82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0, -59, -126, 83, 78, >>>>> 65, >>>>> 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79, -58, 1, 0, 0, 25, >>>>> 1, >>>>> 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49, 48, 48, 48, 48, >>>>> 58, >>>>> 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48, 0, 0, 0, -102, >>>>> 10, >>>>> 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109, 111, 100, 101, >>>>> 108, >>>>> 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46, 97, 100, 100, 46, >>>>> 114, >>>>> 101, 102, 101, 114, 101, 110, 99, 101, 16, -120, -115, -16, -64, -22, >>>>> 42, >>>>> 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97, 109, 115, 46, 115, >>>>> 116, >>>>> 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99, 101, 110, 116, 101, >>>>> 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115, 116, 77, 5, 11, >>>>> 8, >>>>> 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50, 53, 52, 57, >>>>> 50, >>>>> 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, >>>>> 56, >>>>> 48, 72] >>>>> >>>>> A toString() on this entry yields: >>>>> >>>>> *MessageAndOffset(Message(magic = 0, attributes = 2, crc = 1377740251, >>>>> key >>>>> = null, payload = java.nio.HeapByteBuffer[pos=0 lim=197 >>>>> cap=197]),4449011)* >>>>> >>>>> It appears that this record is corrupt and deserializing/decompressing >>>>> it >>>>> causes exceptions which are unhandled. Specifically in 0.10.0 version >>>>> this >>>>> calls fails with NoSuchElementException >>>>> >>>>> ByteBufferMessageSet.deepIterator(entry).next().offset >>>>> >>>>> Note: This message was written to disk using* kafka 0.10.0 broker >>>>> running >>>>> snappy jar version 1.1.1.7* (which is known to have some read time >>>>> bugs). >>>>> >>>>> The log file itself is 512MB large and this message appears at around >>>>> 4MB >>>>> in the file. >>>>> >>>>> We have upgraded snappy; but should this condition be handled >>>>> correctly? >>>>> What is the correct behavior here? Should the exception be handled and >>>>> log >>>>> file be truncated? At the moment this causes kafka to completely crash >>>>> with >>>>> no recovery path except of deleting the bad data file manually and then >>>>> starting kafka. >>>>> >>>>> -- >>>>> >>>>> cheers, >>>>> >>>>> gaurav >>>>> >>>>> >>>>> A test case to repro the crash >>>>> >>>>> @Test >>>>> >>>>> def testCorruptLog() { >>>>> >>>>> val buf = Array[Byte](82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, >>>>> 0, >>>>> -59, -126, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, >>>>> -79, >>>>> -58, 1, 0, 0, 25, 1, 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, >>>>> 49, >>>>> 48, 48, 48, 48, 58, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, >>>>> 48, >>>>> 0, 0, 0, -102, 10, 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, >>>>> 109, >>>>> 111, 100, 101, 108, 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, >>>>> 46, >>>>> 97, 100, 100, 46, 114, 101, 102, 101, 114, 101, 110, 99, 101, 16, -120, >>>>> -115, -16, -64, -22, 42, 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, >>>>> 97, >>>>> 109, 115, 46, 115, 116, 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, >>>>> 99, >>>>> 101, 110, 116, 101, 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, >>>>> 115, >>>>> 116, 77, 5, 11, 8, 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, >>>>> 50, >>>>> 53, 52, 57, 50, 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, >>>>> 52, >>>>> 52, 54, 48, 56, 48, 72); >>>>> >>>>> val msg = new Message(ByteBuffer.wrap(buf), None, None) >>>>> >>>>> val entry = new MessageAndOffset(msg, 4449011L) >>>>> >>>>> val deepIterator: Iterator[MessageAndOffset] = >>>>> ByteBufferMessageSet.deepIterator(entry) >>>>> >>>>> deepIterator.next().offset >>>>> >>>>> } >>>>> >>>>> >>>>> >