gardnervickers commented on a change in pull request #10896:
URL: https://github.com/apache/kafka/pull/10896#discussion_r655747538



##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -828,12 +834,41 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
    * Removes the producer state snapshot file metadata corresponding to the 
provided offset if it exists from this
    * ProducerStateManager, and deletes the backing snapshot file.
    */
-  private[log] def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = {
+  private def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = {
     Option(snapshots.remove(snapshotOffset)).foreach(_.deleteIfExists())
   }
+
+  /**
+   * Removes the producer state snapshot file metadata corresponding to the 
provided offset if it exists from this
+   * ProducerStateManager, and renames the backing snapshot file to have the 
Log.DeletionSuffix.
+   *
+   * Note: This method is safe to use with async deletes. If a race occurs and 
the snapshot file
+   *       is deleted without this ProducerStateManager instance knowing, the 
resulting exception on
+   *       SnapshotFile rename will be ignored and None will be returned.
+   */
+  private[log] def removeAndMarkSnapshotForDeletion(snapshotOffset: Long): 
Option[SnapshotFile] = {
+    Option(snapshots.remove(snapshotOffset)).flatMap { snapshot => {
+      // If the file cannot be renamed, it likely means that the file was 
deleted already.
+      // This can happen due to the way we construct an intermediate producer 
state manager

Review comment:
       @kowshik @junrao Right, we'll still end up with a a fully in-sync 
`ProducerStateManager` after `LogLoader.load(..)` runs. 
   
   The problem can still occur though because we delete snapshots using both 
the "intermediate" and "real" `ProducerStateManager` prior to 
`removeStraySnapshots` at the end of `LogLoader.load`. 
   
   1. `recoverSegment` can delete snapshots with the intermediate 
`ProducerStateManager`
   2. `removeAndDeleteSegmentsAsync` will use the "real" `ProducerStateManager` 
to schedule async deletion. It may have a stale view of the present snapshots 
on the filesystem if #1 deleted snapshots, causing the rename to fail.
   3. At the end of `LogLoader.load`, we will `removeStraySnapshots`, which 
will fix up any discrepancies between the contents of the log dir and the 
"real" `ProducerStateManager`.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to