kowshik commented on a change in pull request #10896: URL: https://github.com/apache/kafka/pull/10896#discussion_r655768001
########## File path: core/src/main/scala/kafka/log/ProducerStateManager.scala ########## @@ -828,15 +834,50 @@ class ProducerStateManager(val topicPartition: TopicPartition, * Removes the producer state snapshot file metadata corresponding to the provided offset if it exists from this * ProducerStateManager, and deletes the backing snapshot file. */ - private[log] def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = { + private def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = { Option(snapshots.remove(snapshotOffset)).foreach(_.deleteIfExists()) } + + /** + * Removes the producer state snapshot file metadata corresponding to the provided offset if it exists from this + * ProducerStateManager, and renames the backing snapshot file to have the Log.DeletionSuffix. + * + * Note: This method is safe to use with async deletes. If a race occurs and the snapshot file + * is deleted without this ProducerStateManager instance knowing, the resulting exception on + * SnapshotFile rename will be ignored and None will be returned. + */ + private[log] def removeAndMarkSnapshotForDeletion(snapshotOffset: Long): Option[SnapshotFile] = { + Option(snapshots.remove(snapshotOffset)).flatMap { snapshot => { + // If the file cannot be renamed, it likely means that the file was deleted already. + // This can happen due to the way we construct an intermediate producer state manager + // during log recovery, and use it to issue deletions prior to creating the "real" + // producer state manager. + // + // In any case, removeAndMarkSnapshotForDeletion is intended to be used for snapshot file + // deletion, so ignoring the exception here just means that the intended operation was + // already completed. + try { + snapshot.renameTo(Log.DeletedFileSuffix) + } catch { + case _: NoSuchFileException => + return None + } + return Some(snapshot) Review comment: It seems like you could drop the `return` keyword here. ########## File path: core/src/main/scala/kafka/log/ProducerStateManager.scala ########## @@ -828,12 +834,41 @@ class ProducerStateManager(val topicPartition: TopicPartition, * Removes the producer state snapshot file metadata corresponding to the provided offset if it exists from this * ProducerStateManager, and deletes the backing snapshot file. */ - private[log] def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = { + private def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = { Option(snapshots.remove(snapshotOffset)).foreach(_.deleteIfExists()) } + + /** + * Removes the producer state snapshot file metadata corresponding to the provided offset if it exists from this + * ProducerStateManager, and renames the backing snapshot file to have the Log.DeletionSuffix. + * + * Note: This method is safe to use with async deletes. If a race occurs and the snapshot file + * is deleted without this ProducerStateManager instance knowing, the resulting exception on + * SnapshotFile rename will be ignored and None will be returned. + */ + private[log] def removeAndMarkSnapshotForDeletion(snapshotOffset: Long): Option[SnapshotFile] = { + Option(snapshots.remove(snapshotOffset)).flatMap { snapshot => { + // If the file cannot be renamed, it likely means that the file was deleted already. + // This can happen due to the way we construct an intermediate producer state manager Review comment: Lets assume `PSM` refers to `ProducerStateManager`. @junrao @gardnervickers That feels right to me, thanks for the explanation! Couple things I wanted to ask: 1. Should we update the comment [here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogLoader.scala#L144-L146) to say: ``` // Reload all snapshots into the ProducerStateManager cache, the intermediate ProducerStateManager used // during log recovery may have created or deleted some snapshots // without the LoadLogParams.producerStateManager instance witnessing the changes. ``` 2. `PSM.removeStraySnapshots` and its params could have a better name. Should we call it differently, like `PSM.reloadEssentialSnapshots(essentialSegmentBaseOffsets: Seq[Long])`? === SUMMARY OF CASES === I thought it's useful to summarize. There are few different cases that arise whenever`PSM.removeAndMarkSnapshotForDeletion()` is invoked on the "real" PSM instance. I believe all cases are handled with the current code as explained below: **Straightforward cases:** 1. Snapshot entry is present in real `PSM` instance and snapshot file is present. This is a straightforward case where we remove the entry and rename the file. 2. Snapshot entry is absent in real `PSM` instance and snapshot file is absent. This is also a more straightforward case where we do nothing. **Corner cases:** 1. Snapshot entry is present in the real `PSM` instance, but snapshot file absent. This can happen because intermediate PSM deleted the snapshot file. In this case, we ignore the failure in the file rename. 2. Snapshot entry is absent in the real `PSM` instance, but snapshot file present. This can happen when intermediate PSM takes a snapshot, but the real `PSM` doesn't have the entry (yet). This is handled by the call to `PSM.removeStraySnapshots` [here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogLoader.scala#L147) which corrects such discrepancies by loading all snapshots from disk and eliminating those that don't match the list of segment base offsets post recovery. ########## File path: core/src/test/scala/unit/kafka/log/LogLoaderTest.scala ########## @@ -1535,4 +1534,122 @@ class LogLoaderTest { assertTrue(onlySegment.lazyOffsetIndex.file.exists()) assertTrue(onlySegment.lazyTimeIndex.file.exists()) } + + @Test + def testCorruptedLogRecoveryDoesNotDeleteProducerStateSnapshotsPostRecovery(): Unit = { + val logConfig = LogTestUtils.createLogConfig() + var log = createLog(logDir, logConfig) + // Create segments: [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], [7-7], [8-8], [9-] + // |---> logStartOffset |---> active segment (empty) + // |---> logEndOffset + for (i <- 0 until 9) { + val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes) + log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0) + log.roll() + } + assertEquals(10, log.logSegments.size) + assertEquals(0, log.logStartOffset) + assertEquals(9, log.activeSegment.baseOffset) + assertEquals(9, log.logEndOffset) + for (offset <- 1 until 10) { + val snapshotFileBeforeDeletion = log.producerStateManager.snapshotFileForOffset(offset) + assertTrue(snapshotFileBeforeDeletion.isDefined) + assertTrue(snapshotFileBeforeDeletion.get.file.exists) + } + + // Increment the log start offset to 4. + // After this step, the segments should be: + // |---> logStartOffset + // [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], [7-7], [8-8], [9-] + // |---> active segment (empty) + // |---> logEndOffset + val newLogStartOffset = 4 + log.updateHighWatermark(log.logEndOffset) + log.maybeIncrementLogStartOffset(newLogStartOffset, ClientRecordDeletion) + assertEquals(4, log.logStartOffset) + assertEquals(9, log.logEndOffset) + + // Append garbage to a segment at baseOffset 1, which is below the current log start offset 4. + // After this step, the segments should be: + // + // [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], [7-7], [8-8], [9-] + // | |---> logStartOffset |---> active segment (empty) + // | |---> logEndOffset + // corrupt record inserted + // + val segmentToForceTruncation = log.logSegments.take(2).last + assertEquals(1, segmentToForceTruncation.baseOffset) + val bw = new BufferedWriter(new FileWriter(segmentToForceTruncation.log.file)) + bw.write("corruptRecord") + bw.close() + log.close() + + // Reopen the log. This will do the following: + // - Truncate the segment above to which we appended garbage and will schedule async deletion of all other + // segments from base offsets 2 to 9. + // - The remaining segments at base offsets 0 and 1 will be lower than the current logStartOffset 4. + // This will cause async deletion of both remaining segments. Finally a single, active segment is created + // starting at logStartOffset 4. + // + // Expected segments after the log is opened again: + // [4-] + // |---> active segment (empty) + // |---> logStartOffset + // |---> logEndOffset + log = createLog(logDir, logConfig, logStartOffset = newLogStartOffset, lastShutdownClean = false) + assertEquals(1, log.logSegments.size) + assertEquals(4, log.logStartOffset) + assertEquals(4, log.activeSegment.baseOffset) + assertEquals(4, log.logEndOffset) + + // Append records, roll the segments and check that the producer state snapshots are defined. + // The expected segments and producer state snapshots, after the appends are complete and segments are rolled, + // is as shown below: + // [4-4], [5-5], [6-6], [7-7], [8-8], [9-] + // | | | | | |---> active segment (empty) + // | | | | | |---> logEndOffset + // | | | | | | + // | |------.------.------.------.-----> producer state snapshot files are DEFINED for each offset in: [5-9] + // |----------------------------------------> logStartOffset + for (i <- 0 until 5) { + val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes) + log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0) + log.roll() + } + assertEquals(9, log.activeSegment.baseOffset) + assertEquals(9, log.logEndOffset) + for (offset <- 5 until 10) { + val snapshotFileBeforeDeletion = log.producerStateManager.snapshotFileForOffset(offset) + assertTrue(snapshotFileBeforeDeletion.isDefined) + assertTrue(snapshotFileBeforeDeletion.get.file.exists) + } + + val offsetsWithSnapshotFiles = (1 until 5) Review comment: Hmm, this check needs to be split into 2 steps. This is because we don't really delete the files until `mockTime.sleep(logConfig.fileDeleteDelayMs)` is done later. (1) Here we need to check if the entries for offsets `1-4` have been removed from `ProducerStateManager` map and check if the files have been just renamed to `.deleted` suffix. (2) Below after `mockTime.sleep(logConfig.fileDeleteDelayMs)`, we need to additionally check that the snapshot files for offsets `1-4` have been deleted as expected. ########## File path: core/src/main/scala/kafka/log/ProducerStateManager.scala ########## @@ -828,15 +834,50 @@ class ProducerStateManager(val topicPartition: TopicPartition, * Removes the producer state snapshot file metadata corresponding to the provided offset if it exists from this * ProducerStateManager, and deletes the backing snapshot file. */ - private[log] def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = { + private def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = { Option(snapshots.remove(snapshotOffset)).foreach(_.deleteIfExists()) } + + /** + * Removes the producer state snapshot file metadata corresponding to the provided offset if it exists from this + * ProducerStateManager, and renames the backing snapshot file to have the Log.DeletionSuffix. + * + * Note: This method is safe to use with async deletes. If a race occurs and the snapshot file + * is deleted without this ProducerStateManager instance knowing, the resulting exception on + * SnapshotFile rename will be ignored and None will be returned. + */ + private[log] def removeAndMarkSnapshotForDeletion(snapshotOffset: Long): Option[SnapshotFile] = { + Option(snapshots.remove(snapshotOffset)).flatMap { snapshot => { + // If the file cannot be renamed, it likely means that the file was deleted already. + // This can happen due to the way we construct an intermediate producer state manager + // during log recovery, and use it to issue deletions prior to creating the "real" + // producer state manager. + // + // In any case, removeAndMarkSnapshotForDeletion is intended to be used for snapshot file + // deletion, so ignoring the exception here just means that the intended operation was + // already completed. + try { + snapshot.renameTo(Log.DeletedFileSuffix) + } catch { + case _: NoSuchFileException => + return None Review comment: Should we log this failure? ########## File path: core/src/test/scala/unit/kafka/log/LogLoaderTest.scala ########## @@ -1535,4 +1534,122 @@ class LogLoaderTest { assertTrue(onlySegment.lazyOffsetIndex.file.exists()) assertTrue(onlySegment.lazyTimeIndex.file.exists()) } + + @Test + def testCorruptedLogRecoveryDoesNotDeleteProducerStateSnapshotsPostRecovery(): Unit = { + val logConfig = LogTestUtils.createLogConfig() + var log = createLog(logDir, logConfig) + // Create segments: [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], [7-7], [8-8], [9-] + // |---> logStartOffset |---> active segment (empty) + // |---> logEndOffset + for (i <- 0 until 9) { + val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes) + log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0) + log.roll() + } + assertEquals(10, log.logSegments.size) + assertEquals(0, log.logStartOffset) + assertEquals(9, log.activeSegment.baseOffset) + assertEquals(9, log.logEndOffset) + for (offset <- 1 until 10) { + val snapshotFileBeforeDeletion = log.producerStateManager.snapshotFileForOffset(offset) + assertTrue(snapshotFileBeforeDeletion.isDefined) + assertTrue(snapshotFileBeforeDeletion.get.file.exists) + } + + // Increment the log start offset to 4. + // After this step, the segments should be: + // |---> logStartOffset + // [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], [7-7], [8-8], [9-] + // |---> active segment (empty) + // |---> logEndOffset + val newLogStartOffset = 4 + log.updateHighWatermark(log.logEndOffset) + log.maybeIncrementLogStartOffset(newLogStartOffset, ClientRecordDeletion) + assertEquals(4, log.logStartOffset) + assertEquals(9, log.logEndOffset) + + // Append garbage to a segment at baseOffset 1, which is below the current log start offset 4. + // After this step, the segments should be: + // + // [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], [7-7], [8-8], [9-] + // | |---> logStartOffset |---> active segment (empty) + // | |---> logEndOffset + // corrupt record inserted + // + val segmentToForceTruncation = log.logSegments.take(2).last + assertEquals(1, segmentToForceTruncation.baseOffset) + val bw = new BufferedWriter(new FileWriter(segmentToForceTruncation.log.file)) + bw.write("corruptRecord") + bw.close() + log.close() + + // Reopen the log. This will do the following: + // - Truncate the segment above to which we appended garbage and will schedule async deletion of all other + // segments from base offsets 2 to 9. + // - The remaining segments at base offsets 0 and 1 will be lower than the current logStartOffset 4. + // This will cause async deletion of both remaining segments. Finally a single, active segment is created + // starting at logStartOffset 4. + // + // Expected segments after the log is opened again: + // [4-] + // |---> active segment (empty) + // |---> logStartOffset + // |---> logEndOffset + log = createLog(logDir, logConfig, logStartOffset = newLogStartOffset, lastShutdownClean = false) + assertEquals(1, log.logSegments.size) + assertEquals(4, log.logStartOffset) + assertEquals(4, log.activeSegment.baseOffset) + assertEquals(4, log.logEndOffset) + + // Append records, roll the segments and check that the producer state snapshots are defined. + // The expected segments and producer state snapshots, after the appends are complete and segments are rolled, + // is as shown below: + // [4-4], [5-5], [6-6], [7-7], [8-8], [9-] + // | | | | | |---> active segment (empty) + // | | | | | |---> logEndOffset + // | | | | | | + // | |------.------.------.------.-----> producer state snapshot files are DEFINED for each offset in: [5-9] + // |----------------------------------------> logStartOffset + for (i <- 0 until 5) { + val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes) + log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0) + log.roll() + } + assertEquals(9, log.activeSegment.baseOffset) + assertEquals(9, log.logEndOffset) + for (offset <- 5 until 10) { + val snapshotFileBeforeDeletion = log.producerStateManager.snapshotFileForOffset(offset) + assertTrue(snapshotFileBeforeDeletion.isDefined) + assertTrue(snapshotFileBeforeDeletion.get.file.exists) + } + + val offsetsWithSnapshotFiles = (1 until 5) Review comment: Hmm, this check needs to be split into 2 steps. This is because we don't really delete the files until `mockTime.sleep(logConfig.fileDeleteDelayMs)` is done later. 1. Here we need to check if the entries for offsets `1-4` have been removed from `ProducerStateManager` map and check if the snapshot files have been just renamed to `.deleted` suffix. 2. Below after the call to `mockTime.sleep(logConfig.fileDeleteDelayMs)`, we need to additionally check that the snapshot files for offsets `1-4` have been deleted as expected. ########## File path: core/src/main/scala/kafka/log/ProducerStateManager.scala ########## @@ -828,12 +834,41 @@ class ProducerStateManager(val topicPartition: TopicPartition, * Removes the producer state snapshot file metadata corresponding to the provided offset if it exists from this * ProducerStateManager, and deletes the backing snapshot file. */ - private[log] def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = { + private def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = { Option(snapshots.remove(snapshotOffset)).foreach(_.deleteIfExists()) } + + /** + * Removes the producer state snapshot file metadata corresponding to the provided offset if it exists from this + * ProducerStateManager, and renames the backing snapshot file to have the Log.DeletionSuffix. + * + * Note: This method is safe to use with async deletes. If a race occurs and the snapshot file + * is deleted without this ProducerStateManager instance knowing, the resulting exception on + * SnapshotFile rename will be ignored and None will be returned. + */ + private[log] def removeAndMarkSnapshotForDeletion(snapshotOffset: Long): Option[SnapshotFile] = { + Option(snapshots.remove(snapshotOffset)).flatMap { snapshot => { + // If the file cannot be renamed, it likely means that the file was deleted already. + // This can happen due to the way we construct an intermediate producer state manager Review comment: @junrao That's a good point. Yes, we should get rid of the temporary PSM. I've created a jira tracking this improvement: https://issues.apache.org/jira/browse/KAFKA-12977. It is currently assigned to myself and I'll follow up on it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org