This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b1e57a2b359 [SPARK-45534][CORE] Use java.lang.ref.Cleaner instead of 
finalize for RemoteBlockPushResolver
b1e57a2b359 is described below

commit b1e57a2b359d7d9fbf07adfba10db97f38b99bde
Author: zhaomin <zhaomin1...@163.com>
AuthorDate: Wed Oct 18 01:20:08 2023 -0500

    [SPARK-45534][CORE] Use java.lang.ref.Cleaner instead of finalize for 
RemoteBlockPushResolver
    
    ### What changes were proposed in this pull request?
    
    use java.lang.ref.Cleaner instead of finalize() for RemoteBlockPushResolver
    
    ### Why are the changes needed?
    
    The finalize() method has been marked as deprecated since Java 9 and will 
be removed in the future, java.lang.ref.Cleaner is the more recommended 
solution.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Pass GitHub Actions
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #43371 from zhaomin1423/45315.
    
    Authored-by: zhaomin <zhaomin1...@163.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../network/shuffle/RemoteBlockPushResolver.java   | 101 +++++++++++++--------
 .../network/shuffle/ShuffleTestAccessor.scala      |   2 +-
 2 files changed, 64 insertions(+), 39 deletions(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index a915d0eccb0..14fefebe089 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -21,6 +21,7 @@ import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.lang.ref.Cleaner;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.charset.StandardCharsets;
@@ -94,6 +95,7 @@ import org.apache.spark.network.util.TransportConf;
  */
 public class RemoteBlockPushResolver implements MergedShuffleFileManager {
 
+  private static final Cleaner CLEANER = Cleaner.create();
   private static final Logger logger = 
LoggerFactory.getLogger(RemoteBlockPushResolver.class);
 
   public static final String MERGED_SHUFFLE_FILE_NAME_PREFIX = "shuffleMerged";
@@ -481,7 +483,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     appShuffleInfo.shuffles.forEach((shuffleId, shuffleInfo) -> 
shuffleInfo.shuffleMergePartitions
       .forEach((shuffleMergeId, partitionInfo) -> {
         synchronized (partitionInfo) {
-          partitionInfo.closeAllFilesAndDeleteIfNeeded(false);
+          partitionInfo.cleanable.clean();
         }
       }));
     if (cleanupLocalDirs) {
@@ -537,7 +539,8 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     partitions
       .forEach((partitionId, partitionInfo) -> {
         synchronized (partitionInfo) {
-          partitionInfo.closeAllFilesAndDeleteIfNeeded(true);
+          partitionInfo.cleanable.clean();
+          partitionInfo.deleteAllFiles();
         }
       });
   }
@@ -822,7 +825,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
                 msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId, 
partition.reduceId,
                 ioe.getMessage());
           } finally {
-            partition.closeAllFilesAndDeleteIfNeeded(false);
+            partition.cleanable.clean();
           }
         }
       }
@@ -1720,6 +1723,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     // The meta file for a particular merged shuffle contains all the map 
indices that belong to
     // every chunk. The entry per chunk is a serialized bitmap.
     private final MergeShuffleFile metaFile;
+    private final Cleaner.Cleanable cleanable;
     // Location offset of the last successfully merged block for this shuffle 
partition
     private long dataFilePos;
     // Track the map index whose block is being merged for this shuffle 
partition
@@ -1756,6 +1760,8 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
       this.dataFilePos = 0;
       this.mapTracker = new RoaringBitmap();
       this.chunkTracker = new RoaringBitmap();
+      this.cleanable = CLEANER.register(this, new ResourceCleaner(dataChannel, 
indexFile,
+        metaFile, appAttemptShuffleMergeId, reduceId));
     }
 
     public long getDataFilePos() {
@@ -1864,36 +1870,13 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
       metaFile.getChannel().truncate(metaFile.getPos());
     }
 
-    void closeAllFilesAndDeleteIfNeeded(boolean delete) {
-      try {
-        if (dataChannel.isOpen()) {
-          dataChannel.close();
-        }
-        if (delete) {
-          dataFile.delete();
-        }
-      } catch (IOException ioe) {
-        logger.warn("Error closing data channel for {} reduceId {}",
-            appAttemptShuffleMergeId, reduceId);
-      }
-      try {
-        metaFile.close();
-        if (delete) {
-          metaFile.delete();
-        }
-      } catch (IOException ioe) {
-        logger.warn("Error closing meta file for {} reduceId {}",
-            appAttemptShuffleMergeId, reduceId);
-        }
-      try {
-        indexFile.close();
-        if (delete) {
-          indexFile.delete();
-        }
-      } catch (IOException ioe) {
-        logger.warn("Error closing index file for {} reduceId {}",
-            appAttemptShuffleMergeId, reduceId);
+    private void deleteAllFiles() {
+      if (!dataFile.delete()) {
+        logger.info("Error deleting data file for {} reduceId {}",
+          appAttemptShuffleMergeId, reduceId);
       }
+      metaFile.delete();
+      indexFile.delete();
     }
 
     @Override
@@ -1904,11 +1887,6 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
           reduceId);
     }
 
-    @Override
-    protected void finalize() throws Throwable {
-      closeAllFilesAndDeleteIfNeeded(false);
-    }
-
     @VisibleForTesting
     MergeShuffleFile getIndexFile() {
       return indexFile;
@@ -1933,6 +1911,53 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     int getNumIOExceptions() {
       return numIOExceptions;
     }
+
+    @VisibleForTesting
+    Cleaner.Cleanable getCleanable() {
+      return cleanable;
+    }
+
+    private record ResourceCleaner(
+        FileChannel dataChannel,
+        MergeShuffleFile indexFile,
+        MergeShuffleFile metaFile,
+        AppAttemptShuffleMergeId appAttemptShuffleMergeId,
+        int reduceId) implements Runnable {
+
+      @Override
+      public void run() {
+        closeAllFiles(dataChannel, indexFile, metaFile, 
appAttemptShuffleMergeId,
+          reduceId);
+      }
+
+      private void closeAllFiles(
+          FileChannel dataChannel,
+          MergeShuffleFile indexFile,
+          MergeShuffleFile metaFile,
+          AppAttemptShuffleMergeId appAttemptShuffleMergeId,
+          int reduceId) {
+        try {
+          if (dataChannel.isOpen()) {
+            dataChannel.close();
+          }
+        } catch (IOException ioe) {
+          logger.warn("Error closing data channel for {} reduceId {}",
+            appAttemptShuffleMergeId, reduceId);
+        }
+        try {
+          metaFile.close();
+        } catch (IOException ioe) {
+          logger.warn("Error closing meta file for {} reduceId {}",
+            appAttemptShuffleMergeId, reduceId);
+        }
+        try {
+          indexFile.close();
+        } catch (IOException ioe) {
+          logger.warn("Error closing index file for {} reduceId {}",
+            appAttemptShuffleMergeId, reduceId);
+        }
+      }
+    }
   }
 
   /**
@@ -2108,7 +2133,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
       }
     }
 
-    void delete() throws IOException {
+    void delete() {
       try {
         if (null != file) {
           file.delete();
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala
index 092dee1a1e8..6e8e581c699 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala
@@ -187,7 +187,7 @@ object ShuffleTestAccessor {
   }
 
   def closePartitionFiles(partitionInfo: AppShufflePartitionInfo): Unit = {
-    partitionInfo.closeAllFilesAndDeleteIfNeeded(false)
+    partitionInfo.getCleanable.clean()
   }
 
   def clearAppShuffleInfo(mergeMgr: RemoteBlockPushResolver): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to