Github user dhruve commented on a diff in the pull request:
https://github.com/apache/spark/pull/19194#discussion_r139163109
--- Diff:
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -758,11 +812,58 @@ private[spark] class ExecutorAllocationManager(
allocationManager.synchronized {
stageIdToNumSpeculativeTasks(stageId) =
stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1
+ maxConcurrentTasks = getMaxConTasks
+ logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on
spec. task submitted.")
allocationManager.onSchedulerBacklogged()
}
}
/**
+ * Calculate the maximum no. of concurrent tasks that can run
currently.
+ */
+ def getMaxConTasks(): Int = {
+ // We can limit the no. of concurrent tasks by a job group. A job
group can have multiple jobs
+ // with multiple stages. We need to get all the active stages
belonging to a job group to
+ // calculate the total no. of pending + running tasks to decide the
maximum no. of executors
+ // we need at that time to serve the outstanding tasks. This is
capped by the minimum of no.
+ // of outstanding tasks and the max concurrent limit specified for
the job group if any.
+
+ def getIncompleteTasksForStage(stageId: Int, numTasks: Int): Int = {
+ var runningTasks = 0
+ if (stageIdToTaskIndices.contains(stageId)) {
+ runningTasks =
+ stageIdToTaskIndices(stageId).size -
stageIdToCompleteTaskCount.getOrElse(stageId, 0)
--- End diff --
Yes. Nice catch. We do need to account for all the tasks for a stage. And
this should include speculative ones as well.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]