kowshik commented on a change in pull request #10896:
URL: https://github.com/apache/kafka/pull/10896#discussion_r655768001



##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -828,15 +834,50 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
    * Removes the producer state snapshot file metadata corresponding to the 
provided offset if it exists from this
    * ProducerStateManager, and deletes the backing snapshot file.
    */
-  private[log] def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = {
+  private def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = {
     Option(snapshots.remove(snapshotOffset)).foreach(_.deleteIfExists())
   }
+
+  /**
+   * Removes the producer state snapshot file metadata corresponding to the 
provided offset if it exists from this
+   * ProducerStateManager, and renames the backing snapshot file to have the 
Log.DeletionSuffix.
+   *
+   * Note: This method is safe to use with async deletes. If a race occurs and 
the snapshot file
+   *       is deleted without this ProducerStateManager instance knowing, the 
resulting exception on
+   *       SnapshotFile rename will be ignored and None will be returned.
+   */
+  private[log] def removeAndMarkSnapshotForDeletion(snapshotOffset: Long): 
Option[SnapshotFile] = {
+    Option(snapshots.remove(snapshotOffset)).flatMap { snapshot => {
+      // If the file cannot be renamed, it likely means that the file was 
deleted already.
+      // This can happen due to the way we construct an intermediate producer 
state manager
+      // during log recovery, and use it to issue deletions prior to creating 
the "real"
+      // producer state manager.
+      //
+      // In any case, removeAndMarkSnapshotForDeletion is intended to be used 
for snapshot file
+      // deletion, so ignoring the exception here just means that the intended 
operation was
+      // already completed.
+      try {
+        snapshot.renameTo(Log.DeletedFileSuffix)
+      } catch {
+        case _: NoSuchFileException =>
+          return None
+      }
+      return Some(snapshot)

Review comment:
       It seems like you could drop the `return` keyword here.

##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -828,12 +834,41 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
    * Removes the producer state snapshot file metadata corresponding to the 
provided offset if it exists from this
    * ProducerStateManager, and deletes the backing snapshot file.
    */
-  private[log] def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = {
+  private def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = {
     Option(snapshots.remove(snapshotOffset)).foreach(_.deleteIfExists())
   }
+
+  /**
+   * Removes the producer state snapshot file metadata corresponding to the 
provided offset if it exists from this
+   * ProducerStateManager, and renames the backing snapshot file to have the 
Log.DeletionSuffix.
+   *
+   * Note: This method is safe to use with async deletes. If a race occurs and 
the snapshot file
+   *       is deleted without this ProducerStateManager instance knowing, the 
resulting exception on
+   *       SnapshotFile rename will be ignored and None will be returned.
+   */
+  private[log] def removeAndMarkSnapshotForDeletion(snapshotOffset: Long): 
Option[SnapshotFile] = {
+    Option(snapshots.remove(snapshotOffset)).flatMap { snapshot => {
+      // If the file cannot be renamed, it likely means that the file was 
deleted already.
+      // This can happen due to the way we construct an intermediate producer 
state manager

Review comment:
       Lets assume `PSM` refers to `ProducerStateManager`.
   
   @junrao @gardnervickers That feels right to me, thanks for the explanation! 
Couple things I wanted to ask:
   
   1. Should we update the comment 
[here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogLoader.scala#L144-L146)
 to say:
   
   ```
   // Reload all snapshots into the ProducerStateManager cache, the 
intermediate ProducerStateManager used
   // during log recovery may have created or deleted some snapshots
   // without the LoadLogParams.producerStateManager instance witnessing the 
changes.
   ```
   2. `PSM.removeStraySnapshots` and its params could have a better name. 
Should we call it differently, like 
`PSM.reloadEssentialSnapshots(essentialSegmentBaseOffsets: Seq[Long])`?
   
   === SUMMARY OF CASES ===
   
   I thought it's useful to summarize. There are few different cases that arise 
whenever`PSM.removeAndMarkSnapshotForDeletion()` is invoked on the "real" PSM 
instance. I believe all cases are handled with the current code as explained 
below:
   
   **Straightforward cases:**
   1. Snapshot entry is present in real `PSM` instance and snapshot file is 
present. This is a straightforward case where we remove the entry and rename 
the file.
   2. Snapshot entry is absent in real `PSM` instance and snapshot file is 
absent. This is also a more straightforward case where we do nothing.
   
   **Corner cases:**
   1. Snapshot entry is present in the real `PSM` instance, but snapshot file 
absent. This can happen because intermediate PSM deleted the snapshot file. In 
this case, we ignore the failure in the file rename.
   2. Snapshot entry is absent in the real `PSM` instance, but snapshot file 
present. This can happen when intermediate PSM takes a snapshot, but the real 
`PSM` doesn't have the entry (yet). This is handled by the call to 
`PSM.removeStraySnapshots` 
[here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogLoader.scala#L147)
 which corrects such discrepancies by loading all snapshots from disk and 
eliminating those that don't match the list of segment base offsets post 
recovery.

##########
File path: core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
##########
@@ -1535,4 +1534,122 @@ class LogLoaderTest {
     assertTrue(onlySegment.lazyOffsetIndex.file.exists())
     assertTrue(onlySegment.lazyTimeIndex.file.exists())
   }
+
+  @Test
+  def 
testCorruptedLogRecoveryDoesNotDeleteProducerStateSnapshotsPostRecovery(): Unit 
= {
+    val logConfig = LogTestUtils.createLogConfig()
+    var log = createLog(logDir, logConfig)
+    // Create segments: [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], 
[7-7], [8-8], [9-]
+    //                   |---> logStartOffset                                  
         |---> active segment (empty)
+    //                                                                         
         |---> logEndOffset
+    for (i <- 0 until 9) {
+      val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
+      log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
+      log.roll()
+    }
+    assertEquals(10, log.logSegments.size)
+    assertEquals(0, log.logStartOffset)
+    assertEquals(9, log.activeSegment.baseOffset)
+    assertEquals(9, log.logEndOffset)
+    for (offset <- 1 until 10) {
+      val snapshotFileBeforeDeletion = 
log.producerStateManager.snapshotFileForOffset(offset)
+      assertTrue(snapshotFileBeforeDeletion.isDefined)
+      assertTrue(snapshotFileBeforeDeletion.get.file.exists)
+    }
+
+    // Increment the log start offset to 4.
+    // After this step, the segments should be:
+    //                              |---> logStartOffset
+    // [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], [7-7], [8-8], [9-]
+    //                                                                 |---> 
active segment (empty)
+    //                                                                 |---> 
logEndOffset
+    val newLogStartOffset = 4
+    log.updateHighWatermark(log.logEndOffset)
+    log.maybeIncrementLogStartOffset(newLogStartOffset, ClientRecordDeletion)
+    assertEquals(4, log.logStartOffset)
+    assertEquals(9, log.logEndOffset)
+
+    // Append garbage to a segment at baseOffset 1, which is below the current 
log start offset 4.
+    // After this step, the segments should be:
+    //
+    // [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], [7-7], [8-8], [9-]
+    //           |                  |---> logStartOffset               |---> 
active segment  (empty)
+    //           |                                                     |---> 
logEndOffset
+    // corrupt record inserted
+    //
+    val segmentToForceTruncation = log.logSegments.take(2).last
+    assertEquals(1, segmentToForceTruncation.baseOffset)
+    val bw = new BufferedWriter(new 
FileWriter(segmentToForceTruncation.log.file))
+    bw.write("corruptRecord")
+    bw.close()
+    log.close()
+
+    // Reopen the log. This will do the following:
+    // - Truncate the segment above to which we appended garbage and will 
schedule async deletion of all other
+    //   segments from base offsets 2 to 9.
+    // - The remaining segments at base offsets 0 and 1 will be lower than the 
current logStartOffset 4.
+    //   This will cause async deletion of both remaining segments. Finally a 
single, active segment is created
+    //   starting at logStartOffset 4.
+    //
+    // Expected segments after the log is opened again:
+    // [4-]
+    //  |---> active segment (empty)
+    //  |---> logStartOffset
+    //  |---> logEndOffset
+    log = createLog(logDir, logConfig, logStartOffset = newLogStartOffset, 
lastShutdownClean = false)
+    assertEquals(1, log.logSegments.size)
+    assertEquals(4, log.logStartOffset)
+    assertEquals(4, log.activeSegment.baseOffset)
+    assertEquals(4, log.logEndOffset)
+
+    // Append records, roll the segments and check that the producer state 
snapshots are defined.
+    // The expected segments and producer state snapshots, after the appends 
are complete and segments are rolled,
+    // is as shown below:
+    // [4-4], [5-5], [6-6], [7-7], [8-8], [9-]
+    //  |      |      |      |      |      |---> active segment (empty)
+    //  |      |      |      |      |      |---> logEndOffset
+    //  |      |      |      |      |      |
+    //  |      |------.------.------.------.-----> producer state snapshot 
files are DEFINED for each offset in: [5-9]
+    //  |----------------------------------------> logStartOffset
+    for (i <- 0 until 5) {
+      val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
+      log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
+      log.roll()
+    }
+    assertEquals(9, log.activeSegment.baseOffset)
+    assertEquals(9, log.logEndOffset)
+    for (offset <- 5 until 10) {
+      val snapshotFileBeforeDeletion = 
log.producerStateManager.snapshotFileForOffset(offset)
+      assertTrue(snapshotFileBeforeDeletion.isDefined)
+      assertTrue(snapshotFileBeforeDeletion.get.file.exists)
+    }
+
+    val offsetsWithSnapshotFiles = (1 until 5)

Review comment:
       Hmm, this check needs to be split into 2 steps. This is because we don't 
really delete the files until `mockTime.sleep(logConfig.fileDeleteDelayMs)` is 
done later.
   
   (1) Here we need to check if the entries for offsets `1-4` have been removed 
from `ProducerStateManager` map and check if the files have been just renamed 
to `.deleted` suffix.
   
   (2) Below after `mockTime.sleep(logConfig.fileDeleteDelayMs)`, we need to 
additionally check that the snapshot files for offsets `1-4` have been deleted 
as expected.

##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -828,15 +834,50 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
    * Removes the producer state snapshot file metadata corresponding to the 
provided offset if it exists from this
    * ProducerStateManager, and deletes the backing snapshot file.
    */
-  private[log] def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = {
+  private def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = {
     Option(snapshots.remove(snapshotOffset)).foreach(_.deleteIfExists())
   }
+
+  /**
+   * Removes the producer state snapshot file metadata corresponding to the 
provided offset if it exists from this
+   * ProducerStateManager, and renames the backing snapshot file to have the 
Log.DeletionSuffix.
+   *
+   * Note: This method is safe to use with async deletes. If a race occurs and 
the snapshot file
+   *       is deleted without this ProducerStateManager instance knowing, the 
resulting exception on
+   *       SnapshotFile rename will be ignored and None will be returned.
+   */
+  private[log] def removeAndMarkSnapshotForDeletion(snapshotOffset: Long): 
Option[SnapshotFile] = {
+    Option(snapshots.remove(snapshotOffset)).flatMap { snapshot => {
+      // If the file cannot be renamed, it likely means that the file was 
deleted already.
+      // This can happen due to the way we construct an intermediate producer 
state manager
+      // during log recovery, and use it to issue deletions prior to creating 
the "real"
+      // producer state manager.
+      //
+      // In any case, removeAndMarkSnapshotForDeletion is intended to be used 
for snapshot file
+      // deletion, so ignoring the exception here just means that the intended 
operation was
+      // already completed.
+      try {
+        snapshot.renameTo(Log.DeletedFileSuffix)
+      } catch {
+        case _: NoSuchFileException =>
+          return None

Review comment:
       Should we log this failure?

##########
File path: core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
##########
@@ -1535,4 +1534,122 @@ class LogLoaderTest {
     assertTrue(onlySegment.lazyOffsetIndex.file.exists())
     assertTrue(onlySegment.lazyTimeIndex.file.exists())
   }
+
+  @Test
+  def 
testCorruptedLogRecoveryDoesNotDeleteProducerStateSnapshotsPostRecovery(): Unit 
= {
+    val logConfig = LogTestUtils.createLogConfig()
+    var log = createLog(logDir, logConfig)
+    // Create segments: [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], 
[7-7], [8-8], [9-]
+    //                   |---> logStartOffset                                  
         |---> active segment (empty)
+    //                                                                         
         |---> logEndOffset
+    for (i <- 0 until 9) {
+      val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
+      log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
+      log.roll()
+    }
+    assertEquals(10, log.logSegments.size)
+    assertEquals(0, log.logStartOffset)
+    assertEquals(9, log.activeSegment.baseOffset)
+    assertEquals(9, log.logEndOffset)
+    for (offset <- 1 until 10) {
+      val snapshotFileBeforeDeletion = 
log.producerStateManager.snapshotFileForOffset(offset)
+      assertTrue(snapshotFileBeforeDeletion.isDefined)
+      assertTrue(snapshotFileBeforeDeletion.get.file.exists)
+    }
+
+    // Increment the log start offset to 4.
+    // After this step, the segments should be:
+    //                              |---> logStartOffset
+    // [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], [7-7], [8-8], [9-]
+    //                                                                 |---> 
active segment (empty)
+    //                                                                 |---> 
logEndOffset
+    val newLogStartOffset = 4
+    log.updateHighWatermark(log.logEndOffset)
+    log.maybeIncrementLogStartOffset(newLogStartOffset, ClientRecordDeletion)
+    assertEquals(4, log.logStartOffset)
+    assertEquals(9, log.logEndOffset)
+
+    // Append garbage to a segment at baseOffset 1, which is below the current 
log start offset 4.
+    // After this step, the segments should be:
+    //
+    // [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], [7-7], [8-8], [9-]
+    //           |                  |---> logStartOffset               |---> 
active segment  (empty)
+    //           |                                                     |---> 
logEndOffset
+    // corrupt record inserted
+    //
+    val segmentToForceTruncation = log.logSegments.take(2).last
+    assertEquals(1, segmentToForceTruncation.baseOffset)
+    val bw = new BufferedWriter(new 
FileWriter(segmentToForceTruncation.log.file))
+    bw.write("corruptRecord")
+    bw.close()
+    log.close()
+
+    // Reopen the log. This will do the following:
+    // - Truncate the segment above to which we appended garbage and will 
schedule async deletion of all other
+    //   segments from base offsets 2 to 9.
+    // - The remaining segments at base offsets 0 and 1 will be lower than the 
current logStartOffset 4.
+    //   This will cause async deletion of both remaining segments. Finally a 
single, active segment is created
+    //   starting at logStartOffset 4.
+    //
+    // Expected segments after the log is opened again:
+    // [4-]
+    //  |---> active segment (empty)
+    //  |---> logStartOffset
+    //  |---> logEndOffset
+    log = createLog(logDir, logConfig, logStartOffset = newLogStartOffset, 
lastShutdownClean = false)
+    assertEquals(1, log.logSegments.size)
+    assertEquals(4, log.logStartOffset)
+    assertEquals(4, log.activeSegment.baseOffset)
+    assertEquals(4, log.logEndOffset)
+
+    // Append records, roll the segments and check that the producer state 
snapshots are defined.
+    // The expected segments and producer state snapshots, after the appends 
are complete and segments are rolled,
+    // is as shown below:
+    // [4-4], [5-5], [6-6], [7-7], [8-8], [9-]
+    //  |      |      |      |      |      |---> active segment (empty)
+    //  |      |      |      |      |      |---> logEndOffset
+    //  |      |      |      |      |      |
+    //  |      |------.------.------.------.-----> producer state snapshot 
files are DEFINED for each offset in: [5-9]
+    //  |----------------------------------------> logStartOffset
+    for (i <- 0 until 5) {
+      val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
+      log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
+      log.roll()
+    }
+    assertEquals(9, log.activeSegment.baseOffset)
+    assertEquals(9, log.logEndOffset)
+    for (offset <- 5 until 10) {
+      val snapshotFileBeforeDeletion = 
log.producerStateManager.snapshotFileForOffset(offset)
+      assertTrue(snapshotFileBeforeDeletion.isDefined)
+      assertTrue(snapshotFileBeforeDeletion.get.file.exists)
+    }
+
+    val offsetsWithSnapshotFiles = (1 until 5)

Review comment:
       Hmm, this check needs to be split into 2 steps. This is because we don't 
really delete the files until `mockTime.sleep(logConfig.fileDeleteDelayMs)` is 
done later.
   
   1. Here we need to check if the entries for offsets `1-4` have been removed 
from `ProducerStateManager` map and check if the snapshot files have been just 
renamed to `.deleted` suffix.
   
   2. Below after the call to `mockTime.sleep(logConfig.fileDeleteDelayMs)`, we 
need to additionally check that the snapshot files for offsets `1-4` have been 
deleted as expected.

##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -828,12 +834,41 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
    * Removes the producer state snapshot file metadata corresponding to the 
provided offset if it exists from this
    * ProducerStateManager, and deletes the backing snapshot file.
    */
-  private[log] def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = {
+  private def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = {
     Option(snapshots.remove(snapshotOffset)).foreach(_.deleteIfExists())
   }
+
+  /**
+   * Removes the producer state snapshot file metadata corresponding to the 
provided offset if it exists from this
+   * ProducerStateManager, and renames the backing snapshot file to have the 
Log.DeletionSuffix.
+   *
+   * Note: This method is safe to use with async deletes. If a race occurs and 
the snapshot file
+   *       is deleted without this ProducerStateManager instance knowing, the 
resulting exception on
+   *       SnapshotFile rename will be ignored and None will be returned.
+   */
+  private[log] def removeAndMarkSnapshotForDeletion(snapshotOffset: Long): 
Option[SnapshotFile] = {
+    Option(snapshots.remove(snapshotOffset)).flatMap { snapshot => {
+      // If the file cannot be renamed, it likely means that the file was 
deleted already.
+      // This can happen due to the way we construct an intermediate producer 
state manager

Review comment:
       @junrao That's a good point. Yes, we should get rid of the temporary 
PSM. I've created a jira tracking this improvement: 
https://issues.apache.org/jira/browse/KAFKA-12977. It is currently assigned to 
myself and I'll follow up on it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to