gardnervickers commented on a change in pull request #10896:
URL: https://github.com/apache/kafka/pull/10896#discussion_r656291670



##########
File path: core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
##########
@@ -1535,4 +1534,122 @@ class LogLoaderTest {
     assertTrue(onlySegment.lazyOffsetIndex.file.exists())
     assertTrue(onlySegment.lazyTimeIndex.file.exists())
   }
+
+  @Test
+  def 
testCorruptedLogRecoveryDoesNotDeleteProducerStateSnapshotsPostRecovery(): Unit 
= {
+    val logConfig = LogTestUtils.createLogConfig()
+    var log = createLog(logDir, logConfig)
+    // Create segments: [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], 
[7-7], [8-8], [9-]
+    //                   |---> logStartOffset                                  
         |---> active segment (empty)
+    //                                                                         
         |---> logEndOffset
+    for (i <- 0 until 9) {
+      val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
+      log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
+      log.roll()
+    }
+    assertEquals(10, log.logSegments.size)
+    assertEquals(0, log.logStartOffset)
+    assertEquals(9, log.activeSegment.baseOffset)
+    assertEquals(9, log.logEndOffset)
+    for (offset <- 1 until 10) {
+      val snapshotFileBeforeDeletion = 
log.producerStateManager.snapshotFileForOffset(offset)
+      assertTrue(snapshotFileBeforeDeletion.isDefined)
+      assertTrue(snapshotFileBeforeDeletion.get.file.exists)
+    }
+
+    // Increment the log start offset to 4.
+    // After this step, the segments should be:
+    //                              |---> logStartOffset
+    // [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], [7-7], [8-8], [9-]
+    //                                                                 |---> 
active segment (empty)
+    //                                                                 |---> 
logEndOffset
+    val newLogStartOffset = 4
+    log.updateHighWatermark(log.logEndOffset)
+    log.maybeIncrementLogStartOffset(newLogStartOffset, ClientRecordDeletion)
+    assertEquals(4, log.logStartOffset)
+    assertEquals(9, log.logEndOffset)
+
+    // Append garbage to a segment at baseOffset 1, which is below the current 
log start offset 4.
+    // After this step, the segments should be:
+    //
+    // [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], [7-7], [8-8], [9-]
+    //           |                  |---> logStartOffset               |---> 
active segment  (empty)
+    //           |                                                     |---> 
logEndOffset
+    // corrupt record inserted
+    //
+    val segmentToForceTruncation = log.logSegments.take(2).last
+    assertEquals(1, segmentToForceTruncation.baseOffset)
+    val bw = new BufferedWriter(new 
FileWriter(segmentToForceTruncation.log.file))
+    bw.write("corruptRecord")
+    bw.close()
+    log.close()
+
+    // Reopen the log. This will do the following:
+    // - Truncate the segment above to which we appended garbage and will 
schedule async deletion of all other
+    //   segments from base offsets 2 to 9.
+    // - The remaining segments at base offsets 0 and 1 will be lower than the 
current logStartOffset 4.
+    //   This will cause async deletion of both remaining segments. Finally a 
single, active segment is created
+    //   starting at logStartOffset 4.
+    //
+    // Expected segments after the log is opened again:
+    // [4-]
+    //  |---> active segment (empty)
+    //  |---> logStartOffset
+    //  |---> logEndOffset
+    log = createLog(logDir, logConfig, logStartOffset = newLogStartOffset, 
lastShutdownClean = false)
+    assertEquals(1, log.logSegments.size)
+    assertEquals(4, log.logStartOffset)
+    assertEquals(4, log.activeSegment.baseOffset)
+    assertEquals(4, log.logEndOffset)
+
+    // Append records, roll the segments and check that the producer state 
snapshots are defined.
+    // The expected segments and producer state snapshots, after the appends 
are complete and segments are rolled,
+    // is as shown below:
+    // [4-4], [5-5], [6-6], [7-7], [8-8], [9-]
+    //  |      |      |      |      |      |---> active segment (empty)
+    //  |      |      |      |      |      |---> logEndOffset
+    //  |      |      |      |      |      |
+    //  |      |------.------.------.------.-----> producer state snapshot 
files are DEFINED for each offset in: [5-9]
+    //  |----------------------------------------> logStartOffset
+    for (i <- 0 until 5) {
+      val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
+      log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
+      log.roll()
+    }
+    assertEquals(9, log.activeSegment.baseOffset)
+    assertEquals(9, log.logEndOffset)
+    for (offset <- 5 until 10) {
+      val snapshotFileBeforeDeletion = 
log.producerStateManager.snapshotFileForOffset(offset)
+      assertTrue(snapshotFileBeforeDeletion.isDefined)
+      assertTrue(snapshotFileBeforeDeletion.get.file.exists)
+    }
+
+    val offsetsWithSnapshotFiles = (1 until 5)

Review comment:
       Hmm the async retention process is not deleting snapshots 1-4 here 
though, segment recovery is truncating the producer state. Why do we need to 
verify the segments are gone after `mockTime.sleep`? 
   
   I agree that it would be good to make sure that the producer state snapshots 
were removed from the `Log` `ProducerStateManager though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to