Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18950#discussion_r134581946
  
    --- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
    @@ -727,6 +780,68 @@ private[spark] class ExecutorAllocationManager(
         }
     
         /**
    +     * Calculate the maximum no. of concurrent tasks that can run 
currently.
    +     */
    +    def getMaxConTasks(): Int = {
    +      // We can limit the no. of concurrent tasks by a job group and 
multiple jobs can run with
    +      // multiple stages. We need to get all the active stages belonging 
to a job group to calculate
    +      // the total no. of pending + running tasks to decide the maximum 
no. of executors we need at
    +      // that time to serve the outstanding tasks. This is capped by the 
minimum of no. of
    +      // outstanding tasks and the max concurrent limit specified for the 
job group if any.
    +      val stagesByJobGroup = stageIdToNumTasks.groupBy(x => 
jobIdToJobGroup(stageIdToJobId(x._1)))
    +
    +      def getMaxConTasks(
    +          maxConTasks: Int,
    +          stagesByJobGroupItr: Iterator[(String, mutable.HashMap[Int, 
Int])]): Int = {
    +        if (stagesByJobGroupItr.hasNext) {
    +          val (jobGroupId, stages) = stagesByJobGroupItr.next
    +          // Get the total running and pending tasks for a job group.
    +          val totalIncompleteTasksForJobGroup = 
getIncompleteTasksForJobGroup(0, stages.iterator)
    +          val maxTasks = Math.min(jobGroupToMaxConTasks(jobGroupId),
    +            totalIncompleteTasksForJobGroup)
    +          if (doesSumOverflow(maxConTasks, maxTasks)) {
    +            Int.MaxValue
    +          } else {
    +            getMaxConTasks(maxConTasks + maxTasks, stagesByJobGroupItr)
    +          }
    +        } else {
    +          maxConTasks
    +        }
    +      }
    +
    +      // Get the total running & pending tasks for all stages in a job 
group.
    +      def getIncompleteTasksForJobGroup(totalTasks: Int, stagesItr: 
Iterator[(Int, Int)]): Int = {
    +        if (stagesItr.hasNext) {
    +          val (stageId, numTasks) = stagesItr.next
    +          val activeTasks = getIncompleteTasksForStage(stageId, numTasks)
    +          if (doesSumOverflow(totalTasks, activeTasks)) {
    +            Int.MaxValue
    +          } else {
    +            getIncompleteTasksForJobGroup(totalTasks + activeTasks, 
stagesItr)
    +          }
    +        } else {
    +          totalTasks
    +        }
    +      }
    +
    +      // Get the total running & pending tasks for a single stage.
    +      def getIncompleteTasksForStage(stageId: Int, numTasks: Int): Int = {
    +        var pendingTasks = numTasks
    +        if (stageIdToTaskIndices.contains(stageId)) {
    +          pendingTasks -= stageIdToTaskIndices(stageId).size
    +        }
    +        var runningTasks = 0
    +        if (stageIdToCompleteTaskCount.contains(stageId)) {
    +          runningTasks = stageIdToTaskIndices(stageId).size - 
stageIdToCompleteTaskCount(stageId)
    +        }
    +        pendingTasks + runningTasks
    +      }
    --- End diff --
    
    nit: add newline after just to make it more readable to see the call to 
getMaxConTasks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to