mridulm commented on a change in pull request #30164:
URL: https://github.com/apache/spark/pull/30164#discussion_r516199843
##########
File path:
resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
##########
@@ -161,6 +170,27 @@ private[spark] abstract class YarnSchedulerBackend(
totalRegisteredExecutors.get() >= totalExpectedExecutors *
minRegisteredRatio
}
+ override def getMergerLocations(
+ numPartitions: Int,
+ resourceProfileId: Int): Seq[BlockManagerId] = {
+ // Currently this is naive way of calculating numMergersNeeded for a
stage. In future,
+ // we can use better heuristics to calculate numMergersNeeded for a stage.
+ val tasksPerExecutor = sc.resourceProfileManager
+ .resourceProfileFromId(resourceProfileId).maxTasksPerExecutor(sc.conf)
+ val numMergersNeeded = math.min(
+ math.max(1, math.ceil(numPartitions / tasksPerExecutor).toInt),
maxNumExecutors)
Review comment:
Without DRA, shouldn't `maxNumExecutors` not be max executors statically
configured for the spark application ?
Else we are in effect we are setting `numMergersNeeded` to `numPartitions /
tasksPerExecutor` always
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1252,6 +1254,28 @@ private[spark] class DAGScheduler(
execCores.map(cores =>
properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores))
}
+ /**
+ * If push based shuffle is enabled, set the shuffle services to be used for
the given
+ * shuffle map stage. The list of shuffle services is determined based on
the list of
+ * active executors tracked by block manager master at the start of the
stage.
+ */
+ private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage)
{
+ // TODO: Handle stage reuse/retry cases separately as without finalize
changes we cannot
+ // TODO: disable shuffle merge for the retry/reuse cases
+ val mergerLocs = sc.schedulerBackend.getMergerLocations(
+ stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
+ logDebug(s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
+
+ if (mergerLocs.nonEmpty) {
+ stage.shuffleDep.setMergerLocs(mergerLocs)
+ logInfo("Shuffle merge enabled for %s (%s) with %d merger locations"
+ .format(stage, stage.name, stage.shuffleDep.getMergerLocs.size))
+ } else {
+ stage.shuffleDep.setShuffleMergeEnabled(false)
+ logInfo("Shuffle merge disabled for %s (%s)".format(stage, stage.name))
+ }
Review comment:
No, I meant as part of relevant `SparkListenerEvent`.
So that UI and/or analysis tools can see where the mergers are, how many
merged blocks are hosted by that shuffle service, etc..
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -657,6 +679,14 @@ class BlockManagerMasterEndpoint(
}
}
+ private def getMergerLocations(
+ numMergersNeeded: Int,
+ hostsToFilter: Set[String]): Seq[BlockManagerId] = {
+ // Copying the merger locations to a list so that the original
mergerLocations won't be shuffled
+ val mergers = mergerLocations.values.filterNot(x =>
hostsToFilter.contains(x.host)).toSeq
+ Utils.randomize(mergers).take(numMergersNeeded)
Review comment:
Are you preferring active executors to bias towards the possibility of
increasing the chance of one or more active executors on those nodes when
reducers start @tgravescs ? Prioritizing active executors does increase the
chances of that ... did we do experiments with that sort of prioritization
Venkat ?
I agree with the other points @venkata91 raised - just that with what Tom
mentioned, there is a higher probability of having existing executors there
already, and so not necessarily need to wait for allocation to kick in.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]