This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b1e57a2b359 [SPARK-45534][CORE] Use java.lang.ref.Cleaner instead of finalize for RemoteBlockPushResolver b1e57a2b359 is described below commit b1e57a2b359d7d9fbf07adfba10db97f38b99bde Author: zhaomin <zhaomin1...@163.com> AuthorDate: Wed Oct 18 01:20:08 2023 -0500 [SPARK-45534][CORE] Use java.lang.ref.Cleaner instead of finalize for RemoteBlockPushResolver ### What changes were proposed in this pull request? use java.lang.ref.Cleaner instead of finalize() for RemoteBlockPushResolver ### Why are the changes needed? The finalize() method has been marked as deprecated since Java 9 and will be removed in the future, java.lang.ref.Cleaner is the more recommended solution. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #43371 from zhaomin1423/45315. Authored-by: zhaomin <zhaomin1...@163.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> --- .../network/shuffle/RemoteBlockPushResolver.java | 101 +++++++++++++-------- .../network/shuffle/ShuffleTestAccessor.scala | 2 +- 2 files changed, 64 insertions(+), 39 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index a915d0eccb0..14fefebe089 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -21,6 +21,7 @@ import java.io.DataOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.lang.ref.Cleaner; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; @@ -94,6 +95,7 @@ import org.apache.spark.network.util.TransportConf; */ public class RemoteBlockPushResolver implements MergedShuffleFileManager { + private static final Cleaner CLEANER = Cleaner.create(); private static final Logger logger = LoggerFactory.getLogger(RemoteBlockPushResolver.class); public static final String MERGED_SHUFFLE_FILE_NAME_PREFIX = "shuffleMerged"; @@ -481,7 +483,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { appShuffleInfo.shuffles.forEach((shuffleId, shuffleInfo) -> shuffleInfo.shuffleMergePartitions .forEach((shuffleMergeId, partitionInfo) -> { synchronized (partitionInfo) { - partitionInfo.closeAllFilesAndDeleteIfNeeded(false); + partitionInfo.cleanable.clean(); } })); if (cleanupLocalDirs) { @@ -537,7 +539,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { partitions .forEach((partitionId, partitionInfo) -> { synchronized (partitionInfo) { - partitionInfo.closeAllFilesAndDeleteIfNeeded(true); + partitionInfo.cleanable.clean(); + partitionInfo.deleteAllFiles(); } }); } @@ -822,7 +825,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId, partition.reduceId, ioe.getMessage()); } finally { - partition.closeAllFilesAndDeleteIfNeeded(false); + partition.cleanable.clean(); } } } @@ -1720,6 +1723,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { // The meta file for a particular merged shuffle contains all the map indices that belong to // every chunk. The entry per chunk is a serialized bitmap. private final MergeShuffleFile metaFile; + private final Cleaner.Cleanable cleanable; // Location offset of the last successfully merged block for this shuffle partition private long dataFilePos; // Track the map index whose block is being merged for this shuffle partition @@ -1756,6 +1760,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { this.dataFilePos = 0; this.mapTracker = new RoaringBitmap(); this.chunkTracker = new RoaringBitmap(); + this.cleanable = CLEANER.register(this, new ResourceCleaner(dataChannel, indexFile, + metaFile, appAttemptShuffleMergeId, reduceId)); } public long getDataFilePos() { @@ -1864,36 +1870,13 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { metaFile.getChannel().truncate(metaFile.getPos()); } - void closeAllFilesAndDeleteIfNeeded(boolean delete) { - try { - if (dataChannel.isOpen()) { - dataChannel.close(); - } - if (delete) { - dataFile.delete(); - } - } catch (IOException ioe) { - logger.warn("Error closing data channel for {} reduceId {}", - appAttemptShuffleMergeId, reduceId); - } - try { - metaFile.close(); - if (delete) { - metaFile.delete(); - } - } catch (IOException ioe) { - logger.warn("Error closing meta file for {} reduceId {}", - appAttemptShuffleMergeId, reduceId); - } - try { - indexFile.close(); - if (delete) { - indexFile.delete(); - } - } catch (IOException ioe) { - logger.warn("Error closing index file for {} reduceId {}", - appAttemptShuffleMergeId, reduceId); + private void deleteAllFiles() { + if (!dataFile.delete()) { + logger.info("Error deleting data file for {} reduceId {}", + appAttemptShuffleMergeId, reduceId); } + metaFile.delete(); + indexFile.delete(); } @Override @@ -1904,11 +1887,6 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { reduceId); } - @Override - protected void finalize() throws Throwable { - closeAllFilesAndDeleteIfNeeded(false); - } - @VisibleForTesting MergeShuffleFile getIndexFile() { return indexFile; @@ -1933,6 +1911,53 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { int getNumIOExceptions() { return numIOExceptions; } + + @VisibleForTesting + Cleaner.Cleanable getCleanable() { + return cleanable; + } + + private record ResourceCleaner( + FileChannel dataChannel, + MergeShuffleFile indexFile, + MergeShuffleFile metaFile, + AppAttemptShuffleMergeId appAttemptShuffleMergeId, + int reduceId) implements Runnable { + + @Override + public void run() { + closeAllFiles(dataChannel, indexFile, metaFile, appAttemptShuffleMergeId, + reduceId); + } + + private void closeAllFiles( + FileChannel dataChannel, + MergeShuffleFile indexFile, + MergeShuffleFile metaFile, + AppAttemptShuffleMergeId appAttemptShuffleMergeId, + int reduceId) { + try { + if (dataChannel.isOpen()) { + dataChannel.close(); + } + } catch (IOException ioe) { + logger.warn("Error closing data channel for {} reduceId {}", + appAttemptShuffleMergeId, reduceId); + } + try { + metaFile.close(); + } catch (IOException ioe) { + logger.warn("Error closing meta file for {} reduceId {}", + appAttemptShuffleMergeId, reduceId); + } + try { + indexFile.close(); + } catch (IOException ioe) { + logger.warn("Error closing index file for {} reduceId {}", + appAttemptShuffleMergeId, reduceId); + } + } + } } /** @@ -2108,7 +2133,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { } } - void delete() throws IOException { + void delete() { try { if (null != file) { file.delete(); diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala index 092dee1a1e8..6e8e581c699 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala @@ -187,7 +187,7 @@ object ShuffleTestAccessor { } def closePartitionFiles(partitionInfo: AppShufflePartitionInfo): Unit = { - partitionInfo.closeAllFilesAndDeleteIfNeeded(false) + partitionInfo.getCleanable.clean() } def clearAppShuffleInfo(mergeMgr: RemoteBlockPushResolver): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org