gardnervickers commented on a change in pull request #7929: URL: https://github.com/apache/kafka/pull/7929#discussion_r501957800
########## File path: core/src/test/scala/unit/kafka/log/LogTest.scala ########## @@ -1226,6 +1225,104 @@ class LogTest { assertEquals(retainedLastSeqOpt, reloadedLastSeqOpt) } + @Test + def testRetentionDeletesProducerStateSnapshots(): Unit = { + val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, retentionBytes = 0, retentionMs = 1000 * 60, fileDeleteDelayMs = 0) + val log = createLog(logDir, logConfig) + val pid1 = 1L + val epoch = 0.toShort + + log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), producerId = pid1, + producerEpoch = epoch, sequence = 0), leaderEpoch = 0) + log.roll() + log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), producerId = pid1, + producerEpoch = epoch, sequence = 1), leaderEpoch = 0) + log.roll() + log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), producerId = pid1, + producerEpoch = epoch, sequence = 2), leaderEpoch = 0) + + log.updateHighWatermark(log.logEndOffset) + + assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size) + // Sleep to breach the retention period + mockTime.sleep(1000 * 60 + 1) + log.deleteOldSegments() + // Sleep to breach the file delete delay and run scheduled file deletion tasks + mockTime.sleep(1) + assertEquals("expect a single producer state snapshot remaining", 1, ProducerStateManager.listSnapshotFiles(logDir).size) + } + + @Test + def testLogStartOffsetMovementDeletesSnapshots(): Unit = { + val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, retentionBytes = -1, fileDeleteDelayMs = 0) + val log = createLog(logDir, logConfig) + val pid1 = 1L + val epoch = 0.toShort + + log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), producerId = pid1, + producerEpoch = epoch, sequence = 0), leaderEpoch = 0) + log.roll() + log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), producerId = pid1, + producerEpoch = epoch, sequence = 1), leaderEpoch = 0) + log.roll() + log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), producerId = pid1, + producerEpoch = epoch, sequence = 2), leaderEpoch = 0) + log.updateHighWatermark(log.logEndOffset) + assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size) + + // Increment the log start offset to exclude the first two segments. + log.maybeIncrementLogStartOffset(log.logEndOffset - 1, ClientRecordDeletion) + log.deleteOldSegments() + // Sleep to breach the file delete delay and run scheduled file deletion tasks + mockTime.sleep(1) + assertEquals("expect a single producer state snapshot remaining", 1, ProducerStateManager.listSnapshotFiles(logDir).size) + } + + @Test + def testCompactionDeletesProducerStateSnapshots(): Unit = { + val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, cleanupPolicy = LogConfig.Compact, fileDeleteDelayMs = 0) + val log = createLog(logDir, logConfig) + val pid1 = 1L + val epoch = 0.toShort + val cleaner = new Cleaner(id = 0, + offsetMap = new FakeOffsetMap(Int.MaxValue), + ioBufferSize = 64 * 1024, + maxIoBufferSize = 64 * 1024, + dupBufferLoadFactor = 0.75, + throttler = new Throttler(Double.MaxValue, Long.MaxValue, false, time = mockTime), + time = mockTime, + checkDone = _ => {}) + + log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, "a".getBytes())), producerId = pid1, + producerEpoch = epoch, sequence = 0), leaderEpoch = 0) + log.roll() + log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, "b".getBytes())), producerId = pid1, + producerEpoch = epoch, sequence = 1), leaderEpoch = 0) + log.roll() + log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, "c".getBytes())), producerId = pid1, + producerEpoch = epoch, sequence = 2), leaderEpoch = 0) + log.updateHighWatermark(log.logEndOffset) + assertEquals("expected a snapshot file per segment base offset, except the first segment", log.logSegments.map(_.baseOffset).toSeq.sorted.drop(1), ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted) + assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size) + + // Clean segments, this should delete everything except the active segment since there only + // exists the key "a". + cleaner.clean(LogToClean(log.topicPartition, log, 0, log.logEndOffset)) + log.deleteOldSegments() + // Sleep to breach the file delete delay and run scheduled file deletion tasks + mockTime.sleep(1) + assertEquals("expected a snapshot file per segment base offset, excluding the first", log.logSegments.map(_.baseOffset).toSeq.sorted.drop(1), ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted) + } + + @Test + def testLoadingLogCleansOrphanedProducerStateSnapshots(): Unit = { + val orphanedSnapshotFile = Log.producerSnapshotFile(logDir, 42).toPath + Files.createFile(orphanedSnapshotFile) + val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, retentionBytes = -1, fileDeleteDelayMs = 0) + createLog(logDir, logConfig) + assertEquals("expected orphaned producer state snapshot file to be cleaned up", 0, ProducerStateManager.listSnapshotFiles(logDir).size) Review comment: It's being deleted because during producer state loading because we truncate producer state to match the bounds of the log, and the snapshot file written out at offset 42 is higher than the log end offset of the empty log. The test name is not very clear in this case though. I will fix the name and add another test which checks that we keep around the largest stray producer state snapshot file. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org