squito commented on a change in pull request #19194: [SPARK-20589] Allow 
limiting task concurrency per stage
URL: https://github.com/apache/spark/pull/19194#discussion_r248848309
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
 ##########
 @@ -800,10 +874,47 @@ private[spark] class ExecutorAllocationManager(
       allocationManager.synchronized {
         stageIdToNumSpeculativeTasks(stageId) =
           stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1
+        maxConcurrentTasks = getMaxConTasks
+        logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on 
spec. task submitted.")
         allocationManager.onSchedulerBacklogged()
       }
     }
 
+    /**
+     * Calculate the maximum no. of concurrent tasks that can run currently.
+     */
+    def getMaxConTasks(): Int = {
+      // We can limit the no. of concurrent tasks by a job group. A job group 
can have multiple jobs
+      // with multiple stages. We need to get all the active stages belonging 
to a job group to
+      // calculate the total no. of pending + running tasks to decide the 
maximum no. of executors
+      // we need at that time to serve the outstanding tasks. This is capped 
by the minimum no. of
+      // outstanding tasks and the max concurrent limit specified for the job 
group if any.
+
+      def sumIncompleteTasksForStages: (Int, Int) => Int = (totalTasks, 
stageId) => {
+        val activeTasks = totalPendingTasks(stageId) + 
totalRunningTasks(stageId)
+        sumOrMax(totalTasks, activeTasks)
+      }
+      // Get the total running & pending tasks for all stages in a job group.
+      def getIncompleteTasksForJobGroup(stagesItr: Set[Int]): Int = {
+        stagesItr.foldLeft(0)(sumIncompleteTasksForStages)
+      }
+
+      def sumIncompleteTasksForJobGroup: (Int, (String, mutable.HashMap[Int, 
String])) => Int = {
+        (maxConTasks, x) => {
+          val totalIncompleteTasksForJobGroup = 
getIncompleteTasksForJobGroup(x._2.keySet.toSet)
 
 Review comment:
   I think this would be a little easier to follow as
   
   ```scala
   def sumIncompleteTasksForJobGroup: (Int, (String, Set[Int])) => Int = {
     (sumSoFar, (jobGroup, stages) =>
       val totalIncompleteTasksForJobGroup = 
getIncompleteTasksForJobGroup(stages)
   ...
   val stagesByJobGroup = stageIdToJobGroupId.groupBy(_._2).mapValues(_.keySet)
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to