yabola commented on code in PR #38560:
URL: https://github.com/apache/spark/pull/38560#discussion_r1030397010


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -452,22 +489,69 @@ void removeAppShuffleInfoFromDB(AppShuffleInfo 
appShuffleInfo) {
   }
 
   /**
-   * Clean up all the AppShufflePartitionInfo and the finalized shuffle 
partitions in DB for
-   * a specific shuffleMergeId. This is done since there is a higher 
shuffleMergeId request made
-   * for a shuffleId, therefore clean up older shuffleMergeId partitions. The 
cleanup will be
-   * executed the mergedShuffleCleaner thread.
+   * Clean up the outdated finalized or unfinalized shuffle partitions. The 
cleanup will be executed in the
+   * mergedShuffleCleaner thread. Two cases need to clean up the shuffle: 1. 
there is a higher shuffleMergeId request
+   * made for a shuffleId, therefore clean up older shuffleMergeId partitions. 
2. Application requires to clean up the
+   * expired or unused specific shuffleId partitions
    */
+  @VisibleForTesting
+  void submitRemoveShuffleMergeTask(
+      AppShuffleInfo shuffleInfo, Integer shuffleId,
+      Optional<AppAttemptShuffleMergeId> higherShuffleMergeId) {
+    AppShuffleMergePartitionsInfo mergePartitionsInfo = 
shuffleInfo.shuffles.get(shuffleId);
+    AppAttemptShuffleMergeId shuffleMergeId = new AppAttemptShuffleMergeId(
+        shuffleInfo.appId, shuffleInfo.attemptId, shuffleId, 
mergePartitionsInfo.shuffleMergeId);
+    if (!mergePartitionsInfo.isFinalized()) {
+      Map<Integer, AppShufflePartitionInfo> partitionsToClean = 
mergePartitionsInfo.shuffleMergePartitions;
+      submitCleanupTask(() ->
+          closeAndDeleteOutdatedPartitions(shuffleMergeId, partitionsToClean, 
higherShuffleMergeId));
+    } else {
+      int[] partitionsToClean = mergePartitionsInfo.finalizedPartitions;
+      submitCleanupTask(() ->
+          deleteOutdatedFinalizedPartitions(shuffleInfo, shuffleMergeId, 
partitionsToClean, higherShuffleMergeId));
+    }
+  }
+
   @VisibleForTesting
   void closeAndDeleteOutdatedPartitions(
-      AppAttemptShuffleMergeId appAttemptShuffleMergeId,
-      Map<Integer, AppShufflePartitionInfo> partitions) {
-    removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId);
+      AppAttemptShuffleMergeId shuffleMergeId,
+      Map<Integer, AppShufflePartitionInfo> partitions,
+      Optional<AppAttemptShuffleMergeId> higherShuffleMergeId) {
+    removeAppShufflePartitionInfoFromDB(shuffleMergeId);
+    higherShuffleMergeId.ifPresent(this::removeAppShufflePartitionInfoFromDB);
     partitions
-      .forEach((partitionId, partitionInfo) -> {
-        synchronized (partitionInfo) {
-          partitionInfo.closeAllFilesAndDeleteIfNeeded(true);
-        }
-      });
+        .forEach((partitionId, partitionInfo) -> {
+          synchronized (partitionInfo) {
+            partitionInfo.closeAllFilesAndDeleteIfNeeded(true);
+          }
+        });
+  }
+
+  @VisibleForTesting
+  void deleteOutdatedFinalizedPartitions(
+      AppShuffleInfo shuffleInfo,
+      AppAttemptShuffleMergeId shuffleMergeId,
+      int[] outdatedFinalizedPartitions,
+      Optional<AppAttemptShuffleMergeId> higherShuffleMergeId) {
+    int shuffleId = shuffleMergeId.shuffleId;
+    int mergeId = shuffleMergeId.shuffleMergeId;
+    removeAppShufflePartitionInfoFromDB(shuffleMergeId);
+    higherShuffleMergeId.ifPresent(this::removeAppShufflePartitionInfoFromDB);
+    Arrays.stream(outdatedFinalizedPartitions).forEach(partition -> {
+      try {
+        File metaFile =
+            shuffleInfo.getMergedShuffleMetaFile(shuffleId, mergeId, 
partition);
+        File indexFile = new File(
+            shuffleInfo.getMergedShuffleIndexFilePath(shuffleId, mergeId, 
partition));
+        File dataFile =
+            shuffleInfo.getMergedShuffleDataFile(shuffleId, mergeId, 
partition);
+        metaFile.delete();
+        indexFile.delete();
+        dataFile.delete();
+      } catch (Exception e) {
+        logger.error("Error delete shuffle files for {}", shuffleMergeId, e);
+      }

Review Comment:
   Thanks for your review~ done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to