junrao commented on a change in pull request #10896:
URL: https://github.com/apache/kafka/pull/10896#discussion_r654568852



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -2390,16 +2390,23 @@ object Log extends Logging {
                                       producerStateManager: 
ProducerStateManager,
                                       logPrefix: String): Unit = {
     segmentsToDelete.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix))
+    val snapshotsToDelete = if (deleteProducerStateSnapshots)
+      segmentsToDelete.flatMap { segment =>
+        
producerStateManager.removeAndMarkSnapshotForDeletion(segment.baseOffset)}
+    else
+      Seq()
 
     def deleteSegments(): Unit = {
       info(s"${logPrefix}Deleting segment files 
${segmentsToDelete.mkString(",")}")
       val parentDir = dir.getParent
       maybeHandleIOException(logDirFailureChannel, parentDir, s"Error while 
deleting segments for $topicPartition in dir $parentDir") {
         segmentsToDelete.foreach { segment =>
           segment.deleteIfExists()
-          if (deleteProducerStateSnapshots)
-            producerStateManager.removeAndDeleteSnapshot(segment.baseOffset)
         }
+        snapshotsToDelete.foreach( snapshot => {

Review comment:
       Could we just do the following?
   ```
           snapshotsToDelete.foreach{ snapshot => 
             snapshot.deleteIfExists()
           }
   
   ```
   

##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -828,12 +834,41 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
    * Removes the producer state snapshot file metadata corresponding to the 
provided offset if it exists from this
    * ProducerStateManager, and deletes the backing snapshot file.
    */
-  private[log] def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = {
+  private def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = {
     Option(snapshots.remove(snapshotOffset)).foreach(_.deleteIfExists())
   }
+
+  /**
+   * Removes the producer state snapshot file metadata corresponding to the 
provided offset if it exists from this
+   * ProducerStateManager, and renames the backing snapshot file to have the 
Log.DeletionSuffix.
+   *
+   * Note: This method is safe to use with async deletes. If a race occurs and 
the snapshot file
+   *       is deleted without this ProducerStateManager instance knowing, the 
resulting exception on
+   *       SnapshotFile rename will be ignored and None will be returned.
+   */
+  private[log] def removeAndMarkSnapshotForDeletion(snapshotOffset: Long): 
Option[SnapshotFile] = {
+    Option(snapshots.remove(snapshotOffset)).flatMap { snapshot => {
+      // If the file cannot be renamed, it likely means that the file was 
deleted already.
+      // This can happen due to the way we construct an intermediate producer 
state manager

Review comment:
       Agree with Kowshik. It's not clear if the comment is still valid.

##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -828,12 +834,41 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
    * Removes the producer state snapshot file metadata corresponding to the 
provided offset if it exists from this
    * ProducerStateManager, and deletes the backing snapshot file.
    */
-  private[log] def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = {
+  private def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = {
     Option(snapshots.remove(snapshotOffset)).foreach(_.deleteIfExists())
   }
+
+  /**
+   * Removes the producer state snapshot file metadata corresponding to the 
provided offset if it exists from this
+   * ProducerStateManager, and renames the backing snapshot file to have the 
Log.DeletionSuffix.
+   *
+   * Note: This method is safe to use with async deletes. If a race occurs and 
the snapshot file
+   *       is deleted without this ProducerStateManager instance knowing, the 
resulting exception on
+   *       SnapshotFile rename will be ignored and None will be returned.
+   */
+  private[log] def removeAndMarkSnapshotForDeletion(snapshotOffset: Long): 
Option[SnapshotFile] = {
+    Option(snapshots.remove(snapshotOffset)).flatMap { snapshot => {
+      // If the file cannot be renamed, it likely means that the file was 
deleted already.
+      // This can happen due to the way we construct an intermediate producer 
state manager

Review comment:
       @gardnervickers : Good point. It seems that LogLoader.recoverSegment() 
can both remove and add snapshots, both of which will be missing in the "real" 
ProducerStateManager captured in LoadLogParams. This can lead to the missing 
file issue you pointed out and also potentially cause LogLoad.load() to do an 
unnecessary expensive Log.rebuildProducerState().
   
   @kowshik : I am wondering if we should let LoadLoader reload the snapshots 
in the "real" ProducerStateManager before calling Log.rebuildProducerState() in 
LogLoad.load().

##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -828,12 +834,41 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
    * Removes the producer state snapshot file metadata corresponding to the 
provided offset if it exists from this
    * ProducerStateManager, and deletes the backing snapshot file.
    */
-  private[log] def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = {
+  private def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = {
     Option(snapshots.remove(snapshotOffset)).foreach(_.deleteIfExists())
   }
+
+  /**
+   * Removes the producer state snapshot file metadata corresponding to the 
provided offset if it exists from this
+   * ProducerStateManager, and renames the backing snapshot file to have the 
Log.DeletionSuffix.
+   *
+   * Note: This method is safe to use with async deletes. If a race occurs and 
the snapshot file
+   *       is deleted without this ProducerStateManager instance knowing, the 
resulting exception on
+   *       SnapshotFile rename will be ignored and None will be returned.
+   */
+  private[log] def removeAndMarkSnapshotForDeletion(snapshotOffset: Long): 
Option[SnapshotFile] = {
+    Option(snapshots.remove(snapshotOffset)).flatMap { snapshot => {
+      // If the file cannot be renamed, it likely means that the file was 
deleted already.
+      // This can happen due to the way we construct an intermediate producer 
state manager

Review comment:
       @gardnervickers : @kowshik mentioned that 
params.producerStateManager.removeStraySnapshots(params.segments.baseOffsets.toSeq)
   in LogLoad.load() actually reloads the snapshots after log recovery. So, it 
seems that the issue you mentioned may not be a problem?

##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -828,12 +834,41 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
    * Removes the producer state snapshot file metadata corresponding to the 
provided offset if it exists from this
    * ProducerStateManager, and deletes the backing snapshot file.
    */
-  private[log] def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = {
+  private def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = {
     Option(snapshots.remove(snapshotOffset)).foreach(_.deleteIfExists())
   }
+
+  /**
+   * Removes the producer state snapshot file metadata corresponding to the 
provided offset if it exists from this
+   * ProducerStateManager, and renames the backing snapshot file to have the 
Log.DeletionSuffix.
+   *
+   * Note: This method is safe to use with async deletes. If a race occurs and 
the snapshot file
+   *       is deleted without this ProducerStateManager instance knowing, the 
resulting exception on
+   *       SnapshotFile rename will be ignored and None will be returned.
+   */
+  private[log] def removeAndMarkSnapshotForDeletion(snapshotOffset: Long): 
Option[SnapshotFile] = {
+    Option(snapshots.remove(snapshotOffset)).flatMap { snapshot => {
+      // If the file cannot be renamed, it likely means that the file was 
deleted already.
+      // This can happen due to the way we construct an intermediate producer 
state manager

Review comment:
       @gardnervickers : Thanks for the explanation. Makes sense.
   
   @kowshik : The source of all the confusing is that we use the real PSM in 
some cases while using a temporary PSM in some other cases during recovery. The 
temporary PSM in recoverSegment() is used in 4 different places.
   
   1. In recoverLog(). this is the case that we could just pass in the real PSM.
   2. In completeSwapOperations(). We try to avoid recovering segment here in 
https://github.com/apache/kafka/pull/10763.
   3 and 4. In loadSegmentFiles(). We probably need to clean this part of the 
logic a bit. If we are missing index file or the index file is corrupted, 
typically we can just rebuild the index without changing PSM. If the segment is 
truncated while rebuilding the index, we actually want to follow the process in 
step 1, by just removing the rest of the segments. So, we could also get rid of 
the temporary PSM in this case.
   
   I am wondering if we could have a separate PR to get rid of the temporary 
PSM complete?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to