Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/19194#discussion_r139818031
--- Diff:
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -758,11 +825,52 @@ private[spark] class ExecutorAllocationManager(
allocationManager.synchronized {
stageIdToNumSpeculativeTasks(stageId) =
stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1
+ maxConcurrentTasks = getMaxConTasks
+ logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on
spec. task submitted.")
allocationManager.onSchedulerBacklogged()
}
}
/**
+ * Calculate the maximum no. of concurrent tasks that can run
currently.
+ */
+ def getMaxConTasks(): Int = {
+ // We can limit the no. of concurrent tasks by a job group. A job
group can have multiple jobs
+ // with multiple stages. We need to get all the active stages
belonging to a job group to
+ // calculate the total no. of pending + running tasks to decide the
maximum no. of executors
+ // we need at that time to serve the outstanding tasks. This is
capped by the minimum no. of
+ // outstanding tasks and the max concurrent limit specified for the
job group if any.
+
+ def getIncompleteTasksForStage(stageId: Int, numTasks: Int): Int = {
+ totalPendingTasks(stageId) + totalRunningTasks(stageId)
+ }
+
+ def sumIncompleteTasksForStages: (Int, (Int, Int)) => Int =
(totalTasks, stageToNumTasks) => {
+ val activeTasks = getIncompleteTasksForStage(stageToNumTasks._1,
stageToNumTasks._2)
+ sumOrMax(totalTasks, activeTasks)
+ }
+ // Get the total running & pending tasks for all stages in a job
group.
+ def getIncompleteTasksForJobGroup(stagesItr: mutable.HashMap[Int,
Int]): Int = {
+ stagesItr.foldLeft(0)(sumIncompleteTasksForStages)
+ }
+
+ def sumIncompleteTasksForJobGroup: (Int, (String,
mutable.HashMap[Int, Int])) => Int = {
+ (maxConTasks, x) => {
+ val totalIncompleteTasksForJobGroup =
getIncompleteTasksForJobGroup(x._2)
+ val maxTasks = Math.min(jobGroupToMaxConTasks(x._1),
totalIncompleteTasksForJobGroup)
+ sumOrMax(maxConTasks, maxTasks)
+ }
+ }
+
+ def sumOrMax(a: Int, b: Int): Int = if (doesSumOverflow(a, b))
Int.MaxValue else (a + b)
+
+ def doesSumOverflow(a: Int, b: Int): Boolean = b > (Int.MaxValue - a)
+
+ val stagesByJobGroup = stageIdToNumTasks.groupBy(x =>
jobIdToJobGroup(stageIdToJobId(x._1)))
--- End diff --
you could just store `stageIdToJobGroupId`. Simplifies this a bit, and
then you dont' need to store `jobIdToJobGroup` at all, I think
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]