Ngone51 commented on a change in pull request #30164:
URL: https://github.com/apache/spark/pull/30164#discussion_r516635097



##########
File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
##########
@@ -161,6 +169,32 @@ private[spark] abstract class YarnSchedulerBackend(
     totalRegisteredExecutors.get() >= totalExpectedExecutors * 
minRegisteredRatio
   }
 
+  override def getShufflePushMergerLocations(
+      numPartitions: Int,
+      resourceProfileId: Int): Seq[BlockManagerId] = {
+    // Currently this is naive way of calculating numMergersNeeded for a 
stage. In future,
+    // we can use better heuristics to calculate numMergersNeeded for a stage.
+    val maxExecutors = if (Utils.isDynamicAllocationEnabled(sc.getConf)) {
+      maxNumExecutors
+    } else {
+      Int.MaxValue
+    }
+    val tasksPerExecutor = sc.resourceProfileManager
+        .resourceProfileFromId(resourceProfileId).maxTasksPerExecutor(sc.conf)

Review comment:
       nit: 2 indentations

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1281,6 +1309,12 @@ private[spark] class DAGScheduler(
     stage match {
       case s: ShuffleMapStage =>
         outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = 
s.numPartitions - 1)
+        // Only generate merger location for a given shuffle dependency once. 
This way, even if
+        // this stage gets retried, it would still be merging blocks using the 
same set of
+        // shuffle services.
+        if (s.shuffleDep.shuffleMergeEnabled) {

Review comment:
       I don't think you need to add the variable `_shuffleMergeEnabled` to 
each shuffle dependency. You can add it as a global variable of `DAScheduler`. 
   
   And then, you don't need `setShuffleMergeEnabled()` either. You don't need 
to call `setShuffleMergeEnabled(false)` when `__mergerLocs` is empty. The 
caller can decide not to push shuffle after realizing `_mergerLocs` is empty.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1252,6 +1254,32 @@ private[spark] class DAGScheduler(
     execCores.map(cores => 
properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores))
   }
 
+  /**
+   * If push based shuffle is enabled, set the shuffle services to be used for 
the given
+   * shuffle map stage for block push/merge.
+   *
+   * Even with DRA kicking in and significantly reducing the number of 
available active

Review comment:
       Could you replace `DRA` with the full representation? Seems like other 
people usually call it "dynamic executor allocation". So I wonder some people 
might not get it.

##########
File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
##########
@@ -161,6 +170,27 @@ private[spark] abstract class YarnSchedulerBackend(
     totalRegisteredExecutors.get() >= totalExpectedExecutors * 
minRegisteredRatio
   }
 
+  override def getMergerLocations(
+      numPartitions: Int,
+      resourceProfileId: Int): Seq[BlockManagerId] = {
+    // Currently this is naive way of calculating numMergersNeeded for a 
stage. In future,
+    // we can use better heuristics to calculate numMergersNeeded for a stage.
+    val tasksPerExecutor = sc.resourceProfileManager
+        .resourceProfileFromId(resourceProfileId).maxTasksPerExecutor(sc.conf)
+    val numMergersNeeded = math.min(
+      math.max(1, math.ceil(numPartitions / tasksPerExecutor).toInt), 
maxNumExecutors)
+    val minMergersThreshold = math.max(minMergersStaticThreshold,
+      math.floor(numMergersNeeded * minMergersThresholdRatio).toInt)

Review comment:
       > If numMergersNeeded < minMergersThreshold we would want to return the 
mergers obtained from BlockManagerMaster. 
   
   So if `numMergersNeeded=3` and `minMergersThreshold=5` and mergers returned 
from BlockManagerMaster is 100, you want to return the 100 mergers here? If so, 
it doesn't look reasonable.

##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -360,6 +371,17 @@ class BlockManagerMasterEndpoint(
 
   }
 
+  private def addMergerLocation(blockManagerId: BlockManagerId): Unit = {
+    if (!shuffleMergerLocations.contains(blockManagerId.host) && 
!blockManagerId.isDriver) {
+      val shuffleServerId = BlockManagerId(blockManagerId.executorId, 
blockManagerId.host,
+        StorageUtils.externalShuffleServicePort(conf))
+      if (shuffleMergerLocations.size >= maxRetainedMergerLocations) {
+        shuffleMergerLocations -= shuffleMergerLocations.head._1

Review comment:
       hmm..why we need to set a threshold of the `shuffleMergerLocations`? I 
think the number of external shuffle service should be static within a 
cluster(no matter what the resource allocation mode of the application is, 
dynamic or static) and wouldn't increase unlimitedly.
   
   Besides, the removed oldest merger may store more merged shuffle data than 
others...would be better if we could remove depends on merged shuffle data 
size...but this should be another topic though..

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1252,6 +1254,32 @@ private[spark] class DAGScheduler(
     execCores.map(cores => 
properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores))
   }
 
+  /**
+   * If push based shuffle is enabled, set the shuffle services to be used for 
the given
+   * shuffle map stage for block push/merge.
+   *
+   * Even with DRA kicking in and significantly reducing the number of 
available active
+   * executors, we would still be able to get sufficient shuffle service 
locations for
+   * block push/merge by getting the historical locations of past executors.
+   */
+  private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage) 
{
+    // TODO: Handle stage reuse/retry cases separately as without finalize 
changes we cannot
+    // TODO: disable shuffle merge for the retry/reuse cases
+    val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
+      stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
+    logDebug(s"List of shuffle push merger locations " +
+      s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")

Review comment:
       You're calling `getMergerLocs` before `setMergerLocs`..

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1927,4 +1927,34 @@ package object config {
       .version("3.0.1")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
+    ConfigBuilder("spark.shuffle.push.enabled")
+      .doc("Set to 'true' to enable push based shuffle")

Review comment:
       Yes, please.

##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -360,6 +371,17 @@ class BlockManagerMasterEndpoint(
 
   }
 
+  private def addMergerLocation(blockManagerId: BlockManagerId): Unit = {
+    if (!shuffleMergerLocations.contains(blockManagerId.host) && 
!blockManagerId.isDriver) {
+      val shuffleServerId = BlockManagerId(blockManagerId.executorId, 
blockManagerId.host,
+        StorageUtils.externalShuffleServicePort(conf))
+      if (shuffleMergerLocations.size >= maxRetainedMergerLocations) {
+        shuffleMergerLocations -= shuffleMergerLocations.head._1
+      }
+      shuffleMergerLocations(shuffleServerId.host) = shuffleServerId

Review comment:
       I wonder why you need a separate `shuffleMergerLocations` since it 
doesn't have a big difference comparing to the `BlockManagerId`. I guess this 
is because of the dynamic allocation case where a node may have no running 
executors but external shuffle service only...right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to