Github user dhruve commented on a diff in the pull request:
https://github.com/apache/spark/pull/19194#discussion_r139162871
--- Diff:
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -758,11 +812,58 @@ private[spark] class ExecutorAllocationManager(
allocationManager.synchronized {
stageIdToNumSpeculativeTasks(stageId) =
stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1
+ maxConcurrentTasks = getMaxConTasks
+ logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on
spec. task submitted.")
allocationManager.onSchedulerBacklogged()
}
}
/**
+ * Calculate the maximum no. of concurrent tasks that can run
currently.
+ */
+ def getMaxConTasks(): Int = {
+ // We can limit the no. of concurrent tasks by a job group. A job
group can have multiple jobs
+ // with multiple stages. We need to get all the active stages
belonging to a job group to
+ // calculate the total no. of pending + running tasks to decide the
maximum no. of executors
+ // we need at that time to serve the outstanding tasks. This is
capped by the minimum of no.
--- End diff --
okay.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]