tgravescs commented on a change in pull request #28287:
URL: https://github.com/apache/spark/pull/28287#discussion_r455153176
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -789,6 +811,28 @@ private[spark] class ExecutorAllocationManager(
}
}
+ override def onUnschedulableTaskSetAdded(
+ unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded):
Unit = {
+ val stageId = unschedulableTaskSetAdded.stageId
+ val stageAttemptId = unschedulableTaskSetAdded.stageAttemptId
+ allocationManager.synchronized {
+ val stageAttempt = StageAttempt(stageId, stageAttemptId)
+ unschedulableTaskSets.add(stageAttempt)
+ }
+ allocationManager.onSchedulerBacklogged()
+ }
+
+ override def onUnschedulableTaskSetRemoved(
+ unschedulableTaskSetRemoved:
SparkListenerUnschedulableTaskSetRemoved): Unit = {
+ val stageId = unschedulableTaskSetRemoved.stageId
+ val stageAttemptId = unschedulableTaskSetRemoved.stageAttemptId
+ allocationManager.synchronized {
+ // Clear unschedulableTaskSets since atleast one task becomes
schedulable now
+ val stageAttempt = StageAttempt(stageId, stageAttemptId)
Review comment:
similar move stageAttempt outside of synchronized
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -289,13 +290,27 @@ private[spark] class ExecutorAllocationManager(
s" tasksperexecutor: $tasksPerExecutor")
val maxNeeded = math.ceil(numRunningOrPendingTasks *
executorAllocationRatio /
tasksPerExecutor).toInt
- if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
+
+ val maxNeededWithSpeculationLocalityOffset =
+ if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
// If we have pending speculative tasks and only need a single executor,
allocate one more
// to satisfy the locality requirements of speculation
maxNeeded + 1
} else {
maxNeeded
}
+
+ if (unschedulableTaskSets > 0) {
+ // Request additional executors only if
maxNeededWithSpeculationLocalityOffset is less than
Review comment:
Can we simply this comment to be something like:
Request additional executors to account for task sets that have tasks that
are unschedulable due to blacklisting when the active executor count has
already reached the the max needed we would normally get.
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -789,6 +811,28 @@ private[spark] class ExecutorAllocationManager(
}
}
+ override def onUnschedulableTaskSetAdded(
+ unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded):
Unit = {
+ val stageId = unschedulableTaskSetAdded.stageId
+ val stageAttemptId = unschedulableTaskSetAdded.stageAttemptId
+ allocationManager.synchronized {
+ val stageAttempt = StageAttempt(stageId, stageAttemptId)
Review comment:
setting stageAttempt can be done outside of synchronized
##########
File path: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
##########
@@ -339,6 +349,20 @@ private[spark] trait SparkListenerInterface {
*/
def onNodeUnblacklisted(nodeUnblacklisted: SparkListenerNodeUnblacklisted):
Unit
+ /**
+ * Called when a taskset becomes unschedulable due to blacklisting and
dynamic allocation
+ * is enabled
+ */
+ def onUnschedulableTaskSetAdded(
+ unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded): Unit
+
+ /**
+ * Called when an unschedulable taskset becomes schedulable and dynamic
allocation
+ * is enabled
Review comment:
add period
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -696,7 +717,8 @@ private[spark] class ExecutorAllocationManager(
// If this is the last stage with pending tasks, mark the scheduler
queue as empty
// This is needed in case the stage is aborted for any reason
- if (stageAttemptToNumTasks.isEmpty &&
stageAttemptToNumSpeculativeTasks.isEmpty) {
+ if (stageAttemptToNumTasks.isEmpty &&
+ stageAttemptToNumSpeculativeTasks.isEmpty) {
Review comment:
revert this change as its unneeded
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -311,6 +311,26 @@ private[spark] class DAGScheduler(
eventProcessLoop.post(SpeculativeTaskSubmitted(task))
}
+ /**
+ * Called by the TaskSetManager when a taskset becomes unschedulable due to
blacklisting and
+ * dynamic allocation is enabled
+ */
+ def unschedulableTaskSetAdded(
+ stageId: Int,
+ stageAttemptId: Int): Unit = {
+ eventProcessLoop.post(UnschedulableTaskSetAdded(stageId, stageAttemptId))
+ }
+
+ /**
+ * Called by the TaskSetManager when an unschedulable taskset becomes
schedulable and dynamic
+ * allocation is enabled
Review comment:
nit add period
##########
File path: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
##########
@@ -339,6 +349,20 @@ private[spark] trait SparkListenerInterface {
*/
def onNodeUnblacklisted(nodeUnblacklisted: SparkListenerNodeUnblacklisted):
Unit
+ /**
+ * Called when a taskset becomes unschedulable due to blacklisting and
dynamic allocation
+ * is enabled
Review comment:
nit add period.
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1014,6 +1034,20 @@ private[spark] class DAGScheduler(
listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId,
task.stageAttemptId))
}
+ private[scheduler] def handleUnschedulableTaskSetAdded(
+ stageId: Int,
+ stageAttemptId: Int): Unit = {
+ listenerBus.post(
+ SparkListenerUnschedulableTaskSetAdded(stageId, stageAttemptId))
+ }
+
+ private[scheduler] def handleUnschedulableTaskSetRemoved(
+ stageId: Int,
+ stageAttemptId: Int): Unit = {
+ listenerBus.post(
+ SparkListenerUnschedulableTaskSetRemoved(stageId, stageAttemptId))
Review comment:
same here put on previous line
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -829,12 +873,28 @@ private[spark] class ExecutorAllocationManager(
numTotalTasks - numRunning
}
+ // Currently TaskSetManager.getCompletelyBlacklistedTaskIfAny only takes
the first unschedulable
+ // task found due to blacklisting. This way we only need to keep track of
the unschedulable
+ // tasksets which is an indirect way to get the current number of
unschedulable tasks.
+ def pendingUnschedulableTaskSetsPerResourceProfile(rp: Int): Int = {
+ val attempts = resourceProfileIdToStageAttempt.getOrElse(rp,
Set.empty).toSeq
+ attempts.filter(attempt => unschedulableTaskSets.contains(attempt)).size
+ }
+
+ def hasPendingUnschedulableTasks: Boolean = {
+ val attemptSets = resourceProfileIdToStageAttempt.values
+ attemptSets.exists { attempts =>
+ attempts.exists(unschedulableTaskSets.contains(_))
+ }
+ }
+
def hasPendingTasks: Boolean = {
- hasPendingSpeculativeTasks || hasPendingRegularTasks
+ hasPendingSpeculativeTasks || hasPendingRegularTasks ||
hasPendingUnschedulableTasks
Review comment:
this doesn't make sense to me. Did you have a specific bug without this?
The tasks themselves are counted as regular tasks.
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -696,7 +717,8 @@ private[spark] class ExecutorAllocationManager(
Review comment:
I think we should remove the stage attempt from unschedulableTaskSets
when the stage completes in case it fails or something and never send the
unschedulabletasksetremoved
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -789,6 +811,28 @@ private[spark] class ExecutorAllocationManager(
}
}
+ override def onUnschedulableTaskSetAdded(
+ unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded):
Unit = {
+ val stageId = unschedulableTaskSetAdded.stageId
+ val stageAttemptId = unschedulableTaskSetAdded.stageAttemptId
+ allocationManager.synchronized {
+ val stageAttempt = StageAttempt(stageId, stageAttemptId)
+ unschedulableTaskSets.add(stageAttempt)
+ }
+ allocationManager.onSchedulerBacklogged()
Review comment:
this should be moved inside of synchronized
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -311,6 +311,26 @@ private[spark] class DAGScheduler(
eventProcessLoop.post(SpeculativeTaskSubmitted(task))
}
+ /**
+ * Called by the TaskSetManager when a taskset becomes unschedulable due to
blacklisting and
+ * dynamic allocation is enabled
Review comment:
nit, add period at end
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -829,12 +873,28 @@ private[spark] class ExecutorAllocationManager(
numTotalTasks - numRunning
}
+ // Currently TaskSetManager.getCompletelyBlacklistedTaskIfAny only takes
the first unschedulable
+ // task found due to blacklisting. This way we only need to keep track of
the unschedulable
+ // tasksets which is an indirect way to get the current number of
unschedulable tasks.
+ def pendingUnschedulableTaskSetsPerResourceProfile(rp: Int): Int = {
+ val attempts = resourceProfileIdToStageAttempt.getOrElse(rp,
Set.empty).toSeq
+ attempts.filter(attempt => unschedulableTaskSets.contains(attempt)).size
+ }
+
+ def hasPendingUnschedulableTasks: Boolean = {
+ val attemptSets = resourceProfileIdToStageAttempt.values
+ attemptSets.exists { attempts =>
+ attempts.exists(unschedulableTaskSets.contains(_))
+ }
+ }
+
def hasPendingTasks: Boolean = {
- hasPendingSpeculativeTasks || hasPendingRegularTasks
+ hasPendingSpeculativeTasks || hasPendingRegularTasks ||
hasPendingUnschedulableTasks
}
def totalPendingTasksPerResourceProfile(rp: Int): Int = {
- pendingTasksPerResourceProfile(rp) +
pendingSpeculativeTasksPerResourceProfile(rp)
+ pendingTasksPerResourceProfile(rp) +
+ pendingSpeculativeTasksPerResourceProfile(rp)
Review comment:
revert
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -829,12 +873,28 @@ private[spark] class ExecutorAllocationManager(
numTotalTasks - numRunning
}
+ // Currently TaskSetManager.getCompletelyBlacklistedTaskIfAny only takes
the first unschedulable
+ // task found due to blacklisting. This way we only need to keep track of
the unschedulable
+ // tasksets which is an indirect way to get the current number of
unschedulable tasks.
Review comment:
so we should clarify further. I realize its indirect way, but its not
really the number of unschedulable tasks. Perhaps we can just this to say
something a bit more generic like:
Currently we only know when a task set has an unschedulable task, we don't
know the exact number and since the allocation manager isn't tied closely with
the scheduler we use the number of tasks sets that are unschedulable as a
heuristic to add more executors.
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1014,6 +1034,20 @@ private[spark] class DAGScheduler(
listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId,
task.stageAttemptId))
}
+ private[scheduler] def handleUnschedulableTaskSetAdded(
+ stageId: Int,
+ stageAttemptId: Int): Unit = {
+ listenerBus.post(
+ SparkListenerUnschedulableTaskSetAdded(stageId, stageAttemptId))
Review comment:
I think this will fit on previous line
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]