gardnervickers commented on a change in pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#discussion_r501979388



##########
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##########
@@ -1226,6 +1225,104 @@ class LogTest {
     assertEquals(retainedLastSeqOpt, reloadedLastSeqOpt)
   }
 
+  @Test
+  def testRetentionDeletesProducerStateSnapshots(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = 0, retentionMs = 1000 * 60, fileDeleteDelayMs = 0)
+    val log = createLog(logDir, logConfig)
+    val pid1 = 1L
+    val epoch = 0.toShort
+
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), 
producerId = pid1,
+      producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), 
producerId = pid1,
+      producerEpoch = epoch, sequence = 1), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), 
producerId = pid1,
+      producerEpoch = epoch, sequence = 2), leaderEpoch = 0)
+
+    log.updateHighWatermark(log.logEndOffset)
+
+    assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
+    // Sleep to breach the retention period
+    mockTime.sleep(1000 * 60 + 1)
+    log.deleteOldSegments()
+    // Sleep to breach the file delete delay and run scheduled file deletion 
tasks
+    mockTime.sleep(1)
+    assertEquals("expect a single producer state snapshot remaining", 1, 
ProducerStateManager.listSnapshotFiles(logDir).size)
+  }
+
+  @Test
+  def testLogStartOffsetMovementDeletesSnapshots(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = -1, fileDeleteDelayMs = 0)
+    val log = createLog(logDir, logConfig)
+    val pid1 = 1L
+    val epoch = 0.toShort
+
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), 
producerId = pid1,
+      producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), 
producerId = pid1,
+      producerEpoch = epoch, sequence = 1), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), 
producerId = pid1,
+      producerEpoch = epoch, sequence = 2), leaderEpoch = 0)
+    log.updateHighWatermark(log.logEndOffset)
+    assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
+
+    // Increment the log start offset to exclude the first two segments.
+    log.maybeIncrementLogStartOffset(log.logEndOffset - 1, 
ClientRecordDeletion)
+    log.deleteOldSegments()
+    // Sleep to breach the file delete delay and run scheduled file deletion 
tasks
+    mockTime.sleep(1)
+    assertEquals("expect a single producer state snapshot remaining", 1, 
ProducerStateManager.listSnapshotFiles(logDir).size)
+  }
+
+  @Test
+  def testCompactionDeletesProducerStateSnapshots(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, 
cleanupPolicy = LogConfig.Compact, fileDeleteDelayMs = 0)
+    val log = createLog(logDir, logConfig)
+    val pid1 = 1L
+    val epoch = 0.toShort
+    val cleaner = new Cleaner(id = 0,
+      offsetMap = new FakeOffsetMap(Int.MaxValue),
+      ioBufferSize = 64 * 1024,
+      maxIoBufferSize = 64 * 1024,
+      dupBufferLoadFactor = 0.75,
+      throttler = new Throttler(Double.MaxValue, Long.MaxValue, false, time = 
mockTime),
+      time = mockTime,
+      checkDone = _ => {})
+
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, 
"a".getBytes())), producerId = pid1,
+      producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, 
"b".getBytes())), producerId = pid1,
+      producerEpoch = epoch, sequence = 1), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, 
"c".getBytes())), producerId = pid1,
+      producerEpoch = epoch, sequence = 2), leaderEpoch = 0)
+    log.updateHighWatermark(log.logEndOffset)
+    assertEquals("expected a snapshot file per segment base offset, except the 
first segment", log.logSegments.map(_.baseOffset).toSeq.sorted.drop(1), 
ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted)
+    assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
+
+    // Clean segments, this should delete everything except the active segment 
since there only
+    // exists the key "a".
+    cleaner.clean(LogToClean(log.topicPartition, log, 0, log.logEndOffset))
+    log.deleteOldSegments()
+    // Sleep to breach the file delete delay and run scheduled file deletion 
tasks
+    mockTime.sleep(1)
+    assertEquals("expected a snapshot file per segment base offset, excluding 
the first", log.logSegments.map(_.baseOffset).toSeq.sorted.drop(1), 
ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted)
+  }
+
+  @Test
+  def testLoadingLogCleansOrphanedProducerStateSnapshots(): Unit = {
+    val orphanedSnapshotFile = Log.producerSnapshotFile(logDir, 42).toPath
+    Files.createFile(orphanedSnapshotFile)
+    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = -1, fileDeleteDelayMs = 0)
+    createLog(logDir, logConfig)
+    assertEquals("expected orphaned producer state snapshot file to be cleaned 
up", 0, ProducerStateManager.listSnapshotFiles(logDir).size)

Review comment:
       I added some extra context to the existing test and created a new test 
which verifies from the log that the largest stray snapshot which is within the 
logs end offset is retained 
`testLoadingLogKeepsLargestStrayProducerStateSnapshot`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to