Github user tgravescs commented on a diff in the pull request:
https://github.com/apache/spark/pull/18950#discussion_r134581946
--- Diff:
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -727,6 +780,68 @@ private[spark] class ExecutorAllocationManager(
}
/**
+ * Calculate the maximum no. of concurrent tasks that can run
currently.
+ */
+ def getMaxConTasks(): Int = {
+ // We can limit the no. of concurrent tasks by a job group and
multiple jobs can run with
+ // multiple stages. We need to get all the active stages belonging
to a job group to calculate
+ // the total no. of pending + running tasks to decide the maximum
no. of executors we need at
+ // that time to serve the outstanding tasks. This is capped by the
minimum of no. of
+ // outstanding tasks and the max concurrent limit specified for the
job group if any.
+ val stagesByJobGroup = stageIdToNumTasks.groupBy(x =>
jobIdToJobGroup(stageIdToJobId(x._1)))
+
+ def getMaxConTasks(
+ maxConTasks: Int,
+ stagesByJobGroupItr: Iterator[(String, mutable.HashMap[Int,
Int])]): Int = {
+ if (stagesByJobGroupItr.hasNext) {
+ val (jobGroupId, stages) = stagesByJobGroupItr.next
+ // Get the total running and pending tasks for a job group.
+ val totalIncompleteTasksForJobGroup =
getIncompleteTasksForJobGroup(0, stages.iterator)
+ val maxTasks = Math.min(jobGroupToMaxConTasks(jobGroupId),
+ totalIncompleteTasksForJobGroup)
+ if (doesSumOverflow(maxConTasks, maxTasks)) {
+ Int.MaxValue
+ } else {
+ getMaxConTasks(maxConTasks + maxTasks, stagesByJobGroupItr)
+ }
+ } else {
+ maxConTasks
+ }
+ }
+
+ // Get the total running & pending tasks for all stages in a job
group.
+ def getIncompleteTasksForJobGroup(totalTasks: Int, stagesItr:
Iterator[(Int, Int)]): Int = {
+ if (stagesItr.hasNext) {
+ val (stageId, numTasks) = stagesItr.next
+ val activeTasks = getIncompleteTasksForStage(stageId, numTasks)
+ if (doesSumOverflow(totalTasks, activeTasks)) {
+ Int.MaxValue
+ } else {
+ getIncompleteTasksForJobGroup(totalTasks + activeTasks,
stagesItr)
+ }
+ } else {
+ totalTasks
+ }
+ }
+
+ // Get the total running & pending tasks for a single stage.
+ def getIncompleteTasksForStage(stageId: Int, numTasks: Int): Int = {
+ var pendingTasks = numTasks
+ if (stageIdToTaskIndices.contains(stageId)) {
+ pendingTasks -= stageIdToTaskIndices(stageId).size
+ }
+ var runningTasks = 0
+ if (stageIdToCompleteTaskCount.contains(stageId)) {
+ runningTasks = stageIdToTaskIndices(stageId).size -
stageIdToCompleteTaskCount(stageId)
+ }
+ pendingTasks + runningTasks
+ }
--- End diff --
nit: add newline after just to make it more readable to see the call to
getMaxConTasks
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]