rohitshekhar29 commented on a change in pull request #8936:
URL: https://github.com/apache/kafka/pull/8936#discussion_r451260203



##########
File path: core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
##########
@@ -148,5 +148,39 @@ class TimeIndexTest {
     idx.close()
   }
 
-}
 
+  /**
+   * In certain cases, index files fail to have their pre-allocated 0 bytes 
trimmed from the tail
+   * when a new segment is rolled. This causes a silent failure at the next 
startup where all retention
+   * windows are breached purging out data whether or not the window was 
really breached.
+   * KAFKA-10207
+   */
+  @Test
+  def testLoadingUntrimmedIndex(): Unit = {
+    // A larger index size must be specified or the starting offset will round 
down
+    // preventing this issue from being reproduced. Configs default to 10mb.
+    val max1MbEntryCount = 100000
+    // Create a file that will exist on disk and be removed when we are done
+    val file = nonExistantTempFile()
+    file.deleteOnExit()
+    // create an index that can have up to 100000 entries, about 1mb
+    var idx2 = new TimeIndex(file, baseOffset = 0, max1MbEntryCount * 12)
+    // Append less than the maximum number of entries, leaving 0 bytes padding 
the end
+    for (i <- 1 until max1MbEntryCount)
+      idx2.maybeAppend(i, i)
+
+    idx2.flush()
+    // jvm 1.8.0_191 fails to always flush shrinking resize to zfs disk
+    // jvm=1.8.0_191 zfs=0.6.5.6 kernel=4.4.0-1013-aws
+    //Explicitly call close handler to unmap the file and avoid the index from 
being trimmed when saved
+    idx2.closeHandler()
+    // The next read of the index data from disk will have 12 0 bytes at the 
tail, when reading
+    // the buffer position starts at the end and the index is assumed to be 
full because it was
+    // supposed to be trimmed before the last save.
+    idx2 = new TimeIndex(file, baseOffset = 0, maxIndexSize = max1MbEntryCount 
* 12)
+    // This index should fail the sanity check as the last timestamp in the 
file is 0 which is
+    // less than the first timestamp in the file.
+    intercept[CorruptIndexException](idx2.sanityCheck())
+    idx2.close()
+  }
+}

Review comment:
       @Johnny-Malizia does it make sense to strengthen the existing sanity 
check, for example 
   for offsetIndex, the sanity check can also be extended to have non zero and 
incremented file position for more than one entries. Currently, the following 
check is limited to offset.
   `
   override def sanityCheck(): Unit = {
     | if (_entries != 0 && _lastOffset < baseOffset)
   `




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to