kowshik commented on a change in pull request #10896:
URL: https://github.com/apache/kafka/pull/10896#discussion_r655788102



##########
File path: core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
##########
@@ -1535,4 +1534,122 @@ class LogLoaderTest {
     assertTrue(onlySegment.lazyOffsetIndex.file.exists())
     assertTrue(onlySegment.lazyTimeIndex.file.exists())
   }
+
+  @Test
+  def 
testCorruptedLogRecoveryDoesNotDeleteProducerStateSnapshotsPostRecovery(): Unit 
= {
+    val logConfig = LogTestUtils.createLogConfig()
+    var log = createLog(logDir, logConfig)
+    // Create segments: [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], 
[7-7], [8-8], [9-]
+    //                   |---> logStartOffset                                  
         |---> active segment (empty)
+    //                                                                         
         |---> logEndOffset
+    for (i <- 0 until 9) {
+      val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
+      log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
+      log.roll()
+    }
+    assertEquals(10, log.logSegments.size)
+    assertEquals(0, log.logStartOffset)
+    assertEquals(9, log.activeSegment.baseOffset)
+    assertEquals(9, log.logEndOffset)
+    for (offset <- 1 until 10) {
+      val snapshotFileBeforeDeletion = 
log.producerStateManager.snapshotFileForOffset(offset)
+      assertTrue(snapshotFileBeforeDeletion.isDefined)
+      assertTrue(snapshotFileBeforeDeletion.get.file.exists)
+    }
+
+    // Increment the log start offset to 4.
+    // After this step, the segments should be:
+    //                              |---> logStartOffset
+    // [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], [7-7], [8-8], [9-]
+    //                                                                 |---> 
active segment (empty)
+    //                                                                 |---> 
logEndOffset
+    val newLogStartOffset = 4
+    log.updateHighWatermark(log.logEndOffset)
+    log.maybeIncrementLogStartOffset(newLogStartOffset, ClientRecordDeletion)
+    assertEquals(4, log.logStartOffset)
+    assertEquals(9, log.logEndOffset)
+
+    // Append garbage to a segment at baseOffset 1, which is below the current 
log start offset 4.
+    // After this step, the segments should be:
+    //
+    // [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], [7-7], [8-8], [9-]
+    //           |                  |---> logStartOffset               |---> 
active segment  (empty)
+    //           |                                                     |---> 
logEndOffset
+    // corrupt record inserted
+    //
+    val segmentToForceTruncation = log.logSegments.take(2).last
+    assertEquals(1, segmentToForceTruncation.baseOffset)
+    val bw = new BufferedWriter(new 
FileWriter(segmentToForceTruncation.log.file))
+    bw.write("corruptRecord")
+    bw.close()
+    log.close()
+
+    // Reopen the log. This will do the following:
+    // - Truncate the segment above to which we appended garbage and will 
schedule async deletion of all other
+    //   segments from base offsets 2 to 9.
+    // - The remaining segments at base offsets 0 and 1 will be lower than the 
current logStartOffset 4.
+    //   This will cause async deletion of both remaining segments. Finally a 
single, active segment is created
+    //   starting at logStartOffset 4.
+    //
+    // Expected segments after the log is opened again:
+    // [4-]
+    //  |---> active segment (empty)
+    //  |---> logStartOffset
+    //  |---> logEndOffset
+    log = createLog(logDir, logConfig, logStartOffset = newLogStartOffset, 
lastShutdownClean = false)
+    assertEquals(1, log.logSegments.size)
+    assertEquals(4, log.logStartOffset)
+    assertEquals(4, log.activeSegment.baseOffset)
+    assertEquals(4, log.logEndOffset)
+
+    // Append records, roll the segments and check that the producer state 
snapshots are defined.
+    // The expected segments and producer state snapshots, after the appends 
are complete and segments are rolled,
+    // is as shown below:
+    // [4-4], [5-5], [6-6], [7-7], [8-8], [9-]
+    //  |      |      |      |      |      |---> active segment (empty)
+    //  |      |      |      |      |      |---> logEndOffset
+    //  |      |      |      |      |      |
+    //  |      |------.------.------.------.-----> producer state snapshot 
files are DEFINED for each offset in: [5-9]
+    //  |----------------------------------------> logStartOffset
+    for (i <- 0 until 5) {
+      val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
+      log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
+      log.roll()
+    }
+    assertEquals(9, log.activeSegment.baseOffset)
+    assertEquals(9, log.logEndOffset)
+    for (offset <- 5 until 10) {
+      val snapshotFileBeforeDeletion = 
log.producerStateManager.snapshotFileForOffset(offset)
+      assertTrue(snapshotFileBeforeDeletion.isDefined)
+      assertTrue(snapshotFileBeforeDeletion.get.file.exists)
+    }
+
+    val offsetsWithSnapshotFiles = (1 until 5)

Review comment:
       Hmm, this check needs to be split into 2 steps. This is because we don't 
really delete the files until `mockTime.sleep(logConfig.fileDeleteDelayMs)` is 
done later.
   
   1. Here we need to check if the entries for offsets `1-4` have been removed 
from `ProducerStateManager` map and check if the snapshot files have been just 
renamed to `.deleted` suffix.
   
   2. Below after the call to `mockTime.sleep(logConfig.fileDeleteDelayMs)`, we 
need to additionally check that the snapshot files for offsets `1-4` have been 
deleted as expected.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to