I'm not from the Kafka dev team so I won't be able to comment whether this is an expected way to fail or if this needs to be handled in a more cleaner/robust manner (at least very least probably a better exception message). Since you have put in efforts to write a test case and narrow it down to this specific flow, maybe you can send a mail to their dev mailing list and/or maybe create a JIRA to report this.

-Jaikiran

On Tuesday 30 August 2016 12:07 PM, Gaurav Agarwal wrote:
Kafka version: 0.10.0

Exception Trace
--------------------
java.util.NoSuchElementException
at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:37)
at kafka.log.LogSegment.recover(LogSegment.scala:189)
at kafka.log.Log.recoverLog(Log.scala:268)
at kafka.log.Log.loadSegments(Log.scala:243)
at kafka.log.Log.<init>(Log.scala:101)
at kafka.log.LogTest.testCorruptLog(LogTest.scala:830)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
at
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

Test Code (same exception trace is see in broker logs as well on prod
machines with exactly the same lof files as given in this mini test)
---------

val logProps = new Properties()
logProps.put(LogConfig.MaxMessageBytesProp, 15 * 1024 * 1024: java.lang.Integer)
val config = LogConfig(logProps)
val cp = new File("/Users/gaurav/Downloads/corrupt/gaurav/kafka-logs/Topic3-12")
var log = new Log(cp, config, 0, time.scheduler, time


On Tue, Aug 30, 2016 at 11:37 AM, Jaikiran Pai <jai.forums2...@gmail.com>
wrote:

Can you paste the entire exception stacktrace please?

-Jaikiran

On Tuesday 30 August 2016 11:23 AM, Gaurav Agarwal wrote:

Hi there, just wanted to bump up the thread one more time to check if
someone can point us in the right direction... This one was quite a
serious
failure that took down many of our kafka brokers..

On Sat, Aug 27, 2016 at 2:11 PM, Gaurav Agarwal <gauravagarw...@gmail.com
wrote:

Hi All,
We are facing a weird problem where Kafka broker fails to start due to an
unhandled exception while 'recovering' a log segment. I have been able to
isolate the problem to a single record and providing the details below:

During Kafka restart, if index files are corrupted or they don't exist,
kafka broker is trying to 'recover' a LogSegment and rebuild the indexes
-
LogSegment:recover()
I the main while loop here which iterates over the entries in the log:
while(iter.hasNext) { val entry = iter.next....}, I get an entry with
complete underlying byte buffer as follows:

[82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0, -59, -126, 83, 78, 65,
80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79, -58, 1, 0, 0, 25, 1,
16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49, 48, 48, 48, 48, 58,
49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48, 0, 0, 0, -102,
10,
39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109, 111, 100, 101,
108,
46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46, 97, 100, 100, 46,
114,
101, 102, 101, 114, 101, 110, 99, 101, 16, -120, -115, -16, -64, -22, 42,
26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97, 109, 115, 46, 115, 116,
111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99, 101, 110, 116, 101,
114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115, 116, 77, 5, 11,
8,
34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50, 53, 52, 57, 50,
34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48,
56,
48, 72]

A toString() on this entry yields:

*MessageAndOffset(Message(magic = 0, attributes = 2, crc = 1377740251,
key
= null, payload = java.nio.HeapByteBuffer[pos=0 lim=197
cap=197]),4449011)*

It appears that this record is corrupt and deserializing/decompressing it
causes exceptions which are unhandled. Specifically in 0.10.0 version
this
calls fails with NoSuchElementException

ByteBufferMessageSet.deepIterator(entry).next().offset

Note: This message was written to disk using* kafka 0.10.0 broker running
snappy jar version 1.1.1.7* (which is known to have some read time bugs).

The log file itself is 512MB large and this message appears at around 4MB
in the file.

We have upgraded snappy; but should this condition be handled correctly?
What is the correct behavior here? Should the exception be handled and
log
file be truncated? At the moment this causes kafka to completely crash
with
no recovery path except of deleting the bad data file manually and then
starting kafka.

--

cheers,

gaurav


A test case to repro the crash

@Test

def testCorruptLog() {

   val buf = Array[Byte](82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0,
-59, -126, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0,
-79,
-58, 1, 0, 0, 25, 1, 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49,
48, 48, 48, 48, 58, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56,
48,
0, 0, 0, -102, 10, 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109,
111, 100, 101, 108, 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46,
97, 100, 100, 46, 114, 101, 102, 101, 114, 101, 110, 99, 101, 16, -120,
-115, -16, -64, -22, 42, 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97,
109, 115, 46, 115, 116, 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72,
99,
101, 110, 116, 101, 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111,
115,
116, 77, 5, 11, 8, 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49,
50,
53, 52, 57, 50, 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53,
52,
52, 54, 48, 56, 48, 72);

    val msg = new Message(ByteBuffer.wrap(buf), None, None)

    val entry = new MessageAndOffset(msg, 4449011L)

    val deepIterator: Iterator[MessageAndOffset] =
ByteBufferMessageSet.deepIterator(entry)

    deepIterator.next().offset

}



Reply via email to