otterc commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1066083711


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -321,6 +321,12 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
+    val mergerLocations =
+      if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
+        mapOutputTracker.getShufflePushMergerLocations(shuffleId)
+      } else {
+        Seq.empty[BlockManagerId]
+      }

Review Comment:
   Nit: why did we add this code here and then the rest of the related code is 
after 30 to 35 lines?  Having related code together increases code readability 
so I would suggest to move it together.



##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -366,8 +372,19 @@ class BlockManagerMasterEndpoint(
         }
       }.getOrElse(Seq.empty)
 
+    val removeShuffleMergeFromShuffleServicesFutures =
+      externalBlockStoreClient.map { shuffleClient =>
+        mergerLocations.map { bmId =>
+          Future[Boolean] {
+            shuffleClient.removeShuffleMerge(bmId.host, bmId.port, shuffleId,
+              RemoteBlockPushResolver.DELETE_ALL_MERGED_SHUFFLE)

Review Comment:
   Shouldn't we declare `DELETE_ALL_MERGED_SHUFFLE` constant somewhere else? It 
seems odd that `BlockManagerMasterEndpoint` which is in core has now a code 
dependency on `RemoteBlockPushResolver`.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -396,6 +403,67 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
     }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+    if (appShuffleInfo.attemptId != msg.appAttemptId) {
+      throw new IllegalArgumentException(
+          String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+                  + "with the current attempt id %s stored in shuffle service 
for application %s",
+              msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+    }
+    appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+      if (mergePartitionsInfo == null) {
+        if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) {
+          return null;
+        } else {
+          writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
+              msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId));
+          return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+        }
+      }
+      boolean deleteCurrentMergedShuffle =
+          msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE ||
+              msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+      int shuffleMergeId = msg.shuffleMergeId != DELETE_ALL_MERGED_SHUFFLE ?
+          msg.shuffleMergeId : mergePartitionsInfo.shuffleMergeId;
+      AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+          new AppAttemptShuffleMergeId(
+              msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
+          msg.appId, msg.appAttemptId, msg.shuffleId, shuffleMergeId);
+      if(deleteCurrentMergedShuffle) {
+        // request to clean up shuffle we are currently hosting
+        if (!mergePartitionsInfo.isFinalized()) {
+          submitCleanupTask(() ->
+            closeAndDeleteOutdatedPartitions(
+                currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions));
+        } else {
+          submitCleanupTask(() ->
+            deleteMergedFiles(currentAppAttemptShuffleMergeId, appShuffleInfo,
+                mergePartitionsInfo.getReduceIds(), false));
+        }
+      } else if(shuffleMergeId < mergePartitionsInfo.shuffleMergeId) {
+        throw new RuntimeException(String.format("Asked to remove old shuffle 
merged data for " +
+                "application %s shuffleId %s shuffleMergeId %s, but current 
shuffleMergeId %s ",
+            msg.appId, msg.shuffleId, shuffleMergeId, 
mergePartitionsInfo.shuffleMergeId));
+      } else if (shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
+        // cleanup request for newer shuffle - remove the outdated data we 
have.
+        if (!mergePartitionsInfo.isFinalized()) {
+          submitCleanupTask(() ->
+              closeAndDeleteOutdatedPartitions(

Review Comment:
   Why are we asynchronously removing the files and the data in the db?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java:
##########
@@ -256,6 +256,22 @@ public void onFailure(Throwable e) {
     }
   }
 
+  @Override
+  public boolean removeShuffleMerge(String host, int port, int shuffleId, int 
shuffleMergeId) {
+    checkInit();
+    try {
+      TransportClient client = clientFactory.createClient(host, port);
+      client.send(
+          new RemoveShuffleMerge(appId, comparableAppAttemptId, shuffleId, 
shuffleMergeId)

Review Comment:
   Are we not going to even log whether the server was successful removing the 
shuffle merge data?  We do that for `removeBlocks` so I am just wondering why 
not do the same here. At the least it makes it more visible that there was a 
problem at the server while deleting merge shuffle data.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -396,6 +403,67 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
     }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+    if (appShuffleInfo.attemptId != msg.appAttemptId) {
+      throw new IllegalArgumentException(
+          String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+                  + "with the current attempt id %s stored in shuffle service 
for application %s",
+              msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+    }
+    appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+      if (mergePartitionsInfo == null) {
+        if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) {
+          return null;
+        } else {
+          writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
+              msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId));
+          return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+        }
+      }
+      boolean deleteCurrentMergedShuffle =
+          msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE ||
+              msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+      int shuffleMergeId = msg.shuffleMergeId != DELETE_ALL_MERGED_SHUFFLE ?
+          msg.shuffleMergeId : mergePartitionsInfo.shuffleMergeId;
+      AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+          new AppAttemptShuffleMergeId(
+              msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
+          msg.appId, msg.appAttemptId, msg.shuffleId, shuffleMergeId);
+      if(deleteCurrentMergedShuffle) {
+        // request to clean up shuffle we are currently hosting
+        if (!mergePartitionsInfo.isFinalized()) {
+          submitCleanupTask(() ->
+            closeAndDeleteOutdatedPartitions(
+                currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions));
+        } else {
+          submitCleanupTask(() ->
+            deleteMergedFiles(currentAppAttemptShuffleMergeId, appShuffleInfo,
+                mergePartitionsInfo.getReduceIds(), false));
+        }
+      } else if(shuffleMergeId < mergePartitionsInfo.shuffleMergeId) {
+        throw new RuntimeException(String.format("Asked to remove old shuffle 
merged data for " +
+                "application %s shuffleId %s shuffleMergeId %s, but current 
shuffleMergeId %s ",
+            msg.appId, msg.shuffleId, shuffleMergeId, 
mergePartitionsInfo.shuffleMergeId));

Review Comment:
   Could you please add more comments for each of these conditions, otherwise 
it is hard to understand.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java:
##########
@@ -84,4 +85,9 @@ public MergedBlockMeta getMergedBlockMeta(
   public String[] getMergedBlockDirs(String appId) {
     throw new UnsupportedOperationException("Cannot handle shuffle block 
merge");
   }
+
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge removeShuffleMerge) {

Review Comment:
   Nit: I would keep the signature consistent with other methods. The other 
methods accept appId, shuffleId, shuffleMergeId and not the wrapping message.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -396,6 +403,67 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
     }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+    if (appShuffleInfo.attemptId != msg.appAttemptId) {
+      throw new IllegalArgumentException(
+          String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+                  + "with the current attempt id %s stored in shuffle service 
for application %s",
+              msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+    }
+    appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+      if (mergePartitionsInfo == null) {
+        if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) {
+          return null;
+        } else {
+          writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
+              msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId));
+          return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+        }
+      }
+      boolean deleteCurrentMergedShuffle =
+          msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE ||
+              msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+      int shuffleMergeId = msg.shuffleMergeId != DELETE_ALL_MERGED_SHUFFLE ?
+          msg.shuffleMergeId : mergePartitionsInfo.shuffleMergeId;
+      AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+          new AppAttemptShuffleMergeId(
+              msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
+          msg.appId, msg.appAttemptId, msg.shuffleId, shuffleMergeId);
+      if(deleteCurrentMergedShuffle) {
+        // request to clean up shuffle we are currently hosting
+        if (!mergePartitionsInfo.isFinalized()) {
+          submitCleanupTask(() ->
+            closeAndDeleteOutdatedPartitions(
+                currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions));
+        } else {
+          submitCleanupTask(() ->
+            deleteMergedFiles(currentAppAttemptShuffleMergeId, appShuffleInfo,
+                mergePartitionsInfo.getReduceIds(), false));
+        }
+      } else if(shuffleMergeId < mergePartitionsInfo.shuffleMergeId) {
+        throw new RuntimeException(String.format("Asked to remove old shuffle 
merged data for " +
+                "application %s shuffleId %s shuffleMergeId %s, but current 
shuffleMergeId %s ",
+            msg.appId, msg.shuffleId, shuffleMergeId, 
mergePartitionsInfo.shuffleMergeId));
+      } else if (shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {

Review Comment:
   `shuffleMergeId > mergePartitionsInfo.shuffleMergeId` at this point will 
always be true



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -95,6 +96,12 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
   public static final String MERGE_DIR_KEY = "mergeDir";
   public static final String ATTEMPT_ID_KEY = "attemptId";
   private static final int UNDEFINED_ATTEMPT_ID = -1;
+
+  /**
+   * The flag for deleting the current merged shuffle data.
+   */
+  public static final int DELETE_ALL_MERGED_SHUFFLE = -1;

Review Comment:
   Nit: the comment conflicts with the name of the constant. Is this to delete 
shuffle data belonging all the `shuffleMergeIds` or the `currentMergeShuffleId`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to