venkata91 commented on a change in pull request #30164:
URL: https://github.com/apache/spark/pull/30164#discussion_r516125221



##########
File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
##########
@@ -161,6 +170,27 @@ private[spark] abstract class YarnSchedulerBackend(
     totalRegisteredExecutors.get() >= totalExpectedExecutors * 
minRegisteredRatio
   }
 
+  override def getMergerLocations(
+      numPartitions: Int,
+      resourceProfileId: Int): Seq[BlockManagerId] = {
+    // Currently this is naive way of calculating numMergersNeeded for a 
stage. In future,
+    // we can use better heuristics to calculate numMergersNeeded for a stage.
+    val tasksPerExecutor = sc.resourceProfileManager
+        .resourceProfileFromId(resourceProfileId).maxTasksPerExecutor(sc.conf)
+    val numMergersNeeded = math.min(
+      math.max(1, math.ceil(numPartitions / tasksPerExecutor).toInt), 
maxNumExecutors)

Review comment:
       Yes, you're right. I was wondering without DRA or without any max 
executors set for DRA then bounding the max to `Int.MaxValue` is reasonable 
right? `BlockManagerMasterEndpoint` would only return as many which are 
available. Thoughts?

##########
File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
##########
@@ -161,6 +170,27 @@ private[spark] abstract class YarnSchedulerBackend(
     totalRegisteredExecutors.get() >= totalExpectedExecutors * 
minRegisteredRatio
   }
 
+  override def getMergerLocations(
+      numPartitions: Int,
+      resourceProfileId: Int): Seq[BlockManagerId] = {
+    // Currently this is naive way of calculating numMergersNeeded for a 
stage. In future,
+    // we can use better heuristics to calculate numMergersNeeded for a stage.
+    val tasksPerExecutor = sc.resourceProfileManager
+        .resourceProfileFromId(resourceProfileId).maxTasksPerExecutor(sc.conf)
+    val numMergersNeeded = math.min(
+      math.max(1, math.ceil(numPartitions / tasksPerExecutor).toInt), 
maxNumExecutors)

Review comment:
       Yes, you're right. On a second thought, without DRA or without any max 
executors set for DRA then bounding the max to `Int.MaxValue` is reasonable 
right? `BlockManagerMasterEndpoint` would only return as many which are 
available. Thoughts?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1252,6 +1254,28 @@ private[spark] class DAGScheduler(
     execCores.map(cores => 
properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores))
   }
 
+  /**
+   * If push based shuffle is enabled, set the shuffle services to be used for 
the given
+   * shuffle map stage. The list of shuffle services is determined based on 
the list of
+   * active executors tracked by block manager master at the start of the 
stage.
+   */
+  private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage) 
{
+    // TODO: Handle stage reuse/retry cases separately as without finalize 
changes we cannot
+    // TODO: disable shuffle merge for the retry/reuse cases
+    val mergerLocs = sc.schedulerBackend.getMergerLocations(
+      stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
+    logDebug(s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
+
+    if (mergerLocs.nonEmpty) {
+      stage.shuffleDep.setMergerLocs(mergerLocs)
+      logInfo("Shuffle merge enabled for %s (%s) with %d merger locations"
+        .format(stage, stage.name, stage.shuffleDep.getMergerLocs.size))
+    } else {
+      stage.shuffleDep.setShuffleMergeEnabled(false)
+      logInfo("Shuffle merge disabled for %s (%s)".format(stage, stage.name))
+    }

Review comment:
       Can you please elaborate a bit on exposing merger locations through 
`SparkListener` events? Are you thinking about passing merger locations to 
`ExecutorAllocationManager` to request new executors based on this info?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1252,6 +1254,28 @@ private[spark] class DAGScheduler(
     execCores.map(cores => 
properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores))
   }
 
+  /**
+   * If push based shuffle is enabled, set the shuffle services to be used for 
the given
+   * shuffle map stage. The list of shuffle services is determined based on 
the list of
+   * active executors tracked by block manager master at the start of the 
stage.
+   */
+  private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage) 
{
+    // TODO: Handle stage reuse/retry cases separately as without finalize 
changes we cannot
+    // TODO: disable shuffle merge for the retry/reuse cases
+    val mergerLocs = sc.schedulerBackend.getMergerLocations(
+      stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
+    logDebug(s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
+
+    if (mergerLocs.nonEmpty) {
+      stage.shuffleDep.setMergerLocs(mergerLocs)
+      logInfo("Shuffle merge enabled for %s (%s) with %d merger locations"
+        .format(stage, stage.name, stage.shuffleDep.getMergerLocs.size))
+    } else {
+      stage.shuffleDep.setShuffleMergeEnabled(false)
+      logInfo("Shuffle merge disabled for %s (%s)".format(stage, stage.name))
+    }

Review comment:
       @mridulm Can you please elaborate a bit on exposing merger locations 
through `SparkListener` events? Are you thinking about passing merger locations 
to `ExecutorAllocationManager` to request new executors based on this info?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1252,6 +1254,28 @@ private[spark] class DAGScheduler(
     execCores.map(cores => 
properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores))
   }
 
+  /**
+   * If push based shuffle is enabled, set the shuffle services to be used for 
the given
+   * shuffle map stage. The list of shuffle services is determined based on 
the list of
+   * active executors tracked by block manager master at the start of the 
stage.

Review comment:
       This is my bad, earlier we didn't have support for DRA and things 
changed recently. Will fix this one.

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1927,4 +1927,31 @@ package object config {
       .version("3.0.1")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
+    ConfigBuilder("spark.shuffle.push.based.enabled")
+      .doc("Set to 'true' to enable push based shuffle")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val MAX_MERGER_LOCATIONS_CACHED =
+    ConfigBuilder("spark.shuffle.push.retainedMergerLocations")
+      .doc("Max number of shuffle services hosts info cached to determine the 
locations of" +

Review comment:
       I tried describing it bit more, let me know if this looks ok or needs 
more info.

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1927,4 +1927,31 @@ package object config {
       .version("3.0.1")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
+    ConfigBuilder("spark.shuffle.push.based.enabled")
+      .doc("Set to 'true' to enable push based shuffle")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val MAX_MERGER_LOCATIONS_CACHED =
+    ConfigBuilder("spark.shuffle.push.retainedMergerLocations")
+      .doc("Max number of shuffle services hosts info cached to determine the 
locations of" +
+        " shuffle services when pushing the blocks.")
+      .intConf
+      .createWithDefault(500)
+
+  private[spark] val MERGER_LOCATIONS_MIN_THRESHOLD_RATIO =
+    ConfigBuilder("spark.shuffle.push.mergerLocations.minThresholdRatio")
+      .doc("Minimum percentage of shuffle services (merger locations) should 
be available with" +
+        " respect to numPartitions in order to enable push based shuffle for a 
stage.")

Review comment:
       I tried describing it bit more, let me know if this looks ok or needs 
more info.

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1927,4 +1927,31 @@ package object config {
       .version("3.0.1")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
+    ConfigBuilder("spark.shuffle.push.based.enabled")
+      .doc("Set to 'true' to enable push based shuffle")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val MAX_MERGER_LOCATIONS_CACHED =
+    ConfigBuilder("spark.shuffle.push.retainedMergerLocations")
+      .doc("Max number of shuffle services hosts info cached to determine the 
locations of" +
+        " shuffle services when pushing the blocks.")
+      .intConf
+      .createWithDefault(500)
+
+  private[spark] val MERGER_LOCATIONS_MIN_THRESHOLD_RATIO =
+    ConfigBuilder("spark.shuffle.push.mergerLocations.minThresholdRatio")
+      .doc("Minimum percentage of shuffle services (merger locations) should 
be available with" +
+        " respect to numPartitions in order to enable push based shuffle for a 
stage.")
+      .doubleConf
+      .createWithDefault(0.05)
+
+  private[spark] val MERGER_LOCATIONS_MIN_STATIC_THRESHOLD =
+    ConfigBuilder("spark.shuffle.push.mergerLocations.minStaticThreshold")
+      .doc("Minimum number of shuffle services (merger locations) should be 
available in order" +
+        "to enable push based shuffle for a stage.")

Review comment:
       I tried describing it bit more, let me know if this looks ok or needs 
more info.

##########
File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
##########
@@ -161,6 +170,27 @@ private[spark] abstract class YarnSchedulerBackend(
     totalRegisteredExecutors.get() >= totalExpectedExecutors * 
minRegisteredRatio
   }
 
+  override def getMergerLocations(
+      numPartitions: Int,
+      resourceProfileId: Int): Seq[BlockManagerId] = {
+    // Currently this is naive way of calculating numMergersNeeded for a 
stage. In future,
+    // we can use better heuristics to calculate numMergersNeeded for a stage.
+    val tasksPerExecutor = sc.resourceProfileManager
+        .resourceProfileFromId(resourceProfileId).maxTasksPerExecutor(sc.conf)
+    val numMergersNeeded = math.min(
+      math.max(1, math.ceil(numPartitions / tasksPerExecutor).toInt), 
maxNumExecutors)

Review comment:
       Yes you're right. I totally forgot about the static executors after 
having DRA for so long ;)

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1252,6 +1254,28 @@ private[spark] class DAGScheduler(
     execCores.map(cores => 
properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores))
   }
 
+  /**
+   * If push based shuffle is enabled, set the shuffle services to be used for 
the given
+   * shuffle map stage. The list of shuffle services is determined based on 
the list of
+   * active executors tracked by block manager master at the start of the 
stage.
+   */
+  private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage) 
{
+    // TODO: Handle stage reuse/retry cases separately as without finalize 
changes we cannot
+    // TODO: disable shuffle merge for the retry/reuse cases
+    val mergerLocs = sc.schedulerBackend.getMergerLocations(
+      stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
+    logDebug(s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
+
+    if (mergerLocs.nonEmpty) {
+      stage.shuffleDep.setMergerLocs(mergerLocs)
+      logInfo("Shuffle merge enabled for %s (%s) with %d merger locations"
+        .format(stage, stage.name, stage.shuffleDep.getMergerLocs.size))
+    } else {
+      stage.shuffleDep.setShuffleMergeEnabled(false)
+      logInfo("Shuffle merge disabled for %s (%s)".format(stage, stage.name))
+    }

Review comment:
       How about doing this as part of client side metrics changes? That can 
introduce so many changes if we do as part of this change. 

##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -657,6 +679,14 @@ class BlockManagerMasterEndpoint(
     }
   }
 
+  private def getMergerLocations(
+      numMergersNeeded: Int,
+      hostsToFilter: Set[String]): Seq[BlockManagerId] = {
+    // Copying the merger locations to a list so that the original 
mergerLocations won't be shuffled
+    val mergers = mergerLocations.values.filterNot(x => 
hostsToFilter.contains(x.host)).toSeq
+    Utils.randomize(mergers).take(numMergersNeeded)

Review comment:
       IIRC we had the same discussion before internally, 
   1. With DRA eventually we will get executors on those hosts where we have 
merged blocks as we are passing the locality info to DRA, though it's not 
guaranteed all the time. 
   2. Even with preferring activeExecutors, it's possible at the end of the 
stage they can go down due to different tasks complete at different time and 
executors idling out during that period of time. So its not really guaranteed 
even if we prefer active executors for merger location that those executors 
remain alive for the subsequent stage.
   
   @tgravescs Thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to