mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r973756877


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java:
##########
@@ -255,4 +255,17 @@ public void getMergedBlockMeta(
       MergedBlocksMetaListener listener) {
     throw new UnsupportedOperationException();
   }
+
+  /**
+   * Remove the shuffle merge data in shuffle services
+   *
+   * @param host the host of the remote node.
+   * @param port the port of the remote node.
+   * @param shuffleId shuffle id.
+   *
+   * @since 3.4.0
+   */
+  public boolean removeShuffleMerge(String host, int port, int shuffleId) {

Review Comment:
   Pass the `shuffleMergeId` as well here (and everywhere else below as 
relevant).
   This will make sure the protocol/api extensible for future use if/when we 
want to cleanup for a specific merge id.
   
   Use a specific value to indicate cleanup for all/any shuffle merge id's (for 
example `-1`, since we start with `0` for shuffle merge id and bump up for new 
stage attempts for indeterminate stages).
   
   For now, we can fail request if `shuffleMergeId` is not `-1` (in 
`RemoteBlockPushResolver.java`) - and progressively add support for cleanup of 
specific merge id in future versions as we need : but this will ensure protocol 
changes are not required at that point in time.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.spark.network.protocol.Encoders;
+
+/**
+ * Remove the merged data for a given shuffle.
+ * Returns {@link Boolean}
+ *
+ * @since 3.4.0
+ */
+public class RemoveShuffleMerge extends BlockTransferMessage {
+  public final String appId;

Review Comment:
   Add `appAttemptId` as well here.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -393,6 +393,20 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
     }
   }
 
+  @Override
+  public void removeShuffleMerge(String appId, int shuffleId) {

Review Comment:
   When adding support for `shuffleMergeId`, follow the same pattern as 
`finalizeShuffleMerge` - there are a few corner cases here, and that method 
handles them.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.spark.network.protocol.Encoders;
+
+/**
+ * Remove the merged data for a given shuffle.
+ * Returns {@link Boolean}
+ *
+ * @since 3.4.0
+ */
+public class RemoveShuffleMerge extends BlockTransferMessage {
+  public final String appId;
+  public final int shuffleId;
+
+  public RemoveShuffleMerge(String appId, int shuffleId) {
+    this.appId = appId;
+    this.shuffleId = shuffleId;
+  }
+
+  @Override
+  protected Type type() {
+    return Type.REMOVE_SHUFFLE_MERGE;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(appId, shuffleId);
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+      .append("appId", appId)
+      .append("shuffleId", shuffleId)
+      .toString();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other != null && other instanceof RemoveShuffleMerge) {
+      RemoveShuffleMerge o = (RemoveShuffleMerge) other;
+      return Objects.equal(appId, o.appId)
+        && shuffleId == o.shuffleId;

Review Comment:
   nit: Check primitives first before objects



##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -364,8 +364,16 @@ class BlockManagerMasterEndpoint(
         }
       }.getOrElse(Seq.empty)
 
+    val removeShuffleMergeFromShuffleServicesFutures =

Review Comment:
   Do this only when push based shuffle is enabled to avoid the 
`getShufflePushMergerLocations` call.



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1395,16 +1393,24 @@ private[spark] class DAGScheduler(
   }
 
   private def getAndSetShufflePushMergerLocations(stage: ShuffleMapStage): 
Seq[BlockManagerId] = {
-    val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
-      stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
-    if (mergerLocs.nonEmpty) {
-      stage.shuffleDep.setMergerLocs(mergerLocs)
+    stage.shuffleDep.synchronized {
+      val oldMergeLocs = stage.shuffleDep.getMergerLocs
+      if (oldMergeLocs.isEmpty) {
+        val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
+          stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
+        if (mergerLocs.nonEmpty) {
+          stage.shuffleDep.setMergerLocs(mergerLocs)
+          mapOutputTracker.registerShufflePushMergerLocations(
+            stage.shuffleDep.shuffleId, mergerLocs)
+        }
+        logDebug(s"Shuffle merge locations for shuffle 
${stage.shuffleDep.shuffleId} with" +
+          s" shuffle merge ${stage.shuffleDep.shuffleMergeId} is" +
+          s" ${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
+        mergerLocs
+      } else {
+        oldMergeLocs
+      }
     }
-
-    logDebug(s"Shuffle merge locations for shuffle 
${stage.shuffleDep.shuffleId} with" +
-      s" shuffle merge ${stage.shuffleDep.shuffleMergeId} is" +
-      s" ${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
-    mergerLocs
   }

Review Comment:
   We dont need to synchronize here - it is within context of `DAGScheduler` 
lock
   Given this, the change would be simpler:
   
   ```
   if (stage.shuffleDep.getMergerLocs.nonEmpty) return
   val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations( ... )
   if (mergerLocs.nonEmpty) {
     stage.shuffleDep.setMergerLocs(mergerLocs)
     mapOutputTracker.registerShufflePushMergerLocations( ... )
     logDebug( ...)
   }
   ```
   
   
   We can also rename the method as `configureShufflePushMergerLocations` - 
since we no longer need to return the value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to