tgravescs commented on a change in pull request #27583: [SPARK-29149][YARN]  
Update YARN cluster manager For Stage Level Scheduling
URL: https://github.com/apache/spark/pull/27583#discussion_r385196205
 
 

 ##########
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 ##########
 @@ -166,69 +195,188 @@ private[yarn] class YarnAllocator(
 
   private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION)
 
-  // A map to store preferred hostname and possible task numbers running on it.
-  private var hostToLocalTaskCounts: Map[String, Int] = Map.empty
-
-  // Number of tasks that have locality preferences in active stages
-  private[yarn] var numLocalityAwareTasks: Int = 0
-
   // A container placement strategy based on pending tasks' locality preference
   private[yarn] val containerPlacementStrategy =
-    new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource, 
resolver)
+    new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resolver)
+
+  // The default profile is always present so we need to initialize the 
datastructures keyed by
+  // ResourceProfile id to ensure its present if things start running before a 
request for
+  // executors could add it. This approach is easier then going and special 
casing everywhere.
+  private def initDefaultProfile(): Unit = synchronized {
+    allocatedHostToContainersMapPerRPId(DEFAULT_RESOURCE_PROFILE_ID) =
+      new HashMap[String, mutable.Set[ContainerId]]()
+    runningExecutorsPerResourceProfileId.put(DEFAULT_RESOURCE_PROFILE_ID, 
mutable.HashSet[String]())
+    numExecutorsStartingPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) = 
new AtomicInteger(0)
+    targetNumExecutorsPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) =
+      SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf)
+    rpIdToYarnResource(DEFAULT_RESOURCE_PROFILE_ID) = defaultResource
+    rpIdToResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) =
+      ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+  }
+
+  initDefaultProfile()
 
-  def getNumExecutorsRunning: Int = runningExecutors.size()
+  def getNumExecutorsRunning: Int = synchronized {
+    runningExecutorsPerResourceProfileId.values.map(_.size).sum
+  }
+
+  def getNumLocalityAwareTasks: Int = synchronized {
+    numLocalityAwareTasksPerResourceProfileId.values.sum
+  }
 
-  def getNumReleasedContainers: Int = releasedContainers.size()
+  def getNumExecutorsStarting: Int = {
 
 Review comment:
   I went ahead an added in more synchronized call in each funciton those 
variables are touched. I believe the re-entrant of synchronized is cheap so 
shouldn't be much overhead and help wiht readability and future breakages. If 
this is not what you intended let me know

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to