mridulm commented on a change in pull request #30164:
URL: https://github.com/apache/spark/pull/30164#discussion_r514552978



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -95,6 +97,29 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
   val shuffleHandle: ShuffleHandle = 
_rdd.context.env.shuffleManager.registerShuffle(
     shuffleId, this)
 
+  // By default, shuffle merge is enabled for ShuffleDependency if push based 
shuffle is enabled
+  private[this] var _shuffleMergeEnabled = 
Utils.isPushBasedShuffleEnabled(rdd.sparkContext.getConf)
+
+  def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = {

Review comment:
       Make this `private[spark]`

##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -95,6 +97,29 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
   val shuffleHandle: ShuffleHandle = 
_rdd.context.env.shuffleManager.registerShuffle(
     shuffleId, this)
 
+  // By default, shuffle merge is enabled for ShuffleDependency if push based 
shuffle is enabled
+  private[this] var _shuffleMergeEnabled = 
Utils.isPushBasedShuffleEnabled(rdd.sparkContext.getConf)
+
+  def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = {
+    _shuffleMergeEnabled = shuffleMergeEnabled
+  }
+
+  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled
+
+  /**
+   * Stores the location of the list of chosen external shuffle services for 
handling the
+   * shuffle merge requests from mappers in this shuffle map stage.
+   */
+  private[this] var _mergerLocs: Seq[BlockManagerId] = Nil
+
+  def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = {

Review comment:
       Make this `private[spark]`.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1252,6 +1254,28 @@ private[spark] class DAGScheduler(
     execCores.map(cores => 
properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores))
   }
 
+  /**
+   * If push based shuffle is enabled, set the shuffle services to be used for 
the given
+   * shuffle map stage. The list of shuffle services is determined based on 
the list of
+   * active executors tracked by block manager master at the start of the 
stage.
+   */
+  private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage) 
{

Review comment:
       How about a subinterface extending `ShuffleDriverComponents` for push 
based shuffle - with a suitable default implementation ?

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -2541,6 +2541,15 @@ private[spark] object Utils extends Logging {
     master == "local" || master.startsWith("local[")
   }
 
+  /**
+   * Push based shuffle can only be enabled when external shuffle service is 
enabled.
+   * In the initial version, we cannot support pushed based shuffle and 
adaptive execution
+   * at the same time. Will improve this in a later version.
+   */
+  def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = {
+    conf.get(PUSH_BASED_SHUFFLE_ENABLED) && conf.get(SHUFFLE_SERVICE_ENABLED)

Review comment:
       I look forward to the day when the second condition will be disabled :-)
   It will be relevant for both k8s and spark streaming !
   
   +CC @dongjoon-hyun you might be interested in this in future.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
##########
@@ -92,4 +93,10 @@ private[spark] trait SchedulerBackend {
    */
   def maxNumConcurrentTasks(rp: ResourceProfile): Int
 
+  /**
+   * Get the list of both active and dead executors host locations for push 
based shuffle
+   * @return List of external shuffle services locations
+   */
+  def getMergerLocations(numPartitions: Int, resourceProfileId: Int): 
Seq[BlockManagerId] = Nil

Review comment:
       nit: Rename to be more descriptive and specific ?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1252,6 +1254,28 @@ private[spark] class DAGScheduler(
     execCores.map(cores => 
properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores))
   }
 
+  /**
+   * If push based shuffle is enabled, set the shuffle services to be used for 
the given
+   * shuffle map stage. The list of shuffle services is determined based on 
the list of
+   * active executors tracked by block manager master at the start of the 
stage.
+   */
+  private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage) 
{
+    // TODO: Handle stage reuse/retry cases separately as without finalize 
changes we cannot
+    // TODO: disable shuffle merge for the retry/reuse cases
+    val mergerLocs = sc.schedulerBackend.getMergerLocations(
+      stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
+    logDebug(s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")

Review comment:
       Make the message more descriptive ?

##########
File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
##########
@@ -161,6 +170,27 @@ private[spark] abstract class YarnSchedulerBackend(
     totalRegisteredExecutors.get() >= totalExpectedExecutors * 
minRegisteredRatio
   }
 
+  override def getMergerLocations(
+      numPartitions: Int,
+      resourceProfileId: Int): Seq[BlockManagerId] = {
+    // Currently this is naive way of calculating numMergersNeeded for a 
stage. In future,
+    // we can use better heuristics to calculate numMergersNeeded for a stage.
+    val tasksPerExecutor = sc.resourceProfileManager
+        .resourceProfileFromId(resourceProfileId).maxTasksPerExecutor(sc.conf)
+    val numMergersNeeded = math.min(
+      math.max(1, math.ceil(numPartitions / tasksPerExecutor).toInt), 
maxNumExecutors)

Review comment:
       This should also consider case when dynamic allocation is disabled.
   `maxNumExecutors` is being set to `DYN_ALLOCATION_MAX_EXECUTORS` - which 
will become `Int.MaxValue` if DRA is not enabled.

##########
File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
##########
@@ -161,6 +170,27 @@ private[spark] abstract class YarnSchedulerBackend(
     totalRegisteredExecutors.get() >= totalExpectedExecutors * 
minRegisteredRatio
   }
 
+  override def getMergerLocations(
+      numPartitions: Int,
+      resourceProfileId: Int): Seq[BlockManagerId] = {
+    // Currently this is naive way of calculating numMergersNeeded for a 
stage. In future,
+    // we can use better heuristics to calculate numMergersNeeded for a stage.
+    val tasksPerExecutor = sc.resourceProfileManager
+        .resourceProfileFromId(resourceProfileId).maxTasksPerExecutor(sc.conf)
+    val numMergersNeeded = math.min(
+      math.max(1, math.ceil(numPartitions / tasksPerExecutor).toInt), 
maxNumExecutors)
+    val minMergersThreshold = math.max(minMergersStaticThreshold,
+      math.floor(numMergersNeeded * minMergersThresholdRatio).toInt)
+    val mergerLocations = blockManagerMaster
+      .getMergerLocations(numMergersNeeded, scheduler.nodeBlacklist())

Review comment:
       Can we add a note on what the constraints for invoking this method being 
invoked ? (in SchedulerBackend.scala preferably).
   For example: 
   - Will it happen with stage retries ?
   - Will it happen with shuffle retry (same shuffleid, different stage's - 
when computed in different jobs) ?
   , etc.
   
   This impacts some of the discussion we might have here.
   For example, 
   * As blacklist is dynamic, how does this interact with stage retries ?
   * As `blockManagerMaster.getMergerLocations` is randomizing the merger 
locations, how does it impact the relative ordering of mergers ?
   , etc.

##########
File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
##########
@@ -161,6 +170,27 @@ private[spark] abstract class YarnSchedulerBackend(
     totalRegisteredExecutors.get() >= totalExpectedExecutors * 
minRegisteredRatio
   }
 
+  override def getMergerLocations(
+      numPartitions: Int,
+      resourceProfileId: Int): Seq[BlockManagerId] = {
+    // Currently this is naive way of calculating numMergersNeeded for a 
stage. In future,
+    // we can use better heuristics to calculate numMergersNeeded for a stage.
+    val tasksPerExecutor = sc.resourceProfileManager
+        .resourceProfileFromId(resourceProfileId).maxTasksPerExecutor(sc.conf)
+    val numMergersNeeded = math.min(
+      math.max(1, math.ceil(numPartitions / tasksPerExecutor).toInt), 
maxNumExecutors)
+    val minMergersThreshold = math.max(minMergersStaticThreshold,
+      math.floor(numMergersNeeded * minMergersThresholdRatio).toInt)

Review comment:
       This variable is a bit misleading.
   It is not the minimum mergers - if `numMergersNeeded` is less than 
`minMergersThreshold`, we will still return mergers if we obtained 
`numMergersNeeded` from block manager.
   
   I am wondering if this is what was intended in this method ?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1252,6 +1254,28 @@ private[spark] class DAGScheduler(
     execCores.map(cores => 
properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores))
   }
 
+  /**
+   * If push based shuffle is enabled, set the shuffle services to be used for 
the given
+   * shuffle map stage. The list of shuffle services is determined based on 
the list of
+   * active executors tracked by block manager master at the start of the 
stage.
+   */
+  private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage) 
{
+    // TODO: Handle stage reuse/retry cases separately as without finalize 
changes we cannot
+    // TODO: disable shuffle merge for the retry/reuse cases
+    val mergerLocs = sc.schedulerBackend.getMergerLocations(
+      stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
+    logDebug(s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
+
+    if (mergerLocs.nonEmpty) {
+      stage.shuffleDep.setMergerLocs(mergerLocs)
+      logInfo("Shuffle merge enabled for %s (%s) with %d merger locations"
+        .format(stage, stage.name, stage.shuffleDep.getMergerLocs.size))
+    } else {
+      stage.shuffleDep.setShuffleMergeEnabled(false)
+      logInfo("Shuffle merge disabled for %s (%s)".format(stage, stage.name))
+    }

Review comment:
       QQ: Are we planning on exposing merger location is any of the spark 
listener events ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to