tgravescs commented on a change in pull request #28287:
URL: https://github.com/apache/spark/pull/28287#discussion_r458133638
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -829,6 +872,23 @@ private[spark] class ExecutorAllocationManager(
numTotalTasks - numRunning
}
+ /**
+ * Currently we only know when a task set has an unschedulable task, we
don't know
+ * the exact number and since the allocation manager isn't tied closely
with the scheduler,
+ * we use the number of tasks sets that are unschedulable as a heuristic
to add more executors.
+ */
+ def pendingUnschedulableTaskSetsPerResourceProfile(rp: Int): Int = {
+ val attempts = resourceProfileIdToStageAttempt.getOrElse(rp,
Set.empty).toSeq
+ attempts.filter(attempt => unschedulableTaskSets.contains(attempt)).size
+ }
+
+ def hasPendingUnschedulableTasks: Boolean = {
Review comment:
This isn't used anymore, correct? Remove it if not
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1014,6 +1034,18 @@ private[spark] class DAGScheduler(
listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId,
task.stageAttemptId))
}
+ private[scheduler] def handleUnschedulableTaskSetAdded(
+ stageId: Int,
+ stageAttemptId: Int): Unit = {
+ listenerBus.post(SparkListenerUnschedulableTaskSetAdded(stageId,
stageAttemptId))
+ }
+
+ private[scheduler] def handleUnschedulableTaskSetRemoved(
+ stageId: Int,
Review comment:
indentation here should be 4 spaces
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]