tgravescs commented on a change in pull request #27583: [SPARK-29149][YARN]
Update YARN cluster manager For Stage Level Scheduling
URL: https://github.com/apache/spark/pull/27583#discussion_r385168310
##########
File path:
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
##########
@@ -166,69 +195,188 @@ private[yarn] class YarnAllocator(
private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION)
- // A map to store preferred hostname and possible task numbers running on it.
- private var hostToLocalTaskCounts: Map[String, Int] = Map.empty
-
- // Number of tasks that have locality preferences in active stages
- private[yarn] var numLocalityAwareTasks: Int = 0
-
// A container placement strategy based on pending tasks' locality preference
private[yarn] val containerPlacementStrategy =
- new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource,
resolver)
+ new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resolver)
+
+ // The default profile is always present so we need to initialize the
datastructures keyed by
+ // ResourceProfile id to ensure its present if things start running before a
request for
+ // executors could add it. This approach is easier then going and special
casing everywhere.
+ private def initDefaultProfile(): Unit = synchronized {
+ allocatedHostToContainersMapPerRPId(DEFAULT_RESOURCE_PROFILE_ID) =
+ new HashMap[String, mutable.Set[ContainerId]]()
+ runningExecutorsPerResourceProfileId.put(DEFAULT_RESOURCE_PROFILE_ID,
mutable.HashSet[String]())
+ numExecutorsStartingPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) =
new AtomicInteger(0)
+ targetNumExecutorsPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) =
+ SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf)
+ rpIdToYarnResource(DEFAULT_RESOURCE_PROFILE_ID) = defaultResource
+ rpIdToResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) =
+ ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+ }
+
+ initDefaultProfile()
- def getNumExecutorsRunning: Int = runningExecutors.size()
+ def getNumExecutorsRunning: Int = synchronized {
+ runningExecutorsPerResourceProfileId.values.map(_.size).sum
+ }
+
+ def getNumLocalityAwareTasks: Int = synchronized {
+ numLocalityAwareTasksPerResourceProfileId.values.sum
+ }
- def getNumReleasedContainers: Int = releasedContainers.size()
+ def getNumExecutorsStarting: Int = {
Review comment:
I went through all the variables, they are all protected via a higher up
call. We can add in more synchonizes if we want to nest (re-entrant) it just
to make it more readable?
For instances this one is only called from allocateResources which is
synchronized and that is the case with most of these.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]