wankunde commented on code in PR #38560:
URL: https://github.com/apache/spark/pull/38560#discussion_r1029992768


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -452,22 +489,69 @@ void removeAppShuffleInfoFromDB(AppShuffleInfo 
appShuffleInfo) {
   }
 
   /**
-   * Clean up all the AppShufflePartitionInfo and the finalized shuffle 
partitions in DB for
-   * a specific shuffleMergeId. This is done since there is a higher 
shuffleMergeId request made
-   * for a shuffleId, therefore clean up older shuffleMergeId partitions. The 
cleanup will be
-   * executed the mergedShuffleCleaner thread.
+   * Clean up the outdated finalized or unfinalized shuffle partitions. The 
cleanup will be executed in the
+   * mergedShuffleCleaner thread. Two cases need to clean up the shuffle: 1. 
there is a higher shuffleMergeId request
+   * made for a shuffleId, therefore clean up older shuffleMergeId partitions. 
2. Application requires to clean up the
+   * expired or unused specific shuffleId partitions
    */
+  @VisibleForTesting
+  void submitRemoveShuffleMergeTask(
+      AppShuffleInfo shuffleInfo, Integer shuffleId,
+      Optional<AppAttemptShuffleMergeId> higherShuffleMergeId) {
+    AppShuffleMergePartitionsInfo mergePartitionsInfo = 
shuffleInfo.shuffles.get(shuffleId);
+    AppAttemptShuffleMergeId shuffleMergeId = new AppAttemptShuffleMergeId(
+        shuffleInfo.appId, shuffleInfo.attemptId, shuffleId, 
mergePartitionsInfo.shuffleMergeId);
+    if (!mergePartitionsInfo.isFinalized()) {
+      Map<Integer, AppShufflePartitionInfo> partitionsToClean = 
mergePartitionsInfo.shuffleMergePartitions;
+      submitCleanupTask(() ->
+          closeAndDeleteOutdatedPartitions(shuffleMergeId, partitionsToClean, 
higherShuffleMergeId));
+    } else {
+      int[] partitionsToClean = mergePartitionsInfo.finalizedPartitions;
+      submitCleanupTask(() ->
+          deleteOutdatedFinalizedPartitions(shuffleInfo, shuffleMergeId, 
partitionsToClean, higherShuffleMergeId));
+    }
+  }
+
   @VisibleForTesting
   void closeAndDeleteOutdatedPartitions(
-      AppAttemptShuffleMergeId appAttemptShuffleMergeId,
-      Map<Integer, AppShufflePartitionInfo> partitions) {
-    removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId);
+      AppAttemptShuffleMergeId shuffleMergeId,
+      Map<Integer, AppShufflePartitionInfo> partitions,
+      Optional<AppAttemptShuffleMergeId> higherShuffleMergeId) {
+    removeAppShufflePartitionInfoFromDB(shuffleMergeId);
+    higherShuffleMergeId.ifPresent(this::removeAppShufflePartitionInfoFromDB);
     partitions
-      .forEach((partitionId, partitionInfo) -> {
-        synchronized (partitionInfo) {
-          partitionInfo.closeAllFilesAndDeleteIfNeeded(true);
-        }
-      });
+        .forEach((partitionId, partitionInfo) -> {
+          synchronized (partitionInfo) {
+            partitionInfo.closeAllFilesAndDeleteIfNeeded(true);
+          }
+        });
+  }
+
+  @VisibleForTesting
+  void deleteOutdatedFinalizedPartitions(
+      AppShuffleInfo shuffleInfo,
+      AppAttemptShuffleMergeId shuffleMergeId,
+      int[] outdatedFinalizedPartitions,
+      Optional<AppAttemptShuffleMergeId> higherShuffleMergeId) {
+    int shuffleId = shuffleMergeId.shuffleId;
+    int mergeId = shuffleMergeId.shuffleMergeId;
+    removeAppShufflePartitionInfoFromDB(shuffleMergeId);
+    higherShuffleMergeId.ifPresent(this::removeAppShufflePartitionInfoFromDB);
+    Arrays.stream(outdatedFinalizedPartitions).forEach(partition -> {
+      try {
+        File metaFile =
+            shuffleInfo.getMergedShuffleMetaFile(shuffleId, mergeId, 
partition);
+        File indexFile = new File(
+            shuffleInfo.getMergedShuffleIndexFilePath(shuffleId, mergeId, 
partition));
+        File dataFile =
+            shuffleInfo.getMergedShuffleDataFile(shuffleId, mergeId, 
partition);
+        metaFile.delete();
+        indexFile.delete();
+        dataFile.delete();
+      } catch (Exception e) {
+        logger.error("Error delete shuffle files for {}", shuffleMergeId, e);
+      }

Review Comment:
   Just like `closeAllFilesAndDeleteIfNeeded` method, can we continue delete 
the other files if one `delete()` failed?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -712,6 +794,7 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
       mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId,
         bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), 
Ints.toArray(reduceIds),
         Longs.toArray(sizes));
+      
appShuffleInfo.shuffles.get(msg.shuffleId).setFinalizedPartitions(Ints.toArray(reduceIds));

Review Comment:
   The `FinalizedPartitions` will be empty after the Shuffle service restart 
which will cause the merged shuffle files leak.



##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -327,43 +327,52 @@ class BlockManagerMasterEndpoint(
       }
     }.toSeq
 
-    // Find all shuffle blocks on executors that are no longer running
-    val blocksToDeleteByShuffleService =
-      new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]]
+    var removeShuffleFromShuffleServicesFutures = Seq.empty[Future[Boolean]]
     if (externalShuffleServiceRemoveShuffleEnabled) {
-      mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus 
=>
-        shuffleStatus.withMapStatuses { mapStatuses =>
-          mapStatuses.foreach { mapStatus =>
-            // Check if the executor has been deallocated
-            if 
(!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) {
-              val blocksToDel =
-                
shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, 
mapStatus.mapId)
-              if (blocksToDel.nonEmpty) {
-                val blocks = 
blocksToDeleteByShuffleService.getOrElseUpdate(mapStatus.location,
-                  new mutable.HashSet[BlockId])
-                blocks ++= blocksToDel
+      val shuffleClient = externalBlockStoreClient.get
+      // Find all shuffle blocks on executors that are no longer running
+      val blocksToDelete = new mutable.HashMap[BlockManagerId, 
mutable.HashSet[BlockId]]
+      mapOutputTracker.shuffleStatuses.get(shuffleId) match {
+        case Some(shuffleStatus) =>
+          shuffleStatus.withMapStatuses { mapStatuses =>
+            mapStatuses.foreach { mapStatus =>
+              // Check if the executor has been deallocated
+              if 
(!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) {
+                val blocksToDel = shuffleManager.shuffleBlockResolver
+                  .getBlocksForShuffle(shuffleId, mapStatus.mapId)
+                if (blocksToDel.nonEmpty) {
+                  val blocks = 
blocksToDelete.getOrElseUpdate(mapStatus.location,
+                    new mutable.HashSet[BlockId])
+                  blocks ++= blocksToDel
+                }

Review Comment:
   What if the shuffle statuses are not exists ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to