gardnervickers commented on a change in pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#discussion_r501957800



##########
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##########
@@ -1226,6 +1225,104 @@ class LogTest {
     assertEquals(retainedLastSeqOpt, reloadedLastSeqOpt)
   }
 
+  @Test
+  def testRetentionDeletesProducerStateSnapshots(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = 0, retentionMs = 1000 * 60, fileDeleteDelayMs = 0)
+    val log = createLog(logDir, logConfig)
+    val pid1 = 1L
+    val epoch = 0.toShort
+
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), 
producerId = pid1,
+      producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), 
producerId = pid1,
+      producerEpoch = epoch, sequence = 1), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), 
producerId = pid1,
+      producerEpoch = epoch, sequence = 2), leaderEpoch = 0)
+
+    log.updateHighWatermark(log.logEndOffset)
+
+    assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
+    // Sleep to breach the retention period
+    mockTime.sleep(1000 * 60 + 1)
+    log.deleteOldSegments()
+    // Sleep to breach the file delete delay and run scheduled file deletion 
tasks
+    mockTime.sleep(1)
+    assertEquals("expect a single producer state snapshot remaining", 1, 
ProducerStateManager.listSnapshotFiles(logDir).size)
+  }
+
+  @Test
+  def testLogStartOffsetMovementDeletesSnapshots(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = -1, fileDeleteDelayMs = 0)
+    val log = createLog(logDir, logConfig)
+    val pid1 = 1L
+    val epoch = 0.toShort
+
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), 
producerId = pid1,
+      producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), 
producerId = pid1,
+      producerEpoch = epoch, sequence = 1), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), 
producerId = pid1,
+      producerEpoch = epoch, sequence = 2), leaderEpoch = 0)
+    log.updateHighWatermark(log.logEndOffset)
+    assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
+
+    // Increment the log start offset to exclude the first two segments.
+    log.maybeIncrementLogStartOffset(log.logEndOffset - 1, 
ClientRecordDeletion)
+    log.deleteOldSegments()
+    // Sleep to breach the file delete delay and run scheduled file deletion 
tasks
+    mockTime.sleep(1)
+    assertEquals("expect a single producer state snapshot remaining", 1, 
ProducerStateManager.listSnapshotFiles(logDir).size)
+  }
+
+  @Test
+  def testCompactionDeletesProducerStateSnapshots(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, 
cleanupPolicy = LogConfig.Compact, fileDeleteDelayMs = 0)
+    val log = createLog(logDir, logConfig)
+    val pid1 = 1L
+    val epoch = 0.toShort
+    val cleaner = new Cleaner(id = 0,
+      offsetMap = new FakeOffsetMap(Int.MaxValue),
+      ioBufferSize = 64 * 1024,
+      maxIoBufferSize = 64 * 1024,
+      dupBufferLoadFactor = 0.75,
+      throttler = new Throttler(Double.MaxValue, Long.MaxValue, false, time = 
mockTime),
+      time = mockTime,
+      checkDone = _ => {})
+
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, 
"a".getBytes())), producerId = pid1,
+      producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, 
"b".getBytes())), producerId = pid1,
+      producerEpoch = epoch, sequence = 1), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, 
"c".getBytes())), producerId = pid1,
+      producerEpoch = epoch, sequence = 2), leaderEpoch = 0)
+    log.updateHighWatermark(log.logEndOffset)
+    assertEquals("expected a snapshot file per segment base offset, except the 
first segment", log.logSegments.map(_.baseOffset).toSeq.sorted.drop(1), 
ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted)
+    assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
+
+    // Clean segments, this should delete everything except the active segment 
since there only
+    // exists the key "a".
+    cleaner.clean(LogToClean(log.topicPartition, log, 0, log.logEndOffset))
+    log.deleteOldSegments()
+    // Sleep to breach the file delete delay and run scheduled file deletion 
tasks
+    mockTime.sleep(1)
+    assertEquals("expected a snapshot file per segment base offset, excluding 
the first", log.logSegments.map(_.baseOffset).toSeq.sorted.drop(1), 
ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted)
+  }
+
+  @Test
+  def testLoadingLogCleansOrphanedProducerStateSnapshots(): Unit = {
+    val orphanedSnapshotFile = Log.producerSnapshotFile(logDir, 42).toPath
+    Files.createFile(orphanedSnapshotFile)
+    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = -1, fileDeleteDelayMs = 0)
+    createLog(logDir, logConfig)
+    assertEquals("expected orphaned producer state snapshot file to be cleaned 
up", 0, ProducerStateManager.listSnapshotFiles(logDir).size)

Review comment:
       It's being deleted because during producer state loading because we 
truncate producer state to match the bounds of the log, and the snapshot file 
written out at offset 42 is higher than the log end offset of the empty log. 
The test name is not very clear in this case though. 
   
   I will fix the name and add another test which checks that we keep around 
the largest stray producer state snapshot file.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to