squito commented on a change in pull request #19194: [SPARK-20589] Allow
limiting task concurrency per stage
URL: https://github.com/apache/spark/pull/19194#discussion_r248848309
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -800,10 +874,47 @@ private[spark] class ExecutorAllocationManager(
allocationManager.synchronized {
stageIdToNumSpeculativeTasks(stageId) =
stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1
+ maxConcurrentTasks = getMaxConTasks
+ logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on
spec. task submitted.")
allocationManager.onSchedulerBacklogged()
}
}
+ /**
+ * Calculate the maximum no. of concurrent tasks that can run currently.
+ */
+ def getMaxConTasks(): Int = {
+ // We can limit the no. of concurrent tasks by a job group. A job group
can have multiple jobs
+ // with multiple stages. We need to get all the active stages belonging
to a job group to
+ // calculate the total no. of pending + running tasks to decide the
maximum no. of executors
+ // we need at that time to serve the outstanding tasks. This is capped
by the minimum no. of
+ // outstanding tasks and the max concurrent limit specified for the job
group if any.
+
+ def sumIncompleteTasksForStages: (Int, Int) => Int = (totalTasks,
stageId) => {
+ val activeTasks = totalPendingTasks(stageId) +
totalRunningTasks(stageId)
+ sumOrMax(totalTasks, activeTasks)
+ }
+ // Get the total running & pending tasks for all stages in a job group.
+ def getIncompleteTasksForJobGroup(stagesItr: Set[Int]): Int = {
+ stagesItr.foldLeft(0)(sumIncompleteTasksForStages)
+ }
+
+ def sumIncompleteTasksForJobGroup: (Int, (String, mutable.HashMap[Int,
String])) => Int = {
+ (maxConTasks, x) => {
+ val totalIncompleteTasksForJobGroup =
getIncompleteTasksForJobGroup(x._2.keySet.toSet)
Review comment:
I think this would be a little easier to follow as
```scala
def sumIncompleteTasksForJobGroup: (Int, (String, Set[Int])) => Int = {
(sumSoFar, (jobGroup, stages) =>
val totalIncompleteTasksForJobGroup =
getIncompleteTasksForJobGroup(stages)
...
val stagesByJobGroup = stageIdToJobGroupId.groupBy(_._2).mapValues(_.keySet)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]