gardnervickers commented on a change in pull request #10896:
URL: https://github.com/apache/kafka/pull/10896#discussion_r661899296
##########
File path: core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
##########
@@ -1535,4 +1534,122 @@ class LogLoaderTest {
assertTrue(onlySegment.lazyOffsetIndex.file.exists())
assertTrue(onlySegment.lazyTimeIndex.file.exists())
}
+
+ @Test
+ def
testCorruptedLogRecoveryDoesNotDeleteProducerStateSnapshotsPostRecovery(): Unit
= {
+ val logConfig = LogTestUtils.createLogConfig()
+ var log = createLog(logDir, logConfig)
+ // Create segments: [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6],
[7-7], [8-8], [9-]
+ // |---> logStartOffset
|---> active segment (empty)
+ //
|---> logEndOffset
+ for (i <- 0 until 9) {
+ val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
+ log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
+ log.roll()
+ }
+ assertEquals(10, log.logSegments.size)
+ assertEquals(0, log.logStartOffset)
+ assertEquals(9, log.activeSegment.baseOffset)
+ assertEquals(9, log.logEndOffset)
+ for (offset <- 1 until 10) {
+ val snapshotFileBeforeDeletion =
log.producerStateManager.snapshotFileForOffset(offset)
+ assertTrue(snapshotFileBeforeDeletion.isDefined)
+ assertTrue(snapshotFileBeforeDeletion.get.file.exists)
+ }
+
+ // Increment the log start offset to 4.
+ // After this step, the segments should be:
+ // |---> logStartOffset
+ // [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], [7-7], [8-8], [9-]
+ // |--->
active segment (empty)
+ // |--->
logEndOffset
+ val newLogStartOffset = 4
+ log.updateHighWatermark(log.logEndOffset)
+ log.maybeIncrementLogStartOffset(newLogStartOffset, ClientRecordDeletion)
+ assertEquals(4, log.logStartOffset)
+ assertEquals(9, log.logEndOffset)
+
+ // Append garbage to a segment at baseOffset 1, which is below the current
log start offset 4.
+ // After this step, the segments should be:
+ //
+ // [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], [7-7], [8-8], [9-]
+ // | |---> logStartOffset |--->
active segment (empty)
+ // | |--->
logEndOffset
+ // corrupt record inserted
+ //
+ val segmentToForceTruncation = log.logSegments.take(2).last
+ assertEquals(1, segmentToForceTruncation.baseOffset)
+ val bw = new BufferedWriter(new
FileWriter(segmentToForceTruncation.log.file))
+ bw.write("corruptRecord")
+ bw.close()
+ log.close()
+
+ // Reopen the log. This will do the following:
+ // - Truncate the segment above to which we appended garbage and will
schedule async deletion of all other
+ // segments from base offsets 2 to 9.
+ // - The remaining segments at base offsets 0 and 1 will be lower than the
current logStartOffset 4.
+ // This will cause async deletion of both remaining segments. Finally a
single, active segment is created
+ // starting at logStartOffset 4.
+ //
+ // Expected segments after the log is opened again:
+ // [4-]
+ // |---> active segment (empty)
+ // |---> logStartOffset
+ // |---> logEndOffset
+ log = createLog(logDir, logConfig, logStartOffset = newLogStartOffset,
lastShutdownClean = false)
+ assertEquals(1, log.logSegments.size)
+ assertEquals(4, log.logStartOffset)
+ assertEquals(4, log.activeSegment.baseOffset)
+ assertEquals(4, log.logEndOffset)
+
+ // Append records, roll the segments and check that the producer state
snapshots are defined.
+ // The expected segments and producer state snapshots, after the appends
are complete and segments are rolled,
+ // is as shown below:
+ // [4-4], [5-5], [6-6], [7-7], [8-8], [9-]
+ // | | | | | |---> active segment (empty)
+ // | | | | | |---> logEndOffset
+ // | | | | | |
+ // | |------.------.------.------.-----> producer state snapshot
files are DEFINED for each offset in: [5-9]
+ // |----------------------------------------> logStartOffset
+ for (i <- 0 until 5) {
+ val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
+ log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
+ log.roll()
+ }
+ assertEquals(9, log.activeSegment.baseOffset)
+ assertEquals(9, log.logEndOffset)
+ for (offset <- 5 until 10) {
+ val snapshotFileBeforeDeletion =
log.producerStateManager.snapshotFileForOffset(offset)
+ assertTrue(snapshotFileBeforeDeletion.isDefined)
+ assertTrue(snapshotFileBeforeDeletion.get.file.exists)
+ }
+
+ val offsetsWithSnapshotFiles = (1 until 5)
Review comment:
Yes that sounds good.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]